diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index feba205bf..f454269f6 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -426,21 +426,38 @@ impl WordDocidsMergerBuilders { current_docid: _, } = other; - let sorter = word_fid_docids.into_sorter()?; - let readers = sorter.into_reader_cursors()?; - self.word_fid_docids.extend(readers); - let sorter = word_docids.into_sorter()?; - let readers = sorter.into_reader_cursors()?; - self.word_docids.extend(readers); - let sorter = exact_word_docids.into_sorter()?; - let readers = sorter.into_reader_cursors()?; - self.exact_word_docids.extend(readers); - let sorter = word_position_docids.into_sorter()?; - let readers = sorter.into_reader_cursors()?; - self.word_position_docids.extend(readers); - let sorter = fid_word_count_docids.into_sorter()?; - let readers = sorter.into_reader_cursors()?; - self.fid_word_count_docids.extend(readers); + let mut word_fid_docids_readers = Ok(vec![]); + let mut word_docids_readers = Ok(vec![]); + let mut exact_word_docids_readers = Ok(vec![]); + let mut word_position_docids_readers = Ok(vec![]); + let mut fid_word_count_docids_readers = Ok(vec![]); + rayon::scope(|s| { + s.spawn(|_| { + word_fid_docids_readers = + word_fid_docids.into_sorter().and_then(|s| s.into_reader_cursors()); + }); + s.spawn(|_| { + word_docids_readers = + word_docids.into_sorter().and_then(|s| s.into_reader_cursors()); + }); + s.spawn(|_| { + exact_word_docids_readers = + exact_word_docids.into_sorter().and_then(|s| s.into_reader_cursors()); + }); + s.spawn(|_| { + word_position_docids_readers = + word_position_docids.into_sorter().and_then(|s| s.into_reader_cursors()); + }); + s.spawn(|_| { + fid_word_count_docids_readers = + fid_word_count_docids.into_sorter().and_then(|s| s.into_reader_cursors()); + }); + }); + self.word_fid_docids.extend(word_fid_docids_readers?); + self.word_docids.extend(word_docids_readers?); + self.exact_word_docids.extend(exact_word_docids_readers?); + self.word_position_docids.extend(word_position_docids_readers?); + self.fid_word_count_docids.extend(fid_word_count_docids_readers?); Ok(()) } @@ -509,25 +526,35 @@ impl WordDocidsExtractors { )) }); - document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, - ) - }) - })?; - - let mut builder = WordDocidsMergerBuilders::new(); - for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() { - builder.add_sorters(cache)?; + { + let span = + tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); + let _entered = span.enter(); + document_changes.into_par_iter().try_for_each(|document_change| { + context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { + Self::extract_document_change( + &*rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + }) + })?; } - Ok(builder.build()) + { + let span = + tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); + let _entered = span.enter(); + let mut builder = WordDocidsMergerBuilders::new(); + for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() { + builder.add_sorters(cache)?; + } + + Ok(builder.build()) + } } fn extract_document_change( diff --git a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index f0d53833b..7b3706424 100644 --- a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -107,7 +107,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { cached_sorter.insert_add_u32(key, docid)?; } } - }; + } } Ok(()) diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 468fded9a..7e096591e 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -13,7 +13,7 @@ pub use extract_word_docids::{ pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; use grenad::Merger; use heed::RoTxn; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; @@ -78,27 +78,42 @@ pub trait SearchableExtractor { )) }); - document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, - ) - }) - })?; - - let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); - for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() { - let sorter = cache.into_sorter()?; - let readers = sorter.into_reader_cursors()?; - builder.extend(readers); + { + let span = + tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); + let _entered = span.enter(); + document_changes.into_par_iter().try_for_each(|document_change| { + context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { + Self::extract_document_change( + &*rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + }) + })?; } + { + let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); + let span = + tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); + let _entered = span.enter(); - Ok(builder.build()) + let readers: Vec<_> = context_pool + .into_items() + .par_bridge() + .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { + let sorter = cached_sorter.into_sorter()?; + sorter.into_reader_cursors() + }) + .collect(); + for reader in readers { + builder.extend(reader?); + } + Ok(builder.build()) + } } fn extract_document_change(