Expose the write channel congestion in the batches

This commit is contained in:
Kerollmops 2025-02-19 14:00:21 +01:00
parent 3ff1de0a21
commit 05cc8c650c
No known key found for this signature in database
GPG key ID: F250A4C4E3AE5F5F
12 changed files with 138 additions and 92 deletions

View file

@ -215,14 +215,16 @@ impl IndexScheduler {
let mut stop_scheduler_forever = false;
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
let mut canceled = RoaringBitmap::new();
let mut congestion = None;
match res {
Ok(tasks) => {
Ok((tasks, cong)) => {
#[cfg(test)]
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded);
let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32);
progress.update_progress(task_progress_obj);
congestion = cong;
let mut success = 0;
let mut failure = 0;
let mut canceled_by = None;
@ -339,9 +341,17 @@ impl IndexScheduler {
// We must re-add the canceled task so they're part of the same batch.
ids |= canceled;
let durations = progress.accumulated_durations();
processing_batch.stats.call_trace =
durations.into_iter().map(|(k, v)| (k, v.into())).collect();
progress.accumulated_durations().into_iter().map(|(k, v)| (k, v.into())).collect();
processing_batch.stats.write_channel_congestion = congestion.map(|congestion| {
let mut congestion_info = serde_json::Map::new();
congestion_info.insert("attempts".into(), congestion.attempts.into());
congestion_info.insert("blocking_attempts".into(), congestion.blocking_attempts.into());
congestion_info.insert("blocking_ratio".into(), congestion.congestion_ratio().into());
congestion_info
});
self.queue.write_batch(&mut wtxn, processing_batch, &ids)?;
#[cfg(test)]

View file

@ -5,7 +5,7 @@ use std::sync::atomic::Ordering;
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self};
use meilisearch_types::milli::{self, ChannelCongestion};
use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task};
use milli::update::Settings as MilliSettings;
use roaring::RoaringBitmap;
@ -35,7 +35,7 @@ impl IndexScheduler {
batch: Batch,
current_batch: &mut ProcessingBatch,
progress: Progress,
) -> Result<Vec<Task>> {
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> {
#[cfg(test)]
{
self.maybe_fail(crate::test_utils::FailureLocation::InsideProcessBatch)?;
@ -76,7 +76,7 @@ impl IndexScheduler {
canceled_tasks.push(task);
Ok(canceled_tasks)
Ok((canceled_tasks, None))
}
Batch::TaskDeletions(mut tasks) => {
// 1. Retrieve the tasks that matched the query at enqueue-time.
@ -115,10 +115,14 @@ impl IndexScheduler {
_ => unreachable!(),
}
}
Ok(tasks)
Ok((tasks, None))
}
Batch::SnapshotCreation(tasks) => {
self.process_snapshot(progress, tasks).map(|tasks| (tasks, None))
}
Batch::Dump(task) => {
self.process_dump_creation(progress, task).map(|tasks| (tasks, None))
}
Batch::SnapshotCreation(tasks) => self.process_snapshot(progress, tasks),
Batch::Dump(task) => self.process_dump_creation(progress, task),
Batch::IndexOperation { op, must_create_index } => {
let index_uid = op.index_uid().to_string();
let index = if must_create_index {
@ -135,7 +139,8 @@ impl IndexScheduler {
.set_currently_updating_index(Some((index_uid.clone(), index.clone())));
let mut index_wtxn = index.write_txn()?;
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op, progress)?;
let (tasks, congestion) =
self.apply_index_operation(&mut index_wtxn, &index, op, progress)?;
{
let span = tracing::trace_span!(target: "indexing::scheduler", "commit");
@ -166,7 +171,7 @@ impl IndexScheduler {
),
}
Ok(tasks)
Ok((tasks, congestion))
}
Batch::IndexCreation { index_uid, primary_key, task } => {
progress.update_progress(CreateIndexProgress::CreatingTheIndex);
@ -234,7 +239,7 @@ impl IndexScheduler {
),
}
Ok(vec![task])
Ok((vec![task], None))
}
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
progress.update_progress(DeleteIndexProgress::DeletingTheIndex);
@ -268,7 +273,7 @@ impl IndexScheduler {
};
}
Ok(tasks)
Ok((tasks, None))
}
Batch::IndexSwap { mut task } => {
progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap);
@ -316,7 +321,7 @@ impl IndexScheduler {
}
wtxn.commit()?;
task.status = Status::Succeeded;
Ok(vec![task])
Ok((vec![task], None))
}
Batch::UpgradeDatabase { mut tasks } => {
let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
@ -346,7 +351,7 @@ impl IndexScheduler {
task.error = None;
}
Ok(tasks)
Ok((tasks, None))
}
}
}

View file

