Move the rayon thread pool outside the extract method

This commit is contained in:
Clément Renault 2024-11-14 10:40:32 +01:00
parent 0e3c5d91ab
commit 9e8367f1e6
No known key found for this signature in database
GPG key ID: F250A4C4E3AE5F5F
10 changed files with 328 additions and 291 deletions

View file

@ -39,7 +39,7 @@ use meilisearch_types::milli::update::{IndexDocumentsMethod, Settings as MilliSe
use meilisearch_types::milli::vector::parsed_vectors::{
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
};
use meilisearch_types::milli::{self, Filter};
use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
@ -1277,7 +1277,6 @@ impl IndexScheduler {
operations,
mut tasks,
} => {
let indexer_config = self.index_mapper.indexer_config();
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
// this is made difficult by the fact we're doing private clones of the index scheduler and sending it
// to a fresh thread.
@ -1386,10 +1385,16 @@ impl IndexScheduler {
}
if tasks.iter().any(|res| res.error.is_none()) {
/// TODO create a pool if needed
// let pool = indexer_config.thread_pool.unwrap();
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
let local_pool;
let pool = match &self.index_mapper.indexer_config().thread_pool {
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
// TODO we want to multithread this
let document_changes = indexer.into_changes(
&indexer_alloc,
index,
@ -1398,18 +1403,20 @@ impl IndexScheduler {
&mut new_fields_ids_map,
)?;
indexer::index(
index_wtxn,
index,
&db_fields_ids_map,
new_fields_ids_map,
primary_key_has_been_set.then_some(primary_key),
&pool,
&document_changes,
embedders,
&|| must_stop_processing.get(),
&send_progress,
)?;
pool.install(|| {
indexer::index(
index_wtxn,
index,
&db_fields_ids_map,
new_fields_ids_map,
primary_key_has_been_set.then_some(primary_key),
&document_changes,
embedders,
&|| must_stop_processing.get(),
&send_progress,
)
})
.unwrap()?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
}
@ -1489,27 +1496,37 @@ impl IndexScheduler {
let result_count = Ok((candidates.len(), candidates.len())) as Result<_>;
if task.error.is_none() {
/// TODO create a pool if needed
// let pool = indexer_config.thread_pool.unwrap();
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
let local_pool;
let pool = match &self.index_mapper.indexer_config().thread_pool {
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone());
let document_changes = indexer.into_changes(&primary_key)?;
let embedders = index.embedding_configs(index_wtxn)?;
let embedders = self.embedders(embedders)?;
pool.install(|| {
let indexer =
UpdateByFunction::new(candidates, context.clone(), code.clone());
let document_changes = indexer.into_changes(&primary_key)?;
let embedders = index.embedding_configs(index_wtxn)?;
let embedders = self.embedders(embedders)?;
indexer::index(
index_wtxn,
index,
&db_fields_ids_map,
new_fields_ids_map,
None, // cannot change primary key in DocumentEdition
&pool,
&document_changes,
embedders,
&|| must_stop_processing.get(),
&send_progress,
)?;
indexer::index(
index_wtxn,
index,
&db_fields_ids_map,
new_fields_ids_map,
None, // cannot change primary key in DocumentEdition
&document_changes,
embedders,
&|| must_stop_processing.get(),
&send_progress,
)?;
Result::Ok(())
})
.unwrap()?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
}
@ -1629,9 +1646,14 @@ impl IndexScheduler {
.map_err(milli::Error::from)?;
if !tasks.iter().all(|res| res.error.is_some()) {
/// TODO create a pool if needed
// let pool = indexer_config.thread_pool.unwrap();
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
let local_pool;
let pool = match &self.index_mapper.indexer_config().thread_pool {
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let mut indexer = indexer::DocumentDeletion::new();
indexer.delete_documents_by_docids(to_delete);
@ -1639,18 +1661,20 @@ impl IndexScheduler {
let embedders = index.embedding_configs(index_wtxn)?;
let embedders = self.embedders(embedders)?;
indexer::index(
index_wtxn,
index,
&db_fields_ids_map,
new_fields_ids_map,
None, // document deletion never changes primary key
&pool,
&document_changes,
embedders,
&|| must_stop_processing.get(),
&send_progress,
)?;
pool.install(|| {
indexer::index(
index_wtxn,
index,
&db_fields_ids_map,
new_fields_ids_map,
None, // document deletion never changes primary key
&document_changes,
embedders,
&|| must_stop_processing.get(),
&send_progress,
)
})
.unwrap()?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
}