From a8239fb528f2a01197d10277341c246df61149d0 Mon Sep 17 00:00:00 2001 From: nnethercott Date: Thu, 12 Jun 2025 00:46:16 +0200 Subject: [PATCH 1/4] Move progress updates outside of loops --- .../src/update/new/indexer/post_processing.rs | 166 ++++++++++++------ 1 file changed, 109 insertions(+), 57 deletions(-) diff --git a/crates/milli/src/update/new/indexer/post_processing.rs b/crates/milli/src/update/new/indexer/post_processing.rs index b5c89d0d9..54530a6f3 100644 --- a/crates/milli/src/update/new/indexer/post_processing.rs +++ b/crates/milli/src/update/new/indexer/post_processing.rs @@ -27,20 +27,30 @@ pub(super) fn post_process( indexing_context: IndexingContext, wtxn: &mut RwTxn<'_>, mut global_fields_ids_map: GlobalFieldsIdsMap<'_>, - facet_field_ids_delta: FacetFieldIdsDelta, + mut facet_field_ids_delta: FacetFieldIdsDelta, ) -> Result<()> where MSP: Fn() -> bool + Sync, { let index = indexing_context.index; indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets); - compute_facet_level_database( + + let string_deltas = facet_field_ids_delta.consume_facet_string_delta().collect(); + compute_facet_level_string( index, wtxn, - facet_field_ids_delta, + string_deltas, &mut global_fields_ids_map, indexing_context.progress, )?; + let number_deltas = facet_field_ids_delta.consume_facet_number_delta().collect(); + compute_facet_level_number( + index, + wtxn, + number_deltas, + indexing_context.progress, + )?; + compute_facet_search_database(index, wtxn, global_fields_ids_map, indexing_context.progress)?; indexing_context.progress.update_progress(IndexingStep::PostProcessingWords); if let Some(prefix_delta) = compute_word_fst(index, wtxn, indexing_context.progress)? { @@ -206,21 +216,27 @@ fn compute_facet_search_database( } #[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")] -fn compute_facet_level_database( +fn compute_facet_level_string( index: &Index, wtxn: &mut RwTxn, - mut facet_field_ids_delta: FacetFieldIdsDelta, + deltas: Vec<(u16, FacetFieldIdDelta)>, global_fields_ids_map: &mut GlobalFieldsIdsMap, progress: &Progress, ) -> Result<()> { let rtxn = index.read_txn()?; - let filterable_attributes_rules = index.filterable_attributes_rules(&rtxn)?; - let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_string_delta().collect(); - // We move all bulks at the front and incrementals (others) at the end. - deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 }); - for (fid, delta) in deltas { + // We partition bulk and incremental updates + let (bulk, incremental): (Vec<_>, Vec<_>) = deltas.into_iter().partition(|(_, delta)| { + if let FacetFieldIdDelta::Bulk = delta { + true + } else { + false + } + }); + + progress.update_progress(PostProcessingFacets::StringsBulk); + for (fid, _) in bulk { // skip field ids that should not be facet leveled let Some(metadata) = global_fields_ids_map.metadata(fid) else { continue; @@ -231,59 +247,65 @@ fn compute_facet_level_database( let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); let _entered = span.enter(); - match delta { - FacetFieldIdDelta::Bulk => { - progress.update_progress(PostProcessingFacets::StringsBulk); - tracing::debug!(%fid, "bulk string facet processing"); - FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String) - .execute(wtxn)? - } - FacetFieldIdDelta::Incremental(delta_data) => { - progress.update_progress(PostProcessingFacets::StringsIncremental); - tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing"); - FacetsUpdateIncremental::new( - index, - FacetType::String, - fid, - delta_data, - FACET_GROUP_SIZE, - FACET_MIN_LEVEL_SIZE, - FACET_MAX_GROUP_SIZE, - ) - .execute(wtxn)? - } + tracing::debug!(%fid, "bulk string facet processing"); + FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String) + .execute(wtxn)? + } + + progress.update_progress(PostProcessingFacets::StringsIncremental); + for (fid, delta) in incremental { + // skip field ids that should not be facet leveled + let Some(metadata) = global_fields_ids_map.metadata(fid) else { + continue; + }; + if !metadata.require_facet_level_database(&filterable_attributes_rules) { + continue; + } + + let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); + let _entered = span.enter(); + if let FacetFieldIdDelta::Incremental(delta_data) = delta { + tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing"); + FacetsUpdateIncremental::new( + index, + FacetType::String, + fid, + delta_data, + FACET_GROUP_SIZE, + FACET_MIN_LEVEL_SIZE, + FACET_MAX_GROUP_SIZE, + ) + .execute(wtxn)? } } - let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_number_delta().collect(); - // We move all bulks at the front and incrementals (others) at the end. - deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 }); + Ok(()) +} - for (fid, delta) in deltas { +#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")] +fn compute_facet_level_number( + index: &Index, + wtxn: &mut RwTxn, + deltas: Vec<(u16, FacetFieldIdDelta)>, + progress: &Progress, +) -> Result<()> { + // We partition bulk and incremental updates + let (bulk, incremental): (Vec<_>, Vec<_>) = deltas.into_iter().partition(|(_, delta)| { + if let FacetFieldIdDelta::Bulk = delta { + true + } else { + false + } + }); + + progress.update_progress(PostProcessingFacets::NumbersBulk); + for (fid, _) in bulk { let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); let _entered = span.enter(); - match delta { - FacetFieldIdDelta::Bulk => { - progress.update_progress(PostProcessingFacets::NumbersBulk); - tracing::debug!(%fid, "bulk number facet processing"); - FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number) - .execute(wtxn)? - } - FacetFieldIdDelta::Incremental(delta_data) => { - progress.update_progress(PostProcessingFacets::NumbersIncremental); - tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing"); - FacetsUpdateIncremental::new( - index, - FacetType::Number, - fid, - delta_data, - FACET_GROUP_SIZE, - FACET_MIN_LEVEL_SIZE, - FACET_MAX_GROUP_SIZE, - ) - .execute(wtxn)? - } - } + tracing::debug!(%fid, "bulk number facet processing"); + FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number) + .execute(wtxn)?; + debug_assert!(crate::update::facet::sanity_checks( index, wtxn, @@ -296,5 +318,35 @@ fn compute_facet_level_database( .is_ok()); } + progress.update_progress(PostProcessingFacets::NumbersIncremental); + for (fid, delta) in incremental { + let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); + let _entered = span.enter(); + if let FacetFieldIdDelta::Incremental(delta_data) = delta { + tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing"); + FacetsUpdateIncremental::new( + index, + FacetType::Number, + fid, + delta_data, + FACET_GROUP_SIZE, + FACET_MIN_LEVEL_SIZE, + FACET_MAX_GROUP_SIZE, + ) + .execute(wtxn)?; + + debug_assert!(crate::update::facet::sanity_checks( + index, + wtxn, + fid, + FacetType::Number, + FACET_GROUP_SIZE as usize, + FACET_MIN_LEVEL_SIZE as usize, + FACET_MAX_GROUP_SIZE as usize, + ) + .is_ok()); + } + } + Ok(()) } From 4c643a5142eb49eb1246a93c0f0fb08d5fff6b49 Mon Sep 17 00:00:00 2001 From: nnethercott Date: Thu, 12 Jun 2025 00:49:19 +0200 Subject: [PATCH 2/4] fmt --- crates/milli/src/update/new/indexer/post_processing.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/milli/src/update/new/indexer/post_processing.rs b/crates/milli/src/update/new/indexer/post_processing.rs index 54530a6f3..3dd381594 100644 --- a/crates/milli/src/update/new/indexer/post_processing.rs +++ b/crates/milli/src/update/new/indexer/post_processing.rs @@ -44,12 +44,7 @@ where indexing_context.progress, )?; let number_deltas = facet_field_ids_delta.consume_facet_number_delta().collect(); - compute_facet_level_number( - index, - wtxn, - number_deltas, - indexing_context.progress, - )?; + compute_facet_level_number(index, wtxn, number_deltas, indexing_context.progress)?; compute_facet_search_database(index, wtxn, global_fields_ids_map, indexing_context.progress)?; indexing_context.progress.update_progress(IndexingStep::PostProcessingWords); From cd712d6f7f512c0e7c12ffd54f572b2d1d7baf2d Mon Sep 17 00:00:00 2001 From: nnethercott Date: Thu, 12 Jun 2025 09:06:26 +0200 Subject: [PATCH 3/4] factorize --- .../src/update/new/indexer/post_processing.rs | 41 ++++++++----------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/crates/milli/src/update/new/indexer/post_processing.rs b/crates/milli/src/update/new/indexer/post_processing.rs index 3dd381594..7e6b4558e 100644 --- a/crates/milli/src/update/new/indexer/post_processing.rs +++ b/crates/milli/src/update/new/indexer/post_processing.rs @@ -221,25 +221,24 @@ fn compute_facet_level_string( let rtxn = index.read_txn()?; let filterable_attributes_rules = index.filterable_attributes_rules(&rtxn)?; - // We partition bulk and incremental updates - let (bulk, incremental): (Vec<_>, Vec<_>) = deltas.into_iter().partition(|(_, delta)| { - if let FacetFieldIdDelta::Bulk = delta { - true - } else { - false - } - }); + // A predicate to determine if a field id should be facet leveled + let mut retain_fid = |fid: u16| { + if let Some(metadata) = global_fields_ids_map.metadata(fid) { + if metadata.require_facet_level_database(&filterable_attributes_rules) { + return true; + } + }; + false + }; + + // We partition deltas into bulk and incremental updates + let (bulk, incremental): (Vec<_>, Vec<_>) = deltas + .into_iter() + .filter(|(fid, _)| retain_fid(*fid)) + .partition(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { true } else { false }); progress.update_progress(PostProcessingFacets::StringsBulk); for (fid, _) in bulk { - // skip field ids that should not be facet leveled - let Some(metadata) = global_fields_ids_map.metadata(fid) else { - continue; - }; - if !metadata.require_facet_level_database(&filterable_attributes_rules) { - continue; - } - let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); let _entered = span.enter(); tracing::debug!(%fid, "bulk string facet processing"); @@ -249,14 +248,6 @@ fn compute_facet_level_string( progress.update_progress(PostProcessingFacets::StringsIncremental); for (fid, delta) in incremental { - // skip field ids that should not be facet leveled - let Some(metadata) = global_fields_ids_map.metadata(fid) else { - continue; - }; - if !metadata.require_facet_level_database(&filterable_attributes_rules) { - continue; - } - let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); let _entered = span.enter(); if let FacetFieldIdDelta::Incremental(delta_data) = delta { @@ -284,7 +275,7 @@ fn compute_facet_level_number( deltas: Vec<(u16, FacetFieldIdDelta)>, progress: &Progress, ) -> Result<()> { - // We partition bulk and incremental updates + // We partition deltas into bulk and incremental updates let (bulk, incremental): (Vec<_>, Vec<_>) = deltas.into_iter().partition(|(_, delta)| { if let FacetFieldIdDelta::Bulk = delta { true From 32318803a6a4b121ea8a12a6ebd185bb669379f8 Mon Sep 17 00:00:00 2001 From: nnethercott Date: Thu, 12 Jun 2025 10:16:21 +0200 Subject: [PATCH 4/4] clippy --- .../milli/src/update/new/indexer/post_processing.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/crates/milli/src/update/new/indexer/post_processing.rs b/crates/milli/src/update/new/indexer/post_processing.rs index 7e6b4558e..f22135239 100644 --- a/crates/milli/src/update/new/indexer/post_processing.rs +++ b/crates/milli/src/update/new/indexer/post_processing.rs @@ -235,7 +235,7 @@ fn compute_facet_level_string( let (bulk, incremental): (Vec<_>, Vec<_>) = deltas .into_iter() .filter(|(fid, _)| retain_fid(*fid)) - .partition(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { true } else { false }); + .partition(|(_, delta)| matches!(delta, FacetFieldIdDelta::Bulk)); progress.update_progress(PostProcessingFacets::StringsBulk); for (fid, _) in bulk { @@ -276,13 +276,8 @@ fn compute_facet_level_number( progress: &Progress, ) -> Result<()> { // We partition deltas into bulk and incremental updates - let (bulk, incremental): (Vec<_>, Vec<_>) = deltas.into_iter().partition(|(_, delta)| { - if let FacetFieldIdDelta::Bulk = delta { - true - } else { - false - } - }); + let (bulk, incremental): (Vec<_>, Vec<_>) = + deltas.into_iter().partition(|(_, delta)| matches!(delta, FacetFieldIdDelta::Bulk)); progress.update_progress(PostProcessingFacets::NumbersBulk); for (fid, _) in bulk {