Add more progress levels to measure merging

This commit is contained in:
Kerollmops 2025-03-16 19:15:39 +01:00
parent 2500e3c067
commit cb16baab18
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
4 changed files with 35 additions and 12 deletions

View File

@ -190,8 +190,18 @@ macro_rules! make_atomic_progress {
};
}
make_atomic_progress!(Document alias AtomicDocumentStep => "document" );
make_atomic_progress!(Payload alias AtomicPayloadStep => "payload" );
make_atomic_progress!(Document alias AtomicDocumentStep => "document");
make_atomic_progress!(Payload alias AtomicPayloadStep => "payload");
make_enum_progress! {
pub enum MergingWordCache {
WordDocids,
WordFieldIdDocids,
ExactWordDocids,
WordPositionDocids,
FieldIdWordCountDocids,
}
}
#[derive(Debug, Serialize, Clone, ToSchema)]
#[serde(rename_all = "camelCase")]

View File

@ -13,6 +13,7 @@ use super::super::thread_local::{FullySend, ThreadLocal};
use super::super::FacetFieldIdsDelta;
use super::document_changes::{extract, DocumentChanges, IndexingContext};
use crate::index::IndexEmbeddingConfig;
use crate::progress::MergingWordCache;
use crate::proximity::ProximityPrecision;
use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::merger::merge_and_send_rtree;
@ -96,6 +97,7 @@ where
{
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted");
let _entered = span.enter();
indexing_context.progress.update_progress(IndexingStep::MergingFacetCaches);
facet_field_ids_delta = merge_and_send_facet_docids(
caches,
@ -117,7 +119,6 @@ where
} = {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
WordDocidsExtractors::run_extraction(
document_changes,
indexing_context,
@ -126,9 +127,13 @@ where
)?
};
indexing_context.progress.update_progress(IndexingStep::MergingWordCaches);
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::WordDocids);
merge_and_send_docids(
word_docids,
index.word_docids.remap_types(),
@ -142,6 +147,8 @@ where
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::WordFieldIdDocids);
merge_and_send_docids(
word_fid_docids,
index.word_fid_docids.remap_types(),
@ -155,6 +162,8 @@ where
let span =
tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::ExactWordDocids);
merge_and_send_docids(
exact_word_docids,
index.exact_word_docids.remap_types(),
@ -168,6 +177,8 @@ where
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::WordPositionDocids);
merge_and_send_docids(
word_position_docids,
index.word_position_docids.remap_types(),
@ -181,6 +192,8 @@ where
let span =
tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::FieldIdWordCountDocids);
merge_and_send_docids(
fid_word_count_docids,
index.field_id_word_count_docids.remap_types(),
@ -210,6 +223,7 @@ where
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(IndexingStep::MergingWordProximity);
merge_and_send_docids(
caches,

View File

@ -82,14 +82,8 @@ where
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
docids_sender.write(key, &bitmap)?;
Ok(())
}
Operation::Delete => {
docids_sender.delete(key)?;
Ok(())
}
Operation::Write(bitmap) => docids_sender.write(key, &bitmap),
Operation::Delete => docids_sender.delete(key),
Operation::Ignore => Ok(()),
}
})
@ -130,7 +124,6 @@ pub fn merge_and_send_facet_docids<'extractor>(
Operation::Ignore => Ok(()),
}
})?;
Ok(facet_field_ids_delta)
})
.reduce(

View File

@ -13,6 +13,9 @@ pub enum IndexingStep {
ExtractingWords,
ExtractingWordProximity,
ExtractingEmbeddings,
MergingFacetCaches,
MergingWordCaches,
MergingWordProximity,
WritingGeoPoints,
WaitingForDatabaseWrites,
WaitingForExtractors,
@ -31,6 +34,9 @@ impl Step for IndexingStep {
IndexingStep::ExtractingWords => "extracting words",
IndexingStep::ExtractingWordProximity => "extracting word proximity",
IndexingStep::ExtractingEmbeddings => "extracting embeddings",
IndexingStep::MergingFacetCaches => "merging facet caches",
IndexingStep::MergingWordCaches => "merging word caches",
IndexingStep::MergingWordProximity => "merging word proximity",
IndexingStep::WritingGeoPoints => "writing geo points",
IndexingStep::WaitingForDatabaseWrites => "waiting for database writes",
IndexingStep::WaitingForExtractors => "waiting for extractors",