diff --git a/crates/milli/src/update/facet/mod.rs b/crates/milli/src/update/facet/mod.rs index 911296577..8742cd904 100644 --- a/crates/milli/src/update/facet/mod.rs +++ b/crates/milli/src/update/facet/mod.rs @@ -95,6 +95,7 @@ use crate::{try_split_array_at, FieldId, Index, Result}; pub mod bulk; pub mod incremental; +pub mod new_incremental; /// A builder used to add new elements to the `facet_id_string_docids` or `facet_id_f64_docids` databases. /// diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index a850c0d03..118f0f03b 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -36,6 +36,8 @@ use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; use crate::progress::Progress; use crate::proximity::ProximityPrecision; use crate::update::del_add::DelAdd; +use crate::update::facet::new_incremental::FacetsUpdateIncremental; +use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE}; use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; @@ -203,6 +205,7 @@ where caches, FacetDatabases::new(index), index, + &rtxn, extractor_sender.facet_docids(), )?; } @@ -735,27 +738,56 @@ fn compute_facet_search_database( fn compute_facet_level_database( index: &Index, wtxn: &mut RwTxn, - facet_field_ids_delta: FacetFieldIdsDelta, + mut facet_field_ids_delta: FacetFieldIdsDelta, ) -> Result<()> { - if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() { + for (fid, delta) in facet_field_ids_delta.consume_facet_string_delta() { let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); let _entered = span.enter(); - FacetsUpdateBulk::new_not_updating_level_0( - index, - modified_facet_string_ids, - FacetType::String, - ) - .execute(wtxn)?; + match delta { + super::merger::FacetFieldIdDelta::Bulk => { + tracing::info!(%fid, "bulk string facet processing"); + FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String) + .execute(wtxn)? + } + super::merger::FacetFieldIdDelta::Incremental(delta_data) => { + tracing::info!(%fid, len=%delta_data.len(), "incremental string facet processing"); + FacetsUpdateIncremental::new( + index, + FacetType::String, + fid, + delta_data, + FACET_GROUP_SIZE, + FACET_MIN_LEVEL_SIZE, + FACET_MAX_GROUP_SIZE, + ) + .execute(wtxn)? + } + } } - if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() { + + for (fid, delta) in facet_field_ids_delta.consume_facet_number_delta() { let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); let _entered = span.enter(); - FacetsUpdateBulk::new_not_updating_level_0( - index, - modified_facet_number_ids, - FacetType::Number, - ) - .execute(wtxn)?; + match delta { + super::merger::FacetFieldIdDelta::Bulk => { + tracing::info!(%fid, "bulk number facet processing"); + FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number) + .execute(wtxn)? + } + super::merger::FacetFieldIdDelta::Incremental(delta_data) => { + tracing::info!(%fid, len=%delta_data.len(), "incremental number facet processing"); + FacetsUpdateIncremental::new( + index, + FacetType::Number, + fid, + delta_data, + FACET_GROUP_SIZE, + FACET_MIN_LEVEL_SIZE, + FACET_MAX_GROUP_SIZE, + ) + .execute(wtxn)? + } + } } Ok(()) diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 09b023ce3..90fadd379 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -1,6 +1,6 @@ use std::cell::RefCell; -use hashbrown::{HashMap, HashSet}; +use hashbrown::HashMap; use heed::types::Bytes; use heed::{Database, RoTxn}; use memmap2::Mmap; @@ -12,6 +12,7 @@ use super::extract::{ merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, GeoExtractorData, }; +use crate::update::facet::new_incremental::{FacetFieldIdChange, FacetFieldIdOperation}; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] @@ -100,23 +101,32 @@ pub fn merge_and_send_facet_docids<'extractor>( mut caches: Vec>, database: FacetDatabases, index: &Index, + rtxn: &RoTxn, docids_sender: FacetDocidsSender, ) -> Result { + let max_string_count = (index.facet_id_string_docids.len(rtxn)? / 500) as usize; + let max_number_count = (index.facet_id_f64_docids.len(rtxn)? / 500) as usize; transpose_and_freeze_caches(&mut caches)? .into_par_iter() .map(|frozen| { - let mut facet_field_ids_delta = FacetFieldIdsDelta::default(); + let mut facet_field_ids_delta = + FacetFieldIdsDelta::new(max_string_count, max_number_count); let rtxn = index.read_txn()?; merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { - facet_field_ids_delta.register_from_key(key); + let operation = if current.is_some() { + FacetFieldIdOperation::InPlace + } else { + FacetFieldIdOperation::Insert + }; + facet_field_ids_delta.register_from_key(key, operation); docids_sender.write(key, &bitmap)?; Ok(()) } Operation::Delete => { - facet_field_ids_delta.register_from_key(key); + facet_field_ids_delta.register_from_key(key, FacetFieldIdOperation::Remove); docids_sender.delete(key)?; Ok(()) } @@ -126,7 +136,10 @@ pub fn merge_and_send_facet_docids<'extractor>( Ok(facet_field_ids_delta) }) - .reduce(|| Ok(FacetFieldIdsDelta::default()), |lhs, rhs| Ok(lhs?.merge(rhs?))) + .reduce( + || Ok(FacetFieldIdsDelta::new(max_string_count, max_number_count)), + |lhs, rhs| Ok(lhs?.merge(rhs?)), + ) } pub struct FacetDatabases<'a> { @@ -162,11 +175,11 @@ pub enum FacetFieldIdDelta { } impl FacetFieldIdDelta { - fn push(&mut self, facet_value: &[u8], operation: FacetFieldIdOperation, db_size: usize) { + fn push(&mut self, facet_value: &[u8], operation: FacetFieldIdOperation, max_count: usize) { *self = match std::mem::replace(self, FacetFieldIdDelta::Bulk) { FacetFieldIdDelta::Bulk => FacetFieldIdDelta::Bulk, FacetFieldIdDelta::Incremental(mut v) => { - if v.len() >= (db_size / 500) { + if v.len() >= max_count { FacetFieldIdDelta::Bulk } else { v.push(FacetFieldIdChange { facet_value: facet_value.into(), operation }); @@ -175,39 +188,47 @@ impl FacetFieldIdDelta { } } } + + fn merge(&mut self, rhs: Option, max_count: usize) { + let Some(rhs) = rhs else { + return; + }; + *self = match (std::mem::replace(self, FacetFieldIdDelta::Bulk), rhs) { + (FacetFieldIdDelta::Bulk, _) | (_, FacetFieldIdDelta::Bulk) => FacetFieldIdDelta::Bulk, + ( + FacetFieldIdDelta::Incremental(mut left), + FacetFieldIdDelta::Incremental(mut right), + ) => { + if left.len() + right.len() >= max_count { + FacetFieldIdDelta::Bulk + } else { + left.append(&mut right); + FacetFieldIdDelta::Incremental(left) + } + } + }; + } } #[derive(Debug)] -pub struct FacetFieldIdChange { - facet_value: Box<[u8]>, - operation: FacetFieldIdOperation, -} - -#[derive(Debug, Clone, Copy)] -pub enum FacetFieldIdOperation { - /// The docids have been modified for an existing facet value - /// - /// The modification must be propagated to upper levels, without changing the structure of the tree - InPlace, - /// A new value has been inserted - /// - /// The modification must be propagated to upper levels, splitting nodes and adding new levels as necessary. - Insert, - /// An existing value has been deleted - /// - /// The modification must be propagated to upper levels, merging nodes and removing levels as necessary. - Remove, -} - -#[derive(Debug, Default)] pub struct FacetFieldIdsDelta { /// The field ids that have been modified modified_facet_string_ids: HashMap, modified_facet_number_ids: HashMap, - db_size: usize, + max_string_count: usize, + max_number_count: usize, } impl FacetFieldIdsDelta { + pub fn new(max_string_count: usize, max_number_count: usize) -> Self { + Self { + max_string_count, + max_number_count, + modified_facet_string_ids: Default::default(), + modified_facet_number_ids: Default::default(), + } + } + fn register_facet_string_id( &mut self, field_id: FieldId, @@ -217,7 +238,7 @@ impl FacetFieldIdsDelta { self.modified_facet_string_ids .entry(field_id) .or_insert(FacetFieldIdDelta::Incremental(Default::default())) - .push(facet_value, operation, self.db_size); + .push(facet_value, operation, self.max_string_count); } fn register_facet_number_id( @@ -229,7 +250,7 @@ impl FacetFieldIdsDelta { self.modified_facet_number_ids .entry(field_id) .or_insert(FacetFieldIdDelta::Incremental(Default::default())) - .push(facet_value, operation, self.db_size); + .push(facet_value, operation, self.max_number_count); } fn register_from_key(&mut self, key: &[u8], operation: FacetFieldIdOperation) { @@ -241,36 +262,37 @@ impl FacetFieldIdsDelta { } } - fn extract_key_data(&self, key: &[u8]) -> (FacetKind, FieldId, &[u8]) { + fn extract_key_data<'key>(&self, key: &'key [u8]) -> (FacetKind, FieldId, &'key [u8]) { let facet_kind = FacetKind::from(key[0]); let field_id = FieldId::from_be_bytes([key[1], key[2]]); let facet_value = &key[2..]; (facet_kind, field_id, facet_value) } - pub fn modified_facet_string_ids(&self) -> Option> { - if self.modified_facet_string_ids.is_empty() { - None - } else { - Some(self.modified_facet_string_ids.iter().copied().collect()) - } + pub fn consume_facet_string_delta( + &mut self, + ) -> impl Iterator + '_ { + self.modified_facet_string_ids.drain() } - pub fn modified_facet_number_ids(&self) -> Option> { - if self.modified_facet_number_ids.is_empty() { - None - } else { - Some(self.modified_facet_number_ids.iter().copied().collect()) - } + pub fn consume_facet_number_delta( + &mut self, + ) -> impl Iterator + '_ { + self.modified_facet_number_ids.drain() } pub fn merge(mut self, rhs: Self) -> Self { - let Self { modified_facet_number_ids, modified_facet_string_ids } = rhs; - modified_facet_number_ids.into_iter().for_each(|fid| { - self.modified_facet_number_ids.insert(fid); + // rhs.max_xx_count is assumed to be equal to self.max_xx_count, and so gets unused + let Self { modified_facet_number_ids, modified_facet_string_ids, .. } = rhs; + modified_facet_number_ids.into_iter().for_each(|(fid, mut delta)| { + let old_delta = self.modified_facet_number_ids.remove(&fid); + delta.merge(old_delta, self.max_number_count); + self.modified_facet_number_ids.insert(fid, delta); }); - modified_facet_string_ids.into_iter().for_each(|fid| { - self.modified_facet_string_ids.insert(fid); + modified_facet_string_ids.into_iter().for_each(|(fid, mut delta)| { + let old_delta = self.modified_facet_string_ids.remove(&fid); + delta.merge(old_delta, self.max_string_count); + self.modified_facet_string_ids.insert(fid, delta); }); self }