diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index 53fd8a89b..63536c559 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -100,6 +100,7 @@ where caches, FacetDatabases::new(index), index, + &rtxn, extractor_sender.facet_docids(), )?; } diff --git a/crates/milli/src/update/new/indexer/post_processing.rs b/crates/milli/src/update/new/indexer/post_processing.rs index 6bd139068..201ab9ec9 100644 --- a/crates/milli/src/update/new/indexer/post_processing.rs +++ b/crates/milli/src/update/new/indexer/post_processing.rs @@ -8,7 +8,10 @@ use super::document_changes::IndexingContext; use crate::facet::FacetType; use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; use crate::update::del_add::DelAdd; +use crate::update::facet::new_incremental::FacetsUpdateIncremental; +use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE}; use crate::update::new::facet_search_builder::FacetSearchBuilder; +use crate::update::new::merger::FacetFieldIdDelta; use crate::update::new::steps::IndexingStep; use crate::update::new::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; use crate::update::new::words_prefix_docids::{ @@ -160,27 +163,66 @@ fn compute_facet_search_database( fn compute_facet_level_database( index: &Index, wtxn: &mut RwTxn, - facet_field_ids_delta: FacetFieldIdsDelta, + mut facet_field_ids_delta: FacetFieldIdsDelta, ) -> Result<()> { - if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() { + for (fid, delta) in facet_field_ids_delta.consume_facet_string_delta() { let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); let _entered = span.enter(); - FacetsUpdateBulk::new_not_updating_level_0( - index, - modified_facet_string_ids, - FacetType::String, - ) - .execute(wtxn)?; + match delta { + FacetFieldIdDelta::Bulk => { + tracing::debug!(%fid, "bulk string facet processing"); + FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String) + .execute(wtxn)? + } + FacetFieldIdDelta::Incremental(delta_data) => { + tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing"); + FacetsUpdateIncremental::new( + index, + FacetType::String, + fid, + delta_data, + FACET_GROUP_SIZE, + FACET_MIN_LEVEL_SIZE, + FACET_MAX_GROUP_SIZE, + ) + .execute(wtxn)? + } + } } - if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() { + + for (fid, delta) in facet_field_ids_delta.consume_facet_number_delta() { let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); let _entered = span.enter(); - FacetsUpdateBulk::new_not_updating_level_0( + match delta { + FacetFieldIdDelta::Bulk => { + tracing::debug!(%fid, "bulk number facet processing"); + FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number) + .execute(wtxn)? + } + FacetFieldIdDelta::Incremental(delta_data) => { + tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing"); + FacetsUpdateIncremental::new( + index, + FacetType::Number, + fid, + delta_data, + FACET_GROUP_SIZE, + FACET_MIN_LEVEL_SIZE, + FACET_MAX_GROUP_SIZE, + ) + .execute(wtxn)? + } + } + debug_assert!(crate::update::facet::sanity_checks( index, - modified_facet_number_ids, + wtxn, + fid, FacetType::Number, + FACET_GROUP_SIZE as usize, + FACET_MIN_LEVEL_SIZE as usize, + FACET_MAX_GROUP_SIZE as usize, ) - .execute(wtxn)?; + .is_ok()); } Ok(()) diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 9e87388a2..6f3fd35cb 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -1,6 +1,6 @@ use std::cell::RefCell; -use hashbrown::HashSet; +use hashbrown::HashMap; use heed::types::Bytes; use heed::{Database, RoTxn}; use memmap2::Mmap; @@ -12,6 +12,7 @@ use super::extract::{ merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, GeoExtractorData, }; +use crate::update::facet::new_incremental::{FacetFieldIdChange, FacetFieldIdOperation}; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] @@ -100,23 +101,34 @@ pub fn merge_and_send_facet_docids<'extractor>( mut caches: Vec>, database: FacetDatabases, index: &Index, + rtxn: &RoTxn, docids_sender: FacetDocidsSender, ) -> Result { + let max_string_count = (index.facet_id_string_docids.len(rtxn)? / 500) as usize; + let max_number_count = (index.facet_id_f64_docids.len(rtxn)? / 500) as usize; + let max_string_count = max_string_count.clamp(1000, 100_000); + let max_number_count = max_number_count.clamp(1000, 100_000); transpose_and_freeze_caches(&mut caches)? .into_par_iter() .map(|frozen| { - let mut facet_field_ids_delta = FacetFieldIdsDelta::default(); + let mut facet_field_ids_delta = + FacetFieldIdsDelta::new(max_string_count, max_number_count); let rtxn = index.read_txn()?; merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { - facet_field_ids_delta.register_from_key(key); + let operation = if current.is_some() { + FacetFieldIdOperation::InPlace + } else { + FacetFieldIdOperation::Insert + }; + facet_field_ids_delta.register_from_key(key, operation); docids_sender.write(key, &bitmap)?; Ok(()) } Operation::Delete => { - facet_field_ids_delta.register_from_key(key); + facet_field_ids_delta.register_from_key(key, FacetFieldIdOperation::Remove); docids_sender.delete(key)?; Ok(()) } @@ -126,7 +138,10 @@ pub fn merge_and_send_facet_docids<'extractor>( Ok(facet_field_ids_delta) }) - .reduce(|| Ok(FacetFieldIdsDelta::default()), |lhs, rhs| Ok(lhs?.merge(rhs?))) + .reduce( + || Ok(FacetFieldIdsDelta::new(max_string_count, max_number_count)), + |lhs, rhs| Ok(lhs?.merge(rhs?)), + ) } pub struct FacetDatabases<'a> { @@ -155,60 +170,141 @@ impl<'a> FacetDatabases<'a> { } } -#[derive(Debug, Default)] +#[derive(Debug)] +pub enum FacetFieldIdDelta { + Bulk, + Incremental(Vec), +} + +impl FacetFieldIdDelta { + fn push(&mut self, facet_value: &[u8], operation: FacetFieldIdOperation, max_count: usize) { + *self = match std::mem::replace(self, FacetFieldIdDelta::Bulk) { + FacetFieldIdDelta::Bulk => FacetFieldIdDelta::Bulk, + FacetFieldIdDelta::Incremental(mut v) => { + if v.len() >= max_count { + FacetFieldIdDelta::Bulk + } else { + v.push(FacetFieldIdChange { facet_value: facet_value.into(), operation }); + FacetFieldIdDelta::Incremental(v) + } + } + } + } + + fn merge(&mut self, rhs: Option, max_count: usize) { + let Some(rhs) = rhs else { + return; + }; + *self = match (std::mem::replace(self, FacetFieldIdDelta::Bulk), rhs) { + (FacetFieldIdDelta::Bulk, _) | (_, FacetFieldIdDelta::Bulk) => FacetFieldIdDelta::Bulk, + ( + FacetFieldIdDelta::Incremental(mut left), + FacetFieldIdDelta::Incremental(mut right), + ) => { + if left.len() + right.len() >= max_count { + FacetFieldIdDelta::Bulk + } else { + left.append(&mut right); + FacetFieldIdDelta::Incremental(left) + } + } + }; + } +} + +#[derive(Debug)] pub struct FacetFieldIdsDelta { /// The field ids that have been modified - modified_facet_string_ids: HashSet, - modified_facet_number_ids: HashSet, + modified_facet_string_ids: HashMap, + modified_facet_number_ids: HashMap, + max_string_count: usize, + max_number_count: usize, } impl FacetFieldIdsDelta { - fn register_facet_string_id(&mut self, field_id: FieldId) { - self.modified_facet_string_ids.insert(field_id); + pub fn new(max_string_count: usize, max_number_count: usize) -> Self { + Self { + max_string_count, + max_number_count, + modified_facet_string_ids: Default::default(), + modified_facet_number_ids: Default::default(), + } } - fn register_facet_number_id(&mut self, field_id: FieldId) { - self.modified_facet_number_ids.insert(field_id); + fn register_facet_string_id( + &mut self, + field_id: FieldId, + facet_value: &[u8], + operation: FacetFieldIdOperation, + ) { + self.modified_facet_string_ids + .entry(field_id) + .or_insert(FacetFieldIdDelta::Incremental(Default::default())) + .push(facet_value, operation, self.max_string_count); } - fn register_from_key(&mut self, key: &[u8]) { - let (facet_kind, field_id) = self.extract_key_data(key); - match facet_kind { - FacetKind::Number => self.register_facet_number_id(field_id), - FacetKind::String => self.register_facet_string_id(field_id), + fn register_facet_number_id( + &mut self, + field_id: FieldId, + facet_value: &[u8], + operation: FacetFieldIdOperation, + ) { + self.modified_facet_number_ids + .entry(field_id) + .or_insert(FacetFieldIdDelta::Incremental(Default::default())) + .push(facet_value, operation, self.max_number_count); + } + + fn register_from_key(&mut self, key: &[u8], operation: FacetFieldIdOperation) { + let (facet_kind, field_id, facet_value) = self.extract_key_data(key); + match (facet_kind, facet_value) { + (FacetKind::Number, Some(facet_value)) => { + self.register_facet_number_id(field_id, facet_value, operation) + } + (FacetKind::String, Some(facet_value)) => { + self.register_facet_string_id(field_id, facet_value, operation) + } _ => (), } } - fn extract_key_data(&self, key: &[u8]) -> (FacetKind, FieldId) { + fn extract_key_data<'key>(&self, key: &'key [u8]) -> (FacetKind, FieldId, Option<&'key [u8]>) { let facet_kind = FacetKind::from(key[0]); let field_id = FieldId::from_be_bytes([key[1], key[2]]); - (facet_kind, field_id) + let facet_value = if key.len() >= 4 { + // level is also stored in the key at [3] (always 0) + Some(&key[4..]) + } else { + None + }; + + (facet_kind, field_id, facet_value) } - pub fn modified_facet_string_ids(&self) -> Option> { - if self.modified_facet_string_ids.is_empty() { - None - } else { - Some(self.modified_facet_string_ids.iter().copied().collect()) - } + pub fn consume_facet_string_delta( + &mut self, + ) -> impl Iterator + '_ { + self.modified_facet_string_ids.drain() } - pub fn modified_facet_number_ids(&self) -> Option> { - if self.modified_facet_number_ids.is_empty() { - None - } else { - Some(self.modified_facet_number_ids.iter().copied().collect()) - } + pub fn consume_facet_number_delta( + &mut self, + ) -> impl Iterator + '_ { + self.modified_facet_number_ids.drain() } pub fn merge(mut self, rhs: Self) -> Self { - let Self { modified_facet_number_ids, modified_facet_string_ids } = rhs; - modified_facet_number_ids.into_iter().for_each(|fid| { - self.modified_facet_number_ids.insert(fid); + // rhs.max_xx_count is assumed to be equal to self.max_xx_count, and so gets unused + let Self { modified_facet_number_ids, modified_facet_string_ids, .. } = rhs; + modified_facet_number_ids.into_iter().for_each(|(fid, mut delta)| { + let old_delta = self.modified_facet_number_ids.remove(&fid); + delta.merge(old_delta, self.max_number_count); + self.modified_facet_number_ids.insert(fid, delta); }); - modified_facet_string_ids.into_iter().for_each(|fid| { - self.modified_facet_string_ids.insert(fid); + modified_facet_string_ids.into_iter().for_each(|(fid, mut delta)| { + let old_delta = self.modified_facet_string_ids.remove(&fid); + delta.merge(old_delta, self.max_string_count); + self.modified_facet_string_ids.insert(fid, delta); }); self }