From 823da19745d4c4ada50873d7eddc4e0332c506bc Mon Sep 17 00:00:00 2001 From: many Date: Tue, 17 Aug 2021 10:56:06 +0200 Subject: [PATCH] Fix test and use progress callback --- .../src/update/index_documents/extract/mod.rs | 2 + milli/src/update/index_documents/mod.rs | 78 +++++++++++++++---- .../src/update/index_documents/typed_chunk.rs | 15 +++- 3 files changed, 79 insertions(+), 16 deletions(-) diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index b24c80da4..a389f36cf 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -11,6 +11,7 @@ use std::collections::HashSet; use std::fs::File; use crossbeam_channel::Sender; +use log::debug; use rayon::prelude::*; use self::extract_docid_word_positions::extract_docid_word_positions; @@ -192,6 +193,7 @@ fn spawn_extraction_task( .map(|chunk| extract_fn(chunk, indexer.clone()).unwrap()) .collect(); rayon::spawn(move || { + debug!("merge {} database", name); let reader = merge_readers(chunks, merge_fn, indexer).unwrap(); lmdb_writer_sx.send(serialize_fn(reader)).unwrap(); }); diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 4f488337c..51b0a6613 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -31,6 +31,10 @@ use crate::update::{ }; use crate::{Index, Result}; +static MERGED_DATABASE_COUNT: usize = 7; +static PREFIX_DATABASE_COUNT: usize = 5; +static TOTAL_POSTING_DATABASE_COUNT: usize = MERGED_DATABASE_COUNT + PREFIX_DATABASE_COUNT; + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct DocumentAdditionResult { pub nb_documents: usize, @@ -278,15 +282,34 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { let index_is_empty = index_documents_ids.len() == 0; let mut final_documents_ids = RoaringBitmap::new(); + let mut databases_seen = 0; + progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen, + total_databases: TOTAL_POSTING_DATABASE_COUNT, + }); + for typed_chunk in lmdb_writer_rx { - let docids = + let (docids, is_merged_database) = write_typed_chunk_into_index(typed_chunk, &self.index, self.wtxn, index_is_empty)?; - final_documents_ids |= docids; - debug!( - "We have seen {} documents on {} total document so far", - final_documents_ids.len(), - documents_count - ); + if !docids.is_empty() { + final_documents_ids |= docids; + let documents_seen_count = final_documents_ids.len(); + progress_callback(UpdateIndexingStep::IndexDocuments { + documents_seen: documents_seen_count as usize, + total_documents: documents_count, + }); + debug!( + "We have seen {} documents on {} total document so far", + documents_seen_count, documents_count + ); + } + if is_merged_database { + databases_seen += 1; + progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen: databases_seen, + total_databases: TOTAL_POSTING_DATABASE_COUNT, + }); + } } // We write the field distribution into the main database @@ -298,20 +321,19 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { // We write the external documents ids into the main database. self.index.put_external_documents_ids(self.wtxn, &external_documents_ids)?; - let all_documents_ids = index_documents_ids | new_documents_ids; + let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids; self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; self.execute_prefix_databases(progress_callback) } - pub fn execute_prefix_databases( - self, - // output: TransformOutput, - progress_callback: F, - ) -> Result<()> + pub fn execute_prefix_databases(self, progress_callback: F) -> Result<()> where F: Fn(UpdateIndexingStep) + Sync, { + // Merged databases are already been indexed, we start from this count; + let mut databases_seen = MERGED_DATABASE_COUNT; + // Run the facets update operation. let mut builder = Facets::new(self.wtxn, self.index, self.update_id); builder.chunk_compression_type = self.chunk_compression_type; @@ -324,6 +346,12 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { } builder.execute()?; + databases_seen += 1; + progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen: databases_seen, + total_databases: TOTAL_POSTING_DATABASE_COUNT, + }); + // Run the words prefixes update operation. let mut builder = WordsPrefixesFst::new(self.wtxn, self.index, self.update_id); if let Some(value) = self.words_prefix_threshold { @@ -334,6 +362,12 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { } builder.execute()?; + databases_seen += 1; + progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen: databases_seen, + total_databases: TOTAL_POSTING_DATABASE_COUNT, + }); + // Run the word prefix docids update operation. let mut builder = WordPrefixDocids::new(self.wtxn, self.index); builder.chunk_compression_type = self.chunk_compression_type; @@ -342,6 +376,12 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { builder.max_memory = self.max_memory; builder.execute()?; + databases_seen += 1; + progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen: databases_seen, + total_databases: TOTAL_POSTING_DATABASE_COUNT, + }); + // Run the word prefix pair proximity docids update operation. let mut builder = WordPrefixPairProximityDocids::new(self.wtxn, self.index); builder.chunk_compression_type = self.chunk_compression_type; @@ -350,6 +390,12 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { builder.max_memory = self.max_memory; builder.execute()?; + databases_seen += 1; + progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen: databases_seen, + total_databases: TOTAL_POSTING_DATABASE_COUNT, + }); + // Run the words level positions update operation. let mut builder = WordsLevelPositions::new(self.wtxn, self.index); builder.chunk_compression_type = self.chunk_compression_type; @@ -362,6 +408,12 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { } builder.execute()?; + databases_seen += 1; + progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen: databases_seen, + total_databases: TOTAL_POSTING_DATABASE_COUNT, + }); + Ok(()) } } diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index e7617bdab..e8790af16 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -32,7 +32,8 @@ pub(crate) fn write_typed_chunk_into_index( index: &Index, wtxn: &mut RwTxn, index_is_empty: bool, -) -> Result { +) -> Result<(RoaringBitmap, bool)> { + let mut is_merged_database = false; match typed_chunk { TypedChunk::DocidWordPositions(docid_word_positions_iter) => { write_entries_into_database( @@ -71,8 +72,11 @@ pub(crate) fn write_typed_chunk_into_index( |value, _buffer| Ok(value), merge_cbo_roaring_bitmaps, )?; + is_merged_database = true; + } + TypedChunk::NewDocumentsIds(documents_ids) => { + return Ok((documents_ids, is_merged_database)) } - TypedChunk::NewDocumentsIds(documents_ids) => return Ok(documents_ids), TypedChunk::WordDocids(word_docids_iter) => { let mut word_docids_iter = unsafe { into_clonable_grenad(word_docids_iter) }?; append_entries_into_database( @@ -100,6 +104,7 @@ pub(crate) fn write_typed_chunk_into_index( builder.extend_stream(union_stream)?; let fst = builder.into_set(); index.put_words_fst(wtxn, &fst)?; + is_merged_database = true; } TypedChunk::WordLevelPositionDocids(word_level_position_docids_iter) => { append_entries_into_database( @@ -110,6 +115,7 @@ pub(crate) fn write_typed_chunk_into_index( |value, _buffer| Ok(value), merge_cbo_roaring_bitmaps, )?; + is_merged_database = true; } TypedChunk::FieldIdFacetNumberDocids(facet_id_f64_docids_iter) => { append_entries_into_database( @@ -120,6 +126,7 @@ pub(crate) fn write_typed_chunk_into_index( |value, _buffer| Ok(value), merge_cbo_roaring_bitmaps, )?; + is_merged_database = true; } TypedChunk::WordPairProximityDocids(word_pair_proximity_docids_iter) => { append_entries_into_database( @@ -130,6 +137,7 @@ pub(crate) fn write_typed_chunk_into_index( |value, _buffer| Ok(value), merge_cbo_roaring_bitmaps, )?; + is_merged_database = true; } TypedChunk::FieldIdDocidFacetNumbers(mut fid_docid_facet_number) => { let index_fid_docid_facet_numbers = @@ -166,10 +174,11 @@ pub(crate) fn write_typed_chunk_into_index( Ok(values.serialize_into(buffer)?) }, )?; + is_merged_database = true; } } - Ok(RoaringBitmap::new()) + Ok((RoaringBitmap::new(), is_merged_database)) } fn merge_roaring_bitmaps(new_value: &[u8], db_value: &[u8], buffer: &mut Vec) -> Result<()> {