@ -5,7 +5,7 @@ use meilisearch_types::milli::documents::PrimaryKey;
use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
use meilisearch_types::milli::update::DocumentAdditionResult;
use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder};
use meilisearch_types::milli::{self, ChannelCongestion, Filter, ThreadPoolNoAbortBuilder};
use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use meilisearch_types::Index;
@ -33,9 +33,8 @@ impl IndexScheduler {
index: &'i Index,
operation: IndexOperation,
progress: Progress,
) -> Result<Vec<Task>> {
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> {
let indexer_alloc = Bump::new();
let started_processing_at = std::time::Instant::now();
let must_stop_processing = self.scheduler.must_stop_processing.clone();
@ -60,7 +59,7 @@ impl IndexScheduler {
};
}
Ok(tasks)
Ok((tasks, None))
}
IndexOperation::DocumentOperation { index_uid, primary_key, operations, mut tasks } => {
progress.update_progress(DocumentOperationProgress::RetrievingConfig);
@ -173,21 +172,24 @@ impl IndexScheduler {
}
progress.update_progress(DocumentOperationProgress::Indexing);
let mut congestion = None;
if tasks.iter().any(|res| res.error.is_none()) {
indexer::index(
index_wtxn,
index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
primary_key,
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
congestion = Some(
indexer::index(
index_wtxn,
index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
primary_key,
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?,
);
let addition = DocumentAdditionResult {
indexed_documents: candidates_count,
@ -199,7 +201,7 @@ impl IndexScheduler {
tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
}
Ok(tasks)
Ok((tasks, congestion))
}
IndexOperation::DocumentEdition { index_uid, mut task } => {
progress.update_progress(DocumentEditionProgress::RetrievingConfig);
@ -247,7 +249,7 @@ impl IndexScheduler {
edited_documents: Some(0),
});
return Ok(vec![task]);
return Ok((vec![task], None));
}
let rtxn = index.read_txn()?;
@ -262,6 +264,7 @@ impl IndexScheduler {
let result_count = Ok((candidates.len(), candidates.len())) as Result<_>;
let mut congestion = None;
if task.error.is_none() {
let local_pool;
let indexer_config = self.index_mapper.indexer_config();
@ -292,20 +295,22 @@ impl IndexScheduler {
let embedders = self.embedders(index_uid.clone(), embedders)?;
progress.update_progress(DocumentEditionProgress::Indexing);
indexer::index(
index_wtxn,
index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
None, // cannot change primary key in DocumentEdition
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
congestion = Some(
indexer::index(
index_wtxn,
index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
None, // cannot change primary key in DocumentEdition
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
);
let addition = DocumentAdditionResult {
indexed_documents: candidates_count,
@ -341,7 +346,7 @@ impl IndexScheduler {
}
}
Ok(vec![task])
Ok((vec![task], congestion))
}
IndexOperation::DocumentDeletion { mut tasks, index_uid } => {
progress.update_progress(DocumentDeletionProgress::RetrievingConfig);
@ -408,7 +413,7 @@ impl IndexScheduler {
}
if to_delete.is_empty() {
return Ok(tasks);
return Ok((tasks, None));
}
let rtxn = index.read_txn()?;
@ -422,6 +427,7 @@ impl IndexScheduler {
PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map)
.map_err(|err| Error::from_milli(err.into(), Some(index_uid.clone())))?;
let mut congestion = None;
if !tasks.iter().all(|res| res.error.is_some()) {
let local_pool;
let indexer_config = self.index_mapper.indexer_config();
@ -447,20 +453,22 @@ impl IndexScheduler {
let embedders = self.embedders(index_uid.clone(), embedders)?;
progress.update_progress(DocumentDeletionProgress::Indexing);
indexer::index(
index_wtxn,
index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
None, // document deletion never changes primary key
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
congestion = Some(
indexer::index(
index_wtxn,
index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
None, // document deletion never changes primary key
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
);
let addition = DocumentAdditionResult {
indexed_documents: candidates_count,
@ -472,7 +480,7 @@ impl IndexScheduler {
tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
}
Ok(tasks)
Ok((tasks, congestion))
}
IndexOperation::Settings { index_uid, settings, mut tasks } => {
progress.update_progress(SettingsProgress::RetrievingAndMergingTheSettings);
@ -497,7 +505,7 @@ impl IndexScheduler {
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
Ok(tasks)
Ok((tasks, None))
}
IndexOperation::DocumentClearAndSetting {
index_uid,
@ -505,7 +513,7 @@ impl IndexScheduler {
settings,
settings_tasks,
} => {
let mut import_tasks = self.apply_index_operation(
let (mut import_tasks, _congestion) = self.apply_index_operation(
index_wtxn,
index,
IndexOperation::DocumentClear {
@ -515,7 +523,7 @@ impl IndexScheduler {
progress.clone(),
)?;
let settings_tasks = self.apply_index_operation(
let (settings_tasks, _congestion) = self.apply_index_operation(
index_wtxn,
index,
IndexOperation::Settings { index_uid, settings, tasks: settings_tasks },
@ -524,7 +532,7 @@ impl IndexScheduler {
let mut tasks = settings_tasks;
tasks.append(&mut import_tasks);
Ok(tasks)
Ok((tasks, None))
}
}
}