From cb16baab18cbd8abce27207fe065f9f8b3b70687 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Sun, 16 Mar 2025 19:15:39 +0100 Subject: [PATCH] Add more progress levels to measure merging --- crates/milli/src/progress.rs | 14 ++++++++++++-- crates/milli/src/update/new/indexer/extract.rs | 16 +++++++++++++++- crates/milli/src/update/new/merger.rs | 11 ++--------- crates/milli/src/update/new/steps.rs | 6 ++++++ 4 files changed, 35 insertions(+), 12 deletions(-) diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index 7eb0cbd6b..75dafa8ec 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -190,8 +190,18 @@ macro_rules! make_atomic_progress { }; } -make_atomic_progress!(Document alias AtomicDocumentStep => "document" ); -make_atomic_progress!(Payload alias AtomicPayloadStep => "payload" ); +make_atomic_progress!(Document alias AtomicDocumentStep => "document"); +make_atomic_progress!(Payload alias AtomicPayloadStep => "payload"); + +make_enum_progress! { + pub enum MergingWordCache { + WordDocids, + WordFieldIdDocids, + ExactWordDocids, + WordPositionDocids, + FieldIdWordCountDocids, + } +} #[derive(Debug, Serialize, Clone, ToSchema)] #[serde(rename_all = "camelCase")] diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index 907a4d1df..bb36ddc37 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -13,6 +13,7 @@ use super::super::thread_local::{FullySend, ThreadLocal}; use super::super::FacetFieldIdsDelta; use super::document_changes::{extract, DocumentChanges, IndexingContext}; use crate::index::IndexEmbeddingConfig; +use crate::progress::MergingWordCache; use crate::proximity::ProximityPrecision; use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::merger::merge_and_send_rtree; @@ -96,6 +97,7 @@ where { let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted"); let _entered = span.enter(); + indexing_context.progress.update_progress(IndexingStep::MergingFacetCaches); facet_field_ids_delta = merge_and_send_facet_docids( caches, @@ -117,7 +119,6 @@ where } = { let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); let _entered = span.enter(); - WordDocidsExtractors::run_extraction( document_changes, indexing_context, @@ -126,9 +127,13 @@ where )? }; + indexing_context.progress.update_progress(IndexingStep::MergingWordCaches); + { let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); let _entered = span.enter(); + indexing_context.progress.update_progress(MergingWordCache::WordDocids); + merge_and_send_docids( word_docids, index.word_docids.remap_types(), @@ -142,6 +147,8 @@ where let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); let _entered = span.enter(); + indexing_context.progress.update_progress(MergingWordCache::WordFieldIdDocids); + merge_and_send_docids( word_fid_docids, index.word_fid_docids.remap_types(), @@ -155,6 +162,8 @@ where let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); let _entered = span.enter(); + indexing_context.progress.update_progress(MergingWordCache::ExactWordDocids); + merge_and_send_docids( exact_word_docids, index.exact_word_docids.remap_types(), @@ -168,6 +177,8 @@ where let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); let _entered = span.enter(); + indexing_context.progress.update_progress(MergingWordCache::WordPositionDocids); + merge_and_send_docids( word_position_docids, index.word_position_docids.remap_types(), @@ -181,6 +192,8 @@ where let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); let _entered = span.enter(); + indexing_context.progress.update_progress(MergingWordCache::FieldIdWordCountDocids); + merge_and_send_docids( fid_word_count_docids, index.field_id_word_count_docids.remap_types(), @@ -210,6 +223,7 @@ where { let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids"); let _entered = span.enter(); + indexing_context.progress.update_progress(IndexingStep::MergingWordProximity); merge_and_send_docids( caches, diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 090add6bd..15f06c67d 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -82,14 +82,8 @@ where merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { - Operation::Write(bitmap) => { - docids_sender.write(key, &bitmap)?; - Ok(()) - } - Operation::Delete => { - docids_sender.delete(key)?; - Ok(()) - } + Operation::Write(bitmap) => docids_sender.write(key, &bitmap), + Operation::Delete => docids_sender.delete(key), Operation::Ignore => Ok(()), } }) @@ -130,7 +124,6 @@ pub fn merge_and_send_facet_docids<'extractor>( Operation::Ignore => Ok(()), } })?; - Ok(facet_field_ids_delta) }) .reduce( diff --git a/crates/milli/src/update/new/steps.rs b/crates/milli/src/update/new/steps.rs index ad8fe9cb1..e026b4d0d 100644 --- a/crates/milli/src/update/new/steps.rs +++ b/crates/milli/src/update/new/steps.rs @@ -13,6 +13,9 @@ pub enum IndexingStep { ExtractingWords, ExtractingWordProximity, ExtractingEmbeddings, + MergingFacetCaches, + MergingWordCaches, + MergingWordProximity, WritingGeoPoints, WaitingForDatabaseWrites, WaitingForExtractors, @@ -31,6 +34,9 @@ impl Step for IndexingStep { IndexingStep::ExtractingWords => "extracting words", IndexingStep::ExtractingWordProximity => "extracting word proximity", IndexingStep::ExtractingEmbeddings => "extracting embeddings", + IndexingStep::MergingFacetCaches => "merging facet caches", + IndexingStep::MergingWordCaches => "merging word caches", + IndexingStep::MergingWordProximity => "merging word proximity", IndexingStep::WritingGeoPoints => "writing geo points", IndexingStep::WaitingForDatabaseWrites => "waiting for database writes", IndexingStep::WaitingForExtractors => "waiting for extractors",