diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index ad3f73d0d..7ea5c3816 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -285,6 +285,7 @@ where let index_is_empty = index_documents_ids.len() == 0; let mut final_documents_ids = RoaringBitmap::new(); let mut word_pair_proximity_docids = Vec::new(); + let mut word_position_docids = Vec::new(); let mut word_docids = Vec::new(); let mut databases_seen = 0; @@ -321,6 +322,19 @@ where let chunk = grenad::Reader::new(file)?; TypedChunk::WordPairProximityDocids(chunk) } + TypedChunk::WordPositionDocids(chunk) => { + // We extract and mmap our chunk file to be able to get it for next processes. + let mut file = chunk.into_inner(); + let mmap = unsafe { memmap2::Mmap::map(&file)? }; + let cursor_mmap = CursorClonableMmap::new(ClonableMmap::from(mmap)); + let chunk = grenad::Reader::new(cursor_mmap)?; + word_position_docids.push(chunk); + + // We reconstruct our typed-chunk back. + file.rewind()?; + let chunk = grenad::Reader::new(file)?; + TypedChunk::WordPositionDocids(chunk) + } otherwise => otherwise, }; @@ -359,7 +373,11 @@ where let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids; self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; - self.execute_prefix_databases(word_docids, word_pair_proximity_docids)?; + self.execute_prefix_databases( + word_docids, + word_pair_proximity_docids, + word_position_docids, + )?; Ok(all_documents_ids.len()) } @@ -369,6 +387,7 @@ where self, word_docids: Vec>, word_pair_proximity_docids: Vec>, + word_position_docids: Vec>, ) -> Result<()> where F: Fn(UpdateIndexingStep) + Sync, @@ -453,7 +472,7 @@ where if let Some(value) = self.config.words_positions_min_level_size { builder.min_level_size(value); } - builder.execute()?; + builder.execute(word_position_docids, &previous_words_prefixes_fst)?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { diff --git a/milli/src/update/words_prefix_position_docids.rs b/milli/src/update/words_prefix_position_docids.rs index a8346a1cb..b1b8273ef 100644 --- a/milli/src/update/words_prefix_position_docids.rs +++ b/milli/src/update/words_prefix_position_docids.rs @@ -1,17 +1,20 @@ +use std::collections::HashMap; use std::num::NonZeroU32; use std::{cmp, str}; use fst::Streamer; -use grenad::CompressionType; +use grenad::{CompressionType, MergerBuilder}; use heed::types::ByteSlice; -use heed::BytesEncode; +use heed::{BytesDecode, BytesEncode}; use log::debug; +use slice_group_by::GroupBy; use crate::error::SerializationError; use crate::heed_codec::StrBEU32Codec; use crate::index::main_key::WORDS_PREFIXES_FST_KEY; use crate::update::index_documents::{ - create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, WriteMethod, + create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, + sorter_into_lmdb_database, CursorClonableMmap, MergeFn, WriteMethod, }; use crate::{Index, Result}; @@ -54,12 +57,27 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixPositionDocids::{}")] - pub fn execute(self) -> Result<()> { + pub fn execute>( + self, + new_word_position_docids: Vec>, + old_prefix_fst: &fst::Set, + ) -> Result<()> { debug!("Computing and writing the word levels positions docids into LMDB on disk..."); - self.index.word_prefix_position_docids.clear(self.wtxn)?; + let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; - let mut word_prefix_positions_docids_sorter = create_sorter( + // We retrieve the common words between the previous and new prefix word fst. + let common_prefix_fst_keys = + fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); + let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys + .as_slice() + .linear_group_by_key(|x| x.chars().nth(0).unwrap()) + .collect(); + + // We compute the set of prefixes that are no more part of the prefix fst. + let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); + + let mut prefix_position_docids_sorter = create_sorter( merge_cbo_roaring_bitmaps, self.chunk_compression_type, self.chunk_compression_level, @@ -67,39 +85,107 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { self.max_memory, ); - // We insert the word prefix position and - // corresponds to the word-prefix position where the prefixes appears - // in the prefix FST previously constructed. - let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; + let mut word_position_docids_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); + word_position_docids_merger.extend(new_word_position_docids); + let mut word_position_docids_iter = + word_position_docids_merger.build().into_merger_iter()?; + + // We fetch all the new common prefixes between the previous and new prefix fst. + let mut buffer = Vec::new(); + let mut current_prefixes: Option<&&[String]> = None; + let mut prefixes_cache = HashMap::new(); + while let Some((key, data)) = word_position_docids_iter.next()? { + let (word, pos) = StrBEU32Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?; + + current_prefixes = match current_prefixes.take() { + Some(prefixes) if word.starts_with(&prefixes[0]) => Some(prefixes), + _otherwise => { + write_prefixes_in_sorter( + &mut prefixes_cache, + &mut prefix_position_docids_sorter, + )?; + common_prefix_fst_keys.iter().find(|prefixes| word.starts_with(&prefixes[0])) + } + }; + + if let Some(prefixes) = current_prefixes { + for prefix in prefixes.iter() { + if word.starts_with(prefix) { + buffer.clear(); + buffer.extend_from_slice(prefix.as_bytes()); + buffer.extend_from_slice(&pos.to_be_bytes()); + match prefixes_cache.get_mut(&buffer) { + Some(value) => value.push(data.to_owned()), + None => { + prefixes_cache.insert(buffer.clone(), vec![data.to_owned()]); + } + } + } + } + } + } + + write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_position_docids_sorter)?; + + // We fetch the docids associated to the newly added word prefix fst only. let db = self.index.word_position_docids.remap_data_type::(); - // iter over all prefixes in the prefix fst. - let mut word_stream = prefix_fst.stream(); - while let Some(prefix_bytes) = word_stream.next() { + let mut new_prefixes_stream = prefix_fst.op().add(old_prefix_fst).difference(); + while let Some(prefix_bytes) = new_prefixes_stream.next() { let prefix = str::from_utf8(prefix_bytes).map_err(|_| { SerializationError::Decoding { db_name: Some(WORDS_PREFIXES_FST_KEY) } })?; // iter over all lines of the DB where the key is prefixed by the current prefix. - let mut iter = db + let iter = db .remap_key_type::() - .prefix_iter(self.wtxn, &prefix_bytes)? + .prefix_iter(self.wtxn, prefix_bytes)? .remap_key_type::(); - while let Some(((_word, pos), data)) = iter.next().transpose()? { - let key = (prefix, pos); - let bytes = StrBEU32Codec::bytes_encode(&key).unwrap(); - word_prefix_positions_docids_sorter.insert(bytes, data)?; + for result in iter { + let ((word, pos), data) = result?; + if word.starts_with(prefix) { + let key = (prefix, pos); + let bytes = StrBEU32Codec::bytes_encode(&key).unwrap(); + prefix_position_docids_sorter.insert(bytes, data)?; + } } } + drop(new_prefixes_stream); + + // We remove all the entries that are no more required in this word prefix position + // docids database. + let mut iter = + self.index.word_prefix_position_docids.iter_mut(self.wtxn)?.lazily_decode_data(); + while let Some(((prefix, _), _)) = iter.next().transpose()? { + if suppr_pw.contains(prefix.as_bytes()) { + unsafe { iter.del_current()? }; + } + } + + drop(iter); + // We finally write all the word prefix position docids into the LMDB database. sorter_into_lmdb_database( self.wtxn, *self.index.word_prefix_position_docids.as_polymorph(), - word_prefix_positions_docids_sorter, + prefix_position_docids_sorter, merge_cbo_roaring_bitmaps, - WriteMethod::Append, + WriteMethod::GetMergePut, )?; Ok(()) } } + +fn write_prefixes_in_sorter( + prefixes: &mut HashMap, Vec>>, + sorter: &mut grenad::Sorter, +) -> Result<()> { + for (key, data_slices) in prefixes.drain() { + for data in data_slices { + sorter.insert(&key, data)?; + } + } + + Ok(()) +}