From 11a11fc870b6fd5d26bbe5827d9ddbc48cf6ee73 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 18 Feb 2025 18:33:19 +0100 Subject: [PATCH 01/12] Accumulate step durations from the progress system --- crates/index-scheduler/src/scheduler/mod.rs | 1 + crates/milli/src/progress.rs | 61 ++++++++++++++++++--- 2 files changed, 55 insertions(+), 7 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index 9268bf3e7..a7937ddd3 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -339,6 +339,7 @@ impl IndexScheduler { // We must re-add the canceled task so they're part of the same batch. ids |= canceled; + eprintln!("{:#?}", progress.accumulated_durations()); self.queue.write_batch(&mut wtxn, processing_batch, &ids)?; #[cfg(test)] diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index 3837e173a..884f49241 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -3,7 +3,9 @@ use std::borrow::Cow; use std::marker::PhantomData; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; +use indexmap::IndexMap; use serde::Serialize; use utoipa::ToSchema; @@ -15,28 +17,60 @@ pub trait Step: 'static + Send + Sync { #[derive(Clone, Default)] pub struct Progress { - steps: Arc)>>>, + steps: Arc>, +} + +#[derive(Default)] +struct InnerProgress { + /// The hierarchy of progress steps. + steps: Vec<(TypeId, Box, Instant)>, + /// The durations associated to the top level steps (*first*). + durations: Vec<(String, Duration)>, +} + +fn name_from_steps<'a, I>(steps: I) -> String +where + I: Iterator> + ExactSizeIterator, +{ + let len = steps.len(); + let mut name = String::new(); + for (i, step) in steps.into_iter().enumerate() { + name.push_str(&step.name()); + if i + 1 < len { + name.push_str(" > "); + } + } + name } impl Progress { pub fn update_progress(&self, sub_progress: P) { - let mut steps = self.steps.write().unwrap(); + let mut inner = self.steps.write().unwrap(); + let InnerProgress { steps, durations } = &mut *inner; + + let now = Instant::now(); let step_type = TypeId::of::

(); - if let Some(idx) = steps.iter().position(|(id, _)| *id == step_type) { + if let Some(idx) = steps.iter().position(|(id, _, _)| *id == step_type) { + for (i, (_, _, started_at)) in steps[idx..].iter().enumerate() { + let full_name = name_from_steps(steps.iter().take(idx + i + 1).map(|(_, s, _)| s)); + durations.push((full_name, now.duration_since(*started_at))); + } steps.truncate(idx); } - steps.push((step_type, Box::new(sub_progress))); + + steps.push((step_type, Box::new(sub_progress), now)); } // TODO: This code should be in meilisearch_types but cannot because milli can't depend on meilisearch_types pub fn as_progress_view(&self) -> ProgressView { - let steps = self.steps.read().unwrap(); + let inner = self.steps.read().unwrap(); + let InnerProgress { steps, .. } = &*inner; let mut percentage = 0.0; let mut prev_factors = 1.0; let mut step_view = Vec::with_capacity(steps.len()); - for (_, step) in steps.iter() { + for (_, step, _) in steps.iter() { prev_factors *= step.total() as f32; percentage += step.current() as f32 / prev_factors; @@ -49,6 +83,19 @@ impl Progress { ProgressView { steps: step_view, percentage: percentage * 100.0 } } + + pub fn accumulated_durations(&self) -> IndexMap { + let mut inner = self.steps.write().unwrap(); + let InnerProgress { steps, durations, .. } = &mut *inner; + + let now = Instant::now(); + for (i, (_, _, started_at)) in steps.iter().enumerate() { + let full_name = name_from_steps(steps.iter().take(i + 1).map(|(_, s, _)| s)); + durations.push((full_name, now.duration_since(*started_at))); + } + + durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect() + } } /// This trait lets you use the AtomicSubStep defined right below. @@ -164,7 +211,7 @@ pub struct ProgressStepView { /// Used when the name can change but it's still the same step. /// To avoid conflicts on the `TypeId`, create a unique type every time you use this step: /// ```text -/// enum UpgradeVersion {} +/// enum UpgradeVersion {} /// /// progress.update_progress(VariableNameStep::::new( /// "v1 to v2", From 4a058a080ed5669e25fb506c728cf55332bb3654 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 18 Feb 2025 18:48:44 +0100 Subject: [PATCH 02/12] Simplify the name generation --- crates/milli/src/progress.rs | 43 +++++++++++++++--------------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index 884f49241..d6fc65888 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use indexmap::IndexMap; +use itertools::Itertools; use serde::Serialize; use utoipa::ToSchema; @@ -22,27 +23,12 @@ pub struct Progress { #[derive(Default)] struct InnerProgress { - /// The hierarchy of progress steps. + /// The hierarchy of steps. steps: Vec<(TypeId, Box, Instant)>, - /// The durations associated to the top level steps (*first*). + /// The durations associated to each steps. durations: Vec<(String, Duration)>, } -fn name_from_steps<'a, I>(steps: I) -> String -where - I: Iterator> + ExactSizeIterator, -{ - let len = steps.len(); - let mut name = String::new(); - for (i, step) in steps.into_iter().enumerate() { - name.push_str(&step.name()); - if i + 1 < len { - name.push_str(" > "); - } - } - name -} - impl Progress { pub fn update_progress(&self, sub_progress: P) { let mut inner = self.steps.write().unwrap(); @@ -51,10 +37,7 @@ impl Progress { let now = Instant::now(); let step_type = TypeId::of::

(); if let Some(idx) = steps.iter().position(|(id, _, _)| *id == step_type) { - for (i, (_, _, started_at)) in steps[idx..].iter().enumerate() { - let full_name = name_from_steps(steps.iter().take(idx + i + 1).map(|(_, s, _)| s)); - durations.push((full_name, now.duration_since(*started_at))); - } + push_steps_durations(steps, durations, now, idx); steps.truncate(idx); } @@ -89,15 +72,25 @@ impl Progress { let InnerProgress { steps, durations, .. } = &mut *inner; let now = Instant::now(); - for (i, (_, _, started_at)) in steps.iter().enumerate() { - let full_name = name_from_steps(steps.iter().take(i + 1).map(|(_, s, _)| s)); - durations.push((full_name, now.duration_since(*started_at))); - } + push_steps_durations(steps, durations, now, 0); durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect() } } +/// Generate the names associated with the durations and push them. +fn push_steps_durations( + steps: &[(TypeId, Box, Instant)], + durations: &mut Vec<(String, Duration)>, + now: Instant, + idx: usize, +) { + for (i, (_, _, started_at)) in steps.iter().skip(idx).enumerate() { + let full_name = steps.iter().take(idx + i + 1).map(|(_, s, _)| s.name()).join(" > "); + durations.push((full_name, now.duration_since(*started_at))); + } +} + /// This trait lets you use the AtomicSubStep defined right below. /// The name must be a const that never changed but that can't be enforced by the type system because it make the trait non object-safe. /// By forcing the Default trait + the &'static str we make it harder to miss-use the trait. From e9add141893317bc755607b9e2691aebf564c147 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 18 Feb 2025 19:26:41 +0100 Subject: [PATCH 03/12] Reorder steps --- crates/milli/src/progress.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index d6fc65888..f8cd4b4cc 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -85,7 +85,7 @@ fn push_steps_durations( now: Instant, idx: usize, ) { - for (i, (_, _, started_at)) in steps.iter().skip(idx).enumerate() { + for (i, (_, _, started_at)) in steps.iter().skip(idx).enumerate().rev() { let full_name = steps.iter().take(idx + i + 1).map(|(_, s, _)| s.name()).join(" > "); durations.push((full_name, now.duration_since(*started_at))); } From 3ff1de0a21700c5eb1cd3f855ae14a698d4d83e3 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 19 Feb 2025 11:24:11 +0100 Subject: [PATCH 04/12] Expose the call trace in the batch stats --- Cargo.lock | 4 ++-- crates/dump/src/lib.rs | 1 + crates/index-scheduler/Cargo.toml | 2 +- crates/index-scheduler/src/scheduler/mod.rs | 4 +++- crates/meilisearch-types/Cargo.toml | 4 ++-- crates/meilisearch-types/src/batches.rs | 1 + 6 files changed, 10 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6a42ffa26..4886dc028 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5160,9 +5160,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.135" +version = "1.0.138" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b0d7ba2887406110130a978386c4e1befb98c674b4fba677954e4db976630d9" +checksum = "d434192e7da787e94a6ea7e9670b26a036d0ca41e0b7efb2676dd32bae872949" dependencies = [ "indexmap", "itoa", diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index 905a6485d..c3861f6a1 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -321,6 +321,7 @@ pub(crate) mod test { status: maplit::btreemap! { Status::Succeeded => 1 }, types: maplit::btreemap! { Kind::DocumentAdditionOrUpdate => 1 }, index_uids: maplit::btreemap! { "doggo".to_string() => 1 }, + call_trace: Default::default(), }, enqueued_at: Some(BatchEnqueuedAt { earliest: datetime!(2022-11-11 0:00 UTC), diff --git a/crates/index-scheduler/Cargo.toml b/crates/index-scheduler/Cargo.toml index 69edace77..881460d86 100644 --- a/crates/index-scheduler/Cargo.toml +++ b/crates/index-scheduler/Cargo.toml @@ -29,7 +29,7 @@ page_size = "0.6.0" rayon = "1.10.0" roaring = { version = "0.10.10", features = ["serde"] } serde = { version = "1.0.217", features = ["derive"] } -serde_json = { version = "1.0.135", features = ["preserve_order"] } +serde_json = { version = "1.0.138", features = ["preserve_order"] } synchronoise = "1.0.1" tempfile = "3.15.0" thiserror = "2.0.9" diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index a7937ddd3..08eb1c317 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -339,7 +339,9 @@ impl IndexScheduler { // We must re-add the canceled task so they're part of the same batch. ids |= canceled; - eprintln!("{:#?}", progress.accumulated_durations()); + let durations = progress.accumulated_durations(); + processing_batch.stats.call_trace = + durations.into_iter().map(|(k, v)| (k, v.into())).collect(); self.queue.write_batch(&mut wtxn, processing_batch, &ids)?; #[cfg(test)] diff --git a/crates/meilisearch-types/Cargo.toml b/crates/meilisearch-types/Cargo.toml index ce36c826b..c128e3d59 100644 --- a/crates/meilisearch-types/Cargo.toml +++ b/crates/meilisearch-types/Cargo.toml @@ -14,6 +14,7 @@ license.workspace = true actix-web = { version = "4.9.0", default-features = false } anyhow = "1.0.95" bumpalo = "3.16.0" +bumparaw-collections = "0.1.4" convert_case = "0.6.0" csv = "1.3.1" deserr = { version = "0.6.3", features = ["actix-web"] } @@ -24,12 +25,11 @@ flate2 = "1.0.35" fst = "0.4.7" memmap2 = "0.9.5" milli = { path = "../milli" } -bumparaw-collections = "0.1.4" roaring = { version = "0.10.10", features = ["serde"] } rustc-hash = "2.1.0" serde = { version = "1.0.217", features = ["derive"] } serde-cs = "0.2.4" -serde_json = "1.0.135" +serde_json = { version = "1.0.135", features = ["preserve_order"] } tar = "0.4.43" tempfile = "3.15.0" thiserror = "2.0.9" diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 663f5cb8d..05463b758 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -60,4 +60,5 @@ pub struct BatchStats { pub status: BTreeMap, pub types: BTreeMap, pub index_uids: BTreeMap, + pub call_trace: serde_json::Map, } From 05cc8c650cecaf1d7753803c8b8284b84a845476 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 19 Feb 2025 14:00:21 +0100 Subject: [PATCH 05/12] Expose the write channel congestion in the batches --- crates/dump/src/lib.rs | 1 + crates/index-scheduler/src/scheduler/mod.rs | 16 ++- .../src/scheduler/process_batch.rs | 29 +++-- .../src/scheduler/process_index_operation.rs | 118 ++++++++++-------- crates/meilisearch-types/src/batches.rs | 2 + crates/milli/src/lib.rs | 1 + crates/milli/src/update/mod.rs | 1 + .../milli/src/update/new/indexer/extract.rs | 2 +- crates/milli/src/update/new/indexer/mod.rs | 12 +- crates/milli/src/update/new/indexer/write.rs | 43 ++++--- crates/milli/src/update/new/mod.rs | 1 + crates/milli/src/update/new/steps.rs | 4 +- 12 files changed, 138 insertions(+), 92 deletions(-) diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index c3861f6a1..70213ce69 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -322,6 +322,7 @@ pub(crate) mod test { types: maplit::btreemap! { Kind::DocumentAdditionOrUpdate => 1 }, index_uids: maplit::btreemap! { "doggo".to_string() => 1 }, call_trace: Default::default(), + write_channel_congestion: None, }, enqueued_at: Some(BatchEnqueuedAt { earliest: datetime!(2022-11-11 0:00 UTC), diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index 08eb1c317..bcf53127b 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -215,14 +215,16 @@ impl IndexScheduler { let mut stop_scheduler_forever = false; let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; let mut canceled = RoaringBitmap::new(); + let mut congestion = None; match res { - Ok(tasks) => { + Ok((tasks, cong)) => { #[cfg(test)] self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded); let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32); progress.update_progress(task_progress_obj); + congestion = cong; let mut success = 0; let mut failure = 0; let mut canceled_by = None; @@ -339,9 +341,17 @@ impl IndexScheduler { // We must re-add the canceled task so they're part of the same batch. ids |= canceled; - let durations = progress.accumulated_durations(); + processing_batch.stats.call_trace = - durations.into_iter().map(|(k, v)| (k, v.into())).collect(); + progress.accumulated_durations().into_iter().map(|(k, v)| (k, v.into())).collect(); + processing_batch.stats.write_channel_congestion = congestion.map(|congestion| { + let mut congestion_info = serde_json::Map::new(); + congestion_info.insert("attempts".into(), congestion.attempts.into()); + congestion_info.insert("blocking_attempts".into(), congestion.blocking_attempts.into()); + congestion_info.insert("blocking_ratio".into(), congestion.congestion_ratio().into()); + congestion_info + }); + self.queue.write_batch(&mut wtxn, processing_batch, &ids)?; #[cfg(test)] diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 21233429c..8f3987bf6 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -5,7 +5,7 @@ use std::sync::atomic::Ordering; use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; -use meilisearch_types::milli::{self}; +use meilisearch_types::milli::{self, ChannelCongestion}; use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task}; use milli::update::Settings as MilliSettings; use roaring::RoaringBitmap; @@ -35,7 +35,7 @@ impl IndexScheduler { batch: Batch, current_batch: &mut ProcessingBatch, progress: Progress, - ) -> Result> { + ) -> Result<(Vec, Option)> { #[cfg(test)] { self.maybe_fail(crate::test_utils::FailureLocation::InsideProcessBatch)?; @@ -76,7 +76,7 @@ impl IndexScheduler { canceled_tasks.push(task); - Ok(canceled_tasks) + Ok((canceled_tasks, None)) } Batch::TaskDeletions(mut tasks) => { // 1. Retrieve the tasks that matched the query at enqueue-time. @@ -115,10 +115,14 @@ impl IndexScheduler { _ => unreachable!(), } } - Ok(tasks) + Ok((tasks, None)) + } + Batch::SnapshotCreation(tasks) => { + self.process_snapshot(progress, tasks).map(|tasks| (tasks, None)) + } + Batch::Dump(task) => { + self.process_dump_creation(progress, task).map(|tasks| (tasks, None)) } - Batch::SnapshotCreation(tasks) => self.process_snapshot(progress, tasks), - Batch::Dump(task) => self.process_dump_creation(progress, task), Batch::IndexOperation { op, must_create_index } => { let index_uid = op.index_uid().to_string(); let index = if must_create_index { @@ -135,7 +139,8 @@ impl IndexScheduler { .set_currently_updating_index(Some((index_uid.clone(), index.clone()))); let mut index_wtxn = index.write_txn()?; - let tasks = self.apply_index_operation(&mut index_wtxn, &index, op, progress)?; + let (tasks, congestion) = + self.apply_index_operation(&mut index_wtxn, &index, op, progress)?; { let span = tracing::trace_span!(target: "indexing::scheduler", "commit"); @@ -166,7 +171,7 @@ impl IndexScheduler { ), } - Ok(tasks) + Ok((tasks, congestion)) } Batch::IndexCreation { index_uid, primary_key, task } => { progress.update_progress(CreateIndexProgress::CreatingTheIndex); @@ -234,7 +239,7 @@ impl IndexScheduler { ), } - Ok(vec![task]) + Ok((vec![task], None)) } Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => { progress.update_progress(DeleteIndexProgress::DeletingTheIndex); @@ -268,7 +273,7 @@ impl IndexScheduler { }; } - Ok(tasks) + Ok((tasks, None)) } Batch::IndexSwap { mut task } => { progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap); @@ -316,7 +321,7 @@ impl IndexScheduler { } wtxn.commit()?; task.status = Status::Succeeded; - Ok(vec![task]) + Ok((vec![task], None)) } Batch::UpgradeDatabase { mut tasks } => { let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else { @@ -346,7 +351,7 @@ impl IndexScheduler { task.error = None; } - Ok(tasks) + Ok((tasks, None)) } } } diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 630ab62e4..690fe2efd 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -5,7 +5,7 @@ use meilisearch_types::milli::documents::PrimaryKey; use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; use meilisearch_types::milli::update::DocumentAdditionResult; -use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder}; +use meilisearch_types::milli::{self, ChannelCongestion, Filter, ThreadPoolNoAbortBuilder}; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; use meilisearch_types::Index; @@ -33,9 +33,8 @@ impl IndexScheduler { index: &'i Index, operation: IndexOperation, progress: Progress, - ) -> Result> { + ) -> Result<(Vec, Option)> { let indexer_alloc = Bump::new(); - let started_processing_at = std::time::Instant::now(); let must_stop_processing = self.scheduler.must_stop_processing.clone(); @@ -60,7 +59,7 @@ impl IndexScheduler { }; } - Ok(tasks) + Ok((tasks, None)) } IndexOperation::DocumentOperation { index_uid, primary_key, operations, mut tasks } => { progress.update_progress(DocumentOperationProgress::RetrievingConfig); @@ -173,21 +172,24 @@ impl IndexScheduler { } progress.update_progress(DocumentOperationProgress::Indexing); + let mut congestion = None; if tasks.iter().any(|res| res.error.is_none()) { - indexer::index( - index_wtxn, - index, - pool, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| must_stop_processing.get(), - &progress, - ) - .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; + congestion = Some( + indexer::index( + index_wtxn, + index, + pool, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| must_stop_processing.get(), + &progress, + ) + .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?, + ); let addition = DocumentAdditionResult { indexed_documents: candidates_count, @@ -199,7 +201,7 @@ impl IndexScheduler { tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } - Ok(tasks) + Ok((tasks, congestion)) } IndexOperation::DocumentEdition { index_uid, mut task } => { progress.update_progress(DocumentEditionProgress::RetrievingConfig); @@ -247,7 +249,7 @@ impl IndexScheduler { edited_documents: Some(0), }); - return Ok(vec![task]); + return Ok((vec![task], None)); } let rtxn = index.read_txn()?; @@ -262,6 +264,7 @@ impl IndexScheduler { let result_count = Ok((candidates.len(), candidates.len())) as Result<_>; + let mut congestion = None; if task.error.is_none() { let local_pool; let indexer_config = self.index_mapper.indexer_config(); @@ -292,20 +295,22 @@ impl IndexScheduler { let embedders = self.embedders(index_uid.clone(), embedders)?; progress.update_progress(DocumentEditionProgress::Indexing); - indexer::index( - index_wtxn, - index, - pool, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - None, // cannot change primary key in DocumentEdition - &document_changes, - embedders, - &|| must_stop_processing.get(), - &progress, - ) - .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; + congestion = Some( + indexer::index( + index_wtxn, + index, + pool, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + None, // cannot change primary key in DocumentEdition + &document_changes, + embedders, + &|| must_stop_processing.get(), + &progress, + ) + .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?, + ); let addition = DocumentAdditionResult { indexed_documents: candidates_count, @@ -341,7 +346,7 @@ impl IndexScheduler { } } - Ok(vec![task]) + Ok((vec![task], congestion)) } IndexOperation::DocumentDeletion { mut tasks, index_uid } => { progress.update_progress(DocumentDeletionProgress::RetrievingConfig); @@ -408,7 +413,7 @@ impl IndexScheduler { } if to_delete.is_empty() { - return Ok(tasks); + return Ok((tasks, None)); } let rtxn = index.read_txn()?; @@ -422,6 +427,7 @@ impl IndexScheduler { PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map) .map_err(|err| Error::from_milli(err.into(), Some(index_uid.clone())))?; + let mut congestion = None; if !tasks.iter().all(|res| res.error.is_some()) { let local_pool; let indexer_config = self.index_mapper.indexer_config(); @@ -447,20 +453,22 @@ impl IndexScheduler { let embedders = self.embedders(index_uid.clone(), embedders)?; progress.update_progress(DocumentDeletionProgress::Indexing); - indexer::index( - index_wtxn, - index, - pool, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - None, // document deletion never changes primary key - &document_changes, - embedders, - &|| must_stop_processing.get(), - &progress, - ) - .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; + congestion = Some( + indexer::index( + index_wtxn, + index, + pool, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + None, // document deletion never changes primary key + &document_changes, + embedders, + &|| must_stop_processing.get(), + &progress, + ) + .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?, + ); let addition = DocumentAdditionResult { indexed_documents: candidates_count, @@ -472,7 +480,7 @@ impl IndexScheduler { tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } - Ok(tasks) + Ok((tasks, congestion)) } IndexOperation::Settings { index_uid, settings, mut tasks } => { progress.update_progress(SettingsProgress::RetrievingAndMergingTheSettings); @@ -497,7 +505,7 @@ impl IndexScheduler { ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; - Ok(tasks) + Ok((tasks, None)) } IndexOperation::DocumentClearAndSetting { index_uid, @@ -505,7 +513,7 @@ impl IndexScheduler { settings, settings_tasks, } => { - let mut import_tasks = self.apply_index_operation( + let (mut import_tasks, _congestion) = self.apply_index_operation( index_wtxn, index, IndexOperation::DocumentClear { @@ -515,7 +523,7 @@ impl IndexScheduler { progress.clone(), )?; - let settings_tasks = self.apply_index_operation( + let (settings_tasks, _congestion) = self.apply_index_operation( index_wtxn, index, IndexOperation::Settings { index_uid, settings, tasks: settings_tasks }, @@ -524,7 +532,7 @@ impl IndexScheduler { let mut tasks = settings_tasks; tasks.append(&mut import_tasks); - Ok(tasks) + Ok((tasks, None)) } } } diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 05463b758..40309ce06 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -61,4 +61,6 @@ pub struct BatchStats { pub types: BTreeMap, pub index_uids: BTreeMap, pub call_trace: serde_json::Map, + #[serde(skip_serializing_if = "Option::is_none")] + pub write_channel_congestion: Option>, } diff --git a/crates/milli/src/lib.rs b/crates/milli/src/lib.rs index ea88d2b78..bb1532c1a 100644 --- a/crates/milli/src/lib.rs +++ b/crates/milli/src/lib.rs @@ -73,6 +73,7 @@ pub use self::search::{ FacetDistribution, Filter, FormatOptions, MatchBounds, MatcherBuilder, MatchingWords, OrderBy, Search, SearchResult, SemanticSearch, TermsMatchingStrategy, DEFAULT_VALUES_PER_FACET, }; +pub use self::update::ChannelCongestion; pub type Result = std::result::Result; diff --git a/crates/milli/src/update/mod.rs b/crates/milli/src/update/mod.rs index 68268db35..9a783ffd2 100644 --- a/crates/milli/src/update/mod.rs +++ b/crates/milli/src/update/mod.rs @@ -5,6 +5,7 @@ pub use self::facet::bulk::FacetsUpdateBulk; pub use self::facet::incremental::FacetsUpdateIncrementalInner; pub use self::index_documents::*; pub use self::indexer_config::IndexerConfig; +pub use self::new::ChannelCongestion; pub use self::settings::{validate_embedding_settings, Setting, Settings}; pub use self::update_step::UpdateIndexingStep; pub use self::word_prefix_docids::WordPrefixDocids; diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index 53478f029..1606851cb 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -291,7 +291,7 @@ where &indexing_context.must_stop_processing, )?; } - indexing_context.progress.update_progress(IndexingStep::WritingToDatabase); + indexing_context.progress.update_progress(IndexingStep::TailWritingToDatabase); finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); Result::Ok((facet_field_ids_delta, index_embeddings)) diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 890191323..58c60d502 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -10,6 +10,7 @@ use hashbrown::HashMap; use heed::RwTxn; pub use partial_dump::PartialDump; pub use update_by_function::UpdateByFunction; +pub use write::ChannelCongestion; use write::{build_vectors, update_index, write_to_db}; use super::channel::*; @@ -53,7 +54,7 @@ pub fn index<'pl, 'indexer, 'index, DC, MSP>( embedders: EmbeddingConfigs, must_stop_processing: &'indexer MSP, progress: &'indexer Progress, -) -> Result<()> +) -> Result where DC: DocumentChanges<'pl>, MSP: Fn() -> bool + Sync, @@ -130,7 +131,7 @@ where let mut field_distribution = index.field_distribution(wtxn)?; let mut document_ids = index.documents_ids(wtxn)?; - thread::scope(|s| -> Result<()> { + let congestion = thread::scope(|s| -> Result { let indexer_span = tracing::Span::current(); let embedders = &embedders; let finished_extraction = &finished_extraction; @@ -182,7 +183,8 @@ where let mut arroy_writers = arroy_writers?; - write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?; + let congestion = + write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?; indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors); @@ -210,7 +212,7 @@ where indexing_context.progress.update_progress(IndexingStep::Finalizing); - Ok(()) as Result<_> + Ok(congestion) as Result<_> })?; // required to into_inner the new_fields_ids_map @@ -227,5 +229,5 @@ where document_ids, )?; - Ok(()) + Ok(congestion) } diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index 707599ba3..c7e449243 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -14,13 +14,13 @@ use crate::update::settings::InnerIndexSettings; use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings}; use crate::{Error, Index, InternalError, Result}; -pub(super) fn write_to_db( +pub fn write_to_db( mut writer_receiver: WriterBbqueueReceiver<'_>, finished_extraction: &AtomicBool, index: &Index, wtxn: &mut RwTxn<'_>, arroy_writers: &HashMap, -) -> Result<()> { +) -> Result { // Used by by the ArroySetVector to copy the embedding into an // aligned memory area, required by arroy to accept a new vector. let mut aligned_embedding = Vec::new(); @@ -75,21 +75,36 @@ pub(super) fn write_to_db( write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?; - let direct_attempts = writer_receiver.sent_messages_attempts(); - let blocking_attempts = writer_receiver.blocking_sent_messages_attempts(); - let congestion_pct = (blocking_attempts as f64 / direct_attempts as f64) * 100.0; - tracing::debug!( - "Channel congestion metrics - \ - Attempts: {direct_attempts}, \ - Blocked attempts: {blocking_attempts} \ - ({congestion_pct:.1}% congestion)" - ); + Ok(ChannelCongestion { + attempts: writer_receiver.sent_messages_attempts(), + blocking_attempts: writer_receiver.blocking_sent_messages_attempts(), + }) +} - Ok(()) +/// Stats exposing the congestion of a channel. +#[derive(Debug, Copy, Clone)] +pub struct ChannelCongestion { + /// Number of attempts to send a message into the bbqueue buffer. + pub attempts: usize, + /// Number of blocking attempts which require a retry. + pub blocking_attempts: usize, +} + +impl ChannelCongestion { + pub fn congestion_ratio(&self) -> f32 { + // tracing::debug!( + // "Channel congestion metrics - \ + // Attempts: {direct_attempts}, \ + // Blocked attempts: {blocking_attempts} \ + // ({congestion_pct:.1}% congestion)" + // ); + + self.blocking_attempts as f32 / self.attempts as f32 + } } #[tracing::instrument(level = "debug", skip_all, target = "indexing::vectors")] -pub(super) fn build_vectors( +pub fn build_vectors( index: &Index, wtxn: &mut RwTxn<'_>, index_embeddings: Vec, @@ -113,7 +128,7 @@ where Ok(()) } -pub(super) fn update_index( +pub fn update_index( index: &Index, wtxn: &mut RwTxn<'_>, new_fields_ids_map: FieldIdMapWithMetadata, diff --git a/crates/milli/src/update/new/mod.rs b/crates/milli/src/update/new/mod.rs index b7e08a461..81ff93e54 100644 --- a/crates/milli/src/update/new/mod.rs +++ b/crates/milli/src/update/new/mod.rs @@ -1,4 +1,5 @@ pub use document_change::{Deletion, DocumentChange, Insertion, Update}; +pub use indexer::ChannelCongestion; pub use merger::{ merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta, }; diff --git a/crates/milli/src/update/new/steps.rs b/crates/milli/src/update/new/steps.rs index 9eb7d376d..38964d8ec 100644 --- a/crates/milli/src/update/new/steps.rs +++ b/crates/milli/src/update/new/steps.rs @@ -14,7 +14,7 @@ pub enum IndexingStep { ExtractingWordProximity, ExtractingEmbeddings, WritingGeoPoints, - WritingToDatabase, + TailWritingToDatabase, WaitingForExtractors, WritingEmbeddingsToDatabase, PostProcessingFacets, @@ -32,7 +32,7 @@ impl Step for IndexingStep { IndexingStep::ExtractingWordProximity => "extracting word proximity", IndexingStep::ExtractingEmbeddings => "extracting embeddings", IndexingStep::WritingGeoPoints => "writing geo points", - IndexingStep::WritingToDatabase => "writing to database", + IndexingStep::TailWritingToDatabase => "tail writing to database", IndexingStep::WaitingForExtractors => "waiting for extractors", IndexingStep::WritingEmbeddingsToDatabase => "writing embeddings to database", IndexingStep::PostProcessingFacets => "post-processing facets", From 1d99c8465c2538b241dd4b12904ea8a9a58f69cb Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 20 Feb 2025 10:03:48 +0100 Subject: [PATCH 06/12] Hide the batch stats to make insta pass --- crates/index-scheduler/src/insta_snapshot.rs | 9 +++++++-- crates/meilisearch-types/src/batches.rs | 3 ++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index bb8827fdc..7b667c67b 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -1,7 +1,7 @@ use std::collections::BTreeSet; use std::fmt::Write; -use meilisearch_types::batches::{Batch, BatchEnqueuedAt}; +use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchStats}; use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{Database, RoTxn}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; @@ -342,6 +342,11 @@ pub fn snapshot_canceled_by(rtxn: &RoTxn, db: Database String { let mut snap = String::new(); let Batch { uid, details, stats, started_at, finished_at, progress: _, enqueued_at } = batch; + let stats = BatchStats { + call_trace: Default::default(), + write_channel_congestion: None, + ..stats.clone() + }; if let Some(finished_at) = finished_at { assert!(finished_at > started_at); } @@ -352,7 +357,7 @@ pub fn snapshot_batch(batch: &Batch) -> String { snap.push('{'); snap.push_str(&format!("uid: {uid}, ")); snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap())); - snap.push_str(&format!("stats: {}, ", serde_json::to_string(stats).unwrap())); + snap.push_str(&format!("stats: {}, ", serde_json::to_string(&stats).unwrap())); snap.push('}'); snap } diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 40309ce06..5ec9cbd41 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -60,7 +60,8 @@ pub struct BatchStats { pub status: BTreeMap, pub types: BTreeMap, pub index_uids: BTreeMap, + #[serde(default, skip_serializing_if = "serde_json::Map::is_empty")] pub call_trace: serde_json::Map, - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default, skip_serializing_if = "Option::is_none")] pub write_channel_congestion: Option>, } From 1b1172ad16a07c38c84f8efaa6e184a603139d07 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 20 Feb 2025 10:44:53 +0100 Subject: [PATCH 07/12] Fix dump tests --- crates/meilisearch/tests/batches/mod.rs | 181 ++++++++++++++++-- crates/meilisearch/tests/dumps/mod.rs | 8 +- .../tests/upgrade/v1_12/v1_12_0.rs | 2 +- 3 files changed, 169 insertions(+), 22 deletions(-) diff --git a/crates/meilisearch/tests/batches/mod.rs b/crates/meilisearch/tests/batches/mod.rs index 6ef40be8e..dac1a2339 100644 --- a/crates/meilisearch/tests/batches/mod.rs +++ b/crates/meilisearch/tests/batches/mod.rs @@ -275,7 +275,14 @@ async fn test_summarized_document_addition_or_update() { index.wait_task(task.uid()).await.succeeded(); let (batch, _) = index.get_batch(0).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 0, @@ -307,7 +314,14 @@ async fn test_summarized_document_addition_or_update() { index.wait_task(task.uid()).await.succeeded(); let (batch, _) = index.get_batch(1).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 1, @@ -343,7 +357,14 @@ async fn test_summarized_delete_documents_by_batch() { index.wait_task(task.uid()).await.failed(); let (batch, _) = index.get_batch(0).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 0, @@ -375,7 +396,14 @@ async fn test_summarized_delete_documents_by_batch() { index.wait_task(task.uid()).await.succeeded(); let (batch, _) = index.get_batch(2).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 2, @@ -413,7 +441,14 @@ async fn test_summarized_delete_documents_by_filter() { index.wait_task(task.uid()).await.failed(); let (batch, _) = index.get_batch(0).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 0, @@ -447,7 +482,14 @@ async fn test_summarized_delete_documents_by_filter() { index.wait_task(task.uid()).await.failed(); let (batch, _) = index.get_batch(2).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 2, @@ -481,7 +523,14 @@ async fn test_summarized_delete_documents_by_filter() { index.wait_task(task.uid()).await.succeeded(); let (batch, _) = index.get_batch(4).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 4, @@ -549,7 +598,14 @@ async fn test_summarized_delete_document_by_id() { index.wait_task(task.uid()).await.succeeded(); let (batch, _) = index.get_batch(2).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 2, @@ -597,7 +653,14 @@ async fn test_summarized_settings_update() { index.wait_task(task.uid()).await.succeeded(); let (batch, _) = index.get_batch(0).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 0, @@ -642,7 +705,14 @@ async fn test_summarized_index_creation() { index.wait_task(task.uid()).await.succeeded(); let (batch, _) = index.get_batch(0).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 0, @@ -670,7 +740,14 @@ async fn test_summarized_index_creation() { index.wait_task(task.uid()).await.failed(); let (batch, _) = index.get_batch(1).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 1, @@ -815,7 +892,14 @@ async fn test_summarized_index_update() { index.wait_task(task.uid()).await.failed(); let (batch, _) = index.get_batch(0).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 0, @@ -843,7 +927,14 @@ async fn test_summarized_index_update() { index.wait_task(task.uid()).await.failed(); let (batch, _) = index.get_batch(1).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 1, @@ -876,7 +967,14 @@ async fn test_summarized_index_update() { index.wait_task(task.uid()).await.succeeded(); let (batch, _) = index.get_batch(3).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 3, @@ -904,7 +1002,14 @@ async fn test_summarized_index_update() { index.wait_task(task.uid()).await.succeeded(); let (batch, _) = index.get_batch(4).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 4, @@ -942,7 +1047,14 @@ async fn test_summarized_index_swap() { server.wait_task(task.uid()).await.failed(); let (batch, _) = server.get_batch(0).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 0, @@ -983,7 +1095,14 @@ async fn test_summarized_index_swap() { server.wait_task(task.uid()).await.succeeded(); let (batch, _) = server.get_batch(1).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 1, @@ -1019,7 +1138,14 @@ async fn test_summarized_batch_cancelation() { index.wait_task(task.uid()).await.succeeded(); let (batch, _) = index.get_batch(1).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 1, @@ -1057,7 +1183,14 @@ async fn test_summarized_batch_deletion() { index.wait_task(task.uid()).await.succeeded(); let (batch, _) = index.get_batch(1).await; assert_json_snapshot!(batch, - { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 1, @@ -1091,7 +1224,15 @@ async fn test_summarized_dump_creation() { server.wait_task(task.uid()).await; let (batch, _) = server.get_batch(0).await; assert_json_snapshot!(batch, - { ".details.dumpUid" => "[dumpUid]", ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }, + { + ".details.dumpUid" => "[dumpUid]", + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": 0, diff --git a/crates/meilisearch/tests/dumps/mod.rs b/crates/meilisearch/tests/dumps/mod.rs index 1b07afdfd..6d83c9be5 100644 --- a/crates/meilisearch/tests/dumps/mod.rs +++ b/crates/meilisearch/tests/dumps/mod.rs @@ -2202,7 +2202,13 @@ async fn import_dump_v6_containing_batches_and_enqueued_tasks() { let (tasks, _) = server.tasks().await; snapshot!(json_string!(tasks, { ".results[1].startedAt" => "[date]", ".results[1].finishedAt" => "[date]", ".results[1].duration" => "[date]" }), name: "tasks"); let (batches, _) = server.batches().await; - snapshot!(json_string!(batches, { ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].duration" => "[date]" }), name: "batches"); + snapshot!(json_string!(batches, { + ".results[0].startedAt" => "[date]", + ".results[0].finishedAt" => "[date]", + ".results[0].duration" => "[date]", + ".results[0].stats.callTrace" => "[callTrace]", + ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]", + }), name: "batches"); let (indexes, code) = server.list_indexes(None, None).await; assert_eq!(code, 200, "{indexes}"); diff --git a/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs b/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs index 6aab2861a..2e71450d8 100644 --- a/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs +++ b/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs @@ -129,7 +129,7 @@ async fn check_the_index_scheduler(server: &Server) { snapshot!(stats, @r###" { "databaseSize": 438272, - "usedDatabaseSize": 196608, + "usedDatabaseSize": 200704, "lastUpdate": "2025-01-23T11:36:22.634859166Z", "indexes": { "kefir": { From 9d314ace09fa5d376f3b5e7c30f894b634f7c445 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 20 Feb 2025 11:25:50 +0100 Subject: [PATCH 08/12] Fix the insta tests --- crates/meilisearch/tests/batches/mod.rs | 144 +++++++++++------- .../batches.snap | 5 +- ...rEnqueuedAt_equal_2025-01-16T16_47_41.snap | 3 +- ...rFinishedAt_equal_2025-01-16T16_47_41.snap | 3 +- ...erStartedAt_equal_2025-01-16T16_47_41.snap | 3 +- ...ue_once_everything_has_been_processed.snap | 4 +- .../tests/upgrade/v1_12/v1_12_0.rs | 24 +-- 7 files changed, 111 insertions(+), 75 deletions(-) diff --git a/crates/meilisearch/tests/batches/mod.rs b/crates/meilisearch/tests/batches/mod.rs index dac1a2339..2f9fbbca7 100644 --- a/crates/meilisearch/tests/batches/mod.rs +++ b/crates/meilisearch/tests/batches/mod.rs @@ -283,7 +283,7 @@ async fn test_summarized_document_addition_or_update() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 0, "progress": null, @@ -301,13 +301,15 @@ async fn test_summarized_document_addition_or_update() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]", + "writeChannelCongestion": "[writeChannelCongestion]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); let (task, _status_code) = index.add_documents(json!({ "id": 42, "content": "doggos & fluff" }), Some("id")).await; @@ -322,7 +324,7 @@ async fn test_summarized_document_addition_or_update() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 1, "progress": null, @@ -340,13 +342,15 @@ async fn test_summarized_document_addition_or_update() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]", + "writeChannelCongestion": "[writeChannelCongestion]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); } #[actix_web::test] @@ -365,7 +369,7 @@ async fn test_summarized_delete_documents_by_batch() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 0, "progress": null, @@ -383,13 +387,14 @@ async fn test_summarized_delete_documents_by_batch() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); index.create(None).await; let (task, _status_code) = index.delete_batch(vec![42]).await; @@ -404,7 +409,7 @@ async fn test_summarized_delete_documents_by_batch() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 2, "progress": null, @@ -422,13 +427,14 @@ async fn test_summarized_delete_documents_by_batch() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); } #[actix_web::test] @@ -449,7 +455,7 @@ async fn test_summarized_delete_documents_by_filter() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 0, "progress": null, @@ -468,13 +474,14 @@ async fn test_summarized_delete_documents_by_filter() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); index.create(None).await; let (task, _status_code) = @@ -490,7 +497,7 @@ async fn test_summarized_delete_documents_by_filter() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 2, "progress": null, @@ -509,13 +516,14 @@ async fn test_summarized_delete_documents_by_filter() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); index.update_settings(json!({ "filterableAttributes": ["doggo"] })).await; let (task, _status_code) = @@ -550,7 +558,8 @@ async fn test_summarized_delete_documents_by_filter() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", @@ -566,7 +575,16 @@ async fn test_summarized_delete_document_by_id() { let (task, _status_code) = index.delete_document(1).await; index.wait_task(task.uid()).await.failed(); let (batch, _) = index.get_batch(0).await; - snapshot!(batch, + assert_json_snapshot!(batch, + { + ".uid" => "[uid]", + ".duration" => "[duration]", + ".enqueuedAt" => "[date]", + ".startedAt" => "[date]", + ".finishedAt" => "[date]", + ".stats.callTrace" => "[callTrace]", + ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + }, @r#" { "uid": "[uid]", @@ -585,7 +603,8 @@ async fn test_summarized_delete_document_by_id() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", @@ -624,7 +643,8 @@ async fn test_summarized_delete_document_by_id() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", @@ -661,7 +681,7 @@ async fn test_summarized_settings_update() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 0, "progress": null, @@ -688,13 +708,14 @@ async fn test_summarized_settings_update() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); } #[actix_web::test] @@ -713,7 +734,7 @@ async fn test_summarized_index_creation() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 0, "progress": null, @@ -728,13 +749,14 @@ async fn test_summarized_index_creation() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); let (task, _status_code) = index.create(Some("doggos")).await; index.wait_task(task.uid()).await.failed(); @@ -748,7 +770,7 @@ async fn test_summarized_index_creation() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 1, "progress": null, @@ -765,13 +787,14 @@ async fn test_summarized_index_creation() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); } #[actix_web::test] @@ -900,7 +923,7 @@ async fn test_summarized_index_update() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 0, "progress": null, @@ -915,13 +938,14 @@ async fn test_summarized_index_update() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); let (task, _status_code) = index.update(Some("bones")).await; index.wait_task(task.uid()).await.failed(); @@ -935,7 +959,7 @@ async fn test_summarized_index_update() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 1, "progress": null, @@ -952,13 +976,14 @@ async fn test_summarized_index_update() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); // And run the same two tests once the index do exists. index.create(None).await; @@ -990,7 +1015,8 @@ async fn test_summarized_index_update() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", @@ -1010,7 +1036,7 @@ async fn test_summarized_index_update() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 4, "progress": null, @@ -1027,13 +1053,14 @@ async fn test_summarized_index_update() { }, "indexUids": { "test": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); } #[actix_web::test] @@ -1055,7 +1082,7 @@ async fn test_summarized_index_swap() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 0, "progress": null, @@ -1077,13 +1104,14 @@ async fn test_summarized_index_swap() { "types": { "indexSwap": 1 }, - "indexUids": {} + "indexUids": {}, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); server.index("doggos").create(None).await; let (task, _status_code) = server.index("cattos").create(None).await; @@ -1103,7 +1131,7 @@ async fn test_summarized_index_swap() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 1, "progress": null, @@ -1118,13 +1146,14 @@ async fn test_summarized_index_swap() { }, "indexUids": { "doggos": 1 - } + }, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); } #[actix_web::test] @@ -1146,7 +1175,7 @@ async fn test_summarized_batch_cancelation() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 1, "progress": null, @@ -1163,13 +1192,14 @@ async fn test_summarized_batch_cancelation() { "types": { "taskCancelation": 1 }, - "indexUids": {} + "indexUids": {}, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); } #[actix_web::test] @@ -1191,7 +1221,7 @@ async fn test_summarized_batch_deletion() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 1, "progress": null, @@ -1208,13 +1238,14 @@ async fn test_summarized_batch_deletion() { "types": { "taskDeletion": 1 }, - "indexUids": {} + "indexUids": {}, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); } #[actix_web::test] @@ -1233,7 +1264,7 @@ async fn test_summarized_dump_creation() { ".stats.callTrace" => "[callTrace]", ".stats.writeChannelCongestion" => "[writeChannelCongestion]" }, - @r#" + @r###" { "uid": 0, "progress": null, @@ -1248,11 +1279,12 @@ async fn test_summarized_dump_creation() { "types": { "dumpCreation": 1 }, - "indexUids": {} + "indexUids": {}, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", "finishedAt": "[date]" } - "#); + "###); } diff --git a/crates/meilisearch/tests/dumps/snapshots/mod.rs/import_dump_v6_containing_batches_and_enqueued_tasks/batches.snap b/crates/meilisearch/tests/dumps/snapshots/mod.rs/import_dump_v6_containing_batches_and_enqueued_tasks/batches.snap index aeac6cf55..8d28aa706 100644 --- a/crates/meilisearch/tests/dumps/snapshots/mod.rs/import_dump_v6_containing_batches_and_enqueued_tasks/batches.snap +++ b/crates/meilisearch/tests/dumps/snapshots/mod.rs/import_dump_v6_containing_batches_and_enqueued_tasks/batches.snap @@ -1,6 +1,5 @@ --- source: crates/meilisearch/tests/dumps/mod.rs -snapshot_kind: text --- { "results": [ @@ -21,7 +20,9 @@ snapshot_kind: text }, "indexUids": { "kefir": 1 - } + }, + "callTrace": "[callTrace]", + "writeChannelCongestion": "[writeChannelCongestion]" }, "duration": "[date]", "startedAt": "[date]", diff --git a/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41.snap b/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41.snap index 6fe049b02..21fcdaffb 100644 --- a/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41.snap +++ b/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41.snap @@ -18,7 +18,8 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs "types": { "upgradeDatabase": 1 }, - "indexUids": {} + "indexUids": {}, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", diff --git a/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41.snap b/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41.snap index 6fe049b02..21fcdaffb 100644 --- a/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41.snap +++ b/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41.snap @@ -18,7 +18,8 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs "types": { "upgradeDatabase": 1 }, - "indexUids": {} + "indexUids": {}, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", diff --git a/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/batches_filter_afterStartedAt_equal_2025-01-16T16_47_41.snap b/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/batches_filter_afterStartedAt_equal_2025-01-16T16_47_41.snap index 6fe049b02..21fcdaffb 100644 --- a/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/batches_filter_afterStartedAt_equal_2025-01-16T16_47_41.snap +++ b/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/batches_filter_afterStartedAt_equal_2025-01-16T16_47_41.snap @@ -18,7 +18,8 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs "types": { "upgradeDatabase": 1 }, - "indexUids": {} + "indexUids": {}, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", diff --git a/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/the_whole_batch_queue_once_everything_has_been_processed.snap b/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/the_whole_batch_queue_once_everything_has_been_processed.snap index 63308dc64..4ed0abe17 100644 --- a/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/the_whole_batch_queue_once_everything_has_been_processed.snap +++ b/crates/meilisearch/tests/upgrade/v1_12/snapshots/v1_12_0.rs/check_the_index_scheduler/the_whole_batch_queue_once_everything_has_been_processed.snap @@ -1,6 +1,5 @@ --- source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs -snapshot_kind: text --- { "results": [ @@ -19,7 +18,8 @@ snapshot_kind: text "types": { "upgradeDatabase": 1 }, - "indexUids": {} + "indexUids": {}, + "callTrace": "[callTrace]" }, "duration": "[duration]", "startedAt": "[date]", diff --git a/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs b/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs index 2e71450d8..f3bff7467 100644 --- a/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs +++ b/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs @@ -159,7 +159,7 @@ async fn check_the_index_scheduler(server: &Server) { let (tasks, _) = server.tasks_filter("limit=1000").await; snapshot!(json_string!(tasks, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "the_whole_task_queue_once_everything_has_been_processed"); let (batches, _) = server.batches_filter("limit=1000").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "the_whole_batch_queue_once_everything_has_been_processed"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "the_whole_batch_queue_once_everything_has_been_processed"); // Tests all the tasks query parameters let (tasks, _) = server.tasks_filter("uids=10").await; @@ -186,32 +186,32 @@ async fn check_the_index_scheduler(server: &Server) { // Tests all the batches query parameters let (batches, _) = server.batches_filter("uids=10").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_uids_equal_10"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_uids_equal_10"); let (batches, _) = server.batches_filter("batchUids=10").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_batchUids_equal_10"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_batchUids_equal_10"); let (batches, _) = server.batches_filter("statuses=canceled").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_statuses_equal_canceled"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_statuses_equal_canceled"); // types has already been tested above to retrieve the upgrade database let (batches, _) = server.batches_filter("canceledBy=19").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_canceledBy_equal_19"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_canceledBy_equal_19"); let (batches, _) = server.batches_filter("beforeEnqueuedAt=2025-01-16T16:47:41Z").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_beforeEnqueuedAt_equal_2025-01-16T16_47_41"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeEnqueuedAt_equal_2025-01-16T16_47_41"); let (batches, _) = server.batches_filter("afterEnqueuedAt=2025-01-16T16:47:41Z").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41"); let (batches, _) = server.batches_filter("beforeStartedAt=2025-01-16T16:47:41Z").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_beforeStartedAt_equal_2025-01-16T16_47_41"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeStartedAt_equal_2025-01-16T16_47_41"); let (batches, _) = server.batches_filter("afterStartedAt=2025-01-16T16:47:41Z").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_afterStartedAt_equal_2025-01-16T16_47_41"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterStartedAt_equal_2025-01-16T16_47_41"); let (batches, _) = server.batches_filter("beforeFinishedAt=2025-01-16T16:47:41Z").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_beforeFinishedAt_equal_2025-01-16T16_47_41"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeFinishedAt_equal_2025-01-16T16_47_41"); let (batches, _) = server.batches_filter("afterFinishedAt=2025-01-16T16:47:41Z").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41"); let (stats, _) = server.stats().await; snapshot!(stats, @r###" { "databaseSize": 438272, - "usedDatabaseSize": 196608, + "usedDatabaseSize": 200704, "lastUpdate": "2025-01-23T11:36:22.634859166Z", "indexes": { "kefir": { From 243a5fa6a8d126b9ae1df0bb388c445b42d0a653 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 20 Feb 2025 14:17:34 +0100 Subject: [PATCH 09/12] Log the call trace and congestion --- crates/index-scheduler/src/scheduler/mod.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index bcf53127b..42ed92839 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -352,6 +352,17 @@ impl IndexScheduler { congestion_info }); + if let Some(congestion) = congestion { + tracing::debug!( + "Channel congestion metrics - Attempts: {}, Blocked attempts: {} ({:.1}% congestion)", + congestion.attempts, + congestion.blocking_attempts, + congestion.congestion_ratio(), + ); + } + + tracing::debug!("call trace: {:?}", progress.accumulated_durations()); + self.queue.write_batch(&mut wtxn, processing_batch, &ids)?; #[cfg(test)] From 434fad5327c0d157e841a48f52a79f43738b1c1d Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 20 Feb 2025 16:02:23 +0100 Subject: [PATCH 10/12] Fix insta tests again --- .../tests/upgrade/v1_12/v1_12_0.rs | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs b/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs index f3bff7467..3a93ba81e 100644 --- a/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs +++ b/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs @@ -2,6 +2,7 @@ // It must test pretty much all the features of meilisearch because the other tests will only tests // the new features they introduced. +use insta::assert_json_snapshot; use manifest_dir_macros::exist_relative_path; use meili_snap::{json_string, snapshot}; use meilisearch::Opt; @@ -126,10 +127,14 @@ async fn check_the_index_scheduler(server: &Server) { "#); // And their metadata are still right let (stats, _) = server.stats().await; - snapshot!(stats, @r###" + assert_json_snapshot!(stats, { + ".databaseSize" => "[bytes]", + ".usedDatabaseSize" => "[bytes]" + }, + @r###" { - "databaseSize": 438272, - "usedDatabaseSize": 200704, + "databaseSize": [bytes], + "usedDatabaseSize": [bytes], "lastUpdate": "2025-01-23T11:36:22.634859166Z", "indexes": { "kefir": { @@ -208,10 +213,14 @@ async fn check_the_index_scheduler(server: &Server) { snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41"); let (stats, _) = server.stats().await; - snapshot!(stats, @r###" + snapshot!(stats, { + ".databaseSize" => "[bytes]", + ".usedDatabaseSize" => "[bytes]" + }, + @r###" { - "databaseSize": 438272, - "usedDatabaseSize": 200704, + "databaseSize": [bytes], + "usedDatabaseSize": [bytes], "lastUpdate": "2025-01-23T11:36:22.634859166Z", "indexes": { "kefir": { From 245a55722a8f7de244d23b95f59a7a718e6d585d Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 20 Feb 2025 16:48:18 +0100 Subject: [PATCH 11/12] Remove commented code --- crates/milli/src/update/new/indexer/write.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index c7e449243..1dad993f0 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -92,13 +92,6 @@ pub struct ChannelCongestion { impl ChannelCongestion { pub fn congestion_ratio(&self) -> f32 { - // tracing::debug!( - // "Channel congestion metrics - \ - // Attempts: {direct_attempts}, \ - // Blocked attempts: {blocking_attempts} \ - // ({congestion_pct:.1}% congestion)" - // ); - self.blocking_attempts as f32 / self.attempts as f32 } } From 76fd5d92d7744ef8d3da70aa31c74c991ebeb900 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 20 Feb 2025 16:58:45 +0100 Subject: [PATCH 12/12] Clarify the tail writing to database --- crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs | 10 +++++----- crates/milli/src/update/new/indexer/extract.rs | 2 +- crates/milli/src/update/new/steps.rs | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs b/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs index 3a93ba81e..224f53ab0 100644 --- a/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs +++ b/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs @@ -133,8 +133,8 @@ async fn check_the_index_scheduler(server: &Server) { }, @r###" { - "databaseSize": [bytes], - "usedDatabaseSize": [bytes], + "databaseSize": "[bytes]", + "usedDatabaseSize": "[bytes]", "lastUpdate": "2025-01-23T11:36:22.634859166Z", "indexes": { "kefir": { @@ -213,14 +213,14 @@ async fn check_the_index_scheduler(server: &Server) { snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41"); let (stats, _) = server.stats().await; - snapshot!(stats, { + assert_json_snapshot!(stats, { ".databaseSize" => "[bytes]", ".usedDatabaseSize" => "[bytes]" }, @r###" { - "databaseSize": [bytes], - "usedDatabaseSize": [bytes], + "databaseSize": "[bytes]", + "usedDatabaseSize": "[bytes]", "lastUpdate": "2025-01-23T11:36:22.634859166Z", "indexes": { "kefir": { diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index 1606851cb..792b0c03b 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -291,7 +291,7 @@ where &indexing_context.must_stop_processing, )?; } - indexing_context.progress.update_progress(IndexingStep::TailWritingToDatabase); + indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites); finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); Result::Ok((facet_field_ids_delta, index_embeddings)) diff --git a/crates/milli/src/update/new/steps.rs b/crates/milli/src/update/new/steps.rs index 38964d8ec..ad8fe9cb1 100644 --- a/crates/milli/src/update/new/steps.rs +++ b/crates/milli/src/update/new/steps.rs @@ -14,7 +14,7 @@ pub enum IndexingStep { ExtractingWordProximity, ExtractingEmbeddings, WritingGeoPoints, - TailWritingToDatabase, + WaitingForDatabaseWrites, WaitingForExtractors, WritingEmbeddingsToDatabase, PostProcessingFacets, @@ -32,7 +32,7 @@ impl Step for IndexingStep { IndexingStep::ExtractingWordProximity => "extracting word proximity", IndexingStep::ExtractingEmbeddings => "extracting embeddings", IndexingStep::WritingGeoPoints => "writing geo points", - IndexingStep::TailWritingToDatabase => "tail writing to database", + IndexingStep::WaitingForDatabaseWrites => "waiting for database writes", IndexingStep::WaitingForExtractors => "waiting for extractors", IndexingStep::WritingEmbeddingsToDatabase => "writing embeddings to database", IndexingStep::PostProcessingFacets => "post-processing facets",