integrate thread_pool

This commit is contained in:
Louis Dureuil 2025-03-01 23:49:16 +01:00
parent 93ba4b924a
commit 50268b930c
32 changed files with 610 additions and 367 deletions

View File

@ -1788,6 +1788,7 @@ pub(crate) mod tests {
use crate::index::{DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS}; use crate::index::{DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS};
use crate::progress::Progress; use crate::progress::Progress;
use crate::update::new::indexer; use crate::update::new::indexer;
use crate::update::new::indexer::document_changes::CHUNK_SIZE;
use crate::update::settings::InnerIndexSettings; use crate::update::settings::InnerIndexSettings;
use crate::update::{ use crate::update::{
self, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting, Settings, self, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting, Settings,
@ -1837,7 +1838,7 @@ pub(crate) mod tests {
) -> Result<(), crate::error::Error> { ) -> Result<(), crate::error::Error> {
let local_pool; let local_pool;
let indexer_config = &self.indexer_config; let indexer_config = &self.indexer_config;
let pool = match &indexer_config.thread_pool { let pool = match &indexer_config.rayon_thread_pool {
Some(pool) => pool, Some(pool) => pool,
None => { None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
@ -1845,6 +1846,11 @@ pub(crate) mod tests {
} }
}; };
let thread_pool = match &indexer_config.thread_pool {
Some(thread_pool) => thread_pool,
None => &scoped_thread_pool::ThreadPool::with_available_parallelism("index".into()),
};
let rtxn = self.inner.read_txn()?; let rtxn = self.inner.read_txn()?;
let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?;
let mut new_fields_ids_map = db_fields_ids_map.clone(); let mut new_fields_ids_map = db_fields_ids_map.clone();
@ -1864,17 +1870,19 @@ pub(crate) mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
thread_pool,
CHUNK_SIZE,
)?; )?;
if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) { if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) {
return Err(error.into()); return Err(error.into());
} }
pool.install(|| {
indexer::index( indexer::index(
wtxn, wtxn,
&self.inner, &self.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), thread_pool,
&pool,
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1883,10 +1891,7 @@ pub(crate) mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
) )?;
})
.unwrap()?;
Ok(()) Ok(())
} }
@ -1925,7 +1930,7 @@ pub(crate) mod tests {
) -> Result<(), crate::error::Error> { ) -> Result<(), crate::error::Error> {
let local_pool; let local_pool;
let indexer_config = &self.indexer_config; let indexer_config = &self.indexer_config;
let pool = match &indexer_config.thread_pool { let pool = match &indexer_config.rayon_thread_pool {
Some(pool) => pool, Some(pool) => pool,
None => { None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
@ -1933,6 +1938,11 @@ pub(crate) mod tests {
} }
}; };
let thread_pool = match &indexer_config.thread_pool {
Some(thread_pool) => thread_pool,
None => &scoped_thread_pool::ThreadPool::with_available_parallelism("index".into()),
};
let rtxn = self.inner.read_txn()?; let rtxn = self.inner.read_txn()?;
let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?;
let mut new_fields_ids_map = db_fields_ids_map.clone(); let mut new_fields_ids_map = db_fields_ids_map.clone();
@ -1955,17 +1965,19 @@ pub(crate) mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
thread_pool,
CHUNK_SIZE,
)?; )?;
if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) { if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) {
return Err(error.into()); return Err(error.into());
} }
pool.install(|| {
indexer::index( indexer::index(
wtxn, wtxn,
&self.inner, &self.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), thread_pool,
&pool,
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1974,9 +1986,7 @@ pub(crate) mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
) )?;
})
.unwrap()?;
Ok(()) Ok(())
} }
@ -2005,7 +2015,7 @@ pub(crate) mod tests {
let local_pool; let local_pool;
let indexer_config = &index.indexer_config; let indexer_config = &index.indexer_config;
let pool = match &indexer_config.thread_pool { let pool = match &indexer_config.rayon_thread_pool {
Some(pool) => pool, Some(pool) => pool,
None => { None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
@ -2013,6 +2023,11 @@ pub(crate) mod tests {
} }
}; };
let thread_pool = match &indexer_config.thread_pool {
Some(thread_pool) => thread_pool,
None => &scoped_thread_pool::ThreadPool::with_available_parallelism("index".into()),
};
let rtxn = index.inner.read_txn().unwrap(); let rtxn = index.inner.read_txn().unwrap();
let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap();
let mut new_fields_ids_map = db_fields_ids_map.clone(); let mut new_fields_ids_map = db_fields_ids_map.clone();
@ -2036,6 +2051,8 @@ pub(crate) mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
@ -2046,7 +2063,8 @@ pub(crate) mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), thread_pool,
&pool,
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,

View File

@ -7,6 +7,7 @@ use maplit::{btreemap, hashset};
use crate::progress::Progress; use crate::progress::Progress;
use crate::update::new::indexer; use crate::update::new::indexer;
use crate::update::new::indexer::document_changes::CHUNK_SIZE;
use crate::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use crate::update::{IndexDocumentsMethod, IndexerConfig, Settings};
use crate::vector::EmbeddingConfigs; use crate::vector::EmbeddingConfigs;
use crate::{db_snap, Criterion, Index}; use crate::{db_snap, Criterion, Index};
@ -65,6 +66,9 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
// index documents // index documents
indexer.add_documents(&payload).unwrap(); indexer.add_documents(&payload).unwrap();
let thread_pool =
scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string());
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let (document_changes, operation_stats, primary_key) = indexer let (document_changes, operation_stats, primary_key) = indexer
.into_changes( .into_changes(
@ -75,6 +79,8 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
@ -85,6 +91,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&thread_pool,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,

View File

@ -28,7 +28,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
let max_positions_per_attributes = max_positions_per_attributes let max_positions_per_attributes = max_positions_per_attributes
.map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE)); .map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE));
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_rayon_thread();
let force_reindexing = settings_diff.reindex_searchable(); let force_reindexing = settings_diff.reindex_searchable();
// initialize destination values. // initialize destination values.

View File

