Merge pull request #5468 from meilisearch/more-precise-post-processing

More Precise Post Processing
This commit is contained in:
Clément Renault 2025-03-27 10:07:09 +00:00 committed by GitHub
commit bb2e9419d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 72 additions and 12 deletions

View File

@ -7,12 +7,13 @@ use itertools::{merge_join_by, EitherOrBoth};
use super::document_changes::IndexingContext;
use crate::facet::FacetType;
use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::progress::Progress;
use crate::update::del_add::DelAdd;
use crate::update::facet::new_incremental::FacetsUpdateIncremental;
use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
use crate::update::new::facet_search_builder::FacetSearchBuilder;
use crate::update::new::merger::FacetFieldIdDelta;
use crate::update::new::steps::IndexingStep;
use crate::update::new::steps::{IndexingStep, PostProcessingFacets, PostProcessingWords};
use crate::update::new::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
use crate::update::new::words_prefix_docids::{
compute_exact_word_prefix_docids, compute_word_prefix_docids, compute_word_prefix_fid_docids,
@ -33,11 +34,23 @@ where
{
let index = indexing_context.index;
indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets);
compute_facet_level_database(index, wtxn, facet_field_ids_delta, &mut global_fields_ids_map)?;
compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
compute_facet_level_database(
index,
wtxn,
facet_field_ids_delta,
&mut global_fields_ids_map,
indexing_context.progress,
)?;
compute_facet_search_database(index, wtxn, global_fields_ids_map, indexing_context.progress)?;
indexing_context.progress.update_progress(IndexingStep::PostProcessingWords);
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
compute_prefix_database(index, wtxn, prefix_delta, indexing_context.grenad_parameters)?;
if let Some(prefix_delta) = compute_word_fst(index, wtxn, indexing_context.progress)? {
compute_prefix_database(
index,
wtxn,
prefix_delta,
indexing_context.grenad_parameters,
indexing_context.progress,
)?;
};
Ok(())
}
@ -48,21 +61,32 @@ fn compute_prefix_database(
wtxn: &mut RwTxn,
prefix_delta: PrefixDelta,
grenad_parameters: &GrenadParameters,
progress: &Progress,
) -> Result<()> {
let PrefixDelta { modified, deleted } = prefix_delta;
// Compute word prefix docids
progress.update_progress(PostProcessingWords::WordPrefixDocids);
compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
// Compute exact word prefix docids
progress.update_progress(PostProcessingWords::ExactWordPrefixDocids);
compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
// Compute word prefix fid docids
progress.update_progress(PostProcessingWords::WordPrefixFieldIdDocids);
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
// Compute word prefix position docids
progress.update_progress(PostProcessingWords::WordPrefixPositionDocids);
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters)
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing")]
fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result<Option<PrefixDelta>> {
fn compute_word_fst(
index: &Index,
wtxn: &mut RwTxn,
progress: &Progress,
) -> Result<Option<PrefixDelta>> {
let rtxn = index.read_txn()?;
progress.update_progress(PostProcessingWords::WordFst);
let words_fst = index.words_fst(&rtxn)?;
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
let prefix_settings = index.prefix_settings(&rtxn)?;
@ -112,8 +136,10 @@ fn compute_facet_search_database(
index: &Index,
wtxn: &mut RwTxn,
global_fields_ids_map: GlobalFieldsIdsMap,
progress: &Progress,
) -> Result<()> {
let rtxn = index.read_txn()?;
progress.update_progress(PostProcessingFacets::FacetSearch);
// if the facet search is not enabled, we can skip the rest of the function
if !index.facet_search(wtxn)? {
@ -171,10 +197,16 @@ fn compute_facet_level_database(
wtxn: &mut RwTxn,
mut facet_field_ids_delta: FacetFieldIdsDelta,
global_fields_ids_map: &mut GlobalFieldsIdsMap,
progress: &Progress,
) -> Result<()> {
let rtxn = index.read_txn()?;
let filterable_attributes_rules = index.filterable_attributes_rules(&rtxn)?;
for (fid, delta) in facet_field_ids_delta.consume_facet_string_delta() {
let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_string_delta().collect();
// We move all bulks at the front and incrementals (others) at the end.
deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 });
for (fid, delta) in deltas {
// skip field ids that should not be facet leveled
let Some(metadata) = global_fields_ids_map.metadata(fid) else {
continue;
@ -187,11 +219,13 @@ fn compute_facet_level_database(
let _entered = span.enter();
match delta {
FacetFieldIdDelta::Bulk => {
progress.update_progress(PostProcessingFacets::StringsBulk);
tracing::debug!(%fid, "bulk string facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
.execute(wtxn)?
}
FacetFieldIdDelta::Incremental(delta_data) => {
progress.update_progress(PostProcessingFacets::StringsIncremental);
tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing");
FacetsUpdateIncremental::new(
index,
@ -207,16 +241,22 @@ fn compute_facet_level_database(
}
}
for (fid, delta) in facet_field_ids_delta.consume_facet_number_delta() {
let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_number_delta().collect();
// We move all bulks at the front and incrementals (others) at the end.
deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 });
for (fid, delta) in deltas {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
let _entered = span.enter();
match delta {
FacetFieldIdDelta::Bulk => {
progress.update_progress(PostProcessingFacets::NumbersBulk);
tracing::debug!(%fid, "bulk number facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number)
.execute(wtxn)?
}
FacetFieldIdDelta::Incremental(delta_data) => {
progress.update_progress(PostProcessingFacets::NumbersIncremental);
tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing");
FacetsUpdateIncremental::new(
index,

View File

@ -20,3 +20,23 @@ make_enum_progress! {
Finalizing,
}
}
make_enum_progress! {
pub enum PostProcessingFacets {
StringsBulk,
StringsIncremental,
NumbersBulk,
NumbersIncremental,
FacetSearch,
}
}
make_enum_progress! {
pub enum PostProcessingWords {
WordFst,
WordPrefixDocids,
ExactWordPrefixDocids,
WordPrefixFieldIdDocids,
WordPrefixPositionDocids,
}
}