diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 8f4e38790..bc771e5cd 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -441,8 +441,9 @@ where (indexing_context.send_progress)(Progress::from_step(Step::PostProcessingWords)); if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { - compute_prefix_database(index, wtxn, prefix_delta)?; + compute_prefix_database(index, wtxn, prefix_delta, grenad_parameters)?; } + (indexing_context.send_progress)(Progress::from_step(Step::Finalizing)); Ok(()) as Result<_> @@ -474,16 +475,17 @@ fn compute_prefix_database( index: &Index, wtxn: &mut RwTxn, prefix_delta: PrefixDelta, + grenad_parameters: GrenadParameters, ) -> Result<()> { let PrefixDelta { modified, deleted } = prefix_delta; // Compute word prefix docids - compute_word_prefix_docids(wtxn, index, &modified, &deleted)?; + compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; // Compute exact word prefix docids - compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted)?; + compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; // Compute word prefix fid docids - compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?; + compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; // Compute word prefix position docids - compute_word_prefix_position_docids(wtxn, index, &modified, &deleted) + compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters) } #[tracing::instrument(level = "trace", skip_all, target = "indexing")] diff --git a/crates/milli/src/update/new/words_prefix_docids.rs b/crates/milli/src/update/new/words_prefix_docids.rs index 5454d815e..338d22505 100644 --- a/crates/milli/src/update/new/words_prefix_docids.rs +++ b/crates/milli/src/update/new/words_prefix_docids.rs @@ -7,24 +7,31 @@ use heed::types::Bytes; use heed::{BytesDecode, Database, Error, RoTxn, RwTxn}; use rayon::iter::{IntoParallelIterator, ParallelIterator as _}; use roaring::MultiOps; -use tempfile::tempfile; +use tempfile::spooled_tempfile; use thread_local::ThreadLocal; use super::ref_cell_ext::RefCellExt as _; use crate::heed_codec::StrBEU16Codec; +use crate::update::GrenadParameters; use crate::{CboRoaringBitmapCodec, Index, Prefix, Result}; struct WordPrefixDocids { database: Database, prefix_database: Database, + max_memory_by_thread: Option, } impl WordPrefixDocids { fn new( database: Database, prefix_database: Database, + grenad_parameters: GrenadParameters, ) -> WordPrefixDocids { - WordPrefixDocids { database, prefix_database } + WordPrefixDocids { + database, + prefix_database, + max_memory_by_thread: grenad_parameters.max_memory_by_thread(), + } } fn execute( @@ -51,9 +58,12 @@ impl WordPrefixDocids { // of them and *serialize* them into files. There is one file by CPU. let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads()); prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| { - let refcell = local_entries.get_or_try(|| { - tempfile().map(BufWriter::new).map(|f| RefCell::new((Vec::new(), f, Vec::new()))) - })?; + let refcell = local_entries.get_or(|| { + let file = BufWriter::new(spooled_tempfile( + self.max_memory_by_thread.unwrap_or(usize::MAX), + )); + RefCell::new((Vec::new(), file, Vec::new())) + }); let mut refmut = refcell.borrow_mut_or_yield(); let (ref mut index, ref mut file, ref mut buffer) = *refmut; @@ -144,14 +154,20 @@ unsafe impl<'a, 'rtxn> Sync for FrozenPrefixBitmaps<'a, 'rtxn> {} struct WordPrefixIntegerDocids { database: Database, prefix_database: Database, + max_memory_by_thread: Option, } impl WordPrefixIntegerDocids { fn new( database: Database, prefix_database: Database, + grenad_parameters: GrenadParameters, ) -> WordPrefixIntegerDocids { - WordPrefixIntegerDocids { database, prefix_database } + WordPrefixIntegerDocids { + database, + prefix_database, + max_memory_by_thread: grenad_parameters.max_memory_by_thread(), + } } fn execute( @@ -178,9 +194,12 @@ impl WordPrefixIntegerDocids { // of them and *serialize* them into files. There is one file by CPU. let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads()); prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| { - let refcell = local_entries.get_or_try(|| { - tempfile().map(BufWriter::new).map(|f| RefCell::new((Vec::new(), f, Vec::new()))) - })?; + let refcell = local_entries.get_or(|| { + let file = BufWriter::new(spooled_tempfile( + self.max_memory_by_thread.unwrap_or(usize::MAX), + )); + RefCell::new((Vec::new(), file, Vec::new())) + }); let mut refmut = refcell.borrow_mut_or_yield(); let (ref mut index, ref mut file, ref mut buffer) = *refmut; @@ -292,10 +311,12 @@ pub fn compute_word_prefix_docids( index: &Index, prefix_to_compute: &HashSet, prefix_to_delete: &HashSet, + grenad_parameters: GrenadParameters, ) -> Result<()> { WordPrefixDocids::new( index.word_docids.remap_key_type(), index.word_prefix_docids.remap_key_type(), + grenad_parameters, ) .execute(wtxn, prefix_to_compute, prefix_to_delete) } @@ -306,10 +327,12 @@ pub fn compute_exact_word_prefix_docids( index: &Index, prefix_to_compute: &HashSet, prefix_to_delete: &HashSet, + grenad_parameters: GrenadParameters, ) -> Result<()> { WordPrefixDocids::new( index.exact_word_docids.remap_key_type(), index.exact_word_prefix_docids.remap_key_type(), + grenad_parameters, ) .execute(wtxn, prefix_to_compute, prefix_to_delete) } @@ -320,10 +343,12 @@ pub fn compute_word_prefix_fid_docids( index: &Index, prefix_to_compute: &HashSet, prefix_to_delete: &HashSet, + grenad_parameters: GrenadParameters, ) -> Result<()> { WordPrefixIntegerDocids::new( index.word_fid_docids.remap_key_type(), index.word_prefix_fid_docids.remap_key_type(), + grenad_parameters, ) .execute(wtxn, prefix_to_compute, prefix_to_delete) } @@ -334,10 +359,12 @@ pub fn compute_word_prefix_position_docids( index: &Index, prefix_to_compute: &HashSet, prefix_to_delete: &HashSet, + grenad_parameters: GrenadParameters, ) -> Result<()> { WordPrefixIntegerDocids::new( index.word_position_docids.remap_key_type(), index.word_prefix_position_docids.remap_key_type(), + grenad_parameters, ) .execute(wtxn, prefix_to_compute, prefix_to_delete) }