@ -23,7 +23,7 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff, _settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_rayon_thread();
let mut facet_number_docids_sorter = create_sorter( let mut facet_number_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,

View File

@ -55,7 +55,7 @@ fn extract_facet_string_docids_document_update<R: io::Read + io::Seek>(
localized_field_ids: &LocalizedFieldIds, localized_field_ids: &LocalizedFieldIds,
facet_search: bool, facet_search: bool,
) -> Result<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> { ) -> Result<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> {
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_rayon_thread();
let mut facet_string_docids_sorter = create_sorter( let mut facet_string_docids_sorter = create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
@ -145,7 +145,7 @@ fn extract_facet_string_docids_settings<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
settings_diff: &InnerIndexSettingsDiff, settings_diff: &InnerIndexSettingsDiff,
) -> Result<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> { ) -> Result<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> {
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_rayon_thread();
let mut facet_string_docids_sorter = create_sorter( let mut facet_string_docids_sorter = create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,

View File

@ -44,7 +44,7 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
settings_diff: &InnerIndexSettingsDiff, settings_diff: &InnerIndexSettingsDiff,
) -> Result<ExtractedFacetValues> { ) -> Result<ExtractedFacetValues> {
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_rayon_thread();
let mut fid_docid_facet_numbers_sorter = create_sorter( let mut fid_docid_facet_numbers_sorter = create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,

View File

@ -26,7 +26,7 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff, _settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_rayon_thread();
let mut fid_word_count_docids_sorter = create_sorter( let mut fid_word_count_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,

View File

@ -35,7 +35,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>,
grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>,
)> { )> {
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_rayon_thread();
let mut word_fid_docids_sorter = create_sorter( let mut word_fid_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,

View File

@ -39,7 +39,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
let any_deletion = settings_diff.old.proximity_precision == ProximityPrecision::ByWord; let any_deletion = settings_diff.old.proximity_precision == ProximityPrecision::ByWord;
let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord; let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord;
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_rayon_thread();
let mut word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE) let mut word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE)
.map(|_| { .map(|_| {
create_sorter( create_sorter(

View File

@ -24,7 +24,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff, _settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_rayon_thread();
let mut word_position_docids_sorter = create_sorter( let mut word_position_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,

View File

@ -119,7 +119,11 @@ impl GrenadParameters {
/// ///
/// This should be called inside of a rayon thread pool, /// This should be called inside of a rayon thread pool,
/// otherwise, it will take the global number of threads. /// otherwise, it will take the global number of threads.
pub fn max_memory_by_thread(&self) -> Option<usize> { pub fn max_memory_by_thread(&self, thread_count: usize) -> Option<usize> {
self.max_memory.map(|max_memory| (max_memory / thread_count))
}
pub fn max_memory_by_rayon_thread(&self) -> Option<usize> {
self.max_memory.map(|max_memory| (max_memory / rayon::current_num_threads())) self.max_memory.map(|max_memory| (max_memory / rayon::current_num_threads()))
} }
} }

View File

@ -227,7 +227,7 @@ where
crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution);
let backup_pool; let backup_pool;
let pool = match self.indexer_config.thread_pool { let pool = match self.indexer_config.rayon_thread_pool {
Some(ref pool) => pool, Some(ref pool) => pool,
None => { None => {
// We initialize a backup pool with the default // We initialize a backup pool with the default
@ -770,6 +770,7 @@ mod tests {
use crate::progress::Progress; use crate::progress::Progress;
use crate::search::TermsMatchingStrategy; use crate::search::TermsMatchingStrategy;
use crate::update::new::indexer; use crate::update::new::indexer;
use crate::update::new::indexer::document_changes::CHUNK_SIZE;
use crate::update::Setting; use crate::update::Setting;
use crate::{db_snap, Filter, Search, UserError}; use crate::{db_snap, Filter, Search, UserError};
@ -1967,6 +1968,8 @@ mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string()),
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
@ -2115,6 +2118,9 @@ mod tests {
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
indexer.add_documents(&documents).unwrap(); indexer.add_documents(&documents).unwrap();
indexer.delete_documents(&["2"]); indexer.delete_documents(&["2"]);
let thread_pool =
scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string());
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
.into_changes( .into_changes(
&indexer_alloc, &indexer_alloc,
@ -2124,12 +2130,15 @@ mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&thread_pool,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
@ -2177,6 +2186,9 @@ mod tests {
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let embedders = EmbeddingConfigs::default(); let embedders = EmbeddingConfigs::default();
let thread_pool =
scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string());
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
.into_changes( .into_changes(
&indexer_alloc, &indexer_alloc,
@ -2186,12 +2198,15 @@ mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&thread_pool,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
@ -2229,6 +2244,8 @@ mod tests {
let embedders = EmbeddingConfigs::default(); let embedders = EmbeddingConfigs::default();
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments);
indexer.add_documents(&documents).unwrap(); indexer.add_documents(&documents).unwrap();
let thread_pool =
scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string());
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
.into_changes( .into_changes(
@ -2239,12 +2256,15 @@ mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&thread_pool,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
@ -2291,12 +2311,15 @@ mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&thread_pool,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
@ -2327,6 +2350,8 @@ mod tests {
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let embedders = EmbeddingConfigs::default(); let embedders = EmbeddingConfigs::default();
let thread_pool =
scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string());
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments);
indexer.delete_documents(&["1", "2"]); indexer.delete_documents(&["1", "2"]);
@ -2345,12 +2370,15 @@ mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&thread_pool,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
@ -2382,6 +2410,8 @@ mod tests {
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let embedders = EmbeddingConfigs::default(); let embedders = EmbeddingConfigs::default();
let thread_pool =
scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string());
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments);
indexer.delete_documents(&["1", "2", "1", "2"]); indexer.delete_documents(&["1", "2", "1", "2"]);
@ -2404,12 +2434,15 @@ mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&thread_pool,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
@ -2440,6 +2473,8 @@ mod tests {
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let embedders = EmbeddingConfigs::default(); let embedders = EmbeddingConfigs::default();
let thread_pool =
scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string());
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments);
let documents = documents!([ let documents = documents!([
@ -2456,12 +2491,15 @@ mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&thread_pool,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
@ -2508,12 +2546,15 @@ mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&thread_pool,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
@ -2683,6 +2724,8 @@ mod tests {
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let embedders = EmbeddingConfigs::default(); let embedders = EmbeddingConfigs::default();
let thread_pool =
scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string());
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
// OP // OP
@ -2702,12 +2745,15 @@ mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&thread_pool,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
@ -2761,12 +2807,15 @@ mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&thread_pool,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
@ -2817,12 +2866,15 @@ mod tests {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&thread_pool,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(), &crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,

View File

@ -11,7 +11,8 @@ pub struct IndexerConfig {
pub max_memory: Option<usize>, pub max_memory: Option<usize>,
pub chunk_compression_type: CompressionType, pub chunk_compression_type: CompressionType,
pub chunk_compression_level: Option<u32>, pub chunk_compression_level: Option<u32>,
pub thread_pool: Option<ThreadPoolNoAbort>, pub rayon_thread_pool: Option<ThreadPoolNoAbort>,
pub thread_pool: Option<scoped_thread_pool::ThreadPool<crate::Error>>,
pub max_positions_per_attributes: Option<u32>, pub max_positions_per_attributes: Option<u32>,
pub skip_index_budget: bool, pub skip_index_budget: bool,
} }
@ -36,6 +37,7 @@ impl Default for IndexerConfig {
max_memory: None, max_memory: None,
chunk_compression_type: CompressionType::None, chunk_compression_type: CompressionType::None,
chunk_compression_level: None, chunk_compression_level: None,
rayon_thread_pool: None,
thread_pool: None, thread_pool: None,
max_positions_per_attributes: None, max_positions_per_attributes: None,
skip_index_budget: false, skip_index_budget: false,

View File

@ -52,12 +52,12 @@ const MAX_FRAME_HEADER_SIZE: usize = 9;
/// a message in this queue only if it is empty to avoid filling /// a message in this queue only if it is empty to avoid filling
/// the channel *and* the BBQueue. /// the channel *and* the BBQueue.
pub fn extractor_writer_bbqueue<'a>( pub fn extractor_writer_bbqueue<'a>(
thread_pool: &mut scoped_thread_pool::ThreadPool<crate::Error>, thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
bbbuffers: &'a mut Vec<BBBuffer>, bbbuffers: &'a mut Vec<BBBuffer>,
total_bbbuffer_capacity: usize, total_bbbuffer_capacity: usize,
channel_capacity: usize, channel_capacity: usize,
) -> (ExtractorBbqueueSender<'a>, WriterBbqueueReceiver<'a>) { ) -> (ExtractorBbqueueSender<'a>, WriterBbqueueReceiver<'a>) {
let current_num_threads = rayon::current_num_threads(); let current_num_threads = thread_pool.thread_count();
let bbbuffer_capacity = total_bbbuffer_capacity.checked_div(current_num_threads).unwrap(); let bbbuffer_capacity = total_bbbuffer_capacity.checked_div(current_num_threads).unwrap();
bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity)); bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity));
@ -67,12 +67,18 @@ pub fn extractor_writer_bbqueue<'a>(
let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap(); let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap();
let producers = ThreadLocal::with_capacity(bbbuffers.len()); let producers = ThreadLocal::with_capacity(bbbuffers.len());
let consumers = thread_pool.broadcast(|thread_index| { let consumers = ThreadLocal::with_capacity(bbbuffers.len());
thread_pool
.broadcast(|thread_index| {
let bbqueue: &BBBuffer = &bbbuffers[thread_index]; let bbqueue: &BBBuffer = &bbbuffers[thread_index];
let (producer, consumer) = bbqueue.try_split_framed().unwrap(); let (producer, consumer) = bbqueue.try_split_framed().unwrap();
producers.get_or(|| FullySend(RefCell::new(producer))); producers.get_or(|| FullySend(RefCell::new(producer)));
consumer consumers.get_or(|| FullySend(consumer));
}); Ok(())
})
.map_err(|errors| crate::Error::from_scoped_thread_pool_errors(thread_pool, errors))
.unwrap();
let consumers: Vec<_> = consumers.into_iter().map(|consumer| consumer.0).collect();
let sent_messages_attempts = Arc::new(AtomicUsize::new(0)); let sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0)); let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0));
@ -964,28 +970,70 @@ impl GeoSender<'_, '_> {
.map_err(|_| SendError(())) .map_err(|_| SendError(()))
} }
pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> crate::Result<()> { pub fn set_geo_faceted(
&self,
bitmap: &RoaringBitmap,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> crate::Result<()> {
let writer = GeoWriter { bitmap, channel: *self };
thread_pool
.execute(&writer)
.map_err(|errors| crate::Error::from_scoped_thread_pool_errors(thread_pool, errors))
}
}
struct GeoWriter<'a, 'b> {
bitmap: &'a RoaringBitmap,
channel: GeoSender<'a, 'b>,
}
impl<'a, 'b> scoped_thread_pool::Workload<'static> for GeoWriter<'a, 'b> {
type Context = ();
type Error = crate::Error;
fn context(
&self,
_thread_count: usize,
_thread_index: usize,
) -> Result<Self::Context, Self::Error> {
Ok(())
}
fn run_task(
&self,
_thread_count: usize,
thread_index: usize,
task_index: usize,
_context: &mut Self::Context,
) -> Option<Result<(), Self::Error>> {
if thread_index != 0 || task_index != 0 {
return None;
}
let database = Database::Main; let database = Database::Main;
let value_length = bitmap.serialized_size(); let value_length = self.bitmap.serialized_size();
let key = GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(); let key = GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes();
let key_length = key.len().try_into().ok().and_then(NonZeroU16::new).ok_or_else(|| { let key_length = match key.len().try_into().ok().and_then(NonZeroU16::new) {
InternalError::StorePut { Some(key_length) => key_length,
None => {
return Some(Err(InternalError::StorePut {
database_name: database.database_name(), database_name: database.database_name(),
key: key.into(), key: key.into(),
value_length, value_length,
error: MdbError::BadValSize.into(), error: MdbError::BadValSize.into(),
} }
})?; .into()))
}
};
self.0.write_key_value_with( Some(self.channel.0.write_key_value_with(
database, database,
key_length, key_length,
value_length, value_length,
|key_buffer, value_buffer| { |key_buffer, value_buffer| {
key_buffer.copy_from_slice(key); key_buffer.copy_from_slice(key);
bitmap.serialize_into(value_buffer)?; self.bitmap.serialize_into(value_buffer)?;
Ok(()) Ok(())
}, },
) ))
} }
} }

View File

@ -38,7 +38,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a, 'b>
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> { fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(BalancedCaches::new_in( Ok(RefCell::new(BalancedCaches::new_in(
self.buckets, self.buckets,
self.grenad_parameters.max_memory_by_thread(), self.grenad_parameters.max_memory_by_thread(self.buckets),
extractor_alloc, extractor_alloc,
))) )))
} }
@ -388,6 +388,7 @@ fn truncate_str(s: &str) -> &str {
impl FacetedDocidsExtractor { impl FacetedDocidsExtractor {
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
@ -412,10 +413,11 @@ impl FacetedDocidsExtractor {
let extractor = FacetedExtractorData { let extractor = FacetedExtractorData {
attributes_to_extract: &attributes_to_extract, attributes_to_extract: &attributes_to_extract,
grenad_parameters: indexing_context.grenad_parameters, grenad_parameters: indexing_context.grenad_parameters,
buckets: rayon::current_num_threads(), buckets: thread_pool.thread_count(),
sender, sender,
}; };
extract( extract(
thread_pool,
document_changes, document_changes,
&extractor, &extractor,
indexing_context, indexing_context,

View File

@ -21,6 +21,7 @@ use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result};
pub struct GeoExtractor { pub struct GeoExtractor {
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
thread_count: usize,
} }
impl GeoExtractor { impl GeoExtractor {
@ -28,11 +29,12 @@ impl GeoExtractor {
rtxn: &RoTxn, rtxn: &RoTxn,
index: &Index, index: &Index,
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
thread_count: usize,
) -> Result<Option<Self>> { ) -> Result<Option<Self>> {
let is_sortable = index.sortable_fields(rtxn)?.contains(RESERVED_GEO_FIELD_NAME); let is_sortable = index.sortable_fields(rtxn)?.contains(RESERVED_GEO_FIELD_NAME);
let is_filterable = index.filterable_fields(rtxn)?.contains(RESERVED_GEO_FIELD_NAME); let is_filterable = index.filterable_fields(rtxn)?.contains(RESERVED_GEO_FIELD_NAME);
if is_sortable || is_filterable { if is_sortable || is_filterable {
Ok(Some(GeoExtractor { grenad_parameters })) Ok(Some(GeoExtractor { grenad_parameters, thread_count }))
} else { } else {
Ok(None) Ok(None)
} }
@ -157,7 +159,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
) -> Result<()> { ) -> Result<()> {
let rtxn = &context.rtxn; let rtxn = &context.rtxn;
let index = context.index; let index = context.index;
let max_memory = self.grenad_parameters.max_memory_by_thread(); let max_memory = self.grenad_parameters.max_memory_by_thread(self.thread_count);
let db_fields_ids_map = context.db_fields_ids_map; let db_fields_ids_map = context.db_fields_ids_map;
let mut data_ref = context.data.borrow_mut_or_yield(); let mut data_ref = context.data.borrow_mut_or_yield();

View File

@ -5,7 +5,6 @@ mod geo;
mod searchable; mod searchable;
mod vectors; mod vectors;
use bumpalo::Bump;
pub use cache::{ pub use cache::{
merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
}; };
@ -15,22 +14,6 @@ pub use geo::*;
pub use searchable::*; pub use searchable::*;
pub use vectors::EmbeddingExtractor; pub use vectors::EmbeddingExtractor;
use super::indexer::document_changes::{DocumentChanges, IndexingContext};
use super::steps::IndexingStep;
use super::thread_local::{FullySend, ThreadLocal};
use crate::Result;
pub trait DocidsExtractor {
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
step: IndexingStep,
) -> Result<Vec<BalancedCaches<'extractor>>>
where
MSP: Fn() -> bool + Sync;
}
/// TODO move in permissive json pointer /// TODO move in permissive json pointer
pub mod perm_json_p { pub mod perm_json_p {
use serde_json::{Map, Value}; use serde_json::{Map, Value};

View File

@ -218,7 +218,7 @@ impl<'a, 'extractor> Extractor<'extractor> for WordDocidsExtractorData<'a> {
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> { fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(Some(WordDocidsBalancedCaches::new_in( Ok(RefCell::new(Some(WordDocidsBalancedCaches::new_in(
self.buckets, self.buckets,
self.grenad_parameters.max_memory_by_thread(), self.grenad_parameters.max_memory_by_thread(self.buckets),
extractor_alloc, extractor_alloc,
)))) ))))
} }
@ -240,6 +240,7 @@ pub struct WordDocidsExtractors;
impl WordDocidsExtractors { impl WordDocidsExtractors {
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
@ -288,10 +289,11 @@ impl WordDocidsExtractors {
let extractor = WordDocidsExtractorData { let extractor = WordDocidsExtractorData {
tokenizer: &document_tokenizer, tokenizer: &document_tokenizer,
grenad_parameters: indexing_context.grenad_parameters, grenad_parameters: indexing_context.grenad_parameters,
buckets: rayon::current_num_threads(), buckets: thread_pool.thread_count(),
}; };
extract( extract(
thread_pool,
document_changes, document_changes,
&extractor, &extractor,
indexing_context, indexing_context,

View File

@ -2,29 +2,62 @@ use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::rc::Rc; use std::rc::Rc;
use bumpalo::Bump;
use heed::RoTxn; use heed::RoTxn;
use super::tokenize_document::DocumentTokenizer; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::SearchableExtractor;
use crate::proximity::{index_proximity, MAX_DISTANCE}; use crate::proximity::{index_proximity, MAX_DISTANCE};
use crate::update::new::document::Document; use crate::update::new::document::Document;
use crate::update::new::extract::cache::BalancedCaches; use crate::update::new::extract::cache::BalancedCaches;
use crate::update::new::indexer::document_changes::DocumentChangeContext; use crate::update::new::indexer::document_changes::{
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext,
};
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::{FullySend, ThreadLocal};
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::{FieldId, GlobalFieldsIdsMap, Index, Result}; use crate::update::GrenadParameters;
use crate::{FieldId, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
pub struct WordPairProximityDocidsExtractor; impl<'a, 'extractor> Extractor<'extractor> for WordPairProximityDocidsExtractor<'a> {
type Data = RefCell<BalancedCaches<'extractor>>;
impl SearchableExtractor for WordPairProximityDocidsExtractor { fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
fn attributes_to_extract<'a>( Ok(RefCell::new(BalancedCaches::new_in(
rtxn: &'a RoTxn, self.buckets,
index: &'a Index, self.grenad_parameters.max_memory_by_thread(self.buckets),
) -> Result<Option<Vec<&'a str>>> { extractor_alloc,
)))
}
fn process<'doc>(
&self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
for change in changes {
let change = change?;
self.extract_document_change(context, change)?;
}
Ok(())
}
}
pub struct WordPairProximityDocidsExtractor<'a> {
tokenizer: &'a DocumentTokenizer<'a>,
grenad_parameters: &'a GrenadParameters,
buckets: usize,
}
impl<'a> WordPairProximityDocidsExtractor<'a> {
fn attributes_to_extract<'b>(
rtxn: &'b RoTxn,
index: &'b Index,
) -> Result<Option<Vec<&'b str>>> {
index.user_defined_searchable_fields(rtxn).map_err(Into::into) index.user_defined_searchable_fields(rtxn).map_err(Into::into)
} }
fn attributes_to_skip<'a>(_rtxn: &'a RoTxn, _index: &'a Index) -> Result<Vec<&'a str>> { fn attributes_to_skip<'b>(_rtxn: &'b RoTxn, _index: &'b Index) -> Result<Vec<&'b str>> {
Ok(Vec::new()) Ok(Vec::new())
} }
@ -32,10 +65,11 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
// and to store the docids of the documents that have a number of words in a given field // and to store the docids of the documents that have a number of words in a given field
// equal to or under than MAX_COUNTED_WORDS. // equal to or under than MAX_COUNTED_WORDS.
fn extract_document_change( fn extract_document_change(
&self,
context: &DocumentChangeContext<RefCell<BalancedCaches>>, context: &DocumentChangeContext<RefCell<BalancedCaches>>,
document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {
let document_tokenizer = self.tokenizer;
let doc_alloc = &context.doc_alloc; let doc_alloc = &context.doc_alloc;
let index = context.index; let index = context.index;
@ -129,6 +163,70 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
} }
Ok(()) Ok(())
} }
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
step: IndexingStep,
) -> Result<Vec<BalancedCaches<'extractor>>>
where
MSP: Fn() -> bool + Sync,
{
let rtxn = indexing_context.index.read_txn()?;
let stop_words = indexing_context.index.stop_words(&rtxn)?;
let allowed_separators = indexing_context.index.allowed_separators(&rtxn)?;
let allowed_separators: Option<Vec<_>> =
allowed_separators.as_ref().map(|s| s.iter().map(String::as_str).collect());
let dictionary = indexing_context.index.dictionary(&rtxn)?;
let dictionary: Option<Vec<_>> =
dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect());
let mut builder = tokenizer_builder(
stop_words.as_ref(),
allowed_separators.as_deref(),
dictionary.as_deref(),
);
let tokenizer = builder.build();
let attributes_to_extract = Self::attributes_to_extract(&rtxn, indexing_context.index)?;
let attributes_to_skip = Self::attributes_to_skip(&rtxn, indexing_context.index)?;
let localized_attributes_rules =
indexing_context.index.localized_attributes_rules(&rtxn)?.unwrap_or_default();
let document_tokenizer = DocumentTokenizer {
tokenizer: &tokenizer,
attribute_to_extract: attributes_to_extract.as_deref(),
attribute_to_skip: attributes_to_skip.as_slice(),
localized_attributes_rules: &localized_attributes_rules,
max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE,
};
let extractor_data: WordPairProximityDocidsExtractor = WordPairProximityDocidsExtractor {
tokenizer: &document_tokenizer,
grenad_parameters: indexing_context.grenad_parameters,
buckets: thread_pool.thread_count(),
};
let datastore = ThreadLocal::new();
{
let span =
tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction");
let _entered = span.enter();
extract(
thread_pool,
document_changes,
&extractor_data,
indexing_context,
extractor_allocs,
&datastore,
step,
)?;
}
Ok(datastore.into_iter().map(RefCell::into_inner).collect())
}
} }
fn build_key<'a>( fn build_key<'a>(

View File

@ -1,146 +1,5 @@
mod extract_word_docids; mod extract_word_docids;
mod extract_word_pair_proximity_docids; mod extract_word_pair_proximity_docids;
mod tokenize_document; mod tokenize_document;
use std::cell::RefCell;
use std::marker::PhantomData;
use bumpalo::Bump;
pub use extract_word_docids::{WordDocidsCaches, WordDocidsExtractors}; pub use extract_word_docids::{WordDocidsCaches, WordDocidsExtractors};
pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor;
use heed::RoTxn;
use tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::cache::BalancedCaches;
use super::DocidsExtractor;
use crate::update::new::indexer::document_changes::{
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext,
};
use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::{FullySend, ThreadLocal};
use crate::update::new::DocumentChange;
use crate::update::GrenadParameters;
use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
pub struct SearchableExtractorData<'a, EX: SearchableExtractor> {
tokenizer: &'a DocumentTokenizer<'a>,
grenad_parameters: &'a GrenadParameters,
buckets: usize,
_ex: PhantomData<EX>,
}
impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
for SearchableExtractorData<'a, EX>
{
type Data = RefCell<BalancedCaches<'extractor>>;
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(BalancedCaches::new_in(
self.buckets,
self.grenad_parameters.max_memory_by_thread(),
extractor_alloc,
)))
}
fn process<'doc>(
&self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
for change in changes {
let change = change?;
EX::extract_document_change(context, self.tokenizer, change)?;
}
Ok(())
}
}
pub trait SearchableExtractor: Sized + Sync {
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
step: IndexingStep,
) -> Result<Vec<BalancedCaches<'extractor>>>
where
MSP: Fn() -> bool + Sync,
{
let rtxn = indexing_context.index.read_txn()?;
let stop_words = indexing_context.index.stop_words(&rtxn)?;
let allowed_separators = indexing_context.index.allowed_separators(&rtxn)?;
let allowed_separators: Option<Vec<_>> =
allowed_separators.as_ref().map(|s| s.iter().map(String::as_str).collect());
let dictionary = indexing_context.index.dictionary(&rtxn)?;
let dictionary: Option<Vec<_>> =
dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect());
let mut builder = tokenizer_builder(
stop_words.as_ref(),
allowed_separators.as_deref(),
dictionary.as_deref(),
);
let tokenizer = builder.build();
let attributes_to_extract = Self::attributes_to_extract(&rtxn, indexing_context.index)?;
let attributes_to_skip = Self::attributes_to_skip(&rtxn, indexing_context.index)?;
let localized_attributes_rules =
indexing_context.index.localized_attributes_rules(&rtxn)?.unwrap_or_default();
let document_tokenizer = DocumentTokenizer {
tokenizer: &tokenizer,
attribute_to_extract: attributes_to_extract.as_deref(),
attribute_to_skip: attributes_to_skip.as_slice(),
localized_attributes_rules: &localized_attributes_rules,
max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE,
};
let extractor_data: SearchableExtractorData<Self> = SearchableExtractorData {
tokenizer: &document_tokenizer,
grenad_parameters: indexing_context.grenad_parameters,
buckets: rayon::current_num_threads(),
_ex: PhantomData,
};
let datastore = ThreadLocal::new();
{
let span =
tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction");
let _entered = span.enter();
extract(
document_changes,
&extractor_data,
indexing_context,
extractor_allocs,
&datastore,
step,
)?;
}
Ok(datastore.into_iter().map(RefCell::into_inner).collect())
}
fn extract_document_change(
context: &DocumentChangeContext<RefCell<BalancedCaches>>,
document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange,
) -> Result<()>;
fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index)
-> Result<Option<Vec<&'a str>>>;
fn attributes_to_skip<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result<Vec<&'a str>>;
}
impl<T: SearchableExtractor> DocidsExtractor for T {
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
step: IndexingStep,
) -> Result<Vec<BalancedCaches<'extractor>>>
where
MSP: Fn() -> bool + Sync,
{
Self::run_extraction(document_changes, indexing_context, extractor_allocs, step)
}
}

View File

@ -1,15 +1,14 @@
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::sync::atomic::Ordering; use std::marker::PhantomData;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use bumpalo::Bump; use bumpalo::Bump;
use heed::RoTxn; use heed::RoTxn;
use rayon::iter::IndexedParallelIterator;
use super::super::document_change::DocumentChange; use super::super::document_change::DocumentChange;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::progress::{AtomicDocumentStep, Progress}; use crate::progress::{AtomicDocumentStep, Progress};
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
@ -114,7 +113,7 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload
>: Sync { >: Sync {
type Item: Send; type Item: Send;
fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>>; fn items(&self, thread_index: usize, task_index: usize) -> Option<&[Self::Item]>;
fn len(&self) -> usize; fn len(&self) -> usize;
@ -186,9 +185,10 @@ where
} }
} }
const CHUNK_SIZE: usize = 100; pub const CHUNK_SIZE: usize = 100;
pub fn extract< struct Extract<
'shared, // covariant lifetime for shared borrows
'pl, // covariant lifetime of the underlying payload 'pl, // covariant lifetime of the underlying payload
'extractor, // invariant lifetime of extractor_alloc 'extractor, // invariant lifetime of extractor_alloc
'fid, // invariant lifetime of fields ids map 'fid, // invariant lifetime of fields ids map
@ -196,31 +196,121 @@ pub fn extract<
'data, // invariant on EX::Data lifetime of datastore 'data, // invariant on EX::Data lifetime of datastore
'index, // covariant lifetime of the index 'index, // covariant lifetime of the index
EX, EX,
DC,
MSP,
> where
DC: DocumentChanges<'pl>,
EX: Extractor<'extractor>,
MSP: Fn() -> bool + Sync,
{
document_changes: &'shared DC,
extractor: &'shared EX,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
extractor_allocs: &'extractor ThreadLocal<FullySend<Bump>>,
datastore: &'data ThreadLocal<EX::Data>,
step: Arc<AtomicU32>,
_marker: PhantomData<&'pl ()>,
}
impl<
'doc,
'extractor: 'doc, // invariant lifetime of extractor_alloc
'shared,
'pl, // covariant lifetime of the underlying payload
'fid: 'doc, // invariant lifetime of fields ids map
'indexer: 'doc, // covariant lifetime of objects that are borrowed during the entire indexing
'data: 'doc, // invariant on EX::Data lifetime of datastore
'index: 'doc + 'indexer, // covariant lifetime of the index
EX,
DC: DocumentChanges<'pl>, DC: DocumentChanges<'pl>,
MSP, MSP,
> scoped_thread_pool::Workload<'doc>
for Extract<'shared, 'pl, 'extractor, 'fid, 'indexer, 'data, 'index, EX, DC, MSP>
where
EX: Extractor<'extractor>,
MSP: Fn() -> bool + Sync,
{
type Context = DocumentChangeContext<'doc, 'extractor, 'fid, 'indexer, EX::Data>;
type Error = crate::Error;
fn context(
&self,
_thread_count: usize,
_thread_index: usize,
) -> std::result::Result<
DocumentChangeContext<'doc, 'extractor, 'fid, 'indexer, EX::Data>,
Self::Error,
> {
let extractor = self.extractor;
DocumentChangeContext::new(
self.indexing_context.index,
self.indexing_context.db_fields_ids_map,
self.indexing_context.new_fields_ids_map,
self.extractor_allocs,
self.indexing_context.doc_allocs,
self.datastore,
self.indexing_context.fields_ids_map_store,
move |index_alloc| extractor.init_data(index_alloc),
)
}
fn run_task(
&self,
_thread_count: usize,
thread_index: usize,
task_index: usize,
context: &mut Self::Context,
) -> Option<std::result::Result<(), Self::Error>> {
let items = self.document_changes.items(thread_index, task_index)?;
if (self.indexing_context.must_stop_processing)() {
return Some(Err(InternalError::AbortedIndexation.into()));
}
// Clean up and reuse the document-specific allocator
context.doc_alloc.reset();
let changes = items.iter().filter_map(|item| {
self.document_changes.item_to_document_change(context, item).transpose()
});
let res = self.extractor.process(changes, context);
self.step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);
// send back the doc_alloc in the pool
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));
Some(res)
}
}
pub fn extract<
'pool, // invariant lifetime of the thread pool
'pl, // covariant lifetime of the underlying payload
'extractor, // invariant lifetime of extractor_alloc
'fid, // invariant lifetime of fields ids map
'indexer, // covariant lifetime of objects that are borrowed during the entire indexing
'data, // invariant on EX::Data lifetime of datastore
'index, // covariant lifetime of the index
EX,
DC,
MSP,
>( >(
thread_pool: &'pool scoped_thread_pool::ThreadPool<crate::Error>,
document_changes: &DC, document_changes: &DC,
extractor: &EX, extractor: &EX,
IndexingContext { indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
index,
db_fields_ids_map,
new_fields_ids_map,
doc_allocs,
fields_ids_map_store,
must_stop_processing,
progress,
grenad_parameters: _,
}: IndexingContext<'fid, 'indexer, 'index, MSP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
datastore: &'data ThreadLocal<EX::Data>, datastore: &'data ThreadLocal<EX::Data>,
step: IndexingStep, step: IndexingStep,
) -> Result<()> ) -> Result<()>
where where
DC: DocumentChanges<'pl>,
EX: Extractor<'extractor>, EX: Extractor<'extractor>,
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
{ {
tracing::trace!("We are resetting the extractor allocators"); tracing::trace!("We are resetting the extractor allocators");
progress.update_progress(step); indexing_context.progress.update_progress(step);
// Clean up and reuse the extractor allocs // Clean up and reuse the extractor allocs
for extractor_alloc in extractor_allocs.iter_mut() { for extractor_alloc in extractor_allocs.iter_mut() {
tracing::trace!("\tWith {} bytes reset", extractor_alloc.0.allocated_bytes()); tracing::trace!("\tWith {} bytes reset", extractor_alloc.0.allocated_bytes());
@ -229,45 +319,22 @@ where
let total_documents = document_changes.len() as u32; let total_documents = document_changes.len() as u32;
let (step, progress_step) = AtomicDocumentStep::new(total_documents); let (step, progress_step) = AtomicDocumentStep::new(total_documents);
progress.update_progress(progress_step); indexing_context.progress.update_progress(progress_step);
let pi = document_changes.iter(CHUNK_SIZE); let extract = Extract {
pi.try_arc_for_each_try_init( document_changes,
|| { extractor,
DocumentChangeContext::new( indexing_context,
index,
db_fields_ids_map,
new_fields_ids_map,
extractor_allocs, extractor_allocs,
doc_allocs,
datastore, datastore,
fields_ids_map_store, step,
move |index_alloc| extractor.init_data(index_alloc), _marker: PhantomData,
) };
}, thread_pool
|context, items| { .execute(&extract)
if (must_stop_processing)() { .map_err(|errors| crate::Error::from_scoped_thread_pool_errors(thread_pool, errors))?;
return Err(Arc::new(InternalError::AbortedIndexation.into()));
}
// Clean up and reuse the document-specific allocator extract.step.store(total_documents, Ordering::Relaxed);
context.doc_alloc.reset();
let items = items.as_ref();
let changes = items.iter().filter_map(|item| {
document_changes.item_to_document_change(context, item).transpose()
});
let res = extractor.process(changes, context).map_err(Arc::new);
step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);
// send back the doc_alloc in the pool
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));
res
},
)?;
step.store(total_documents, Ordering::Relaxed);
Ok(()) Ok(())
} }

