From 6a5a834f2788c90f7681eb1c0ae4a53751614897 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Thu, 3 Apr 2025 16:04:35 +0200 Subject: [PATCH] Lazily compute the FSTs during indexing --- .../src/update/new/facet_search_builder.rs | 20 +++--- .../src/update/new/fst_merger_builder.rs | 63 +++++++++++++------ .../src/update/new/indexer/post_processing.rs | 4 +- .../milli/src/update/new/word_fst_builder.rs | 10 +-- 4 files changed, 63 insertions(+), 34 deletions(-) diff --git a/crates/milli/src/update/new/facet_search_builder.rs b/crates/milli/src/update/new/facet_search_builder.rs index 6e9ffa1ed..aa918c170 100644 --- a/crates/milli/src/update/new/facet_search_builder.rs +++ b/crates/milli/src/update/new/facet_search_builder.rs @@ -144,7 +144,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> { let mut merger_iter = builder.build().into_stream_merger_iter()?; let mut current_field_id = None; let mut fst; - let mut fst_merger_builder: Option = None; + let mut fst_merger_builder: Option> = None; while let Some((key, deladd)) = merger_iter.next()? { let (field_id, normalized_facet_string) = BEU16StrCodec::bytes_decode(key).map_err(heed::Error::Encoding)?; @@ -153,12 +153,13 @@ impl<'indexer> FacetSearchBuilder<'indexer> { if let (Some(current_field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) { - let mmap = fst_merger_builder.build(&mut callback)?; - index.facet_id_string_fst.remap_data_type::().put( - wtxn, - ¤t_field_id, - &mmap, - )?; + if let Some(mmap) = fst_merger_builder.build(&mut callback)? { + index.facet_id_string_fst.remap_data_type::().put( + wtxn, + ¤t_field_id, + &mmap, + )?; + } } fst = index.facet_id_string_fst.get(rtxn, &field_id)?; @@ -209,8 +210,9 @@ impl<'indexer> FacetSearchBuilder<'indexer> { } if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) { - let mmap = fst_merger_builder.build(&mut callback)?; - index.facet_id_string_fst.remap_data_type::().put(wtxn, &field_id, &mmap)?; + if let Some(mmap) = fst_merger_builder.build(&mut callback)? { + index.facet_id_string_fst.remap_data_type::().put(wtxn, &field_id, &mmap)?; + } } Ok(()) diff --git a/crates/milli/src/update/new/fst_merger_builder.rs b/crates/milli/src/update/new/fst_merger_builder.rs index 1c584ef53..5bd0fc14c 100644 --- a/crates/milli/src/update/new/fst_merger_builder.rs +++ b/crates/milli/src/update/new/fst_merger_builder.rs @@ -1,25 +1,27 @@ use std::fs::File; use std::io::BufWriter; -use fst::{Set, SetBuilder, Streamer}; +use fst::{IntoStreamer, Set, SetBuilder, Streamer}; use memmap2::Mmap; use tempfile::tempfile; use crate::update::del_add::DelAdd; use crate::{InternalError, Result}; -pub struct FstMergerBuilder<'a> { +pub struct FstMergerBuilder<'a, D: AsRef<[u8]>> { + fst: Option<&'a Set>, stream: Option>, - fst_builder: SetBuilder>, + fst_builder: Option>>, last: Option>, inserted_words: usize, } -impl<'a> FstMergerBuilder<'a> { - pub fn new>(fst: Option<&'a Set>) -> Result { +impl<'a, D: AsRef<[u8]>> FstMergerBuilder<'a, D> { + pub fn new(fst: Option<&'a Set>) -> Result { Ok(Self { + fst, stream: fst.map(|fst| fst.stream()), - fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?, + fst_builder: None, last: None, inserted_words: 0, }) @@ -110,11 +112,17 @@ impl<'a> FstMergerBuilder<'a> { is_modified: bool, insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, ) -> Result<()> { - // Addition: We insert the word - // Deletion: We delete the word by not inserting it - if deladd == DelAdd::Addition { - self.inserted_words += 1; - self.fst_builder.insert(bytes)?; + if is_modified && self.fst_builder.is_none() { + self.build_new_fst(bytes)?; + } + + if let Some(fst_builder) = self.fst_builder.as_mut() { + // Addition: We insert the word + // Deletion: We delete the word by not inserting it + if deladd == DelAdd::Addition { + self.inserted_words += 1; + fst_builder.insert(bytes)?; + } } insertion_callback(bytes, deladd, is_modified)?; @@ -122,6 +130,19 @@ impl<'a> FstMergerBuilder<'a> { Ok(()) } + // Lazily build the new fst + fn build_new_fst(&mut self, bytes: &[u8]) -> Result<()> { + let mut fst_builder = SetBuilder::new(BufWriter::new(tempfile()?))?; + + if let Some(fst) = self.fst { + fst_builder.extend_stream(fst.range().lt(bytes).into_stream())?; + } + + self.fst_builder = Some(fst_builder); + + Ok(()) + } + fn drain_stream( &mut self, insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, @@ -142,16 +163,20 @@ impl<'a> FstMergerBuilder<'a> { pub fn build( mut self, insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, - ) -> Result { + ) -> Result> { self.drain_stream(insertion_callback)?; - let fst_file = self - .fst_builder - .into_inner()? - .into_inner() - .map_err(|_| InternalError::IndexingMergingKeys { process: "building-fst" })?; - let fst_mmap = unsafe { Mmap::map(&fst_file)? }; + match self.fst_builder { + Some(fst_builder) => { + let fst_file = fst_builder + .into_inner()? + .into_inner() + .map_err(|_| InternalError::IndexingMergingKeys { process: "building-fst" })?; + let fst_mmap = unsafe { Mmap::map(&fst_file)? }; - Ok(fst_mmap) + Ok(Some(fst_mmap)) + } + None => Ok(None), + } } } diff --git a/crates/milli/src/update/new/indexer/post_processing.rs b/crates/milli/src/update/new/indexer/post_processing.rs index aace70cff..546d061f5 100644 --- a/crates/milli/src/update/new/indexer/post_processing.rs +++ b/crates/milli/src/update/new/indexer/post_processing.rs @@ -118,7 +118,9 @@ fn compute_word_fst( } let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?; - index.main.remap_types::().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?; + if let Some(word_fst_mmap) = word_fst_mmap { + index.main.remap_types::().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?; + } if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data { index.main.remap_types::().put( wtxn, diff --git a/crates/milli/src/update/new/word_fst_builder.rs b/crates/milli/src/update/new/word_fst_builder.rs index a9a5222be..d7a301f75 100644 --- a/crates/milli/src/update/new/word_fst_builder.rs +++ b/crates/milli/src/update/new/word_fst_builder.rs @@ -10,14 +10,14 @@ use crate::index::PrefixSettings; use crate::update::del_add::DelAdd; use crate::{InternalError, Prefix, Result}; -pub struct WordFstBuilder<'a> { - word_fst_builder: FstMergerBuilder<'a>, +pub struct WordFstBuilder<'a, D: AsRef<[u8]>> { + word_fst_builder: FstMergerBuilder<'a, D>, prefix_fst_builder: Option, registered_words: usize, } -impl<'a> WordFstBuilder<'a> { - pub fn new(words_fst: &'a Set>) -> Result { +impl<'a, D: AsRef<[u8]>> WordFstBuilder<'a, D> { + pub fn new(words_fst: &'a Set) -> Result { Ok(Self { word_fst_builder: FstMergerBuilder::new(Some(words_fst))?, prefix_fst_builder: None, @@ -50,7 +50,7 @@ impl<'a> WordFstBuilder<'a> { mut self, index: &crate::Index, rtxn: &heed::RoTxn, - ) -> Result<(Mmap, Option)> { + ) -> Result<(Option, Option)> { let words_fst_mmap = self.word_fst_builder.build(&mut |bytes, deladd, is_modified| { if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder { prefix_fst_builder.insert_word(bytes, deladd, is_modified)