5422: Add more progress levels to measure merging r=Kerollmops a=Kerollmops

I found out that Meilisearch was not correctly reporting the long indexing times in the progress and that a lot of time was spent on extracting words with all documents already extracted. The reason was that there was no step to report merging the cache and sending the entries to write to the writer thread. This PR adds these entries to the progress.

Co-authored-by: Kerollmops <clement@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2025-03-17 12:02:46 +00:00 committed by GitHub
commit cbdf80893d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 48 additions and 61 deletions

View File

@ -190,8 +190,18 @@ macro_rules! make_atomic_progress {
};
}
make_atomic_progress!(Document alias AtomicDocumentStep => "document" );
make_atomic_progress!(Payload alias AtomicPayloadStep => "payload" );
make_atomic_progress!(Document alias AtomicDocumentStep => "document");
make_atomic_progress!(Payload alias AtomicPayloadStep => "payload");
make_enum_progress! {
pub enum MergingWordCache {
WordDocids,
WordFieldIdDocids,
ExactWordDocids,
WordPositionDocids,
FieldIdWordCountDocids,
}
}
#[derive(Debug, Serialize, Clone, ToSchema)]
#[serde(rename_all = "camelCase")]

View File

@ -13,6 +13,7 @@ use super::super::thread_local::{FullySend, ThreadLocal};
use super::super::FacetFieldIdsDelta;
use super::document_changes::{extract, DocumentChanges, IndexingContext};
use crate::index::IndexEmbeddingConfig;
use crate::progress::MergingWordCache;
use crate::proximity::ProximityPrecision;
use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::merger::merge_and_send_rtree;
@ -96,6 +97,7 @@ where
{
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted");
let _entered = span.enter();
indexing_context.progress.update_progress(IndexingStep::MergingFacetCaches);
facet_field_ids_delta = merge_and_send_facet_docids(
caches,
@ -117,7 +119,6 @@ where
} = {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
WordDocidsExtractors::run_extraction(
document_changes,
indexing_context,
@ -126,9 +127,13 @@ where
)?
};
indexing_context.progress.update_progress(IndexingStep::MergingWordCaches);
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::WordDocids);
merge_and_send_docids(
word_docids,
index.word_docids.remap_types(),
@ -142,6 +147,8 @@ where
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::WordFieldIdDocids);
merge_and_send_docids(
word_fid_docids,
index.word_fid_docids.remap_types(),
@ -155,6 +162,8 @@ where
let span =
tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::ExactWordDocids);
merge_and_send_docids(
exact_word_docids,
index.exact_word_docids.remap_types(),
@ -168,6 +177,8 @@ where
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::WordPositionDocids);
merge_and_send_docids(
word_position_docids,
index.word_position_docids.remap_types(),
@ -181,6 +192,8 @@ where
let span =
tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::FieldIdWordCountDocids);
merge_and_send_docids(
fid_word_count_docids,
index.field_id_word_count_docids.remap_types(),
@ -210,6 +223,7 @@ where
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(IndexingStep::MergingWordProximity);
merge_and_send_docids(
caches,

View File

@ -82,14 +82,8 @@ where
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
docids_sender.write(key, &bitmap)?;
Ok(())
}
Operation::Delete => {
docids_sender.delete(key)?;
Ok(())
}
Operation::Write(bitmap) => docids_sender.write(key, &bitmap),
Operation::Delete => docids_sender.delete(key),
Operation::Ignore => Ok(()),
}
})
@ -130,7 +124,6 @@ pub fn merge_and_send_facet_docids<'extractor>(
Operation::Ignore => Ok(()),
}
})?;
Ok(facet_field_ids_delta)
})
.reduce(

View File

@ -1,52 +1,22 @@
use std::borrow::Cow;
use crate::make_enum_progress;
use enum_iterator::Sequence;
use crate::progress::Step;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Sequence)]
#[repr(u8)]
pub enum IndexingStep {
PreparingPayloads,
ExtractingDocuments,
ExtractingFacets,
ExtractingWords,
ExtractingWordProximity,
ExtractingEmbeddings,
WritingGeoPoints,
WaitingForDatabaseWrites,
WaitingForExtractors,
WritingEmbeddingsToDatabase,
PostProcessingFacets,
PostProcessingWords,
Finalizing,
}
impl Step for IndexingStep {
fn name(&self) -> Cow<'static, str> {
match self {
IndexingStep::PreparingPayloads => "preparing update file",
IndexingStep::ExtractingDocuments => "extracting documents",
IndexingStep::ExtractingFacets => "extracting facets",
IndexingStep::ExtractingWords => "extracting words",
IndexingStep::ExtractingWordProximity => "extracting word proximity",
IndexingStep::ExtractingEmbeddings => "extracting embeddings",
IndexingStep::WritingGeoPoints => "writing geo points",
IndexingStep::WaitingForDatabaseWrites => "waiting for database writes",
IndexingStep::WaitingForExtractors => "waiting for extractors",
IndexingStep::WritingEmbeddingsToDatabase => "writing embeddings to database",
IndexingStep::PostProcessingFacets => "post-processing facets",
IndexingStep::PostProcessingWords => "post-processing words",
IndexingStep::Finalizing => "finalizing",
}
.into()
}
fn current(&self) -> u32 {
*self as u32
}
fn total(&self) -> u32 {
Self::CARDINALITY as u32
make_enum_progress! {
pub enum IndexingStep {
PreparingPayloads,
ExtractingDocuments,
ExtractingFacets,
ExtractingWords,
ExtractingWordProximity,
ExtractingEmbeddings,
MergingFacetCaches,
MergingWordCaches,
MergingWordProximity,
WritingGeoPoints,
WaitingForDatabaseWrites,
WaitingForExtractors,
WritingEmbeddingsToDatabase,
PostProcessingFacets,
PostProcessingWords,
Finalizing,
}
}