View File

@ -95,7 +95,7 @@ mod test {
use crate::index::tests::TempIndex; use crate::index::tests::TempIndex;
use crate::progress::Progress; use crate::progress::Progress;
use crate::update::new::indexer::document_changes::{ use crate::update::new::indexer::document_changes::{
extract, DocumentChangeContext, Extractor, IndexingContext, extract, DocumentChangeContext, Extractor, IndexingContext, CHUNK_SIZE,
}; };
use crate::update::new::indexer::DocumentDeletion; use crate::update::new::indexer::DocumentDeletion;
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::IndexingStep;
@ -136,7 +136,7 @@ mod test {
} }
} }
let mut thread_pool = let thread_pool =
scoped_thread_pool::ThreadPool::new(NonZeroUsize::new(1).unwrap(), "test".into()); scoped_thread_pool::ThreadPool::new(NonZeroUsize::new(1).unwrap(), "test".into());
let mut deletions = DocumentDeletion::new(); let mut deletions = DocumentDeletion::new();
@ -159,8 +159,12 @@ mod test {
let deletion_tracker = TrackDeletion(PhantomData); let deletion_tracker = TrackDeletion(PhantomData);
let changes = deletions let changes = deletions.into_changes(
.into_changes(&indexer, crate::documents::PrimaryKey::Flat { name: "id", field_id: 0 }); &indexer,
crate::documents::PrimaryKey::Flat { name: "id", field_id: 0 },
&thread_pool,
CHUNK_SIZE,
);
let context = IndexingContext { let context = IndexingContext {
index: &index, index: &index,
@ -177,7 +181,7 @@ mod test {
let datastore = ThreadLocal::new(); let datastore = ThreadLocal::new();
extract( extract(
&mut thread_pool, &thread_pool,
&changes, &changes,
&deletion_tracker, &deletion_tracker,
context, context,

View File

@ -22,6 +22,7 @@ use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub(super) fn extract_all<'pl, 'extractor, DC, MSP>( pub(super) fn extract_all<'pl, 'extractor, DC, MSP>(
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<MSP>, indexing_context: IndexingContext<MSP>,
indexer_span: Span, indexer_span: Span,
@ -47,11 +48,12 @@ where
// document but we need to create a function that collects and compresses documents. // document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.documents(); let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(document_sender, embedders); let document_extractor = DocumentsExtractor::new(document_sender, embedders);
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); let datastore = ThreadLocal::with_capacity(thread_pool.thread_count());
{ {
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents");
let _entered = span.enter(); let _entered = span.enter();
extract( extract(
thread_pool,
document_changes, document_changes,
&document_extractor, &document_extractor,
indexing_context, indexing_context,
@ -84,6 +86,7 @@ where
let _entered = span.enter(); let _entered = span.enter();
FacetedDocidsExtractor::run_extraction( FacetedDocidsExtractor::run_extraction(
thread_pool,
document_changes, document_changes,
indexing_context, indexing_context,
extractor_allocs, extractor_allocs,
@ -97,6 +100,7 @@ where
let _entered = span.enter(); let _entered = span.enter();
facet_field_ids_delta = merge_and_send_facet_docids( facet_field_ids_delta = merge_and_send_facet_docids(
thread_pool,
caches, caches,
FacetDatabases::new(index), FacetDatabases::new(index),
index, index,
@ -118,6 +122,7 @@ where
let _entered = span.enter(); let _entered = span.enter();
WordDocidsExtractors::run_extraction( WordDocidsExtractors::run_extraction(
thread_pool,
document_changes, document_changes,
indexing_context, indexing_context,
extractor_allocs, extractor_allocs,
@ -129,6 +134,7 @@ where
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter(); let _entered = span.enter();
merge_and_send_docids( merge_and_send_docids(
thread_pool,
word_docids, word_docids,
index.word_docids.remap_types(), index.word_docids.remap_types(),
index, index,
@ -142,6 +148,7 @@ where
tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
let _entered = span.enter(); let _entered = span.enter();
merge_and_send_docids( merge_and_send_docids(
thread_pool,
word_fid_docids, word_fid_docids,
index.word_fid_docids.remap_types(), index.word_fid_docids.remap_types(),
index, index,
@ -155,6 +162,7 @@ where
tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter(); let _entered = span.enter();
merge_and_send_docids( merge_and_send_docids(
thread_pool,
exact_word_docids, exact_word_docids,
index.exact_word_docids.remap_types(), index.exact_word_docids.remap_types(),
index, index,
@ -168,6 +176,7 @@ where
tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
let _entered = span.enter(); let _entered = span.enter();
merge_and_send_docids( merge_and_send_docids(
thread_pool,
word_position_docids, word_position_docids,
index.word_position_docids.remap_types(), index.word_position_docids.remap_types(),
index, index,
@ -181,6 +190,7 @@ where
tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
let _entered = span.enter(); let _entered = span.enter();
merge_and_send_docids( merge_and_send_docids(
thread_pool,
fid_word_count_docids, fid_word_count_docids,
index.field_id_word_count_docids.remap_types(), index.field_id_word_count_docids.remap_types(),
index, index,
@ -198,7 +208,8 @@ where
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
let _entered = span.enter(); let _entered = span.enter();
<WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction( WordPairProximityDocidsExtractor::run_extraction(
thread_pool,
document_changes, document_changes,
indexing_context, indexing_context,
extractor_allocs, extractor_allocs,
@ -211,6 +222,7 @@ where
let _entered = span.enter(); let _entered = span.enter();
merge_and_send_docids( merge_and_send_docids(
thread_pool,
caches, caches,
index.word_pair_proximity_docids.remap_types(), index.word_pair_proximity_docids.remap_types(),
index, index,
@ -232,12 +244,13 @@ where
field_distribution, field_distribution,
request_threads(), request_threads(),
); );
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); let mut datastore = ThreadLocal::with_capacity(thread_pool.thread_count());
{ {
let span = tracing::debug_span!(target: "indexing::documents::extract", "vectors"); let span = tracing::debug_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter(); let _entered = span.enter();
extract( extract(
thread_pool,
document_changes, document_changes,
&extractor, &extractor,
indexing_context, indexing_context,
@ -263,17 +276,23 @@ where
} }
'geo: { 'geo: {
let Some(extractor) = GeoExtractor::new(&rtxn, index, *indexing_context.grenad_parameters)? let Some(extractor) = GeoExtractor::new(
&rtxn,
index,
*indexing_context.grenad_parameters,
thread_pool.thread_count(),
)?
else { else {
break 'geo; break 'geo;
}; };
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); let datastore = ThreadLocal::with_capacity(thread_pool.thread_count());
{ {
let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
let _entered = span.enter(); let _entered = span.enter();
extract( extract(
thread_pool,
document_changes, document_changes,
&extractor, &extractor,
indexing_context, indexing_context,
@ -289,6 +308,7 @@ where
index, index,
extractor_sender.geo(), extractor_sender.geo(),
&indexing_context.must_stop_processing, &indexing_context.must_stop_processing,
thread_pool,
)?; )?;
} }
indexing_context.progress.update_progress(IndexingStep::WritingToDatabase); indexing_context.progress.update_progress(IndexingStep::WritingToDatabase);

View File

@ -44,6 +44,7 @@ static LOG_MEMORY_METRICS_ONCE: Once = Once::new();
pub fn index<'pl, 'indexer, 'index, DC, MSP>( pub fn index<'pl, 'indexer, 'index, DC, MSP>(
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
index: &'index Index, index: &'index Index,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
pool: &ThreadPoolNoAbort, pool: &ThreadPoolNoAbort,
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
db_fields_ids_map: &'indexer FieldsIdsMap, db_fields_ids_map: &'indexer FieldsIdsMap,
@ -110,9 +111,9 @@ where
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
let new_fields_ids_map = RwLock::new(new_fields_ids_map); let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(rayon::current_num_threads()); let fields_ids_map_store = ThreadLocal::with_capacity(thread_pool.thread_count());
let mut extractor_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); let mut extractor_allocs = ThreadLocal::with_capacity(thread_pool.thread_count());
let doc_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); let doc_allocs = ThreadLocal::with_capacity(thread_pool.thread_count());
let indexing_context = IndexingContext { let indexing_context = IndexingContext {
index, index,
@ -203,6 +204,7 @@ where
wtxn, wtxn,
global_fields_ids_map, global_fields_ids_map,
facet_field_ids_delta, facet_field_ids_delta,
thread_pool,
)?; )?;
indexing_context.progress.update_progress(IndexingStep::Finalizing); indexing_context.progress.update_progress(IndexingStep::Finalizing);

View File

@ -1,7 +1,6 @@
use std::ops::DerefMut; use std::ops::DerefMut;
use bumparaw_collections::RawMap; use bumparaw_collections::RawMap;
use rayon::iter::IndexedParallelIterator;
use rustc_hash::FxBuildHasher; use rustc_hash::FxBuildHasher;
use scoped_thread_pool::ThreadPool; use scoped_thread_pool::ThreadPool;
use serde_json::value::RawValue; use serde_json::value::RawValue;
@ -26,8 +25,8 @@ impl PartialDump {
self, self,
concurrent_available_ids: &'index ConcurrentAvailableIds, concurrent_available_ids: &'index ConcurrentAvailableIds,
primary_key: &'index PrimaryKey, primary_key: &'index PrimaryKey,
thread_pool: &ThreadPool<crate::Error>, _thread_pool: &ThreadPool<crate::Error>,
chunk_size: usize, _chunk_size: usize,
) -> PartialDumpChanges<'index> { ) -> PartialDumpChanges<'index> {
// Note for future self: // Note for future self:
// - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items). // - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items).

View File

@ -27,6 +27,7 @@ pub(super) fn post_process<MSP>(
wtxn: &mut RwTxn<'_>, wtxn: &mut RwTxn<'_>,
global_fields_ids_map: GlobalFieldsIdsMap<'_>, global_fields_ids_map: GlobalFieldsIdsMap<'_>,
facet_field_ids_delta: FacetFieldIdsDelta, facet_field_ids_delta: FacetFieldIdsDelta,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> Result<()> ) -> Result<()>
where where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
@ -39,7 +40,13 @@ where
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
indexing_context.progress.update_progress(IndexingStep::PostProcessingWords); indexing_context.progress.update_progress(IndexingStep::PostProcessingWords);
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
compute_prefix_database(index, wtxn, prefix_delta, indexing_context.grenad_parameters)?; compute_prefix_database(
index,
wtxn,
prefix_delta,
indexing_context.grenad_parameters,
thread_pool,
)?;
}; };
Ok(()) Ok(())
} }
@ -50,16 +57,38 @@ fn compute_prefix_database(
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
prefix_delta: PrefixDelta, prefix_delta: PrefixDelta,
grenad_parameters: &GrenadParameters, grenad_parameters: &GrenadParameters,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> Result<()> { ) -> Result<()> {
let PrefixDelta { modified, deleted } = prefix_delta; let PrefixDelta { modified, deleted } = prefix_delta;
// Compute word prefix docids // Compute word prefix docids
compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters, thread_pool)?;
// Compute exact word prefix docids // Compute exact word prefix docids
compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; compute_exact_word_prefix_docids(
wtxn,
index,
&modified,
&deleted,
grenad_parameters,
thread_pool,
)?;
// Compute word prefix fid docids // Compute word prefix fid docids
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; compute_word_prefix_fid_docids(
wtxn,
index,
&modified,
&deleted,
grenad_parameters,
thread_pool,
)?;
// Compute word prefix position docids // Compute word prefix position docids
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters) compute_word_prefix_position_docids(
wtxn,
index,
&modified,
&deleted,
grenad_parameters,
thread_pool,
)
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing")] #[tracing::instrument(level = "trace", skip_all, target = "indexing")]

View File

@ -22,6 +22,7 @@ pub fn merge_and_send_rtree<'extractor, MSP>(
index: &Index, index: &Index,
geo_sender: GeoSender<'_, '_>, geo_sender: GeoSender<'_, '_>,
must_stop_processing: &MSP, must_stop_processing: &MSP,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> Result<()> ) -> Result<()>
where where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
@ -57,14 +58,14 @@ where
let rtree_mmap = unsafe { Mmap::map(&file)? }; let rtree_mmap = unsafe { Mmap::map(&file)? };
geo_sender.set_rtree(rtree_mmap).unwrap(); geo_sender.set_rtree(rtree_mmap).unwrap();
geo_sender.set_geo_faceted(&faceted)?; geo_sender.set_geo_faceted(&faceted, thread_pool)?;
Ok(()) Ok(())
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_docids<'extractor, MSP, D>( pub fn merge_and_send_docids<'extractor, MSP, D>(
thread_pool: &mut scoped_thread_pool::ThreadPool<crate::Error>, thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
mut caches: Vec<BalancedCaches<'extractor>>, mut caches: Vec<BalancedCaches<'extractor>>,
database: Database<Bytes, Bytes>, database: Database<Bytes, Bytes>,
index: &Index, index: &Index,
@ -96,7 +97,7 @@ where
} }
Operation::Ignore => Ok(()), Operation::Ignore => Ok(()),
} }
}); })?;
Ok(()) Ok(())
}) { }) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
@ -106,7 +107,7 @@ where
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_facet_docids<'extractor>( pub fn merge_and_send_facet_docids<'extractor>(
thread_pool: &mut scoped_thread_pool::ThreadPool<crate::Error>, thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
mut caches: Vec<BalancedCaches<'extractor>>, mut caches: Vec<BalancedCaches<'extractor>>,
database: FacetDatabases, database: FacetDatabases,
index: &Index, index: &Index,
@ -119,12 +120,15 @@ pub fn merge_and_send_facet_docids<'extractor>(
let max_number_count = max_number_count.clamp(1000, 100_000); let max_number_count = max_number_count.clamp(1000, 100_000);
let transposed_frozen_caches = Mutex::new(transpose_and_freeze_caches(&mut caches)?); let transposed_frozen_caches = Mutex::new(transpose_and_freeze_caches(&mut caches)?);
let output = Mutex::new(FacetFieldIdsDelta::new(max_string_count, max_number_count)); let output = Mutex::new(FacetFieldIdsDelta::new(max_string_count, max_number_count));
thread_pool.broadcast(|thread_index| { thread_pool
.broadcast(|thread_index| {
// TODO: we can probably spare the mutex here since it is guaranteed that each thread will access its own cell of the vec // TODO: we can probably spare the mutex here since it is guaranteed that each thread will access its own cell of the vec
let frozen = let frozen = std::mem::take(
std::mem::take(transposed_frozen_caches.lock().unwrap().get_mut(thread_index).unwrap()); transposed_frozen_caches.lock().unwrap().get_mut(thread_index).unwrap(),
);
let mut facet_field_ids_delta = FacetFieldIdsDelta::new(max_string_count, max_number_count); let mut facet_field_ids_delta =
FacetFieldIdsDelta::new(max_string_count, max_number_count);
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
@ -151,7 +155,8 @@ pub fn merge_and_send_facet_docids<'extractor>(
.merge(facet_field_ids_delta); .merge(facet_field_ids_delta);
} }
Ok(()) Ok(())
}); })
.map_err(|errors| crate::Error::from_scoped_thread_pool_errors(thread_pool, errors))?;
Ok(output.into_inner().unwrap()) Ok(output.into_inner().unwrap())
} }

View File

@ -26,11 +26,13 @@ impl WordPrefixDocids {
database: Database<Bytes, CboRoaringBitmapCodec>, database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>, prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
grenad_parameters: &GrenadParameters, grenad_parameters: &GrenadParameters,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> WordPrefixDocids { ) -> WordPrefixDocids {
WordPrefixDocids { WordPrefixDocids {
database, database,
prefix_database, prefix_database,
max_memory_by_thread: grenad_parameters.max_memory_by_thread(), max_memory_by_thread: grenad_parameters
.max_memory_by_thread(thread_pool.thread_count()),
} }
} }
@ -39,9 +41,10 @@ impl WordPrefixDocids {
wtxn: &mut heed::RwTxn, wtxn: &mut heed::RwTxn,
prefix_to_compute: &BTreeSet<Prefix>, prefix_to_compute: &BTreeSet<Prefix>,
prefix_to_delete: &BTreeSet<Prefix>, prefix_to_delete: &BTreeSet<Prefix>,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> Result<()> { ) -> Result<()> {
delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?; delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?;
self.recompute_modified_prefixes(wtxn, prefix_to_compute) self.recompute_modified_prefixes(wtxn, prefix_to_compute, thread_pool)
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
@ -49,6 +52,7 @@ impl WordPrefixDocids {
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
prefixes: &BTreeSet<Prefix>, prefixes: &BTreeSet<Prefix>,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> Result<()> { ) -> Result<()> {
// We fetch the docids associated to the newly added word prefix fst only. // We fetch the docids associated to the newly added word prefix fst only.
// And collect the CboRoaringBitmaps pointers in an HashMap. // And collect the CboRoaringBitmaps pointers in an HashMap.
@ -56,7 +60,7 @@ impl WordPrefixDocids {
// We access this HashMap in parallel to compute the *union* of all // We access this HashMap in parallel to compute the *union* of all
// of them and *serialize* them into files. There is one file by CPU. // of them and *serialize* them into files. There is one file by CPU.
let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads()); let local_entries = ThreadLocal::with_capacity(thread_pool.thread_count());
prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| { prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| {
let refcell = local_entries.get_or(|| { let refcell = local_entries.get_or(|| {
let file = BufWriter::new(spooled_tempfile( let file = BufWriter::new(spooled_tempfile(
@ -162,11 +166,13 @@ impl WordPrefixIntegerDocids {
database: Database<Bytes, CboRoaringBitmapCodec>, database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>, prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
grenad_parameters: &GrenadParameters, grenad_parameters: &GrenadParameters,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> WordPrefixIntegerDocids { ) -> WordPrefixIntegerDocids {
WordPrefixIntegerDocids { WordPrefixIntegerDocids {
database, database,
prefix_database, prefix_database,
max_memory_by_thread: grenad_parameters.max_memory_by_thread(), max_memory_by_thread: grenad_parameters
.max_memory_by_thread(thread_pool.thread_count()),
} }
} }
@ -175,9 +181,10 @@ impl WordPrefixIntegerDocids {
wtxn: &mut heed::RwTxn, wtxn: &mut heed::RwTxn,
prefix_to_compute: &BTreeSet<Prefix>, prefix_to_compute: &BTreeSet<Prefix>,
prefix_to_delete: &BTreeSet<Prefix>, prefix_to_delete: &BTreeSet<Prefix>,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> Result<()> { ) -> Result<()> {
delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?; delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?;
self.recompute_modified_prefixes(wtxn, prefix_to_compute) self.recompute_modified_prefixes(wtxn, prefix_to_compute, thread_pool)
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
@ -185,6 +192,7 @@ impl WordPrefixIntegerDocids {
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
prefixes: &BTreeSet<Prefix>, prefixes: &BTreeSet<Prefix>,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> Result<()> { ) -> Result<()> {
// We fetch the docids associated to the newly added word prefix fst only. // We fetch the docids associated to the newly added word prefix fst only.
// And collect the CboRoaringBitmaps pointers in an HashMap. // And collect the CboRoaringBitmaps pointers in an HashMap.
@ -192,7 +200,7 @@ impl WordPrefixIntegerDocids {
// We access this HashMap in parallel to compute the *union* of all // We access this HashMap in parallel to compute the *union* of all
// of them and *serialize* them into files. There is one file by CPU. // of them and *serialize* them into files. There is one file by CPU.
let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads()); let local_entries = ThreadLocal::with_capacity(thread_pool.thread_count());
prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| { prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| {
let refcell = local_entries.get_or(|| { let refcell = local_entries.get_or(|| {
let file = BufWriter::new(spooled_tempfile( let file = BufWriter::new(spooled_tempfile(
@ -312,13 +320,15 @@ pub fn compute_word_prefix_docids(
prefix_to_compute: &BTreeSet<Prefix>, prefix_to_compute: &BTreeSet<Prefix>,
prefix_to_delete: &BTreeSet<Prefix>, prefix_to_delete: &BTreeSet<Prefix>,
grenad_parameters: &GrenadParameters, grenad_parameters: &GrenadParameters,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> Result<()> { ) -> Result<()> {
WordPrefixDocids::new( WordPrefixDocids::new(
index.word_docids.remap_key_type(), index.word_docids.remap_key_type(),
index.word_prefix_docids.remap_key_type(), index.word_prefix_docids.remap_key_type(),
grenad_parameters, grenad_parameters,
thread_pool,
) )
.execute(wtxn, prefix_to_compute, prefix_to_delete) .execute(wtxn, prefix_to_compute, prefix_to_delete, thread_pool)
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
@ -328,13 +338,15 @@ pub fn compute_exact_word_prefix_docids(
prefix_to_compute: &BTreeSet<Prefix>, prefix_to_compute: &BTreeSet<Prefix>,
prefix_to_delete: &BTreeSet<Prefix>, prefix_to_delete: &BTreeSet<Prefix>,
grenad_parameters: &GrenadParameters, grenad_parameters: &GrenadParameters,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> Result<()> { ) -> Result<()> {
WordPrefixDocids::new( WordPrefixDocids::new(
index.exact_word_docids.remap_key_type(), index.exact_word_docids.remap_key_type(),
index.exact_word_prefix_docids.remap_key_type(), index.exact_word_prefix_docids.remap_key_type(),
grenad_parameters, grenad_parameters,
thread_pool,
) )
.execute(wtxn, prefix_to_compute, prefix_to_delete) .execute(wtxn, prefix_to_compute, prefix_to_delete, thread_pool)
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
@ -344,13 +356,15 @@ pub fn compute_word_prefix_fid_docids(
prefix_to_compute: &BTreeSet<Prefix>, prefix_to_compute: &BTreeSet<Prefix>,
prefix_to_delete: &BTreeSet<Prefix>, prefix_to_delete: &BTreeSet<Prefix>,
grenad_parameters: &GrenadParameters, grenad_parameters: &GrenadParameters,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> Result<()> { ) -> Result<()> {
WordPrefixIntegerDocids::new( WordPrefixIntegerDocids::new(
index.word_fid_docids.remap_key_type(), index.word_fid_docids.remap_key_type(),
index.word_prefix_fid_docids.remap_key_type(), index.word_prefix_fid_docids.remap_key_type(),
grenad_parameters, grenad_parameters,
thread_pool,
) )
.execute(wtxn, prefix_to_compute, prefix_to_delete) .execute(wtxn, prefix_to_compute, prefix_to_delete, thread_pool)
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
@ -360,11 +374,13 @@ pub fn compute_word_prefix_position_docids(
prefix_to_compute: &BTreeSet<Prefix>, prefix_to_compute: &BTreeSet<Prefix>,
prefix_to_delete: &BTreeSet<Prefix>, prefix_to_delete: &BTreeSet<Prefix>,
grenad_parameters: &GrenadParameters, grenad_parameters: &GrenadParameters,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
) -> Result<()> { ) -> Result<()> {
WordPrefixIntegerDocids::new( WordPrefixIntegerDocids::new(
index.word_position_docids.remap_key_type(), index.word_position_docids.remap_key_type(),
index.word_prefix_position_docids.remap_key_type(), index.word_prefix_position_docids.remap_key_type(),
grenad_parameters, grenad_parameters,
thread_pool,
) )
.execute(wtxn, prefix_to_compute, prefix_to_delete) .execute(wtxn, prefix_to_compute, prefix_to_delete, thread_pool)
} }

View File

@ -5,6 +5,7 @@ use maplit::hashset;
use milli::documents::mmap_from_objects; use milli::documents::mmap_from_objects;
use milli::progress::Progress; use milli::progress::Progress;
use milli::update::new::indexer; use milli::update::new::indexer;
use milli::update::new::indexer::document_changes::CHUNK_SIZE;
use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings};
use milli::vector::EmbeddingConfigs; use milli::vector::EmbeddingConfigs;
use milli::{FacetDistribution, Index, Object, OrderBy}; use milli::{FacetDistribution, Index, Object, OrderBy};
@ -36,6 +37,8 @@ fn test_facet_distribution_with_no_facet_values() {
let mut new_fields_ids_map = db_fields_ids_map.clone(); let mut new_fields_ids_map = db_fields_ids_map.clone();
let embedders = EmbeddingConfigs::default(); let embedders = EmbeddingConfigs::default();
let thread_pool =
scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string());
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
let doc1: Object = from_value( let doc1: Object = from_value(
@ -59,12 +62,15 @@ fn test_facet_distribution_with_no_facet_values() {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&thread_pool,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,

View File

@ -9,6 +9,7 @@ use heed::EnvOpenOptions;
use maplit::{btreemap, hashset}; use maplit::{btreemap, hashset};
use milli::progress::Progress; use milli::progress::Progress;
use milli::update::new::indexer; use milli::update::new::indexer;
use milli::update::new::indexer::document_changes::CHUNK_SIZE;
use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings};
use milli::vector::EmbeddingConfigs; use milli::vector::EmbeddingConfigs;
use milli::{AscDesc, Criterion, DocumentId, Index, Member, TermsMatchingStrategy}; use milli::{AscDesc, Criterion, DocumentId, Index, Member, TermsMatchingStrategy};
@ -72,6 +73,8 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
let mut new_fields_ids_map = db_fields_ids_map.clone(); let mut new_fields_ids_map = db_fields_ids_map.clone();
let embedders = EmbeddingConfigs::default(); let embedders = EmbeddingConfigs::default();
let thread_pool =
scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string());
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
let mut file = tempfile::tempfile().unwrap(); let mut file = tempfile::tempfile().unwrap();
@ -92,6 +95,8 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
@ -102,6 +107,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&thread_pool,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,

View File

@ -7,6 +7,7 @@ use itertools::Itertools;
use maplit::hashset; use maplit::hashset;
use milli::progress::Progress; use milli::progress::Progress;
use milli::update::new::indexer; use milli::update::new::indexer;
use milli::update::new::indexer::document_changes::CHUNK_SIZE;
use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings};
use milli::vector::EmbeddingConfigs; use milli::vector::EmbeddingConfigs;
use milli::{AscDesc, Criterion, Index, Member, Search, SearchResult, TermsMatchingStrategy}; use milli::{AscDesc, Criterion, Index, Member, Search, SearchResult, TermsMatchingStrategy};
@ -288,6 +289,8 @@ fn criteria_ascdesc() {
let mut new_fields_ids_map = db_fields_ids_map.clone(); let mut new_fields_ids_map = db_fields_ids_map.clone();
let embedders = EmbeddingConfigs::default(); let embedders = EmbeddingConfigs::default();
let thread_pool =
scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string());
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
let mut file = tempfile::tempfile().unwrap(); let mut file = tempfile::tempfile().unwrap();
@ -328,12 +331,15 @@ fn criteria_ascdesc() {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&thread_pool,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,

View File

@ -5,6 +5,7 @@ use heed::EnvOpenOptions;
use milli::documents::mmap_from_objects; use milli::documents::mmap_from_objects;
use milli::progress::Progress; use milli::progress::Progress;
use milli::update::new::indexer; use milli::update::new::indexer;
use milli::update::new::indexer::document_changes::CHUNK_SIZE;
use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings};
use milli::vector::EmbeddingConfigs; use milli::vector::EmbeddingConfigs;
use milli::{Criterion, Index, Object, Search, TermsMatchingStrategy}; use milli::{Criterion, Index, Object, Search, TermsMatchingStrategy};
@ -123,6 +124,8 @@ fn test_typo_disabled_on_word() {
let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let mut new_fields_ids_map = db_fields_ids_map.clone(); let mut new_fields_ids_map = db_fields_ids_map.clone();
let embedders = EmbeddingConfigs::default(); let embedders = EmbeddingConfigs::default();
let thread_pool =
scoped_thread_pool::ThreadPool::with_available_parallelism("index".to_string());
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
indexer.add_documents(&documents).unwrap(); indexer.add_documents(&documents).unwrap();
@ -137,12 +140,15 @@ fn test_typo_disabled_on_word() {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| false, &|| false,
Progress::default(), Progress::default(),
&thread_pool,
CHUNK_SIZE,
) )
.unwrap(); .unwrap();
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&thread_pool,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,