From 8d26f3040cdb75559a4ec71bad249b54127accbc Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 28 Feb 2022 10:14:54 +0100 Subject: [PATCH 1/2] Remove a useless grenad file merging --- milli/src/update/index_documents/mod.rs | 24 +++++++++---------- milli/src/update/word_prefix_docids.rs | 13 ++++------ .../word_prefix_pair_proximity_docids.rs | 15 ++++-------- .../update/words_prefix_position_docids.rs | 13 ++++------ 4 files changed, 24 insertions(+), 41 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 9b1c73b36..93b86617c 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -279,9 +279,9 @@ where let index_documents_ids = self.index.documents_ids(self.wtxn)?; let index_is_empty = index_documents_ids.len() == 0; let mut final_documents_ids = RoaringBitmap::new(); - let mut word_pair_proximity_docids = Vec::new(); - let mut word_position_docids = Vec::new(); - let mut word_docids = Vec::new(); + let mut word_pair_proximity_docids = None; + let mut word_position_docids = None; + let mut word_docids = None; let mut databases_seen = 0; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { @@ -293,17 +293,17 @@ where let typed_chunk = match result? { TypedChunk::WordDocids(chunk) => { let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; - word_docids.push(cloneable_chunk); + word_docids = Some(cloneable_chunk); TypedChunk::WordDocids(chunk) } TypedChunk::WordPairProximityDocids(chunk) => { let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; - word_pair_proximity_docids.push(cloneable_chunk); + word_pair_proximity_docids = Some(cloneable_chunk); TypedChunk::WordPairProximityDocids(chunk) } TypedChunk::WordPositionDocids(chunk) => { let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; - word_position_docids.push(cloneable_chunk); + word_position_docids = Some(cloneable_chunk); TypedChunk::WordPositionDocids(chunk) } otherwise => otherwise, @@ -345,9 +345,9 @@ where self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; self.execute_prefix_databases( - word_docids, - word_pair_proximity_docids, - word_position_docids, + word_docids.unwrap(), + word_pair_proximity_docids.unwrap(), + word_position_docids.unwrap(), )?; Ok(all_documents_ids.len()) @@ -356,9 +356,9 @@ where #[logging_timer::time("IndexDocuments::{}")] pub fn execute_prefix_databases( self, - word_docids: Vec>, - word_pair_proximity_docids: Vec>, - word_position_docids: Vec>, + word_docids: grenad::Reader, + word_pair_proximity_docids: grenad::Reader, + word_position_docids: grenad::Reader, ) -> Result<()> where F: Fn(UpdateIndexingStep) + Sync, diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 0bb5edb9a..2baaf2f19 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -1,6 +1,6 @@ use std::collections::{HashMap, HashSet}; -use grenad::{CompressionType, MergerBuilder}; +use grenad::CompressionType; use heed::types::ByteSlice; use crate::update::index_documents::{ @@ -35,7 +35,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { #[logging_timer::time("WordPrefixDocids::{}")] pub fn execute( self, - new_word_docids: Vec>, + new_word_docids: grenad::Reader, new_prefix_fst_words: &[String], common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, @@ -50,15 +50,10 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { self.max_memory, ); - let mut word_docids_merger = MergerBuilder::new(merge_roaring_bitmaps); - for reader in new_word_docids { - word_docids_merger.push(reader.into_cursor()?); - } - let mut word_docids_iter = word_docids_merger.build().into_stream_merger_iter()?; - + let mut new_word_docids_iter = new_word_docids.into_cursor()?; let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); - while let Some((word, data)) = word_docids_iter.next()? { + while let Some((word, data)) = new_word_docids_iter.move_on_next()? { current_prefixes = match current_prefixes.take() { Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes), _otherwise => { diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index b498d5850..692dd1568 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -1,6 +1,6 @@ use std::collections::{HashMap, HashSet}; -use grenad::{CompressionType, MergerBuilder}; +use grenad::CompressionType; use heed::types::ByteSlice; use heed::BytesDecode; use log::debug; @@ -64,7 +64,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { #[logging_timer::time("WordPrefixPairProximityDocids::{}")] pub fn execute( self, - new_word_pair_proximity_docids: Vec>, + new_word_pair_proximity_docids: grenad::Reader, new_prefix_fst_words: &[String], common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, @@ -74,14 +74,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { let new_prefix_fst_words: Vec<_> = new_prefix_fst_words.linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect(); - // We retrieve and merge the created word pair proximities docids entries - // for the newly added documents. - let mut wppd_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); - for reader in new_word_pair_proximity_docids { - wppd_merger.push(reader.into_cursor()?); - } - let mut wppd_iter = wppd_merger.build().into_stream_merger_iter()?; - + let mut new_wppd_iter = new_word_pair_proximity_docids.into_cursor()?; let mut word_prefix_pair_proximity_docids_sorter = create_sorter( merge_cbo_roaring_bitmaps, self.chunk_compression_type, @@ -95,7 +88,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { let mut buffer = Vec::new(); let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); - while let Some((key, data)) = wppd_iter.next()? { + while let Some((key, data)) = new_wppd_iter.move_on_next()? { let (w1, w2, prox) = StrStrU8Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?; if prox > self.max_proximity { continue; diff --git a/milli/src/update/words_prefix_position_docids.rs b/milli/src/update/words_prefix_position_docids.rs index 9e15f4d6c..324516325 100644 --- a/milli/src/update/words_prefix_position_docids.rs +++ b/milli/src/update/words_prefix_position_docids.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; use std::num::NonZeroU32; use std::{cmp, str}; -use grenad::{CompressionType, MergerBuilder}; +use grenad::CompressionType; use heed::types::ByteSlice; use heed::{BytesDecode, BytesEncode}; use log::debug; @@ -57,7 +57,7 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { #[logging_timer::time("WordPrefixPositionDocids::{}")] pub fn execute( self, - new_word_position_docids: Vec>, + new_word_position_docids: grenad::Reader, new_prefix_fst_words: &[String], common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, @@ -72,18 +72,13 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { self.max_memory, ); - let mut word_position_docids_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); - for reader in new_word_position_docids { - word_position_docids_merger.push(reader.into_cursor()?); - } - let mut word_position_docids_iter = - word_position_docids_merger.build().into_stream_merger_iter()?; + let mut new_word_position_docids_iter = new_word_position_docids.into_cursor()?; // We fetch all the new common prefixes between the previous and new prefix fst. let mut buffer = Vec::new(); let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); - while let Some((key, data)) = word_position_docids_iter.next()? { + while let Some((key, data)) = new_word_position_docids_iter.move_on_next()? { let (word, pos) = StrBEU32Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?; current_prefixes = match current_prefixes.take() { From d5b8b5a2f846d26e7c65f0634dd34bd7da460581 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 28 Feb 2022 16:00:33 +0100 Subject: [PATCH 2/2] Replace the ugly unwraps by clean if let Somes --- milli/src/update/index_documents/mod.rs | 100 +++++++++++++----------- 1 file changed, 53 insertions(+), 47 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 93b86617c..2d3004444 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -345,9 +345,9 @@ where self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; self.execute_prefix_databases( - word_docids.unwrap(), - word_pair_proximity_docids.unwrap(), - word_position_docids.unwrap(), + word_docids, + word_pair_proximity_docids, + word_position_docids, )?; Ok(all_documents_ids.len()) @@ -356,9 +356,9 @@ where #[logging_timer::time("IndexDocuments::{}")] pub fn execute_prefix_databases( self, - word_docids: grenad::Reader, - word_pair_proximity_docids: grenad::Reader, - word_position_docids: grenad::Reader, + word_docids: Option>, + word_pair_proximity_docids: Option>, + word_position_docids: Option>, ) -> Result<()> where F: Fn(UpdateIndexingStep) + Sync, @@ -424,18 +424,20 @@ where total_databases: TOTAL_POSTING_DATABASE_COUNT, }); - // Run the word prefix docids update operation. - let mut builder = WordPrefixDocids::new(self.wtxn, self.index); - builder.chunk_compression_type = self.indexer_config.chunk_compression_type; - builder.chunk_compression_level = self.indexer_config.chunk_compression_level; - builder.max_nb_chunks = self.indexer_config.max_nb_chunks; - builder.max_memory = self.indexer_config.max_memory; - builder.execute( - word_docids, - &new_prefix_fst_words, - &common_prefix_fst_words, - &del_prefix_fst_words, - )?; + if let Some(word_docids) = word_docids { + // Run the word prefix docids update operation. + let mut builder = WordPrefixDocids::new(self.wtxn, self.index); + builder.chunk_compression_type = self.indexer_config.chunk_compression_type; + builder.chunk_compression_level = self.indexer_config.chunk_compression_level; + builder.max_nb_chunks = self.indexer_config.max_nb_chunks; + builder.max_memory = self.indexer_config.max_memory; + builder.execute( + word_docids, + &new_prefix_fst_words, + &common_prefix_fst_words, + &del_prefix_fst_words, + )?; + } databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { @@ -443,18 +445,20 @@ where total_databases: TOTAL_POSTING_DATABASE_COUNT, }); - // Run the word prefix pair proximity docids update operation. - let mut builder = WordPrefixPairProximityDocids::new(self.wtxn, self.index); - builder.chunk_compression_type = self.indexer_config.chunk_compression_type; - builder.chunk_compression_level = self.indexer_config.chunk_compression_level; - builder.max_nb_chunks = self.indexer_config.max_nb_chunks; - builder.max_memory = self.indexer_config.max_memory; - builder.execute( - word_pair_proximity_docids, - &new_prefix_fst_words, - &common_prefix_fst_words, - &del_prefix_fst_words, - )?; + if let Some(word_pair_proximity_docids) = word_pair_proximity_docids { + // Run the word prefix pair proximity docids update operation. + let mut builder = WordPrefixPairProximityDocids::new(self.wtxn, self.index); + builder.chunk_compression_type = self.indexer_config.chunk_compression_type; + builder.chunk_compression_level = self.indexer_config.chunk_compression_level; + builder.max_nb_chunks = self.indexer_config.max_nb_chunks; + builder.max_memory = self.indexer_config.max_memory; + builder.execute( + word_pair_proximity_docids, + &new_prefix_fst_words, + &common_prefix_fst_words, + &del_prefix_fst_words, + )?; + } databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { @@ -462,24 +466,26 @@ where total_databases: TOTAL_POSTING_DATABASE_COUNT, }); - // Run the words prefix position docids update operation. - let mut builder = WordPrefixPositionDocids::new(self.wtxn, self.index); - builder.chunk_compression_type = self.indexer_config.chunk_compression_type; - builder.chunk_compression_level = self.indexer_config.chunk_compression_level; - builder.max_nb_chunks = self.indexer_config.max_nb_chunks; - builder.max_memory = self.indexer_config.max_memory; - if let Some(value) = self.config.words_positions_level_group_size { - builder.level_group_size(value); + if let Some(word_position_docids) = word_position_docids { + // Run the words prefix position docids update operation. + let mut builder = WordPrefixPositionDocids::new(self.wtxn, self.index); + builder.chunk_compression_type = self.indexer_config.chunk_compression_type; + builder.chunk_compression_level = self.indexer_config.chunk_compression_level; + builder.max_nb_chunks = self.indexer_config.max_nb_chunks; + builder.max_memory = self.indexer_config.max_memory; + if let Some(value) = self.config.words_positions_level_group_size { + builder.level_group_size(value); + } + if let Some(value) = self.config.words_positions_min_level_size { + builder.min_level_size(value); + } + builder.execute( + word_position_docids, + &new_prefix_fst_words, + &common_prefix_fst_words, + &del_prefix_fst_words, + )?; } - if let Some(value) = self.config.words_positions_min_level_size { - builder.min_level_size(value); - } - builder.execute( - word_position_docids, - &new_prefix_fst_words, - &common_prefix_fst_words, - &del_prefix_fst_words, - )?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {