From 50268b930cd147be9939ebb44112828e2594eee1 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Sat, 1 Mar 2025 23:49:16 +0100 Subject: [PATCH] integrate thread_pool --- crates/milli/src/index.rs | 92 +++++---- .../milli/src/search/new/tests/integration.rs | 7 + .../extract/extract_docid_word_positions.rs | 2 +- .../extract/extract_facet_number_docids.rs | 2 +- .../extract/extract_facet_string_docids.rs | 4 +- .../extract/extract_fid_docid_facet_values.rs | 2 +- .../extract/extract_fid_word_count_docids.rs | 2 +- .../extract/extract_word_docids.rs | 2 +- .../extract_word_pair_proximity_docids.rs | 2 +- .../extract/extract_word_position_docids.rs | 2 +- .../index_documents/helpers/grenad_helpers.rs | 6 +- .../milli/src/update/index_documents/mod.rs | 54 +++++- crates/milli/src/update/indexer_config.rs | 4 +- crates/milli/src/update/new/channel.rs | 94 +++++++--- .../new/extract/faceted/extract_facets.rs | 6 +- .../milli/src/update/new/extract/geo/mod.rs | 6 +- crates/milli/src/update/new/extract/mod.rs | 17 -- .../extract/searchable/extract_word_docids.rs | 6 +- .../extract_word_pair_proximity_docids.rs | 122 ++++++++++-- .../src/update/new/extract/searchable/mod.rs | 141 -------------- .../update/new/indexer/document_changes.rs | 175 ++++++++++++------ .../update/new/indexer/document_deletion.rs | 14 +- .../milli/src/update/new/indexer/extract.rs | 30 ++- crates/milli/src/update/new/indexer/mod.rs | 8 +- .../src/update/new/indexer/partial_dump.rs | 5 +- .../src/update/new/indexer/post_processing.rs | 39 +++- crates/milli/src/update/new/merger.rs | 73 ++++---- .../src/update/new/words_prefix_docids.rs | 36 +++- .../milli/tests/search/facet_distribution.rs | 6 + crates/milli/tests/search/mod.rs | 6 + crates/milli/tests/search/query_criteria.rs | 6 + crates/milli/tests/search/typo_tolerance.rs | 6 + 32 files changed, 610 insertions(+), 367 deletions(-) diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 0550965ed..4d9280dfa 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -1788,6 +1788,7 @@ pub(crate) mod tests { use crate::index::{DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS}; use crate::progress::Progress; use crate::update::new::indexer; + use crate::update::new::indexer::document_changes::CHUNK_SIZE; use crate::update::settings::InnerIndexSettings; use crate::update::{ self, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting, Settings, @@ -1837,7 +1838,7 @@ pub(crate) mod tests { ) -> Result<(), crate::error::Error> { let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { + let pool = match &indexer_config.rayon_thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -1845,6 +1846,11 @@ pub(crate) mod tests { } }; + let thread_pool = match &indexer_config.thread_pool { + Some(thread_pool) => thread_pool, + None => &scoped_thread_pool::ThreadPool::with_available_parallelism("index".into()), + }; + let rtxn = self.inner.read_txn()?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -1864,29 +1870,28 @@ pub(crate) mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + thread_pool, + CHUNK_SIZE, )?; if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) { return Err(error.into()); } - pool.install(|| { - indexer::index( - wtxn, - &self.inner, - &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &Progress::default(), - ) - }) - .unwrap()?; - + indexer::index( + wtxn, + &self.inner, + thread_pool, + &pool, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &Progress::default(), + )?; Ok(()) } @@ -1925,7 +1930,7 @@ pub(crate) mod tests { ) -> Result<(), crate::error::Error> { let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { + let pool = match &indexer_config.rayon_thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -1933,6 +1938,11 @@ pub(crate) mod tests { } }; + let thread_pool = match &indexer_config.thread_pool { + Some(thread_pool) => thread_pool, + None => &scoped_thread_pool::ThreadPool::with_available_parallelism("index".into()), + }; + let rtxn = self.inner.read_txn()?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -1955,28 +1965,28 @@ pub(crate) mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + thread_pool, + CHUNK_SIZE, )?; if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) { return Err(error.into()); } - pool.install(|| { - indexer::index( - wtxn, - &self.inner, - &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &Progress::default(), - ) - }) - .unwrap()?; + indexer::index( + wtxn, + &self.inner, + thread_pool, + &pool, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &Progress::default(), + )?; Ok(()) } @@ -2005,7 +2015,7 @@ pub(crate) mod tests { let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { + let pool = match &indexer_config.rayon_thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -2013,6 +2023,11 @@ pub(crate) mod tests { } }; + let thread_pool = match &indexer_config.thread_pool { + Some(thread_pool) => thread_pool, + None => &scoped_thread_pool::ThreadPool::with_available_parallelism("index".into()), + }; + let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -2036,6 +2051,8 @@ pub(crate) mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + thread_pool, + CHUNK_SIZE, ) .unwrap(); @@ -2046,7 +2063,8 @@ pub(crate) mod tests { indexer::index( &mut wtxn, &index.inner, - &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), + thread_pool, + &pool, indexer_config.grenad_parameters(), &db_fields_ids_map, new_fields_ids_map, diff --git a/crates/milli/src/search/new/tests/integration.rs b/crates/milli/src/search/new/tests/integration.rs index 99d5dc033..5232424cf 100644 --- a/crates/milli/src/search/new/tests/integration.rs +++ b/crates/milli/src/search/new/tests/integration.rs @@ -7,6 +7,7 @@ use maplit::{btreemap, hashset}; use crate::progress::Progress; use crate::update::new::indexer; +use crate::update::new::indexer::document_changes::CHUNK_SIZE; use crate::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use crate::vector::EmbeddingConfigs; use crate::{db_snap, Criterion, Index}; @@ -65,6 +66,9 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { // index documents indexer.add_documents(&payload).unwrap(); + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()); + let indexer_alloc = Bump::new(); let (document_changes, operation_stats, primary_key) = indexer .into_changes( @@ -75,6 +79,8 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); @@ -85,6 +91,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { indexer::index( &mut wtxn, &index, + &thread_pool, &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), config.grenad_parameters(), &db_fields_ids_map, diff --git a/crates/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/crates/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index 606ae6b54..245d94d6a 100644 --- a/crates/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/crates/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -28,7 +28,7 @@ pub fn extract_docid_word_positions( ) -> Result>> { let max_positions_per_attributes = max_positions_per_attributes .map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE)); - let max_memory = indexer.max_memory_by_thread(); + let max_memory = indexer.max_memory_by_rayon_thread(); let force_reindexing = settings_diff.reindex_searchable(); // initialize destination values. diff --git a/crates/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/crates/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index 34bece989..203400488 100644 --- a/crates/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/crates/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -23,7 +23,7 @@ pub fn extract_facet_number_docids( indexer: GrenadParameters, _settings_diff: &InnerIndexSettingsDiff, ) -> Result>> { - let max_memory = indexer.max_memory_by_thread(); + let max_memory = indexer.max_memory_by_rayon_thread(); let mut facet_number_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, diff --git a/crates/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/crates/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index d330ea5a0..5aaad916a 100644 --- a/crates/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/crates/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -55,7 +55,7 @@ fn extract_facet_string_docids_document_update( localized_field_ids: &LocalizedFieldIds, facet_search: bool, ) -> Result<(grenad::Reader>, grenad::Reader>)> { - let max_memory = indexer.max_memory_by_thread(); + let max_memory = indexer.max_memory_by_rayon_thread(); let mut facet_string_docids_sorter = create_sorter( grenad::SortAlgorithm::Stable, @@ -145,7 +145,7 @@ fn extract_facet_string_docids_settings( indexer: GrenadParameters, settings_diff: &InnerIndexSettingsDiff, ) -> Result<(grenad::Reader>, grenad::Reader>)> { - let max_memory = indexer.max_memory_by_thread(); + let max_memory = indexer.max_memory_by_rayon_thread(); let mut facet_string_docids_sorter = create_sorter( grenad::SortAlgorithm::Stable, diff --git a/crates/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/crates/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 88c02fe70..357ff23d6 100644 --- a/crates/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/crates/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -44,7 +44,7 @@ pub fn extract_fid_docid_facet_values( indexer: GrenadParameters, settings_diff: &InnerIndexSettingsDiff, ) -> Result { - let max_memory = indexer.max_memory_by_thread(); + let max_memory = indexer.max_memory_by_rayon_thread(); let mut fid_docid_facet_numbers_sorter = create_sorter( grenad::SortAlgorithm::Stable, diff --git a/crates/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/crates/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs index 5739a5e15..18113293a 100644 --- a/crates/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ b/crates/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs @@ -26,7 +26,7 @@ pub fn extract_fid_word_count_docids( indexer: GrenadParameters, _settings_diff: &InnerIndexSettingsDiff, ) -> Result>> { - let max_memory = indexer.max_memory_by_thread(); + let max_memory = indexer.max_memory_by_rayon_thread(); let mut fid_word_count_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, diff --git a/crates/milli/src/update/index_documents/extract/extract_word_docids.rs b/crates/milli/src/update/index_documents/extract/extract_word_docids.rs index 829da768c..d9ff8b8be 100644 --- a/crates/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/crates/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -35,7 +35,7 @@ pub fn extract_word_docids( grenad::Reader>, grenad::Reader>, )> { - let max_memory = indexer.max_memory_by_thread(); + let max_memory = indexer.max_memory_by_rayon_thread(); let mut word_fid_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, diff --git a/crates/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/crates/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 6194da23d..ff2872655 100644 --- a/crates/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/crates/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -39,7 +39,7 @@ pub fn extract_word_pair_proximity_docids( let any_deletion = settings_diff.old.proximity_precision == ProximityPrecision::ByWord; let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord; - let max_memory = indexer.max_memory_by_thread(); + let max_memory = indexer.max_memory_by_rayon_thread(); let mut word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE) .map(|_| { create_sorter( diff --git a/crates/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/crates/milli/src/update/index_documents/extract/extract_word_position_docids.rs index f870fbe1b..a8ce0dc59 100644 --- a/crates/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/crates/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -24,7 +24,7 @@ pub fn extract_word_position_docids( indexer: GrenadParameters, _settings_diff: &InnerIndexSettingsDiff, ) -> Result>> { - let max_memory = indexer.max_memory_by_thread(); + let max_memory = indexer.max_memory_by_rayon_thread(); let mut word_position_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, diff --git a/crates/milli/src/update/index_documents/helpers/grenad_helpers.rs b/crates/milli/src/update/index_documents/helpers/grenad_helpers.rs index 62dc40edc..97e092732 100644 --- a/crates/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/crates/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -119,7 +119,11 @@ impl GrenadParameters { /// /// This should be called inside of a rayon thread pool, /// otherwise, it will take the global number of threads. - pub fn max_memory_by_thread(&self) -> Option { + pub fn max_memory_by_thread(&self, thread_count: usize) -> Option { + self.max_memory.map(|max_memory| (max_memory / thread_count)) + } + + pub fn max_memory_by_rayon_thread(&self) -> Option { self.max_memory.map(|max_memory| (max_memory / rayon::current_num_threads())) } } diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 154db7875..f37a9d1cf 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -227,7 +227,7 @@ where crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); let backup_pool; - let pool = match self.indexer_config.thread_pool { + let pool = match self.indexer_config.rayon_thread_pool { Some(ref pool) => pool, None => { // We initialize a backup pool with the default @@ -770,6 +770,7 @@ mod tests { use crate::progress::Progress; use crate::search::TermsMatchingStrategy; use crate::update::new::indexer; + use crate::update::new::indexer::document_changes::CHUNK_SIZE; use crate::update::Setting; use crate::{db_snap, Filter, Search, UserError}; @@ -1967,6 +1968,8 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()), + CHUNK_SIZE, ) .unwrap(); @@ -2115,6 +2118,9 @@ mod tests { let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); indexer.add_documents(&documents).unwrap(); indexer.delete_documents(&["2"]); + + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()); let (document_changes, _operation_stats, primary_key) = indexer .into_changes( &indexer_alloc, @@ -2124,12 +2130,15 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index.inner, + &thread_pool, &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), indexer_config.grenad_parameters(), &db_fields_ids_map, @@ -2177,6 +2186,9 @@ mod tests { let indexer_alloc = Bump::new(); let embedders = EmbeddingConfigs::default(); + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()); + let (document_changes, _operation_stats, primary_key) = indexer .into_changes( &indexer_alloc, @@ -2186,12 +2198,15 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index.inner, + &thread_pool, &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), indexer_config.grenad_parameters(), &db_fields_ids_map, @@ -2229,6 +2244,8 @@ mod tests { let embedders = EmbeddingConfigs::default(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); indexer.add_documents(&documents).unwrap(); + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()); let (document_changes, _operation_stats, primary_key) = indexer .into_changes( @@ -2239,12 +2256,15 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index.inner, + &thread_pool, &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), indexer_config.grenad_parameters(), &db_fields_ids_map, @@ -2291,12 +2311,15 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index.inner, + &thread_pool, &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), indexer_config.grenad_parameters(), &db_fields_ids_map, @@ -2327,6 +2350,8 @@ mod tests { let indexer_alloc = Bump::new(); let embedders = EmbeddingConfigs::default(); + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); indexer.delete_documents(&["1", "2"]); @@ -2345,12 +2370,15 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index.inner, + &thread_pool, &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), indexer_config.grenad_parameters(), &db_fields_ids_map, @@ -2382,6 +2410,8 @@ mod tests { let indexer_alloc = Bump::new(); let embedders = EmbeddingConfigs::default(); + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); indexer.delete_documents(&["1", "2", "1", "2"]); @@ -2404,12 +2434,15 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index.inner, + &thread_pool, &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), indexer_config.grenad_parameters(), &db_fields_ids_map, @@ -2440,6 +2473,8 @@ mod tests { let indexer_alloc = Bump::new(); let embedders = EmbeddingConfigs::default(); + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); let documents = documents!([ @@ -2456,12 +2491,15 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index.inner, + &thread_pool, &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), indexer_config.grenad_parameters(), &db_fields_ids_map, @@ -2508,12 +2546,15 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index.inner, + &thread_pool, &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), indexer_config.grenad_parameters(), &db_fields_ids_map, @@ -2683,6 +2724,8 @@ mod tests { let indexer_alloc = Bump::new(); let embedders = EmbeddingConfigs::default(); + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); // OP @@ -2702,12 +2745,15 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index.inner, + &thread_pool, &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), indexer_config.grenad_parameters(), &db_fields_ids_map, @@ -2761,12 +2807,15 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index.inner, + &thread_pool, &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), indexer_config.grenad_parameters(), &db_fields_ids_map, @@ -2817,12 +2866,15 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index.inner, + &thread_pool, &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), indexer_config.grenad_parameters(), &db_fields_ids_map, diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index 6fb33ad78..e18538ffb 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -11,7 +11,8 @@ pub struct IndexerConfig { pub max_memory: Option, pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, - pub thread_pool: Option, + pub rayon_thread_pool: Option, + pub thread_pool: Option>, pub max_positions_per_attributes: Option, pub skip_index_budget: bool, } @@ -36,6 +37,7 @@ impl Default for IndexerConfig { max_memory: None, chunk_compression_type: CompressionType::None, chunk_compression_level: None, + rayon_thread_pool: None, thread_pool: None, max_positions_per_attributes: None, skip_index_budget: false, diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index 3d576a7e5..0665944eb 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -52,12 +52,12 @@ const MAX_FRAME_HEADER_SIZE: usize = 9; /// a message in this queue only if it is empty to avoid filling /// the channel *and* the BBQueue. pub fn extractor_writer_bbqueue<'a>( - thread_pool: &mut scoped_thread_pool::ThreadPool, + thread_pool: &scoped_thread_pool::ThreadPool, bbbuffers: &'a mut Vec, total_bbbuffer_capacity: usize, channel_capacity: usize, ) -> (ExtractorBbqueueSender<'a>, WriterBbqueueReceiver<'a>) { - let current_num_threads = rayon::current_num_threads(); + let current_num_threads = thread_pool.thread_count(); let bbbuffer_capacity = total_bbbuffer_capacity.checked_div(current_num_threads).unwrap(); bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity)); @@ -67,12 +67,18 @@ pub fn extractor_writer_bbqueue<'a>( let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap(); let producers = ThreadLocal::with_capacity(bbbuffers.len()); - let consumers = thread_pool.broadcast(|thread_index| { - let bbqueue: &BBBuffer = &bbbuffers[thread_index]; - let (producer, consumer) = bbqueue.try_split_framed().unwrap(); - producers.get_or(|| FullySend(RefCell::new(producer))); - consumer - }); + let consumers = ThreadLocal::with_capacity(bbbuffers.len()); + thread_pool + .broadcast(|thread_index| { + let bbqueue: &BBBuffer = &bbbuffers[thread_index]; + let (producer, consumer) = bbqueue.try_split_framed().unwrap(); + producers.get_or(|| FullySend(RefCell::new(producer))); + consumers.get_or(|| FullySend(consumer)); + Ok(()) + }) + .map_err(|errors| crate::Error::from_scoped_thread_pool_errors(thread_pool, errors)) + .unwrap(); + let consumers: Vec<_> = consumers.into_iter().map(|consumer| consumer.0).collect(); let sent_messages_attempts = Arc::new(AtomicUsize::new(0)); let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0)); @@ -964,28 +970,70 @@ impl GeoSender<'_, '_> { .map_err(|_| SendError(())) } - pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> crate::Result<()> { - let database = Database::Main; - let value_length = bitmap.serialized_size(); - let key = GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(); - let key_length = key.len().try_into().ok().and_then(NonZeroU16::new).ok_or_else(|| { - InternalError::StorePut { - database_name: database.database_name(), - key: key.into(), - value_length, - error: MdbError::BadValSize.into(), - } - })?; + pub fn set_geo_faceted( + &self, + bitmap: &RoaringBitmap, + thread_pool: &scoped_thread_pool::ThreadPool, + ) -> crate::Result<()> { + let writer = GeoWriter { bitmap, channel: *self }; + thread_pool + .execute(&writer) + .map_err(|errors| crate::Error::from_scoped_thread_pool_errors(thread_pool, errors)) + } +} - self.0.write_key_value_with( +struct GeoWriter<'a, 'b> { + bitmap: &'a RoaringBitmap, + channel: GeoSender<'a, 'b>, +} +impl<'a, 'b> scoped_thread_pool::Workload<'static> for GeoWriter<'a, 'b> { + type Context = (); + + type Error = crate::Error; + + fn context( + &self, + _thread_count: usize, + _thread_index: usize, + ) -> Result { + Ok(()) + } + + fn run_task( + &self, + _thread_count: usize, + thread_index: usize, + task_index: usize, + _context: &mut Self::Context, + ) -> Option> { + if thread_index != 0 || task_index != 0 { + return None; + } + let database = Database::Main; + let value_length = self.bitmap.serialized_size(); + let key = GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(); + let key_length = match key.len().try_into().ok().and_then(NonZeroU16::new) { + Some(key_length) => key_length, + None => { + return Some(Err(InternalError::StorePut { + database_name: database.database_name(), + key: key.into(), + value_length, + error: MdbError::BadValSize.into(), + } + .into())) + } + }; + + Some(self.channel.0.write_key_value_with( database, key_length, value_length, |key_buffer, value_buffer| { key_buffer.copy_from_slice(key); - bitmap.serialize_into(value_buffer)?; + self.bitmap.serialize_into(value_buffer)?; Ok(()) }, - ) + )) } } diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs index 41b6a12a2..34744506a 100644 --- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs @@ -38,7 +38,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a, 'b> fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { Ok(RefCell::new(BalancedCaches::new_in( self.buckets, - self.grenad_parameters.max_memory_by_thread(), + self.grenad_parameters.max_memory_by_thread(self.buckets), extractor_alloc, ))) } @@ -388,6 +388,7 @@ fn truncate_str(s: &str) -> &str { impl FacetedDocidsExtractor { #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( + thread_pool: &scoped_thread_pool::ThreadPool, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, @@ -412,10 +413,11 @@ impl FacetedDocidsExtractor { let extractor = FacetedExtractorData { attributes_to_extract: &attributes_to_extract, grenad_parameters: indexing_context.grenad_parameters, - buckets: rayon::current_num_threads(), + buckets: thread_pool.thread_count(), sender, }; extract( + thread_pool, document_changes, &extractor, indexing_context, diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs index f2af0b229..45f723958 100644 --- a/crates/milli/src/update/new/extract/geo/mod.rs +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -21,6 +21,7 @@ use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result}; pub struct GeoExtractor { grenad_parameters: GrenadParameters, + thread_count: usize, } impl GeoExtractor { @@ -28,11 +29,12 @@ impl GeoExtractor { rtxn: &RoTxn, index: &Index, grenad_parameters: GrenadParameters, + thread_count: usize, ) -> Result> { let is_sortable = index.sortable_fields(rtxn)?.contains(RESERVED_GEO_FIELD_NAME); let is_filterable = index.filterable_fields(rtxn)?.contains(RESERVED_GEO_FIELD_NAME); if is_sortable || is_filterable { - Ok(Some(GeoExtractor { grenad_parameters })) + Ok(Some(GeoExtractor { grenad_parameters, thread_count })) } else { Ok(None) } @@ -157,7 +159,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { ) -> Result<()> { let rtxn = &context.rtxn; let index = context.index; - let max_memory = self.grenad_parameters.max_memory_by_thread(); + let max_memory = self.grenad_parameters.max_memory_by_thread(self.thread_count); let db_fields_ids_map = context.db_fields_ids_map; let mut data_ref = context.data.borrow_mut_or_yield(); diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index aa0a3d333..c27d33a6e 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -5,7 +5,6 @@ mod geo; mod searchable; mod vectors; -use bumpalo::Bump; pub use cache::{ merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, }; @@ -15,22 +14,6 @@ pub use geo::*; pub use searchable::*; pub use vectors::EmbeddingExtractor; -use super::indexer::document_changes::{DocumentChanges, IndexingContext}; -use super::steps::IndexingStep; -use super::thread_local::{FullySend, ThreadLocal}; -use crate::Result; - -pub trait DocidsExtractor { - fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( - document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, - extractor_allocs: &'extractor mut ThreadLocal>, - step: IndexingStep, - ) -> Result>> - where - MSP: Fn() -> bool + Sync; -} - /// TODO move in permissive json pointer pub mod perm_json_p { use serde_json::{Map, Value}; diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs index 49259cd64..c6484a4b8 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -218,7 +218,7 @@ impl<'a, 'extractor> Extractor<'extractor> for WordDocidsExtractorData<'a> { fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { Ok(RefCell::new(Some(WordDocidsBalancedCaches::new_in( self.buckets, - self.grenad_parameters.max_memory_by_thread(), + self.grenad_parameters.max_memory_by_thread(self.buckets), extractor_alloc, )))) } @@ -240,6 +240,7 @@ pub struct WordDocidsExtractors; impl WordDocidsExtractors { pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( + thread_pool: &scoped_thread_pool::ThreadPool, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, @@ -288,10 +289,11 @@ impl WordDocidsExtractors { let extractor = WordDocidsExtractorData { tokenizer: &document_tokenizer, grenad_parameters: indexing_context.grenad_parameters, - buckets: rayon::current_num_threads(), + buckets: thread_pool.thread_count(), }; extract( + thread_pool, document_changes, &extractor, indexing_context, diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index e58c0efd2..4945adf43 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -2,29 +2,62 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::rc::Rc; +use bumpalo::Bump; use heed::RoTxn; -use super::tokenize_document::DocumentTokenizer; -use super::SearchableExtractor; +use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use crate::proximity::{index_proximity, MAX_DISTANCE}; use crate::update::new::document::Document; use crate::update::new::extract::cache::BalancedCaches; -use crate::update::new::indexer::document_changes::DocumentChangeContext; +use crate::update::new::indexer::document_changes::{ + extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, +}; use crate::update::new::ref_cell_ext::RefCellExt as _; +use crate::update::new::steps::IndexingStep; +use crate::update::new::thread_local::{FullySend, ThreadLocal}; use crate::update::new::DocumentChange; -use crate::{FieldId, GlobalFieldsIdsMap, Index, Result}; +use crate::update::GrenadParameters; +use crate::{FieldId, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; -pub struct WordPairProximityDocidsExtractor; +impl<'a, 'extractor> Extractor<'extractor> for WordPairProximityDocidsExtractor<'a> { + type Data = RefCell>; -impl SearchableExtractor for WordPairProximityDocidsExtractor { - fn attributes_to_extract<'a>( - rtxn: &'a RoTxn, - index: &'a Index, - ) -> Result>> { + fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { + Ok(RefCell::new(BalancedCaches::new_in( + self.buckets, + self.grenad_parameters.max_memory_by_thread(self.buckets), + extractor_alloc, + ))) + } + + fn process<'doc>( + &self, + changes: impl Iterator>>, + context: &DocumentChangeContext, + ) -> Result<()> { + for change in changes { + let change = change?; + self.extract_document_change(context, change)?; + } + Ok(()) + } +} + +pub struct WordPairProximityDocidsExtractor<'a> { + tokenizer: &'a DocumentTokenizer<'a>, + grenad_parameters: &'a GrenadParameters, + buckets: usize, +} + +impl<'a> WordPairProximityDocidsExtractor<'a> { + fn attributes_to_extract<'b>( + rtxn: &'b RoTxn, + index: &'b Index, + ) -> Result>> { index.user_defined_searchable_fields(rtxn).map_err(Into::into) } - fn attributes_to_skip<'a>(_rtxn: &'a RoTxn, _index: &'a Index) -> Result> { + fn attributes_to_skip<'b>(_rtxn: &'b RoTxn, _index: &'b Index) -> Result> { Ok(Vec::new()) } @@ -32,10 +65,11 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { // and to store the docids of the documents that have a number of words in a given field // equal to or under than MAX_COUNTED_WORDS. fn extract_document_change( + &self, context: &DocumentChangeContext>, - document_tokenizer: &DocumentTokenizer, document_change: DocumentChange, ) -> Result<()> { + let document_tokenizer = self.tokenizer; let doc_alloc = &context.doc_alloc; let index = context.index; @@ -129,6 +163,70 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { } Ok(()) } + + pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( + thread_pool: &scoped_thread_pool::ThreadPool, + document_changes: &DC, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, + extractor_allocs: &'extractor mut ThreadLocal>, + step: IndexingStep, + ) -> Result>> + where + MSP: Fn() -> bool + Sync, + { + let rtxn = indexing_context.index.read_txn()?; + let stop_words = indexing_context.index.stop_words(&rtxn)?; + let allowed_separators = indexing_context.index.allowed_separators(&rtxn)?; + let allowed_separators: Option> = + allowed_separators.as_ref().map(|s| s.iter().map(String::as_str).collect()); + let dictionary = indexing_context.index.dictionary(&rtxn)?; + let dictionary: Option> = + dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect()); + let mut builder = tokenizer_builder( + stop_words.as_ref(), + allowed_separators.as_deref(), + dictionary.as_deref(), + ); + let tokenizer = builder.build(); + + let attributes_to_extract = Self::attributes_to_extract(&rtxn, indexing_context.index)?; + let attributes_to_skip = Self::attributes_to_skip(&rtxn, indexing_context.index)?; + let localized_attributes_rules = + indexing_context.index.localized_attributes_rules(&rtxn)?.unwrap_or_default(); + + let document_tokenizer = DocumentTokenizer { + tokenizer: &tokenizer, + attribute_to_extract: attributes_to_extract.as_deref(), + attribute_to_skip: attributes_to_skip.as_slice(), + localized_attributes_rules: &localized_attributes_rules, + max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, + }; + + let extractor_data: WordPairProximityDocidsExtractor = WordPairProximityDocidsExtractor { + tokenizer: &document_tokenizer, + grenad_parameters: indexing_context.grenad_parameters, + buckets: thread_pool.thread_count(), + }; + + let datastore = ThreadLocal::new(); + + { + let span = + tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); + let _entered = span.enter(); + extract( + thread_pool, + document_changes, + &extractor_data, + indexing_context, + extractor_allocs, + &datastore, + step, + )?; + } + + Ok(datastore.into_iter().map(RefCell::into_inner).collect()) + } } fn build_key<'a>( diff --git a/crates/milli/src/update/new/extract/searchable/mod.rs b/crates/milli/src/update/new/extract/searchable/mod.rs index 7c949a3ce..bf0607653 100644 --- a/crates/milli/src/update/new/extract/searchable/mod.rs +++ b/crates/milli/src/update/new/extract/searchable/mod.rs @@ -1,146 +1,5 @@ mod extract_word_docids; mod extract_word_pair_proximity_docids; mod tokenize_document; - -use std::cell::RefCell; -use std::marker::PhantomData; - -use bumpalo::Bump; pub use extract_word_docids::{WordDocidsCaches, WordDocidsExtractors}; pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; -use heed::RoTxn; -use tokenize_document::{tokenizer_builder, DocumentTokenizer}; - -use super::cache::BalancedCaches; -use super::DocidsExtractor; -use crate::update::new::indexer::document_changes::{ - extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, -}; -use crate::update::new::steps::IndexingStep; -use crate::update::new::thread_local::{FullySend, ThreadLocal}; -use crate::update::new::DocumentChange; -use crate::update::GrenadParameters; -use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE}; - -pub struct SearchableExtractorData<'a, EX: SearchableExtractor> { - tokenizer: &'a DocumentTokenizer<'a>, - grenad_parameters: &'a GrenadParameters, - buckets: usize, - _ex: PhantomData, -} - -impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> - for SearchableExtractorData<'a, EX> -{ - type Data = RefCell>; - - fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { - Ok(RefCell::new(BalancedCaches::new_in( - self.buckets, - self.grenad_parameters.max_memory_by_thread(), - extractor_alloc, - ))) - } - - fn process<'doc>( - &self, - changes: impl Iterator>>, - context: &DocumentChangeContext, - ) -> Result<()> { - for change in changes { - let change = change?; - EX::extract_document_change(context, self.tokenizer, change)?; - } - Ok(()) - } -} - -pub trait SearchableExtractor: Sized + Sync { - fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( - document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, - extractor_allocs: &'extractor mut ThreadLocal>, - step: IndexingStep, - ) -> Result>> - where - MSP: Fn() -> bool + Sync, - { - let rtxn = indexing_context.index.read_txn()?; - let stop_words = indexing_context.index.stop_words(&rtxn)?; - let allowed_separators = indexing_context.index.allowed_separators(&rtxn)?; - let allowed_separators: Option> = - allowed_separators.as_ref().map(|s| s.iter().map(String::as_str).collect()); - let dictionary = indexing_context.index.dictionary(&rtxn)?; - let dictionary: Option> = - dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect()); - let mut builder = tokenizer_builder( - stop_words.as_ref(), - allowed_separators.as_deref(), - dictionary.as_deref(), - ); - let tokenizer = builder.build(); - - let attributes_to_extract = Self::attributes_to_extract(&rtxn, indexing_context.index)?; - let attributes_to_skip = Self::attributes_to_skip(&rtxn, indexing_context.index)?; - let localized_attributes_rules = - indexing_context.index.localized_attributes_rules(&rtxn)?.unwrap_or_default(); - - let document_tokenizer = DocumentTokenizer { - tokenizer: &tokenizer, - attribute_to_extract: attributes_to_extract.as_deref(), - attribute_to_skip: attributes_to_skip.as_slice(), - localized_attributes_rules: &localized_attributes_rules, - max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, - }; - - let extractor_data: SearchableExtractorData = SearchableExtractorData { - tokenizer: &document_tokenizer, - grenad_parameters: indexing_context.grenad_parameters, - buckets: rayon::current_num_threads(), - _ex: PhantomData, - }; - - let datastore = ThreadLocal::new(); - - { - let span = - tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); - let _entered = span.enter(); - extract( - document_changes, - &extractor_data, - indexing_context, - extractor_allocs, - &datastore, - step, - )?; - } - - Ok(datastore.into_iter().map(RefCell::into_inner).collect()) - } - - fn extract_document_change( - context: &DocumentChangeContext>, - document_tokenizer: &DocumentTokenizer, - document_change: DocumentChange, - ) -> Result<()>; - - fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) - -> Result>>; - - fn attributes_to_skip<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result>; -} - -impl DocidsExtractor for T { - fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( - document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, - extractor_allocs: &'extractor mut ThreadLocal>, - step: IndexingStep, - ) -> Result>> - where - MSP: Fn() -> bool + Sync, - { - Self::run_extraction(document_changes, indexing_context, extractor_allocs, step) - } -} diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index f77ac7658..dfa925b3c 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -1,15 +1,14 @@ use std::cell::{Cell, RefCell}; -use std::sync::atomic::Ordering; +use std::marker::PhantomData; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; use bumpalo::Bump; use heed::RoTxn; -use rayon::iter::IndexedParallelIterator; use super::super::document_change::DocumentChange; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::progress::{AtomicDocumentStep, Progress}; -use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; use crate::update::GrenadParameters; @@ -114,7 +113,7 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload >: Sync { type Item: Send; - fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator>; + fn items(&self, thread_index: usize, task_index: usize) -> Option<&[Self::Item]>; fn len(&self) -> usize; @@ -186,9 +185,10 @@ where } } -const CHUNK_SIZE: usize = 100; +pub const CHUNK_SIZE: usize = 100; -pub fn extract< +struct Extract< + 'shared, // covariant lifetime for shared borrows 'pl, // covariant lifetime of the underlying payload 'extractor, // invariant lifetime of extractor_alloc 'fid, // invariant lifetime of fields ids map @@ -196,31 +196,121 @@ pub fn extract< 'data, // invariant on EX::Data lifetime of datastore 'index, // covariant lifetime of the index EX, + DC, + MSP, +> where DC: DocumentChanges<'pl>, + EX: Extractor<'extractor>, + MSP: Fn() -> bool + Sync, +{ + document_changes: &'shared DC, + extractor: &'shared EX, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, + extractor_allocs: &'extractor ThreadLocal>, + datastore: &'data ThreadLocal, + step: Arc, + _marker: PhantomData<&'pl ()>, +} + +impl< + 'doc, + 'extractor: 'doc, // invariant lifetime of extractor_alloc + 'shared, + 'pl, // covariant lifetime of the underlying payload + 'fid: 'doc, // invariant lifetime of fields ids map + 'indexer: 'doc, // covariant lifetime of objects that are borrowed during the entire indexing + 'data: 'doc, // invariant on EX::Data lifetime of datastore + 'index: 'doc + 'indexer, // covariant lifetime of the index + EX, + DC: DocumentChanges<'pl>, + MSP, + > scoped_thread_pool::Workload<'doc> + for Extract<'shared, 'pl, 'extractor, 'fid, 'indexer, 'data, 'index, EX, DC, MSP> +where + EX: Extractor<'extractor>, + MSP: Fn() -> bool + Sync, +{ + type Context = DocumentChangeContext<'doc, 'extractor, 'fid, 'indexer, EX::Data>; + + type Error = crate::Error; + + fn context( + &self, + _thread_count: usize, + _thread_index: usize, + ) -> std::result::Result< + DocumentChangeContext<'doc, 'extractor, 'fid, 'indexer, EX::Data>, + Self::Error, + > { + let extractor = self.extractor; + DocumentChangeContext::new( + self.indexing_context.index, + self.indexing_context.db_fields_ids_map, + self.indexing_context.new_fields_ids_map, + self.extractor_allocs, + self.indexing_context.doc_allocs, + self.datastore, + self.indexing_context.fields_ids_map_store, + move |index_alloc| extractor.init_data(index_alloc), + ) + } + + fn run_task( + &self, + _thread_count: usize, + thread_index: usize, + task_index: usize, + context: &mut Self::Context, + ) -> Option> { + let items = self.document_changes.items(thread_index, task_index)?; + if (self.indexing_context.must_stop_processing)() { + return Some(Err(InternalError::AbortedIndexation.into())); + } + + // Clean up and reuse the document-specific allocator + context.doc_alloc.reset(); + + let changes = items.iter().filter_map(|item| { + self.document_changes.item_to_document_change(context, item).transpose() + }); + + let res = self.extractor.process(changes, context); + self.step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed); + + // send back the doc_alloc in the pool + context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); + + Some(res) + } +} + +pub fn extract< + 'pool, // invariant lifetime of the thread pool + 'pl, // covariant lifetime of the underlying payload + 'extractor, // invariant lifetime of extractor_alloc + 'fid, // invariant lifetime of fields ids map + 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing + 'data, // invariant on EX::Data lifetime of datastore + 'index, // covariant lifetime of the index + EX, + DC, MSP, >( + thread_pool: &'pool scoped_thread_pool::ThreadPool, document_changes: &DC, extractor: &EX, - IndexingContext { - index, - db_fields_ids_map, - new_fields_ids_map, - doc_allocs, - fields_ids_map_store, - must_stop_processing, - progress, - grenad_parameters: _, - }: IndexingContext<'fid, 'indexer, 'index, MSP>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, datastore: &'data ThreadLocal, step: IndexingStep, ) -> Result<()> where + DC: DocumentChanges<'pl>, EX: Extractor<'extractor>, MSP: Fn() -> bool + Sync, { tracing::trace!("We are resetting the extractor allocators"); - progress.update_progress(step); + indexing_context.progress.update_progress(step); // Clean up and reuse the extractor allocs for extractor_alloc in extractor_allocs.iter_mut() { tracing::trace!("\tWith {} bytes reset", extractor_alloc.0.allocated_bytes()); @@ -229,45 +319,22 @@ where let total_documents = document_changes.len() as u32; let (step, progress_step) = AtomicDocumentStep::new(total_documents); - progress.update_progress(progress_step); + indexing_context.progress.update_progress(progress_step); - let pi = document_changes.iter(CHUNK_SIZE); - pi.try_arc_for_each_try_init( - || { - DocumentChangeContext::new( - index, - db_fields_ids_map, - new_fields_ids_map, - extractor_allocs, - doc_allocs, - datastore, - fields_ids_map_store, - move |index_alloc| extractor.init_data(index_alloc), - ) - }, - |context, items| { - if (must_stop_processing)() { - return Err(Arc::new(InternalError::AbortedIndexation.into())); - } + let extract = Extract { + document_changes, + extractor, + indexing_context, + extractor_allocs, + datastore, + step, + _marker: PhantomData, + }; + thread_pool + .execute(&extract) + .map_err(|errors| crate::Error::from_scoped_thread_pool_errors(thread_pool, errors))?; - // Clean up and reuse the document-specific allocator - context.doc_alloc.reset(); - - let items = items.as_ref(); - let changes = items.iter().filter_map(|item| { - document_changes.item_to_document_change(context, item).transpose() - }); - - let res = extractor.process(changes, context).map_err(Arc::new); - step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed); - - // send back the doc_alloc in the pool - context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); - - res - }, - )?; - step.store(total_documents, Ordering::Relaxed); + extract.step.store(total_documents, Ordering::Relaxed); Ok(()) } diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index 39e35ff34..4defaa497 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -95,7 +95,7 @@ mod test { use crate::index::tests::TempIndex; use crate::progress::Progress; use crate::update::new::indexer::document_changes::{ - extract, DocumentChangeContext, Extractor, IndexingContext, + extract, DocumentChangeContext, Extractor, IndexingContext, CHUNK_SIZE, }; use crate::update::new::indexer::DocumentDeletion; use crate::update::new::steps::IndexingStep; @@ -136,7 +136,7 @@ mod test { } } - let mut thread_pool = + let thread_pool = scoped_thread_pool::ThreadPool::new(NonZeroUsize::new(1).unwrap(), "test".into()); let mut deletions = DocumentDeletion::new(); @@ -159,8 +159,12 @@ mod test { let deletion_tracker = TrackDeletion(PhantomData); - let changes = deletions - .into_changes(&indexer, crate::documents::PrimaryKey::Flat { name: "id", field_id: 0 }); + let changes = deletions.into_changes( + &indexer, + crate::documents::PrimaryKey::Flat { name: "id", field_id: 0 }, + &thread_pool, + CHUNK_SIZE, + ); let context = IndexingContext { index: &index, @@ -177,7 +181,7 @@ mod test { let datastore = ThreadLocal::new(); extract( - &mut thread_pool, + &thread_pool, &changes, &deletion_tracker, context, diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index 53478f029..b33d7b1f2 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -22,6 +22,7 @@ use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; #[allow(clippy::too_many_arguments)] pub(super) fn extract_all<'pl, 'extractor, DC, MSP>( + thread_pool: &scoped_thread_pool::ThreadPool, document_changes: &DC, indexing_context: IndexingContext, indexer_span: Span, @@ -47,11 +48,12 @@ where // document but we need to create a function that collects and compresses documents. let document_sender = extractor_sender.documents(); let document_extractor = DocumentsExtractor::new(document_sender, embedders); - let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + let datastore = ThreadLocal::with_capacity(thread_pool.thread_count()); { let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); let _entered = span.enter(); extract( + thread_pool, document_changes, &document_extractor, indexing_context, @@ -84,6 +86,7 @@ where let _entered = span.enter(); FacetedDocidsExtractor::run_extraction( + thread_pool, document_changes, indexing_context, extractor_allocs, @@ -97,6 +100,7 @@ where let _entered = span.enter(); facet_field_ids_delta = merge_and_send_facet_docids( + thread_pool, caches, FacetDatabases::new(index), index, @@ -118,6 +122,7 @@ where let _entered = span.enter(); WordDocidsExtractors::run_extraction( + thread_pool, document_changes, indexing_context, extractor_allocs, @@ -129,6 +134,7 @@ where let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); let _entered = span.enter(); merge_and_send_docids( + thread_pool, word_docids, index.word_docids.remap_types(), index, @@ -142,6 +148,7 @@ where tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); let _entered = span.enter(); merge_and_send_docids( + thread_pool, word_fid_docids, index.word_fid_docids.remap_types(), index, @@ -155,6 +162,7 @@ where tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); let _entered = span.enter(); merge_and_send_docids( + thread_pool, exact_word_docids, index.exact_word_docids.remap_types(), index, @@ -168,6 +176,7 @@ where tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); let _entered = span.enter(); merge_and_send_docids( + thread_pool, word_position_docids, index.word_position_docids.remap_types(), index, @@ -181,6 +190,7 @@ where tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); let _entered = span.enter(); merge_and_send_docids( + thread_pool, fid_word_count_docids, index.field_id_word_count_docids.remap_types(), index, @@ -198,7 +208,8 @@ where let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); let _entered = span.enter(); - ::run_extraction( + WordPairProximityDocidsExtractor::run_extraction( + thread_pool, document_changes, indexing_context, extractor_allocs, @@ -211,6 +222,7 @@ where let _entered = span.enter(); merge_and_send_docids( + thread_pool, caches, index.word_pair_proximity_docids.remap_types(), index, @@ -232,12 +244,13 @@ where field_distribution, request_threads(), ); - let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + let mut datastore = ThreadLocal::with_capacity(thread_pool.thread_count()); { let span = tracing::debug_span!(target: "indexing::documents::extract", "vectors"); let _entered = span.enter(); extract( + thread_pool, document_changes, &extractor, indexing_context, @@ -263,17 +276,23 @@ where } 'geo: { - let Some(extractor) = GeoExtractor::new(&rtxn, index, *indexing_context.grenad_parameters)? + let Some(extractor) = GeoExtractor::new( + &rtxn, + index, + *indexing_context.grenad_parameters, + thread_pool.thread_count(), + )? else { break 'geo; }; - let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + let datastore = ThreadLocal::with_capacity(thread_pool.thread_count()); { let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); let _entered = span.enter(); extract( + thread_pool, document_changes, &extractor, indexing_context, @@ -289,6 +308,7 @@ where index, extractor_sender.geo(), &indexing_context.must_stop_processing, + thread_pool, )?; } indexing_context.progress.update_progress(IndexingStep::WritingToDatabase); diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index fd196c4f7..144fc61d3 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -44,6 +44,7 @@ static LOG_MEMORY_METRICS_ONCE: Once = Once::new(); pub fn index<'pl, 'indexer, 'index, DC, MSP>( wtxn: &mut RwTxn, index: &'index Index, + thread_pool: &scoped_thread_pool::ThreadPool, pool: &ThreadPoolNoAbort, grenad_parameters: GrenadParameters, db_fields_ids_map: &'indexer FieldsIdsMap, @@ -110,9 +111,9 @@ where let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); let new_fields_ids_map = RwLock::new(new_fields_ids_map); - let fields_ids_map_store = ThreadLocal::with_capacity(rayon::current_num_threads()); - let mut extractor_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); - let doc_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); + let fields_ids_map_store = ThreadLocal::with_capacity(thread_pool.thread_count()); + let mut extractor_allocs = ThreadLocal::with_capacity(thread_pool.thread_count()); + let doc_allocs = ThreadLocal::with_capacity(thread_pool.thread_count()); let indexing_context = IndexingContext { index, @@ -203,6 +204,7 @@ where wtxn, global_fields_ids_map, facet_field_ids_delta, + thread_pool, )?; indexing_context.progress.update_progress(IndexingStep::Finalizing); diff --git a/crates/milli/src/update/new/indexer/partial_dump.rs b/crates/milli/src/update/new/indexer/partial_dump.rs index 3069310bf..eb924b19d 100644 --- a/crates/milli/src/update/new/indexer/partial_dump.rs +++ b/crates/milli/src/update/new/indexer/partial_dump.rs @@ -1,7 +1,6 @@ use std::ops::DerefMut; use bumparaw_collections::RawMap; -use rayon::iter::IndexedParallelIterator; use rustc_hash::FxBuildHasher; use scoped_thread_pool::ThreadPool; use serde_json::value::RawValue; @@ -26,8 +25,8 @@ impl PartialDump { self, concurrent_available_ids: &'index ConcurrentAvailableIds, primary_key: &'index PrimaryKey, - thread_pool: &ThreadPool, - chunk_size: usize, + _thread_pool: &ThreadPool, + _chunk_size: usize, ) -> PartialDumpChanges<'index> { // Note for future self: // - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items). diff --git a/crates/milli/src/update/new/indexer/post_processing.rs b/crates/milli/src/update/new/indexer/post_processing.rs index 201ab9ec9..3457126cb 100644 --- a/crates/milli/src/update/new/indexer/post_processing.rs +++ b/crates/milli/src/update/new/indexer/post_processing.rs @@ -27,6 +27,7 @@ pub(super) fn post_process( wtxn: &mut RwTxn<'_>, global_fields_ids_map: GlobalFieldsIdsMap<'_>, facet_field_ids_delta: FacetFieldIdsDelta, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> Result<()> where MSP: Fn() -> bool + Sync, @@ -39,7 +40,13 @@ where compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; indexing_context.progress.update_progress(IndexingStep::PostProcessingWords); if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { - compute_prefix_database(index, wtxn, prefix_delta, indexing_context.grenad_parameters)?; + compute_prefix_database( + index, + wtxn, + prefix_delta, + indexing_context.grenad_parameters, + thread_pool, + )?; }; Ok(()) } @@ -50,16 +57,38 @@ fn compute_prefix_database( wtxn: &mut RwTxn, prefix_delta: PrefixDelta, grenad_parameters: &GrenadParameters, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> Result<()> { let PrefixDelta { modified, deleted } = prefix_delta; // Compute word prefix docids - compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; + compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters, thread_pool)?; // Compute exact word prefix docids - compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; + compute_exact_word_prefix_docids( + wtxn, + index, + &modified, + &deleted, + grenad_parameters, + thread_pool, + )?; // Compute word prefix fid docids - compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; + compute_word_prefix_fid_docids( + wtxn, + index, + &modified, + &deleted, + grenad_parameters, + thread_pool, + )?; // Compute word prefix position docids - compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters) + compute_word_prefix_position_docids( + wtxn, + index, + &modified, + &deleted, + grenad_parameters, + thread_pool, + ) } #[tracing::instrument(level = "trace", skip_all, target = "indexing")] diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 10e9b77c7..274469d80 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -22,6 +22,7 @@ pub fn merge_and_send_rtree<'extractor, MSP>( index: &Index, geo_sender: GeoSender<'_, '_>, must_stop_processing: &MSP, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> Result<()> where MSP: Fn() -> bool + Sync, @@ -57,14 +58,14 @@ where let rtree_mmap = unsafe { Mmap::map(&file)? }; geo_sender.set_rtree(rtree_mmap).unwrap(); - geo_sender.set_geo_faceted(&faceted)?; + geo_sender.set_geo_faceted(&faceted, thread_pool)?; Ok(()) } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] pub fn merge_and_send_docids<'extractor, MSP, D>( - thread_pool: &mut scoped_thread_pool::ThreadPool, + thread_pool: &scoped_thread_pool::ThreadPool, mut caches: Vec>, database: Database, index: &Index, @@ -96,7 +97,7 @@ where } Operation::Ignore => Ok(()), } - }); + })?; Ok(()) }) { Ok(()) => Ok(()), @@ -106,7 +107,7 @@ where #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] pub fn merge_and_send_facet_docids<'extractor>( - thread_pool: &mut scoped_thread_pool::ThreadPool, + thread_pool: &scoped_thread_pool::ThreadPool, mut caches: Vec>, database: FacetDatabases, index: &Index, @@ -119,39 +120,43 @@ pub fn merge_and_send_facet_docids<'extractor>( let max_number_count = max_number_count.clamp(1000, 100_000); let transposed_frozen_caches = Mutex::new(transpose_and_freeze_caches(&mut caches)?); let output = Mutex::new(FacetFieldIdsDelta::new(max_string_count, max_number_count)); - thread_pool.broadcast(|thread_index| { - // TODO: we can probably spare the mutex here since it is guaranteed that each thread will access its own cell of the vec - let frozen = - std::mem::take(transposed_frozen_caches.lock().unwrap().get_mut(thread_index).unwrap()); + thread_pool + .broadcast(|thread_index| { + // TODO: we can probably spare the mutex here since it is guaranteed that each thread will access its own cell of the vec + let frozen = std::mem::take( + transposed_frozen_caches.lock().unwrap().get_mut(thread_index).unwrap(), + ); - let mut facet_field_ids_delta = FacetFieldIdsDelta::new(max_string_count, max_number_count); - let rtxn = index.read_txn()?; - merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { - let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; - match merge_cbo_bitmaps(current, del, add)? { - Operation::Write(bitmap) => { - facet_field_ids_delta.register_from_key(key); - docids_sender.write(key, &bitmap)?; - Ok(()) + let mut facet_field_ids_delta = + FacetFieldIdsDelta::new(max_string_count, max_number_count); + let rtxn = index.read_txn()?; + merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { + let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; + match merge_cbo_bitmaps(current, del, add)? { + Operation::Write(bitmap) => { + facet_field_ids_delta.register_from_key(key); + docids_sender.write(key, &bitmap)?; + Ok(()) + } + Operation::Delete => { + facet_field_ids_delta.register_from_key(key); + docids_sender.delete(key)?; + Ok(()) + } + Operation::Ignore => Ok(()), } - Operation::Delete => { - facet_field_ids_delta.register_from_key(key); - docids_sender.delete(key)?; - Ok(()) - } - Operation::Ignore => Ok(()), + })?; + { + let mut common = output.lock().unwrap(); + *common = std::mem::replace( + &mut *common, + FacetFieldIdsDelta::new(max_string_count, max_number_count), + ) + .merge(facet_field_ids_delta); } - })?; - { - let mut common = output.lock().unwrap(); - *common = std::mem::replace( - &mut *common, - FacetFieldIdsDelta::new(max_string_count, max_number_count), - ) - .merge(facet_field_ids_delta); - } - Ok(()) - }); + Ok(()) + }) + .map_err(|errors| crate::Error::from_scoped_thread_pool_errors(thread_pool, errors))?; Ok(output.into_inner().unwrap()) } diff --git a/crates/milli/src/update/new/words_prefix_docids.rs b/crates/milli/src/update/new/words_prefix_docids.rs index 7ba2b9b71..73aaa21cd 100644 --- a/crates/milli/src/update/new/words_prefix_docids.rs +++ b/crates/milli/src/update/new/words_prefix_docids.rs @@ -26,11 +26,13 @@ impl WordPrefixDocids { database: Database, prefix_database: Database, grenad_parameters: &GrenadParameters, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> WordPrefixDocids { WordPrefixDocids { database, prefix_database, - max_memory_by_thread: grenad_parameters.max_memory_by_thread(), + max_memory_by_thread: grenad_parameters + .max_memory_by_thread(thread_pool.thread_count()), } } @@ -39,9 +41,10 @@ impl WordPrefixDocids { wtxn: &mut heed::RwTxn, prefix_to_compute: &BTreeSet, prefix_to_delete: &BTreeSet, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> Result<()> { delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?; - self.recompute_modified_prefixes(wtxn, prefix_to_compute) + self.recompute_modified_prefixes(wtxn, prefix_to_compute, thread_pool) } #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] @@ -49,6 +52,7 @@ impl WordPrefixDocids { &self, wtxn: &mut RwTxn, prefixes: &BTreeSet, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> Result<()> { // We fetch the docids associated to the newly added word prefix fst only. // And collect the CboRoaringBitmaps pointers in an HashMap. @@ -56,7 +60,7 @@ impl WordPrefixDocids { // We access this HashMap in parallel to compute the *union* of all // of them and *serialize* them into files. There is one file by CPU. - let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads()); + let local_entries = ThreadLocal::with_capacity(thread_pool.thread_count()); prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| { let refcell = local_entries.get_or(|| { let file = BufWriter::new(spooled_tempfile( @@ -162,11 +166,13 @@ impl WordPrefixIntegerDocids { database: Database, prefix_database: Database, grenad_parameters: &GrenadParameters, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> WordPrefixIntegerDocids { WordPrefixIntegerDocids { database, prefix_database, - max_memory_by_thread: grenad_parameters.max_memory_by_thread(), + max_memory_by_thread: grenad_parameters + .max_memory_by_thread(thread_pool.thread_count()), } } @@ -175,9 +181,10 @@ impl WordPrefixIntegerDocids { wtxn: &mut heed::RwTxn, prefix_to_compute: &BTreeSet, prefix_to_delete: &BTreeSet, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> Result<()> { delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?; - self.recompute_modified_prefixes(wtxn, prefix_to_compute) + self.recompute_modified_prefixes(wtxn, prefix_to_compute, thread_pool) } #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] @@ -185,6 +192,7 @@ impl WordPrefixIntegerDocids { &self, wtxn: &mut RwTxn, prefixes: &BTreeSet, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> Result<()> { // We fetch the docids associated to the newly added word prefix fst only. // And collect the CboRoaringBitmaps pointers in an HashMap. @@ -192,7 +200,7 @@ impl WordPrefixIntegerDocids { // We access this HashMap in parallel to compute the *union* of all // of them and *serialize* them into files. There is one file by CPU. - let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads()); + let local_entries = ThreadLocal::with_capacity(thread_pool.thread_count()); prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| { let refcell = local_entries.get_or(|| { let file = BufWriter::new(spooled_tempfile( @@ -312,13 +320,15 @@ pub fn compute_word_prefix_docids( prefix_to_compute: &BTreeSet, prefix_to_delete: &BTreeSet, grenad_parameters: &GrenadParameters, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> Result<()> { WordPrefixDocids::new( index.word_docids.remap_key_type(), index.word_prefix_docids.remap_key_type(), grenad_parameters, + thread_pool, ) - .execute(wtxn, prefix_to_compute, prefix_to_delete) + .execute(wtxn, prefix_to_compute, prefix_to_delete, thread_pool) } #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] @@ -328,13 +338,15 @@ pub fn compute_exact_word_prefix_docids( prefix_to_compute: &BTreeSet, prefix_to_delete: &BTreeSet, grenad_parameters: &GrenadParameters, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> Result<()> { WordPrefixDocids::new( index.exact_word_docids.remap_key_type(), index.exact_word_prefix_docids.remap_key_type(), grenad_parameters, + thread_pool, ) - .execute(wtxn, prefix_to_compute, prefix_to_delete) + .execute(wtxn, prefix_to_compute, prefix_to_delete, thread_pool) } #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] @@ -344,13 +356,15 @@ pub fn compute_word_prefix_fid_docids( prefix_to_compute: &BTreeSet, prefix_to_delete: &BTreeSet, grenad_parameters: &GrenadParameters, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> Result<()> { WordPrefixIntegerDocids::new( index.word_fid_docids.remap_key_type(), index.word_prefix_fid_docids.remap_key_type(), grenad_parameters, + thread_pool, ) - .execute(wtxn, prefix_to_compute, prefix_to_delete) + .execute(wtxn, prefix_to_compute, prefix_to_delete, thread_pool) } #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] @@ -360,11 +374,13 @@ pub fn compute_word_prefix_position_docids( prefix_to_compute: &BTreeSet, prefix_to_delete: &BTreeSet, grenad_parameters: &GrenadParameters, + thread_pool: &scoped_thread_pool::ThreadPool, ) -> Result<()> { WordPrefixIntegerDocids::new( index.word_position_docids.remap_key_type(), index.word_prefix_position_docids.remap_key_type(), grenad_parameters, + thread_pool, ) - .execute(wtxn, prefix_to_compute, prefix_to_delete) + .execute(wtxn, prefix_to_compute, prefix_to_delete, thread_pool) } diff --git a/crates/milli/tests/search/facet_distribution.rs b/crates/milli/tests/search/facet_distribution.rs index db9f86357..4529df7d2 100644 --- a/crates/milli/tests/search/facet_distribution.rs +++ b/crates/milli/tests/search/facet_distribution.rs @@ -5,6 +5,7 @@ use maplit::hashset; use milli::documents::mmap_from_objects; use milli::progress::Progress; use milli::update::new::indexer; +use milli::update::new::indexer::document_changes::CHUNK_SIZE; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::vector::EmbeddingConfigs; use milli::{FacetDistribution, Index, Object, OrderBy}; @@ -36,6 +37,8 @@ fn test_facet_distribution_with_no_facet_values() { let mut new_fields_ids_map = db_fields_ids_map.clone(); let embedders = EmbeddingConfigs::default(); + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let doc1: Object = from_value( @@ -59,12 +62,15 @@ fn test_facet_distribution_with_no_facet_values() { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index, + &thread_pool, &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), config.grenad_parameters(), &db_fields_ids_map, diff --git a/crates/milli/tests/search/mod.rs b/crates/milli/tests/search/mod.rs index 662715638..34bb0dbc9 100644 --- a/crates/milli/tests/search/mod.rs +++ b/crates/milli/tests/search/mod.rs @@ -9,6 +9,7 @@ use heed::EnvOpenOptions; use maplit::{btreemap, hashset}; use milli::progress::Progress; use milli::update::new::indexer; +use milli::update::new::indexer::document_changes::CHUNK_SIZE; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::vector::EmbeddingConfigs; use milli::{AscDesc, Criterion, DocumentId, Index, Member, TermsMatchingStrategy}; @@ -72,6 +73,8 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { let mut new_fields_ids_map = db_fields_ids_map.clone(); let embedders = EmbeddingConfigs::default(); + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let mut file = tempfile::tempfile().unwrap(); @@ -92,6 +95,8 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); @@ -102,6 +107,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { indexer::index( &mut wtxn, &index, + &thread_pool, &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), config.grenad_parameters(), &db_fields_ids_map, diff --git a/crates/milli/tests/search/query_criteria.rs b/crates/milli/tests/search/query_criteria.rs index d47c9539d..8e4c67c2d 100644 --- a/crates/milli/tests/search/query_criteria.rs +++ b/crates/milli/tests/search/query_criteria.rs @@ -7,6 +7,7 @@ use itertools::Itertools; use maplit::hashset; use milli::progress::Progress; use milli::update::new::indexer; +use milli::update::new::indexer::document_changes::CHUNK_SIZE; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::vector::EmbeddingConfigs; use milli::{AscDesc, Criterion, Index, Member, Search, SearchResult, TermsMatchingStrategy}; @@ -288,6 +289,8 @@ fn criteria_ascdesc() { let mut new_fields_ids_map = db_fields_ids_map.clone(); let embedders = EmbeddingConfigs::default(); + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let mut file = tempfile::tempfile().unwrap(); @@ -328,12 +331,15 @@ fn criteria_ascdesc() { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index, + &thread_pool, &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), config.grenad_parameters(), &db_fields_ids_map, diff --git a/crates/milli/tests/search/typo_tolerance.rs b/crates/milli/tests/search/typo_tolerance.rs index b640fa910..f4cbe083f 100644 --- a/crates/milli/tests/search/typo_tolerance.rs +++ b/crates/milli/tests/search/typo_tolerance.rs @@ -5,6 +5,7 @@ use heed::EnvOpenOptions; use milli::documents::mmap_from_objects; use milli::progress::Progress; use milli::update::new::indexer; +use milli::update::new::indexer::document_changes::CHUNK_SIZE; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::vector::EmbeddingConfigs; use milli::{Criterion, Index, Object, Search, TermsMatchingStrategy}; @@ -123,6 +124,8 @@ fn test_typo_disabled_on_word() { let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); let embedders = EmbeddingConfigs::default(); + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); indexer.add_documents(&documents).unwrap(); @@ -137,12 +140,15 @@ fn test_typo_disabled_on_word() { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index, + &thread_pool, &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), config.grenad_parameters(), &db_fields_ids_map,