From 811143cbe940db843be07954a91c4db49150ac57 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 27 Mar 2025 10:17:28 +0100 Subject: [PATCH] Add more progress precision when doing post processing --- .../src/update/new/indexer/post_processing.rs | 64 +++++++++++++++---- crates/milli/src/update/new/steps.rs | 20 ++++++ 2 files changed, 72 insertions(+), 12 deletions(-) diff --git a/crates/milli/src/update/new/indexer/post_processing.rs b/crates/milli/src/update/new/indexer/post_processing.rs index 2a01fccf3..aace70cff 100644 --- a/crates/milli/src/update/new/indexer/post_processing.rs +++ b/crates/milli/src/update/new/indexer/post_processing.rs @@ -7,12 +7,13 @@ use itertools::{merge_join_by, EitherOrBoth}; use super::document_changes::IndexingContext; use crate::facet::FacetType; use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; +use crate::progress::Progress; use crate::update::del_add::DelAdd; use crate::update::facet::new_incremental::FacetsUpdateIncremental; use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE}; use crate::update::new::facet_search_builder::FacetSearchBuilder; use crate::update::new::merger::FacetFieldIdDelta; -use crate::update::new::steps::IndexingStep; +use crate::update::new::steps::{IndexingStep, PostProcessingFacets, PostProcessingWords}; use crate::update::new::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; use crate::update::new::words_prefix_docids::{ compute_exact_word_prefix_docids, compute_word_prefix_docids, compute_word_prefix_fid_docids, @@ -33,11 +34,23 @@ where { let index = indexing_context.index; indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets); - compute_facet_level_database(index, wtxn, facet_field_ids_delta, &mut global_fields_ids_map)?; - compute_facet_search_database(index, wtxn, global_fields_ids_map)?; + compute_facet_level_database( + index, + wtxn, + facet_field_ids_delta, + &mut global_fields_ids_map, + indexing_context.progress, + )?; + compute_facet_search_database(index, wtxn, global_fields_ids_map, indexing_context.progress)?; indexing_context.progress.update_progress(IndexingStep::PostProcessingWords); - if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { - compute_prefix_database(index, wtxn, prefix_delta, indexing_context.grenad_parameters)?; + if let Some(prefix_delta) = compute_word_fst(index, wtxn, indexing_context.progress)? { + compute_prefix_database( + index, + wtxn, + prefix_delta, + indexing_context.grenad_parameters, + indexing_context.progress, + )?; }; Ok(()) } @@ -48,21 +61,32 @@ fn compute_prefix_database( wtxn: &mut RwTxn, prefix_delta: PrefixDelta, grenad_parameters: &GrenadParameters, + progress: &Progress, ) -> Result<()> { let PrefixDelta { modified, deleted } = prefix_delta; - // Compute word prefix docids + + progress.update_progress(PostProcessingWords::WordPrefixDocids); compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; - // Compute exact word prefix docids + + progress.update_progress(PostProcessingWords::ExactWordPrefixDocids); compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; - // Compute word prefix fid docids + + progress.update_progress(PostProcessingWords::WordPrefixFieldIdDocids); compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; - // Compute word prefix position docids + + progress.update_progress(PostProcessingWords::WordPrefixPositionDocids); compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters) } #[tracing::instrument(level = "trace", skip_all, target = "indexing")] -fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result> { +fn compute_word_fst( + index: &Index, + wtxn: &mut RwTxn, + progress: &Progress, +) -> Result> { let rtxn = index.read_txn()?; + progress.update_progress(PostProcessingWords::WordFst); + let words_fst = index.words_fst(&rtxn)?; let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; let prefix_settings = index.prefix_settings(&rtxn)?; @@ -112,8 +136,10 @@ fn compute_facet_search_database( index: &Index, wtxn: &mut RwTxn, global_fields_ids_map: GlobalFieldsIdsMap, + progress: &Progress, ) -> Result<()> { let rtxn = index.read_txn()?; + progress.update_progress(PostProcessingFacets::FacetSearch); // if the facet search is not enabled, we can skip the rest of the function if !index.facet_search(wtxn)? { @@ -171,10 +197,16 @@ fn compute_facet_level_database( wtxn: &mut RwTxn, mut facet_field_ids_delta: FacetFieldIdsDelta, global_fields_ids_map: &mut GlobalFieldsIdsMap, + progress: &Progress, ) -> Result<()> { let rtxn = index.read_txn()?; + let filterable_attributes_rules = index.filterable_attributes_rules(&rtxn)?; - for (fid, delta) in facet_field_ids_delta.consume_facet_string_delta() { + let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_string_delta().collect(); + // We move all bulks at the front and incrementals (others) at the end. + deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 }); + + for (fid, delta) in deltas { // skip field ids that should not be facet leveled let Some(metadata) = global_fields_ids_map.metadata(fid) else { continue; @@ -187,11 +219,13 @@ fn compute_facet_level_database( let _entered = span.enter(); match delta { FacetFieldIdDelta::Bulk => { + progress.update_progress(PostProcessingFacets::StringsBulk); tracing::debug!(%fid, "bulk string facet processing"); FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String) .execute(wtxn)? } FacetFieldIdDelta::Incremental(delta_data) => { + progress.update_progress(PostProcessingFacets::StringsIncremental); tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing"); FacetsUpdateIncremental::new( index, @@ -207,16 +241,22 @@ fn compute_facet_level_database( } } - for (fid, delta) in facet_field_ids_delta.consume_facet_number_delta() { + let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_number_delta().collect(); + // We move all bulks at the front and incrementals (others) at the end. + deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 }); + + for (fid, delta) in deltas { let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); let _entered = span.enter(); match delta { FacetFieldIdDelta::Bulk => { + progress.update_progress(PostProcessingFacets::NumbersBulk); tracing::debug!(%fid, "bulk number facet processing"); FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number) .execute(wtxn)? } FacetFieldIdDelta::Incremental(delta_data) => { + progress.update_progress(PostProcessingFacets::NumbersIncremental); tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing"); FacetsUpdateIncremental::new( index, diff --git a/crates/milli/src/update/new/steps.rs b/crates/milli/src/update/new/steps.rs index da71819c6..eabf9104e 100644 --- a/crates/milli/src/update/new/steps.rs +++ b/crates/milli/src/update/new/steps.rs @@ -20,3 +20,23 @@ make_enum_progress! { Finalizing, } } + +make_enum_progress! { + pub enum PostProcessingFacets { + StringsBulk, + StringsIncremental, + NumbersBulk, + NumbersIncremental, + FacetSearch, + } +} + +make_enum_progress! { + pub enum PostProcessingWords { + WordFst, + WordPrefixDocids, + ExactWordPrefixDocids, + WordPrefixFieldIdDocids, + WordPrefixPositionDocids, + } +}