diff --git a/crates/milli/src/update/new/indexer/post_processing.rs b/crates/milli/src/update/new/indexer/post_processing.rs index 288b9c5ed..35d0c8802 100644 --- a/crates/milli/src/update/new/indexer/post_processing.rs +++ b/crates/milli/src/update/new/indexer/post_processing.rs @@ -27,20 +27,25 @@ pub(super) fn post_process( indexing_context: IndexingContext, wtxn: &mut RwTxn<'_>, mut global_fields_ids_map: GlobalFieldsIdsMap<'_>, - facet_field_ids_delta: FacetFieldIdsDelta, + mut facet_field_ids_delta: FacetFieldIdsDelta, ) -> Result<()> where MSP: Fn() -> bool + Sync, { let index = indexing_context.index; indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets); - compute_facet_level_database( + + let string_deltas = facet_field_ids_delta.consume_facet_string_delta().collect(); + compute_facet_level_string( index, wtxn, - facet_field_ids_delta, + string_deltas, &mut global_fields_ids_map, indexing_context.progress, )?; + let number_deltas = facet_field_ids_delta.consume_facet_number_delta().collect(); + compute_facet_level_number(index, wtxn, number_deltas, indexing_context.progress)?; + compute_facet_search_database(index, wtxn, global_fields_ids_map, indexing_context.progress)?; indexing_context.progress.update_progress(IndexingStep::PostProcessingWords); if let Some(prefix_delta) = compute_word_fst(index, wtxn, indexing_context.progress)? { @@ -211,84 +216,82 @@ fn compute_facet_search_database( } #[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")] -fn compute_facet_level_database( +fn compute_facet_level_string( index: &Index, wtxn: &mut RwTxn, - mut facet_field_ids_delta: FacetFieldIdsDelta, + deltas: Vec<(u16, FacetFieldIdDelta)>, global_fields_ids_map: &mut GlobalFieldsIdsMap, progress: &Progress, ) -> Result<()> { let rtxn = index.read_txn()?; - let filterable_attributes_rules = index.filterable_attributes_rules(&rtxn)?; - let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_string_delta().collect(); - // We move all bulks at the front and incrementals (others) at the end. - deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 }); - for (fid, delta) in deltas { - // skip field ids that should not be facet leveled - let Some(metadata) = global_fields_ids_map.metadata(fid) else { - continue; + // A predicate to determine if a field id should be facet leveled + let mut retain_fid = |fid: u16| { + if let Some(metadata) = global_fields_ids_map.metadata(fid) { + if metadata.require_facet_level_database(&filterable_attributes_rules) { + return true; + } }; - if !metadata.require_facet_level_database(&filterable_attributes_rules) { - continue; - } + false + }; + // We partition deltas into bulk and incremental updates + let (bulk, incremental): (Vec<_>, Vec<_>) = deltas + .into_iter() + .filter(|(fid, _)| retain_fid(*fid)) + .partition(|(_, delta)| matches!(delta, FacetFieldIdDelta::Bulk)); + + progress.update_progress(PostProcessingFacets::StringsBulk); + for (fid, _) in bulk { let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); let _entered = span.enter(); - match delta { - FacetFieldIdDelta::Bulk => { - progress.update_progress(PostProcessingFacets::StringsBulk); - tracing::debug!(%fid, "bulk string facet processing"); - FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String) - .execute(wtxn)? - } - FacetFieldIdDelta::Incremental(delta_data) => { - progress.update_progress(PostProcessingFacets::StringsIncremental); - tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing"); - FacetsUpdateIncremental::new( - index, - FacetType::String, - fid, - delta_data, - FACET_GROUP_SIZE, - FACET_MIN_LEVEL_SIZE, - FACET_MAX_GROUP_SIZE, - ) - .execute(wtxn)? - } + tracing::debug!(%fid, "bulk string facet processing"); + FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String) + .execute(wtxn)? + } + + progress.update_progress(PostProcessingFacets::StringsIncremental); + for (fid, delta) in incremental { + let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); + let _entered = span.enter(); + if let FacetFieldIdDelta::Incremental(delta_data) = delta { + tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing"); + FacetsUpdateIncremental::new( + index, + FacetType::String, + fid, + delta_data, + FACET_GROUP_SIZE, + FACET_MIN_LEVEL_SIZE, + FACET_MAX_GROUP_SIZE, + ) + .execute(wtxn)? } } - let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_number_delta().collect(); - // We move all bulks at the front and incrementals (others) at the end. - deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 }); + Ok(()) +} - for (fid, delta) in deltas { +#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")] +fn compute_facet_level_number( + index: &Index, + wtxn: &mut RwTxn, + deltas: Vec<(u16, FacetFieldIdDelta)>, + progress: &Progress, +) -> Result<()> { + // We partition deltas into bulk and incremental updates + let (bulk, incremental): (Vec<_>, Vec<_>) = + deltas.into_iter().partition(|(_, delta)| matches!(delta, FacetFieldIdDelta::Bulk)); + + progress.update_progress(PostProcessingFacets::NumbersBulk); + for (fid, _) in bulk { let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); let _entered = span.enter(); - match delta { - FacetFieldIdDelta::Bulk => { - progress.update_progress(PostProcessingFacets::NumbersBulk); - tracing::debug!(%fid, "bulk number facet processing"); - FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number) - .execute(wtxn)? - } - FacetFieldIdDelta::Incremental(delta_data) => { - progress.update_progress(PostProcessingFacets::NumbersIncremental); - tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing"); - FacetsUpdateIncremental::new( - index, - FacetType::Number, - fid, - delta_data, - FACET_GROUP_SIZE, - FACET_MIN_LEVEL_SIZE, - FACET_MAX_GROUP_SIZE, - ) - .execute(wtxn)? - } - } + tracing::debug!(%fid, "bulk number facet processing"); + FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number) + .execute(wtxn)?; + debug_assert!(crate::update::facet::sanity_checks( index, wtxn, @@ -301,5 +304,35 @@ fn compute_facet_level_database( .is_ok()); } + progress.update_progress(PostProcessingFacets::NumbersIncremental); + for (fid, delta) in incremental { + let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); + let _entered = span.enter(); + if let FacetFieldIdDelta::Incremental(delta_data) = delta { + tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing"); + FacetsUpdateIncremental::new( + index, + FacetType::Number, + fid, + delta_data, + FACET_GROUP_SIZE, + FACET_MIN_LEVEL_SIZE, + FACET_MAX_GROUP_SIZE, + ) + .execute(wtxn)?; + + debug_assert!(crate::update::facet::sanity_checks( + index, + wtxn, + fid, + FacetType::Number, + FACET_GROUP_SIZE as usize, + FACET_MIN_LEVEL_SIZE as usize, + FACET_MAX_GROUP_SIZE as usize, + ) + .is_ok()); + } + } + Ok(()) }