This commit is contained in:
Nate Nethercott 2025-07-03 09:18:22 +12:00 committed by GitHub
commit 2b0534b631
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -27,20 +27,25 @@ pub(super) fn post_process<MSP>(
indexing_context: IndexingContext<MSP>,
wtxn: &mut RwTxn<'_>,
mut global_fields_ids_map: GlobalFieldsIdsMap<'_>,
facet_field_ids_delta: FacetFieldIdsDelta,
mut facet_field_ids_delta: FacetFieldIdsDelta,
) -> Result<()>
where
MSP: Fn() -> bool + Sync,
{
let index = indexing_context.index;
indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets);
compute_facet_level_database(
let string_deltas = facet_field_ids_delta.consume_facet_string_delta().collect();
compute_facet_level_string(
index,
wtxn,
facet_field_ids_delta,
string_deltas,
&mut global_fields_ids_map,
indexing_context.progress,
)?;
let number_deltas = facet_field_ids_delta.consume_facet_number_delta().collect();
compute_facet_level_number(index, wtxn, number_deltas, indexing_context.progress)?;
compute_facet_search_database(index, wtxn, global_fields_ids_map, indexing_context.progress)?;
indexing_context.progress.update_progress(IndexingStep::PostProcessingWords);
if let Some(prefix_delta) = compute_word_fst(index, wtxn, indexing_context.progress)? {
@ -211,84 +216,82 @@ fn compute_facet_search_database(
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")]
fn compute_facet_level_database(
fn compute_facet_level_string(
index: &Index,
wtxn: &mut RwTxn,
mut facet_field_ids_delta: FacetFieldIdsDelta,
deltas: Vec<(u16, FacetFieldIdDelta)>,
global_fields_ids_map: &mut GlobalFieldsIdsMap,
progress: &Progress,
) -> Result<()> {
let rtxn = index.read_txn()?;
let filterable_attributes_rules = index.filterable_attributes_rules(&rtxn)?;
let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_string_delta().collect();
// We move all bulks at the front and incrementals (others) at the end.
deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 });
for (fid, delta) in deltas {
// skip field ids that should not be facet leveled
let Some(metadata) = global_fields_ids_map.metadata(fid) else {
continue;
// A predicate to determine if a field id should be facet leveled
let mut retain_fid = |fid: u16| {
if let Some(metadata) = global_fields_ids_map.metadata(fid) {
if metadata.require_facet_level_database(&filterable_attributes_rules) {
return true;
}
};
if !metadata.require_facet_level_database(&filterable_attributes_rules) {
continue;
}
false
};
// We partition deltas into bulk and incremental updates
let (bulk, incremental): (Vec<_>, Vec<_>) = deltas
.into_iter()
.filter(|(fid, _)| retain_fid(*fid))
.partition(|(_, delta)| matches!(delta, FacetFieldIdDelta::Bulk));
progress.update_progress(PostProcessingFacets::StringsBulk);
for (fid, _) in bulk {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string");
let _entered = span.enter();
match delta {
FacetFieldIdDelta::Bulk => {
progress.update_progress(PostProcessingFacets::StringsBulk);
tracing::debug!(%fid, "bulk string facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
.execute(wtxn)?
}
FacetFieldIdDelta::Incremental(delta_data) => {
progress.update_progress(PostProcessingFacets::StringsIncremental);
tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing");
FacetsUpdateIncremental::new(
index,
FacetType::String,
fid,
delta_data,
FACET_GROUP_SIZE,
FACET_MIN_LEVEL_SIZE,
FACET_MAX_GROUP_SIZE,
)
.execute(wtxn)?
}
tracing::debug!(%fid, "bulk string facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
.execute(wtxn)?
}
progress.update_progress(PostProcessingFacets::StringsIncremental);
for (fid, delta) in incremental {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string");
let _entered = span.enter();
if let FacetFieldIdDelta::Incremental(delta_data) = delta {
tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing");
FacetsUpdateIncremental::new(
index,
FacetType::String,
fid,
delta_data,
FACET_GROUP_SIZE,
FACET_MIN_LEVEL_SIZE,
FACET_MAX_GROUP_SIZE,
)
.execute(wtxn)?
}
}
let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_number_delta().collect();
// We move all bulks at the front and incrementals (others) at the end.
deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 });
Ok(())
}
for (fid, delta) in deltas {
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")]
fn compute_facet_level_number(
index: &Index,
wtxn: &mut RwTxn,
deltas: Vec<(u16, FacetFieldIdDelta)>,
progress: &Progress,
) -> Result<()> {
// We partition deltas into bulk and incremental updates
let (bulk, incremental): (Vec<_>, Vec<_>) =
deltas.into_iter().partition(|(_, delta)| matches!(delta, FacetFieldIdDelta::Bulk));
progress.update_progress(PostProcessingFacets::NumbersBulk);
for (fid, _) in bulk {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
let _entered = span.enter();
match delta {
FacetFieldIdDelta::Bulk => {
progress.update_progress(PostProcessingFacets::NumbersBulk);
tracing::debug!(%fid, "bulk number facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number)
.execute(wtxn)?
}
FacetFieldIdDelta::Incremental(delta_data) => {
progress.update_progress(PostProcessingFacets::NumbersIncremental);
tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing");
FacetsUpdateIncremental::new(
index,
FacetType::Number,
fid,
delta_data,
FACET_GROUP_SIZE,
FACET_MIN_LEVEL_SIZE,
FACET_MAX_GROUP_SIZE,
)
.execute(wtxn)?
}
}
tracing::debug!(%fid, "bulk number facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number)
.execute(wtxn)?;
debug_assert!(crate::update::facet::sanity_checks(
index,
wtxn,
@ -301,5 +304,35 @@ fn compute_facet_level_database(
.is_ok());
}
progress.update_progress(PostProcessingFacets::NumbersIncremental);
for (fid, delta) in incremental {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
let _entered = span.enter();
if let FacetFieldIdDelta::Incremental(delta_data) = delta {
tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing");
FacetsUpdateIncremental::new(
index,
FacetType::Number,
fid,
delta_data,
FACET_GROUP_SIZE,
FACET_MIN_LEVEL_SIZE,
FACET_MAX_GROUP_SIZE,
)
.execute(wtxn)?;
debug_assert!(crate::update::facet::sanity_checks(
index,
wtxn,
fid,
FacetType::Number,
FACET_GROUP_SIZE as usize,
FACET_MIN_LEVEL_SIZE as usize,
FACET_MAX_GROUP_SIZE as usize,
)
.is_ok());
}
}
Ok(())
}