From 6b3a2c7281b4c9ce4e7d77b33d654a375f7ec019 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 6 Jan 2025 15:07:55 +0100 Subject: [PATCH 01/14] Add sanity checks for facet values --- crates/milli/src/update/facet/mod.rs | 199 ++++++++++++++++++++++++++- 1 file changed, 198 insertions(+), 1 deletion(-) diff --git a/crates/milli/src/update/facet/mod.rs b/crates/milli/src/update/facet/mod.rs index 911296577..44f499f8c 100644 --- a/crates/milli/src/update/facet/mod.rs +++ b/crates/milli/src/update/facet/mod.rs @@ -79,17 +79,23 @@ pub const FACET_MIN_LEVEL_SIZE: u8 = 5; use std::collections::BTreeSet; use std::fs::File; use std::io::BufReader; +use std::ops::Bound; use grenad::Merger; use heed::types::{Bytes, DecodeIgnore}; +use heed::BytesDecode as _; +use roaring::RoaringBitmap; use time::OffsetDateTime; use tracing::debug; use self::incremental::FacetsUpdateIncremental; use super::{FacetsUpdateBulk, MergeDeladdBtreesetString, MergeDeladdCboRoaringBitmaps}; use crate::facet::FacetType; -use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec}; +use crate::heed_codec::facet::{ + FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec, OrderedF64Codec, +}; use crate::heed_codec::BytesRefCodec; +use crate::search::facet::get_highest_level; use crate::update::del_add::{DelAdd, KvReaderDelAdd}; use crate::{try_split_array_at, FieldId, Index, Result}; @@ -646,3 +652,194 @@ mod comparison_bench { } } } + +/// Run sanity checks on the specified fid tree +/// +/// 1. No "orphan" child value, any child value has a parent +/// 2. Any docid in the child appears in the parent +/// 3. No docid in the parent is missing from all its children +/// 4. no group is bigger than max_group_size +/// 5. Less than 50% of groups are bigger than group_size +/// 6. group size matches the number of children +/// 7. max_level is < 255 +pub(crate) fn sanity_checks( + index: &Index, + rtxn: &heed::RoTxn, + field_id: FieldId, + facet_type: FacetType, + group_size: usize, + _min_level_size: usize, // might add a check on level size later + max_group_size: usize, +) -> Result<()> { + tracing::info!(%field_id, ?facet_type, "performing sanity checks"); + let database = match facet_type { + FacetType::String => { + index.facet_id_string_docids.remap_key_type::>() + } + FacetType::Number => { + index.facet_id_f64_docids.remap_key_type::>() + } + }; + + let leaf_prefix: FacetGroupKey<&[u8]> = FacetGroupKey { field_id, level: 0, left_bound: &[] }; + + let leaf_it = database.prefix_iter(rtxn, &leaf_prefix)?; + + let max_level = get_highest_level(rtxn, database, field_id)?; + if max_level == u8::MAX { + panic!("max_level == 255"); + } + + for leaf in leaf_it { + let (leaf_facet_value, leaf_docids) = leaf?; + let mut current_level = 0; + + let mut current_parent_facet_value: Option> = None; + let mut current_parent_docids: Option = None; + loop { + current_level += 1; + if current_level >= max_level { + break; + } + let parent_key_right_bound = FacetGroupKey { + field_id, + level: current_level, + left_bound: leaf_facet_value.left_bound, + }; + let (parent_facet_value, parent_docids) = database + .get_lower_than_or_equal_to(rtxn, &parent_key_right_bound)? + .expect("no parent found"); + if parent_facet_value.level != current_level { + panic!( + "wrong parent level, found_level={}, expected_level={}", + parent_facet_value.level, current_level + ); + } + if parent_facet_value.field_id != field_id { + panic!("wrong parent fid"); + } + if parent_facet_value.left_bound > leaf_facet_value.left_bound { + panic!("wrong parent left bound"); + } + + if !leaf_docids.bitmap.is_subset(&parent_docids.bitmap) { + panic!( + "missing docids from leaf in parent, current_level={}, parent={}, child={}, missing={missing:?}, child_len={}, child={:?}", + current_level, + facet_to_string(parent_facet_value.left_bound, facet_type), + facet_to_string(leaf_facet_value.left_bound, facet_type), + leaf_docids.bitmap.len(), + leaf_docids.bitmap.clone(), + missing=leaf_docids.bitmap - parent_docids.bitmap, + ) + } + + if let Some(current_parent_facet_value) = current_parent_facet_value { + if current_parent_facet_value.field_id != parent_facet_value.field_id { + panic!("wrong parent parent fid"); + } + if current_parent_facet_value.level + 1 != parent_facet_value.level { + panic!("wrong parent parent level"); + } + if current_parent_facet_value.left_bound < parent_facet_value.left_bound { + panic!("wrong parent parent left bound"); + } + } + + if let Some(current_parent_docids) = current_parent_docids { + if !current_parent_docids.bitmap.is_subset(&parent_docids.bitmap) { + panic!("missing docids from intermediate node in parent, parent_level={}, parent={}, intermediate={}, missing={missing:?}, intermediate={:?}", + parent_facet_value.level, + facet_to_string(parent_facet_value.left_bound, facet_type), + facet_to_string(current_parent_facet_value.unwrap().left_bound, facet_type), + current_parent_docids.bitmap.clone(), + missing=current_parent_docids.bitmap - parent_docids.bitmap, + ); + } + } + + current_parent_facet_value = Some(parent_facet_value); + current_parent_docids = Some(parent_docids); + } + } + tracing::info!(%field_id, ?facet_type, "checked all leaves"); + + let mut current_level = max_level; + let mut greater_than_group = 0usize; + let mut total = 0usize; + loop { + if current_level == 0 { + break; + } + let child_level = current_level - 1; + tracing::info!(%field_id, ?facet_type, %current_level, "checked groups for level"); + let level_groups_prefix: FacetGroupKey<&[u8]> = + FacetGroupKey { field_id, level: current_level, left_bound: &[] }; + let mut level_groups_it = database.prefix_iter(rtxn, &level_groups_prefix)?.peekable(); + + 'group_it: loop { + let Some(group) = level_groups_it.next() else { break 'group_it }; + + let (group_facet_value, group_docids) = group?; + let child_left_bound = group_facet_value.left_bound.to_owned(); + let mut expected_docids = RoaringBitmap::new(); + let mut expected_size = 0usize; + let right_bound = level_groups_it + .peek() + .and_then(|res| res.as_ref().ok()) + .map(|(key, _)| key.left_bound); + let child_left_bound = FacetGroupKey { + field_id, + level: child_level, + left_bound: child_left_bound.as_slice(), + }; + let child_left_bound = Bound::Included(&child_left_bound); + let child_right_bound; + let child_right_bound = if let Some(right_bound) = right_bound { + child_right_bound = + FacetGroupKey { field_id, level: child_level, left_bound: right_bound }; + Bound::Excluded(&child_right_bound) + } else { + Bound::Unbounded + }; + let children = database.range(rtxn, &(child_left_bound, child_right_bound))?; + for child in children { + let (child_facet_value, child_docids) = child?; + if child_facet_value.field_id != field_id { + break; + } + if child_facet_value.level != child_level { + break; + } + expected_size += 1; + expected_docids |= &child_docids.bitmap; + } + assert_eq!(expected_size, group_docids.size as usize); + assert!(expected_size <= max_group_size); + assert_eq!(expected_docids, group_docids.bitmap); + total += 1; + if expected_size > group_size { + greater_than_group += 1; + } + } + + current_level -= 1; + } + if greater_than_group * 2 > total { + panic!("too many groups have a size > group_size"); + } + + tracing::info!("sanity checks OK"); + + Ok(()) +} + +fn facet_to_string(facet_value: &[u8], facet_type: FacetType) -> String { + match facet_type { + FacetType::String => bstr::BStr::new(facet_value).to_string(), + FacetType::Number => match OrderedF64Codec::bytes_decode(facet_value) { + Ok(value) => value.to_string(), + Err(e) => format!("error: {e} (bytes: {facet_value:?}"), + }, + } +} From 6e9aa4989377598d83d062a3d0f1fa49e192ffa3 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 6 Jan 2025 15:08:07 +0100 Subject: [PATCH 02/14] add valid_facet_value utility function --- crates/milli/src/update/index_documents/helpers/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/milli/src/update/index_documents/helpers/mod.rs b/crates/milli/src/update/index_documents/helpers/mod.rs index c188e324d..195d12455 100644 --- a/crates/milli/src/update/index_documents/helpers/mod.rs +++ b/crates/milli/src/update/index_documents/helpers/mod.rs @@ -16,6 +16,10 @@ pub fn valid_lmdb_key(key: impl AsRef<[u8]>) -> bool { key.as_ref().len() <= MAX_WORD_LENGTH * 2 && !key.as_ref().is_empty() } +pub fn valid_facet_value(facet_value: impl AsRef<[u8]>) -> bool { + facet_value.as_ref().len() <= (MAX_WORD_LENGTH * 2) - 3 && !facet_value.as_ref().is_empty() +} + /// Divides one slice into two at an index, returns `None` if mid is out of bounds. pub fn try_split_at(slice: &[T], mid: usize) -> Option<(&[T], &[T])> { if mid <= slice.len() { From 5d219587b8d8109c8ba4745e603fba744814b0a4 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 6 Jan 2025 15:08:36 +0100 Subject: [PATCH 03/14] Add new incremental facet indexing --- crates/milli/src/update/facet/mod.rs | 1 + .../milli/src/update/facet/new_incremental.rs | 469 ++++++++++++++++++ 2 files changed, 470 insertions(+) create mode 100644 crates/milli/src/update/facet/new_incremental.rs diff --git a/crates/milli/src/update/facet/mod.rs b/crates/milli/src/update/facet/mod.rs index 44f499f8c..02b6e7649 100644 --- a/crates/milli/src/update/facet/mod.rs +++ b/crates/milli/src/update/facet/mod.rs @@ -101,6 +101,7 @@ use crate::{try_split_array_at, FieldId, Index, Result}; pub mod bulk; pub mod incremental; +pub mod new_incremental; /// A builder used to add new elements to the `facet_id_string_docids` or `facet_id_f64_docids` databases. /// diff --git a/crates/milli/src/update/facet/new_incremental.rs b/crates/milli/src/update/facet/new_incremental.rs new file mode 100644 index 000000000..57358888e --- /dev/null +++ b/crates/milli/src/update/facet/new_incremental.rs @@ -0,0 +1,469 @@ +use std::ops::Bound; + +use heed::types::{Bytes, DecodeIgnore}; +use heed::{BytesDecode as _, Database, RwTxn}; +use roaring::RoaringBitmap; + +use crate::facet::FacetType; +use crate::heed_codec::facet::{ + FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec, +}; +use crate::heed_codec::BytesRefCodec; +use crate::search::facet::get_highest_level; +use crate::update::valid_facet_value; +use crate::{FieldId, Index, Result}; + +pub struct FacetsUpdateIncremental { + inner: FacetsUpdateIncrementalInner, + delta_data: Vec, +} + +struct FacetsUpdateIncrementalInner { + db: Database, FacetGroupValueCodec>, + field_id: FieldId, + group_size: u8, + min_level_size: u8, + max_group_size: u8, +} + +impl FacetsUpdateIncremental { + pub fn new( + index: &Index, + facet_type: FacetType, + field_id: FieldId, + delta_data: Vec, + group_size: u8, + min_level_size: u8, + max_group_size: u8, + ) -> Self { + FacetsUpdateIncremental { + inner: FacetsUpdateIncrementalInner { + db: match facet_type { + FacetType::String => index + .facet_id_string_docids + .remap_key_type::>(), + FacetType::Number => index + .facet_id_f64_docids + .remap_key_type::>(), + }, + field_id, + group_size, + min_level_size, + max_group_size, + }, + + delta_data, + } + } + + #[tracing::instrument(level = "trace", skip_all, target = "indexing::facets::incremental")] + pub fn execute(mut self, wtxn: &mut RwTxn) -> Result<()> { + if self.delta_data.is_empty() { + return Ok(()); + } + self.delta_data.sort_unstable_by( + |FacetFieldIdChange { facet_value: left, .. }, + FacetFieldIdChange { facet_value: right, .. }| left.cmp(right), + ); + + self.inner.find_touched_parents( + wtxn, + 0, + self.delta_data + .into_iter() + // reverse lexicographic order + .rev() + .map(|change| change.facet_value.into()), + )?; + + self.inner.add_or_delete_level(wtxn) + } +} + +impl FacetsUpdateIncrementalInner { + /// WARNING: `touched_children` must be sorted in **reverse** lexicographic order. + fn find_touched_parents( + &self, + wtxn: &mut RwTxn, + child_level: u8, + mut touched_children: impl Iterator>, + ) -> Result<()> { + let mut touched_parents = vec![]; + let Some(parent_level) = child_level.checked_add(1) else { return Ok(()) }; + let parent_level_left_bound: FacetGroupKey<&[u8]> = + FacetGroupKey { field_id: self.field_id, level: parent_level, left_bound: &[] }; + + let mut last_parent: Option> = None; + + for child in &mut touched_children { + if !valid_facet_value(&child) { + continue; + } + + if let Some(last_parent) = &last_parent { + if child.as_slice() >= last_parent.as_slice() { + self.compute_parent_group(wtxn, child_level, child)?; + continue; + } + } + + // need to find a new parent + let parent_key_prefix = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: child.as_slice(), + }; + + let parent = self + .db + .remap_data_type::() + .rev_range( + wtxn, + &( + Bound::Excluded(&parent_level_left_bound), + Bound::Included(&parent_key_prefix), + ), + )? + .next(); + + match parent { + Some(Ok((parent_key, _parent_value))) => { + // found parent, cache it for next keys + last_parent = Some(parent_key.left_bound.to_owned()); + + // add to modified list for parent level + touched_parents.push(parent_key.left_bound.to_owned()); + self.compute_parent_group(wtxn, child_level, child)?; + } + Some(Err(err)) => return Err(err.into()), + None => { + self.compute_parent_group(wtxn, child_level, child)?; + break; + } + } + } + // do we have children without parents? + if let Some(child) = touched_children.next() { + // no parent for that key + let mut it = self + .db + .remap_data_type::() + .prefix_iter_mut(wtxn, &parent_level_left_bound)?; + match it.next() { + // 1. left of the current left bound, or + Some(Ok((first_key, _first_value))) => 'change_left_bound: { + // make sure we don't spill on the neighboring fid (level also included defensively) + if first_key.field_id != self.field_id || first_key.level != parent_level { + break 'change_left_bound; + } + // remove old left bound + unsafe { it.del_current()? }; + drop(it); + // pop all elements and order to visit the new left bound + touched_parents.push(child.clone()); + self.compute_parent_group(wtxn, child_level, child)?; + for child in touched_children { + let new_left_bound = touched_parents.last_mut().unwrap(); + new_left_bound.clear(); + new_left_bound.extend_from_slice(&child); + self.compute_parent_group(wtxn, child_level, child)?; + } + } + Some(Err(err)) => return Err(err.into()), + // 2. max level reached, exit + None => { + drop(it); + self.compute_parent_group(wtxn, child_level, child)?; + for child in touched_children { + self.compute_parent_group(wtxn, child_level, child)?; + } + } + } + } + self.find_touched_parents( + wtxn, + parent_level, + touched_parents + // no need to `rev` here because the parents were already visited in reverse order + .into_iter(), + ) + } + + fn compute_parent_group( + &self, + wtxn: &mut RwTxn<'_>, + parent_level: u8, + parent_left_bound: Vec, + ) -> Result<()> { + let mut range_left_bound = parent_left_bound; + if parent_level == 0 { + return Ok(()); + } + let child_level = parent_level - 1; + + let parent_key = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: range_left_bound.as_slice(), + }; + let child_right_bound = self + .db + .remap_data_type::() + .get_greater_than(wtxn, &parent_key)? + .and_then( + |( + FacetGroupKey { + level: right_level, + field_id: right_fid, + left_bound: right_bound, + }, + _, + )| { + if parent_level != right_level || self.field_id != right_fid { + // there was a greater key, but with a greater level or fid, so not a sibling to the parent: ignore + return None; + } + Some(right_bound.to_owned()) + }, + ); + let child_right_bound = match &child_right_bound { + Some(right_bound) => Bound::Excluded(FacetGroupKey { + left_bound: right_bound.as_slice(), + field_id: self.field_id, + level: child_level, + }), + None => Bound::Unbounded, + }; + + let child_left_key = FacetGroupKey { + field_id: self.field_id, + level: child_level, + left_bound: range_left_bound.as_slice(), + }; + let mut child_left_bound = Bound::Included(child_left_key); + + loop { + let mut child_it = self.db.range(wtxn, &(child_left_bound, child_right_bound))?; + let res: Result<_> = child_it + .by_ref() + .take(self.max_group_size as usize) + // stop if we go to the next level or field id + .take_while(|res| match res { + Ok((child_key, _)) => { + child_key.field_id == self.field_id && child_key.level == child_level + } + Err(_) => true, + }) + .try_fold( + (None, FacetGroupValue { size: 0, bitmap: Default::default() }), + |(bounds, mut group_value), child_res| { + let (child_key, child_value) = child_res?; + let bounds = match bounds { + Some((left_bound, _)) => Some((left_bound, child_key.left_bound)), + None => Some((child_key.left_bound, child_key.left_bound)), + }; + // max_group_size <= u8::MAX + group_value.size += 1; + group_value.bitmap |= &child_value.bitmap; + Ok((bounds, group_value)) + }, + ); + + let (bounds, group_value) = res?; + + let Some((group_left_bound, right_bound)) = bounds else { + let update_key = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: range_left_bound.as_slice(), + }; + drop(child_it); + if let Bound::Included(_) = child_left_bound { + self.db.delete(wtxn, &update_key)?; + } + + break; + }; + + drop(child_it); + let current_left_bound = group_left_bound.to_owned(); + + let delete_old_bound = match child_left_bound { + Bound::Included(bound) => { + if bound.left_bound != current_left_bound { + Some(range_left_bound.clone()) + } else { + None + } + } + _ => None, + }; + + range_left_bound.clear(); + range_left_bound.extend_from_slice(right_bound); + let child_left_key = FacetGroupKey { + field_id: self.field_id, + level: child_level, + left_bound: range_left_bound.as_slice(), + }; + child_left_bound = Bound::Excluded(child_left_key); + + if let Some(old_bound) = delete_old_bound { + let update_key = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: old_bound.as_slice(), + }; + self.db.delete(wtxn, &update_key)?; + } + + let update_key = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: current_left_bound.as_slice(), + }; + if group_value.bitmap.is_empty() { + self.db.delete(wtxn, &update_key)?; + } else { + self.db.put(wtxn, &update_key, &group_value)?; + } + } + + Ok(()) + } + + /// Check whether the highest level has exceeded `min_level_size` * `self.group_size`. + /// If it has, we must build an addition level above it. + /// Then check whether the highest level is under `min_level_size`. + /// If it has, we must remove the complete level. + pub(crate) fn add_or_delete_level(&self, txn: &mut RwTxn<'_>) -> Result<()> { + let highest_level = get_highest_level(txn, self.db, self.field_id)?; + let mut highest_level_prefix = vec![]; + highest_level_prefix.extend_from_slice(&self.field_id.to_be_bytes()); + highest_level_prefix.push(highest_level); + + let size_highest_level = + self.db.remap_types::().prefix_iter(txn, &highest_level_prefix)?.count(); + + if size_highest_level >= self.group_size as usize * self.min_level_size as usize { + self.add_level(txn, highest_level, &highest_level_prefix, size_highest_level) + } else if size_highest_level < self.min_level_size as usize && highest_level != 0 { + self.delete_level(txn, &highest_level_prefix) + } else { + Ok(()) + } + } + + /// Delete a level. + fn delete_level(&self, txn: &mut RwTxn<'_>, highest_level_prefix: &[u8]) -> Result<()> { + let mut to_delete = vec![]; + let mut iter = + self.db.remap_types::().prefix_iter(txn, highest_level_prefix)?; + for el in iter.by_ref() { + let (k, _) = el?; + to_delete.push( + FacetGroupKeyCodec::::bytes_decode(k) + .map_err(heed::Error::Encoding)? + .into_owned(), + ); + } + drop(iter); + for k in to_delete { + self.db.delete(txn, &k.as_ref())?; + } + Ok(()) + } + + /// Build an additional level for the field id. + fn add_level( + &self, + txn: &mut RwTxn<'_>, + highest_level: u8, + highest_level_prefix: &[u8], + size_highest_level: usize, + ) -> Result<()> { + let mut groups_iter = self + .db + .remap_types::() + .prefix_iter(txn, highest_level_prefix)?; + + let nbr_new_groups = size_highest_level / self.group_size as usize; + let nbr_leftover_elements = size_highest_level % self.group_size as usize; + + let mut to_add = vec![]; + for _ in 0..nbr_new_groups { + let mut first_key = None; + let mut values = RoaringBitmap::new(); + for _ in 0..self.group_size { + let (key_bytes, value_i) = groups_iter.next().unwrap()?; + let key_i = FacetGroupKeyCodec::::bytes_decode(key_bytes) + .map_err(heed::Error::Encoding)?; + + if first_key.is_none() { + first_key = Some(key_i); + } + values |= value_i.bitmap; + } + let key = FacetGroupKey { + field_id: self.field_id, + level: highest_level + 1, + left_bound: first_key.unwrap().left_bound, + }; + let value = FacetGroupValue { size: self.group_size, bitmap: values }; + to_add.push((key.into_owned(), value)); + } + // now we add the rest of the level, in case its size is > group_size * min_level_size + // this can indeed happen if the min_level_size parameter changes between two calls to `insert` + if nbr_leftover_elements > 0 { + let mut first_key = None; + let mut values = RoaringBitmap::new(); + for _ in 0..nbr_leftover_elements { + let (key_bytes, value_i) = groups_iter.next().unwrap()?; + let key_i = FacetGroupKeyCodec::::bytes_decode(key_bytes) + .map_err(heed::Error::Encoding)?; + + if first_key.is_none() { + first_key = Some(key_i); + } + values |= value_i.bitmap; + } + let key = FacetGroupKey { + field_id: self.field_id, + level: highest_level + 1, + left_bound: first_key.unwrap().left_bound, + }; + // Note: nbr_leftover_elements can be casted to a u8 since it is bounded by `max_group_size` + // when it is created above. + let value = FacetGroupValue { size: nbr_leftover_elements as u8, bitmap: values }; + to_add.push((key.into_owned(), value)); + } + + drop(groups_iter); + for (key, value) in to_add { + self.db.put(txn, &key.as_ref(), &value)?; + } + Ok(()) + } +} + +#[derive(Debug)] +pub struct FacetFieldIdChange { + pub facet_value: Box<[u8]>, + pub operation: FacetFieldIdOperation, +} + +#[derive(Debug, Clone, Copy)] +pub enum FacetFieldIdOperation { + /// The docids have been modified for an existing facet value + /// + /// The modification must be propagated to upper levels, without changing the structure of the tree + InPlace, + /// A new value has been inserted + /// + /// The modification must be propagated to upper levels, splitting nodes and adding new levels as necessary. + Insert, + /// An existing value has been deleted + /// + /// The modification must be propagated to upper levels, merging nodes and removing levels as necessary. + Remove, +} From 7a9290aaaef2339db42e2880ef6506a12305f0f7 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 6 Jan 2025 15:08:48 +0100 Subject: [PATCH 04/14] Use new incremental facet indexing and enable sanity checks in debug --- crates/milli/src/update/new/indexer/mod.rs | 66 ++++++-- crates/milli/src/update/new/merger.rs | 168 ++++++++++++++++----- 2 files changed, 186 insertions(+), 48 deletions(-) diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index a850c0d03..84d4af3b8 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -36,6 +36,8 @@ use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; use crate::progress::Progress; use crate::proximity::ProximityPrecision; use crate::update::del_add::DelAdd; +use crate::update::facet::new_incremental::FacetsUpdateIncremental; +use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE}; use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; @@ -203,6 +205,7 @@ where caches, FacetDatabases::new(index), index, + &rtxn, extractor_sender.facet_docids(), )?; } @@ -735,27 +738,66 @@ fn compute_facet_search_database( fn compute_facet_level_database( index: &Index, wtxn: &mut RwTxn, - facet_field_ids_delta: FacetFieldIdsDelta, + mut facet_field_ids_delta: FacetFieldIdsDelta, ) -> Result<()> { - if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() { + for (fid, delta) in facet_field_ids_delta.consume_facet_string_delta() { let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); let _entered = span.enter(); - FacetsUpdateBulk::new_not_updating_level_0( - index, - modified_facet_string_ids, - FacetType::String, - ) - .execute(wtxn)?; + match delta { + super::merger::FacetFieldIdDelta::Bulk => { + tracing::debug!(%fid, "bulk string facet processing"); + FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String) + .execute(wtxn)? + } + super::merger::FacetFieldIdDelta::Incremental(delta_data) => { + tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing"); + FacetsUpdateIncremental::new( + index, + FacetType::String, + fid, + delta_data, + FACET_GROUP_SIZE, + FACET_MIN_LEVEL_SIZE, + FACET_MAX_GROUP_SIZE, + ) + .execute(wtxn)? + } + } } - if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() { + + for (fid, delta) in facet_field_ids_delta.consume_facet_number_delta() { let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); let _entered = span.enter(); - FacetsUpdateBulk::new_not_updating_level_0( + match delta { + super::merger::FacetFieldIdDelta::Bulk => { + tracing::debug!(%fid, "bulk number facet processing"); + FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number) + .execute(wtxn)? + } + super::merger::FacetFieldIdDelta::Incremental(delta_data) => { + tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing"); + FacetsUpdateIncremental::new( + index, + FacetType::Number, + fid, + delta_data, + FACET_GROUP_SIZE, + FACET_MIN_LEVEL_SIZE, + FACET_MAX_GROUP_SIZE, + ) + .execute(wtxn)? + } + } + debug_assert!(crate::update::facet::sanity_checks( index, - modified_facet_number_ids, + wtxn, + fid, FacetType::Number, + FACET_GROUP_SIZE as usize, + FACET_MIN_LEVEL_SIZE as usize, + FACET_MAX_GROUP_SIZE as usize, ) - .execute(wtxn)?; + .is_ok()); } Ok(()) diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 9e87388a2..6f3fd35cb 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -1,6 +1,6 @@ use std::cell::RefCell; -use hashbrown::HashSet; +use hashbrown::HashMap; use heed::types::Bytes; use heed::{Database, RoTxn}; use memmap2::Mmap; @@ -12,6 +12,7 @@ use super::extract::{ merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, GeoExtractorData, }; +use crate::update::facet::new_incremental::{FacetFieldIdChange, FacetFieldIdOperation}; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] @@ -100,23 +101,34 @@ pub fn merge_and_send_facet_docids<'extractor>( mut caches: Vec>, database: FacetDatabases, index: &Index, + rtxn: &RoTxn, docids_sender: FacetDocidsSender, ) -> Result { + let max_string_count = (index.facet_id_string_docids.len(rtxn)? / 500) as usize; + let max_number_count = (index.facet_id_f64_docids.len(rtxn)? / 500) as usize; + let max_string_count = max_string_count.clamp(1000, 100_000); + let max_number_count = max_number_count.clamp(1000, 100_000); transpose_and_freeze_caches(&mut caches)? .into_par_iter() .map(|frozen| { - let mut facet_field_ids_delta = FacetFieldIdsDelta::default(); + let mut facet_field_ids_delta = + FacetFieldIdsDelta::new(max_string_count, max_number_count); let rtxn = index.read_txn()?; merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { - facet_field_ids_delta.register_from_key(key); + let operation = if current.is_some() { + FacetFieldIdOperation::InPlace + } else { + FacetFieldIdOperation::Insert + }; + facet_field_ids_delta.register_from_key(key, operation); docids_sender.write(key, &bitmap)?; Ok(()) } Operation::Delete => { - facet_field_ids_delta.register_from_key(key); + facet_field_ids_delta.register_from_key(key, FacetFieldIdOperation::Remove); docids_sender.delete(key)?; Ok(()) } @@ -126,7 +138,10 @@ pub fn merge_and_send_facet_docids<'extractor>( Ok(facet_field_ids_delta) }) - .reduce(|| Ok(FacetFieldIdsDelta::default()), |lhs, rhs| Ok(lhs?.merge(rhs?))) + .reduce( + || Ok(FacetFieldIdsDelta::new(max_string_count, max_number_count)), + |lhs, rhs| Ok(lhs?.merge(rhs?)), + ) } pub struct FacetDatabases<'a> { @@ -155,60 +170,141 @@ impl<'a> FacetDatabases<'a> { } } -#[derive(Debug, Default)] +#[derive(Debug)] +pub enum FacetFieldIdDelta { + Bulk, + Incremental(Vec), +} + +impl FacetFieldIdDelta { + fn push(&mut self, facet_value: &[u8], operation: FacetFieldIdOperation, max_count: usize) { + *self = match std::mem::replace(self, FacetFieldIdDelta::Bulk) { + FacetFieldIdDelta::Bulk => FacetFieldIdDelta::Bulk, + FacetFieldIdDelta::Incremental(mut v) => { + if v.len() >= max_count { + FacetFieldIdDelta::Bulk + } else { + v.push(FacetFieldIdChange { facet_value: facet_value.into(), operation }); + FacetFieldIdDelta::Incremental(v) + } + } + } + } + + fn merge(&mut self, rhs: Option, max_count: usize) { + let Some(rhs) = rhs else { + return; + }; + *self = match (std::mem::replace(self, FacetFieldIdDelta::Bulk), rhs) { + (FacetFieldIdDelta::Bulk, _) | (_, FacetFieldIdDelta::Bulk) => FacetFieldIdDelta::Bulk, + ( + FacetFieldIdDelta::Incremental(mut left), + FacetFieldIdDelta::Incremental(mut right), + ) => { + if left.len() + right.len() >= max_count { + FacetFieldIdDelta::Bulk + } else { + left.append(&mut right); + FacetFieldIdDelta::Incremental(left) + } + } + }; + } +} + +#[derive(Debug)] pub struct FacetFieldIdsDelta { /// The field ids that have been modified - modified_facet_string_ids: HashSet, - modified_facet_number_ids: HashSet, + modified_facet_string_ids: HashMap, + modified_facet_number_ids: HashMap, + max_string_count: usize, + max_number_count: usize, } impl FacetFieldIdsDelta { - fn register_facet_string_id(&mut self, field_id: FieldId) { - self.modified_facet_string_ids.insert(field_id); + pub fn new(max_string_count: usize, max_number_count: usize) -> Self { + Self { + max_string_count, + max_number_count, + modified_facet_string_ids: Default::default(), + modified_facet_number_ids: Default::default(), + } } - fn register_facet_number_id(&mut self, field_id: FieldId) { - self.modified_facet_number_ids.insert(field_id); + fn register_facet_string_id( + &mut self, + field_id: FieldId, + facet_value: &[u8], + operation: FacetFieldIdOperation, + ) { + self.modified_facet_string_ids + .entry(field_id) + .or_insert(FacetFieldIdDelta::Incremental(Default::default())) + .push(facet_value, operation, self.max_string_count); } - fn register_from_key(&mut self, key: &[u8]) { - let (facet_kind, field_id) = self.extract_key_data(key); - match facet_kind { - FacetKind::Number => self.register_facet_number_id(field_id), - FacetKind::String => self.register_facet_string_id(field_id), + fn register_facet_number_id( + &mut self, + field_id: FieldId, + facet_value: &[u8], + operation: FacetFieldIdOperation, + ) { + self.modified_facet_number_ids + .entry(field_id) + .or_insert(FacetFieldIdDelta::Incremental(Default::default())) + .push(facet_value, operation, self.max_number_count); + } + + fn register_from_key(&mut self, key: &[u8], operation: FacetFieldIdOperation) { + let (facet_kind, field_id, facet_value) = self.extract_key_data(key); + match (facet_kind, facet_value) { + (FacetKind::Number, Some(facet_value)) => { + self.register_facet_number_id(field_id, facet_value, operation) + } + (FacetKind::String, Some(facet_value)) => { + self.register_facet_string_id(field_id, facet_value, operation) + } _ => (), } } - fn extract_key_data(&self, key: &[u8]) -> (FacetKind, FieldId) { + fn extract_key_data<'key>(&self, key: &'key [u8]) -> (FacetKind, FieldId, Option<&'key [u8]>) { let facet_kind = FacetKind::from(key[0]); let field_id = FieldId::from_be_bytes([key[1], key[2]]); - (facet_kind, field_id) + let facet_value = if key.len() >= 4 { + // level is also stored in the key at [3] (always 0) + Some(&key[4..]) + } else { + None + }; + + (facet_kind, field_id, facet_value) } - pub fn modified_facet_string_ids(&self) -> Option> { - if self.modified_facet_string_ids.is_empty() { - None - } else { - Some(self.modified_facet_string_ids.iter().copied().collect()) - } + pub fn consume_facet_string_delta( + &mut self, + ) -> impl Iterator + '_ { + self.modified_facet_string_ids.drain() } - pub fn modified_facet_number_ids(&self) -> Option> { - if self.modified_facet_number_ids.is_empty() { - None - } else { - Some(self.modified_facet_number_ids.iter().copied().collect()) - } + pub fn consume_facet_number_delta( + &mut self, + ) -> impl Iterator + '_ { + self.modified_facet_number_ids.drain() } pub fn merge(mut self, rhs: Self) -> Self { - let Self { modified_facet_number_ids, modified_facet_string_ids } = rhs; - modified_facet_number_ids.into_iter().for_each(|fid| { - self.modified_facet_number_ids.insert(fid); + // rhs.max_xx_count is assumed to be equal to self.max_xx_count, and so gets unused + let Self { modified_facet_number_ids, modified_facet_string_ids, .. } = rhs; + modified_facet_number_ids.into_iter().for_each(|(fid, mut delta)| { + let old_delta = self.modified_facet_number_ids.remove(&fid); + delta.merge(old_delta, self.max_number_count); + self.modified_facet_number_ids.insert(fid, delta); }); - modified_facet_string_ids.into_iter().for_each(|fid| { - self.modified_facet_string_ids.insert(fid); + modified_facet_string_ids.into_iter().for_each(|(fid, mut delta)| { + let old_delta = self.modified_facet_string_ids.remove(&fid); + delta.merge(old_delta, self.max_string_count); + self.modified_facet_string_ids.insert(fid, delta); }); self } From 34f4602ae82d1025228b8eb97e484d5fe19e30e3 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 6 Jan 2025 16:55:12 +0100 Subject: [PATCH 05/14] Update snapshot --- .../facet_id_f64_docids.snap | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/milli/src/update/index_documents/snapshots/mod.rs/geo_filtered_placeholder_search_should_not_return_deleted_documents/facet_id_f64_docids.snap b/crates/milli/src/update/index_documents/snapshots/mod.rs/geo_filtered_placeholder_search_should_not_return_deleted_documents/facet_id_f64_docids.snap index c45c350e7..7ab60b90d 100644 --- a/crates/milli/src/update/index_documents/snapshots/mod.rs/geo_filtered_placeholder_search_should_not_return_deleted_documents/facet_id_f64_docids.snap +++ b/crates/milli/src/update/index_documents/snapshots/mod.rs/geo_filtered_placeholder_search_should_not_return_deleted_documents/facet_id_f64_docids.snap @@ -1,5 +1,5 @@ --- -source: milli/src/update/index_documents/mod.rs +source: crates/milli/src/update/index_documents/mod.rs --- 3 0 48.9021 1 [19, ] 3 0 49.9314 1 [17, ] @@ -15,6 +15,11 @@ source: milli/src/update/index_documents/mod.rs 3 0 50.7453 1 [7, ] 3 0 50.8466 1 [10, ] 3 0 51.0537 1 [9, ] +3 1 48.9021 2 [17, 19, ] +3 1 50.1793 3 [13, 14, 15, ] +3 1 50.4502 4 [0, 3, 8, 12, ] +3 1 50.6312 2 [1, 2, ] +3 1 50.7453 3 [7, 9, 10, ] 4 0 2.271 1 [17, ] 4 0 2.3708 1 [19, ] 4 0 2.7637 1 [14, ] @@ -28,4 +33,3 @@ source: milli/src/update/index_documents/mod.rs 4 0 3.6957 1 [9, ] 4 0 3.9623 1 [12, ] 4 0 4.337 1 [10, ] - From 28cc6df7a3589a64037b878dd56c742918c225bc Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 6 Jan 2025 18:07:49 +0100 Subject: [PATCH 06/14] Fix uselessly deep stack trace --- .../milli/src/update/facet/new_incremental.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/crates/milli/src/update/facet/new_incremental.rs b/crates/milli/src/update/facet/new_incremental.rs index 57358888e..9d8c19543 100644 --- a/crates/milli/src/update/facet/new_incremental.rs +++ b/crates/milli/src/update/facet/new_incremental.rs @@ -180,13 +180,16 @@ impl FacetsUpdateIncrementalInner { } } } - self.find_touched_parents( - wtxn, - parent_level, - touched_parents - // no need to `rev` here because the parents were already visited in reverse order - .into_iter(), - ) + if !touched_parents.is_empty() { + self.find_touched_parents( + wtxn, + parent_level, + touched_parents + // no need to `rev` here because the parents were already visited in reverse order + .into_iter(), + )?; + } + Ok(()) } fn compute_parent_group( From 4d2433de128a87ac6b8cb10520ac640576257782 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 6 Jan 2025 18:23:35 +0100 Subject: [PATCH 07/14] center groups --- .../milli/src/update/facet/new_incremental.rs | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/crates/milli/src/update/facet/new_incremental.rs b/crates/milli/src/update/facet/new_incremental.rs index 9d8c19543..6d5525fec 100644 --- a/crates/milli/src/update/facet/new_incremental.rs +++ b/crates/milli/src/update/facet/new_incremental.rs @@ -246,10 +246,32 @@ impl FacetsUpdateIncrementalInner { let mut child_left_bound = Bound::Included(child_left_key); loop { + // do a first pass on the range to find the number of children + let child_count = self + .db + .remap_data_type::() + .range(wtxn, &(child_left_bound, child_right_bound))? + .take(self.max_group_size as usize * 2) + .count(); let mut child_it = self.db.range(wtxn, &(child_left_bound, child_right_bound))?; + + // pick the right group_size depending on the number of children + let group_size = if child_count >= self.max_group_size as usize * 2 { + // more than twice the max_group_size => there will be space for at least 2 groups of max_group_size + self.max_group_size as usize + } else if child_count >= self.group_size as usize { + // size in [group_size, max_group_size * 2[ + // divided by 2 it is between [group_size / 2, max_group_size[ + // this ensures that the tree is balanced + child_count / 2 + } else { + // take everything + child_count + }; + let res: Result<_> = child_it .by_ref() - .take(self.max_group_size as usize) + .take(group_size) // stop if we go to the next level or field id .take_while(|res| match res { Ok((child_key, _)) => { From 3648abbfd566d18271c958881a5a464bb1ec73be Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Tue, 7 Jan 2025 15:26:09 +0100 Subject: [PATCH 08/14] Remove unused `FacetFieldIdOperation` --- .../milli/src/update/facet/new_incremental.rs | 17 -------- crates/milli/src/update/new/merger.rs | 39 ++++++------------- 2 files changed, 12 insertions(+), 44 deletions(-) diff --git a/crates/milli/src/update/facet/new_incremental.rs b/crates/milli/src/update/facet/new_incremental.rs index 6d5525fec..f132ebf19 100644 --- a/crates/milli/src/update/facet/new_incremental.rs +++ b/crates/milli/src/update/facet/new_incremental.rs @@ -474,21 +474,4 @@ impl FacetsUpdateIncrementalInner { #[derive(Debug)] pub struct FacetFieldIdChange { pub facet_value: Box<[u8]>, - pub operation: FacetFieldIdOperation, -} - -#[derive(Debug, Clone, Copy)] -pub enum FacetFieldIdOperation { - /// The docids have been modified for an existing facet value - /// - /// The modification must be propagated to upper levels, without changing the structure of the tree - InPlace, - /// A new value has been inserted - /// - /// The modification must be propagated to upper levels, splitting nodes and adding new levels as necessary. - Insert, - /// An existing value has been deleted - /// - /// The modification must be propagated to upper levels, merging nodes and removing levels as necessary. - Remove, } diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 6f3fd35cb..090add6bd 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -12,7 +12,7 @@ use super::extract::{ merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, GeoExtractorData, }; -use crate::update::facet::new_incremental::{FacetFieldIdChange, FacetFieldIdOperation}; +use crate::update::facet::new_incremental::FacetFieldIdChange; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] @@ -118,17 +118,12 @@ pub fn merge_and_send_facet_docids<'extractor>( let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { - let operation = if current.is_some() { - FacetFieldIdOperation::InPlace - } else { - FacetFieldIdOperation::Insert - }; - facet_field_ids_delta.register_from_key(key, operation); + facet_field_ids_delta.register_from_key(key); docids_sender.write(key, &bitmap)?; Ok(()) } Operation::Delete => { - facet_field_ids_delta.register_from_key(key, FacetFieldIdOperation::Remove); + facet_field_ids_delta.register_from_key(key); docids_sender.delete(key)?; Ok(()) } @@ -177,14 +172,14 @@ pub enum FacetFieldIdDelta { } impl FacetFieldIdDelta { - fn push(&mut self, facet_value: &[u8], operation: FacetFieldIdOperation, max_count: usize) { + fn push(&mut self, facet_value: &[u8], max_count: usize) { *self = match std::mem::replace(self, FacetFieldIdDelta::Bulk) { FacetFieldIdDelta::Bulk => FacetFieldIdDelta::Bulk, FacetFieldIdDelta::Incremental(mut v) => { if v.len() >= max_count { FacetFieldIdDelta::Bulk } else { - v.push(FacetFieldIdChange { facet_value: facet_value.into(), operation }); + v.push(FacetFieldIdChange { facet_value: facet_value.into() }); FacetFieldIdDelta::Incremental(v) } } @@ -231,38 +226,28 @@ impl FacetFieldIdsDelta { } } - fn register_facet_string_id( - &mut self, - field_id: FieldId, - facet_value: &[u8], - operation: FacetFieldIdOperation, - ) { + fn register_facet_string_id(&mut self, field_id: FieldId, facet_value: &[u8]) { self.modified_facet_string_ids .entry(field_id) .or_insert(FacetFieldIdDelta::Incremental(Default::default())) - .push(facet_value, operation, self.max_string_count); + .push(facet_value, self.max_string_count); } - fn register_facet_number_id( - &mut self, - field_id: FieldId, - facet_value: &[u8], - operation: FacetFieldIdOperation, - ) { + fn register_facet_number_id(&mut self, field_id: FieldId, facet_value: &[u8]) { self.modified_facet_number_ids .entry(field_id) .or_insert(FacetFieldIdDelta::Incremental(Default::default())) - .push(facet_value, operation, self.max_number_count); + .push(facet_value, self.max_number_count); } - fn register_from_key(&mut self, key: &[u8], operation: FacetFieldIdOperation) { + fn register_from_key(&mut self, key: &[u8]) { let (facet_kind, field_id, facet_value) = self.extract_key_data(key); match (facet_kind, facet_value) { (FacetKind::Number, Some(facet_value)) => { - self.register_facet_number_id(field_id, facet_value, operation) + self.register_facet_number_id(field_id, facet_value) } (FacetKind::String, Some(facet_value)) => { - self.register_facet_string_id(field_id, facet_value, operation) + self.register_facet_string_id(field_id, facet_value) } _ => (), } From 8ee37932593c8bd7fad7cae605b2e0749ccd54b9 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 8 Jan 2025 13:58:14 +0100 Subject: [PATCH 09/14] Update after review --- .../milli/src/update/facet/new_incremental.rs | 30 +++++++++---------- .../src/update/index_documents/helpers/mod.rs | 4 +-- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/crates/milli/src/update/facet/new_incremental.rs b/crates/milli/src/update/facet/new_incremental.rs index f132ebf19..fb2f3ae6b 100644 --- a/crates/milli/src/update/facet/new_incremental.rs +++ b/crates/milli/src/update/facet/new_incremental.rs @@ -66,7 +66,7 @@ impl FacetsUpdateIncremental { FacetFieldIdChange { facet_value: right, .. }| left.cmp(right), ); - self.inner.find_touched_parents( + self.inner.find_changed_parents( wtxn, 0, self.delta_data @@ -81,21 +81,21 @@ impl FacetsUpdateIncremental { } impl FacetsUpdateIncrementalInner { - /// WARNING: `touched_children` must be sorted in **reverse** lexicographic order. - fn find_touched_parents( + /// WARNING: `changed_children` must be sorted in **reverse** lexicographic order. + fn find_changed_parents( &self, wtxn: &mut RwTxn, child_level: u8, - mut touched_children: impl Iterator>, + mut changed_children: impl Iterator>, ) -> Result<()> { - let mut touched_parents = vec![]; + let mut changed_parents = vec![]; let Some(parent_level) = child_level.checked_add(1) else { return Ok(()) }; let parent_level_left_bound: FacetGroupKey<&[u8]> = FacetGroupKey { field_id: self.field_id, level: parent_level, left_bound: &[] }; let mut last_parent: Option> = None; - for child in &mut touched_children { + for child in &mut changed_children { if !valid_facet_value(&child) { continue; } @@ -132,7 +132,7 @@ impl FacetsUpdateIncrementalInner { last_parent = Some(parent_key.left_bound.to_owned()); // add to modified list for parent level - touched_parents.push(parent_key.left_bound.to_owned()); + changed_parents.push(parent_key.left_bound.to_owned()); self.compute_parent_group(wtxn, child_level, child)?; } Some(Err(err)) => return Err(err.into()), @@ -143,7 +143,7 @@ impl FacetsUpdateIncrementalInner { } } // do we have children without parents? - if let Some(child) = touched_children.next() { + if let Some(child) = changed_children.next() { // no parent for that key let mut it = self .db @@ -160,10 +160,10 @@ impl FacetsUpdateIncrementalInner { unsafe { it.del_current()? }; drop(it); // pop all elements and order to visit the new left bound - touched_parents.push(child.clone()); + changed_parents.push(child.clone()); self.compute_parent_group(wtxn, child_level, child)?; - for child in touched_children { - let new_left_bound = touched_parents.last_mut().unwrap(); + for child in changed_children { + let new_left_bound = changed_parents.last_mut().unwrap(); new_left_bound.clear(); new_left_bound.extend_from_slice(&child); self.compute_parent_group(wtxn, child_level, child)?; @@ -174,17 +174,17 @@ impl FacetsUpdateIncrementalInner { None => { drop(it); self.compute_parent_group(wtxn, child_level, child)?; - for child in touched_children { + for child in changed_children { self.compute_parent_group(wtxn, child_level, child)?; } } } } - if !touched_parents.is_empty() { - self.find_touched_parents( + if !changed_parents.is_empty() { + self.find_changed_parents( wtxn, parent_level, - touched_parents + changed_parents // no need to `rev` here because the parents were already visited in reverse order .into_iter(), )?; diff --git a/crates/milli/src/update/index_documents/helpers/mod.rs b/crates/milli/src/update/index_documents/helpers/mod.rs index 195d12455..236c63cc3 100644 --- a/crates/milli/src/update/index_documents/helpers/mod.rs +++ b/crates/milli/src/update/index_documents/helpers/mod.rs @@ -10,14 +10,14 @@ use fst::{IntoStreamer, Streamer}; pub use grenad_helpers::*; pub use merge_functions::*; -use crate::MAX_WORD_LENGTH; +use crate::{MAX_LMDB_KEY_LENGTH, MAX_WORD_LENGTH}; pub fn valid_lmdb_key(key: impl AsRef<[u8]>) -> bool { key.as_ref().len() <= MAX_WORD_LENGTH * 2 && !key.as_ref().is_empty() } pub fn valid_facet_value(facet_value: impl AsRef<[u8]>) -> bool { - facet_value.as_ref().len() <= (MAX_WORD_LENGTH * 2) - 3 && !facet_value.as_ref().is_empty() + facet_value.as_ref().len() <= MAX_LMDB_KEY_LENGTH - 3 && !facet_value.as_ref().is_empty() } /// Divides one slice into two at an index, returns `None` if mid is out of bounds. From b4005593f4a87bbdc38e785424b24ae15e01b9ec Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 8 Jan 2025 14:57:14 +0100 Subject: [PATCH 10/14] Switch to an iterative algorithm for find_changed_parents --- .../milli/src/update/facet/new_incremental.rs | 236 ++++++++++-------- 1 file changed, 129 insertions(+), 107 deletions(-) diff --git a/crates/milli/src/update/facet/new_incremental.rs b/crates/milli/src/update/facet/new_incremental.rs index fb2f3ae6b..e2aab0fab 100644 --- a/crates/milli/src/update/facet/new_incremental.rs +++ b/crates/milli/src/update/facet/new_incremental.rs @@ -63,18 +63,14 @@ impl FacetsUpdateIncremental { } self.delta_data.sort_unstable_by( |FacetFieldIdChange { facet_value: left, .. }, - FacetFieldIdChange { facet_value: right, .. }| left.cmp(right), + FacetFieldIdChange { facet_value: right, .. }| { + left.cmp(right) + // sort in **reverse** lexicographic order + .reverse() + }, ); - self.inner.find_changed_parents( - wtxn, - 0, - self.delta_data - .into_iter() - // reverse lexicographic order - .rev() - .map(|change| change.facet_value.into()), - )?; + self.inner.find_changed_parents(wtxn, self.delta_data)?; self.inner.add_or_delete_level(wtxn) } @@ -85,109 +81,135 @@ impl FacetsUpdateIncrementalInner { fn find_changed_parents( &self, wtxn: &mut RwTxn, - child_level: u8, - mut changed_children: impl Iterator>, + mut changed_children: Vec, ) -> Result<()> { let mut changed_parents = vec![]; - let Some(parent_level) = child_level.checked_add(1) else { return Ok(()) }; - let parent_level_left_bound: FacetGroupKey<&[u8]> = - FacetGroupKey { field_id: self.field_id, level: parent_level, left_bound: &[] }; + for child_level in 0u8..u8::MAX { + // child_level < u8::MAX by construction + let parent_level = child_level + 1; + let parent_level_left_bound: FacetGroupKey<&[u8]> = + FacetGroupKey { field_id: self.field_id, level: parent_level, left_bound: &[] }; - let mut last_parent: Option> = None; - - for child in &mut changed_children { - if !valid_facet_value(&child) { - continue; - } - - if let Some(last_parent) = &last_parent { - if child.as_slice() >= last_parent.as_slice() { - self.compute_parent_group(wtxn, child_level, child)?; + let mut last_parent: Option> = None; + let mut child_it = changed_children.drain(..); + 'current_level: while let Some(child) = child_it.next() { + if !valid_facet_value(&child.facet_value) { continue; } - } - // need to find a new parent - let parent_key_prefix = FacetGroupKey { - field_id: self.field_id, - level: parent_level, - left_bound: child.as_slice(), - }; - - let parent = self - .db - .remap_data_type::() - .rev_range( - wtxn, - &( - Bound::Excluded(&parent_level_left_bound), - Bound::Included(&parent_key_prefix), - ), - )? - .next(); - - match parent { - Some(Ok((parent_key, _parent_value))) => { - // found parent, cache it for next keys - last_parent = Some(parent_key.left_bound.to_owned()); - - // add to modified list for parent level - changed_parents.push(parent_key.left_bound.to_owned()); - self.compute_parent_group(wtxn, child_level, child)?; - } - Some(Err(err)) => return Err(err.into()), - None => { - self.compute_parent_group(wtxn, child_level, child)?; - break; - } - } - } - // do we have children without parents? - if let Some(child) = changed_children.next() { - // no parent for that key - let mut it = self - .db - .remap_data_type::() - .prefix_iter_mut(wtxn, &parent_level_left_bound)?; - match it.next() { - // 1. left of the current left bound, or - Some(Ok((first_key, _first_value))) => 'change_left_bound: { - // make sure we don't spill on the neighboring fid (level also included defensively) - if first_key.field_id != self.field_id || first_key.level != parent_level { - break 'change_left_bound; - } - // remove old left bound - unsafe { it.del_current()? }; - drop(it); - // pop all elements and order to visit the new left bound - changed_parents.push(child.clone()); - self.compute_parent_group(wtxn, child_level, child)?; - for child in changed_children { - let new_left_bound = changed_parents.last_mut().unwrap(); - new_left_bound.clear(); - new_left_bound.extend_from_slice(&child); - self.compute_parent_group(wtxn, child_level, child)?; + if let Some(last_parent) = &last_parent { + if &child.facet_value >= last_parent { + self.compute_parent_group(wtxn, child_level, child.facet_value)?; + continue; } } - Some(Err(err)) => return Err(err.into()), - // 2. max level reached, exit - None => { - drop(it); - self.compute_parent_group(wtxn, child_level, child)?; - for child in changed_children { - self.compute_parent_group(wtxn, child_level, child)?; + + // need to find a new parent + let parent_key_prefix = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: &*child.facet_value, + }; + + let parent = self + .db + .remap_data_type::() + .rev_range( + wtxn, + &( + Bound::Excluded(&parent_level_left_bound), + Bound::Included(&parent_key_prefix), + ), + )? + .next(); + + match parent { + Some(Ok((parent_key, _parent_value))) => { + // found parent, cache it for next keys + last_parent = Some(parent_key.left_bound.to_owned().into_boxed_slice()); + + // add to modified list for parent level + changed_parents.push(FacetFieldIdChange { + facet_value: parent_key.left_bound.to_owned().into_boxed_slice(), + }); + self.compute_parent_group(wtxn, child_level, child.facet_value)?; + } + Some(Err(err)) => return Err(err.into()), + None => { + self.compute_parent_group(wtxn, child_level, child.facet_value)?; + + // do we have children without parents? + if let Some(child) = child_it.next() { + // no parent for that key + let mut parent_it = self + .db + .remap_data_type::() + .prefix_iter_mut(wtxn, &parent_level_left_bound)?; + match parent_it.next() { + // 1. left of the current left bound, or + Some(Ok((first_key, _first_value))) => 'change_left_bound: { + // make sure we don't spill on the neighboring fid (level also included defensively) + if first_key.field_id != self.field_id + || first_key.level != parent_level + { + break 'change_left_bound; + } + // remove old left bound + unsafe { parent_it.del_current()? }; + drop(parent_it); + // pop all elements and order to visit the new left bound + changed_parents.push(FacetFieldIdChange { + facet_value: child.facet_value.clone(), + }); + self.compute_parent_group( + wtxn, + child_level, + child.facet_value, + )?; + for child in child_it.by_ref() { + let new_left_bound = + &mut changed_parents.last_mut().unwrap().facet_value; + + new_left_bound.clone_from(&child.facet_value); + + self.compute_parent_group( + wtxn, + child_level, + child.facet_value, + )?; + } + + break 'current_level; + } + Some(Err(err)) => return Err(err.into()), + // 2. max level reached, exit + None => { + drop(parent_it); + self.compute_parent_group( + wtxn, + child_level, + child.facet_value, + )?; + for child in child_it.by_ref() { + self.compute_parent_group( + wtxn, + child_level, + child.facet_value, + )?; + } + return Ok(()); + } + } + } } } } - } - if !changed_parents.is_empty() { - self.find_changed_parents( - wtxn, - parent_level, - changed_parents - // no need to `rev` here because the parents were already visited in reverse order - .into_iter(), - )?; + if changed_parents.is_empty() { + return Ok(()); + } + drop(child_it); + std::mem::swap(&mut changed_children, &mut changed_parents); + // changed_parents is now empty because changed_children was emptied by the drain } Ok(()) } @@ -196,9 +218,9 @@ impl FacetsUpdateIncrementalInner { &self, wtxn: &mut RwTxn<'_>, parent_level: u8, - parent_left_bound: Vec, + parent_left_bound: Box<[u8]>, ) -> Result<()> { - let mut range_left_bound = parent_left_bound; + let mut range_left_bound: Vec = parent_left_bound.into(); if parent_level == 0 { return Ok(()); } @@ -207,7 +229,7 @@ impl FacetsUpdateIncrementalInner { let parent_key = FacetGroupKey { field_id: self.field_id, level: parent_level, - left_bound: range_left_bound.as_slice(), + left_bound: &*range_left_bound, }; let child_right_bound = self .db @@ -241,7 +263,7 @@ impl FacetsUpdateIncrementalInner { let child_left_key = FacetGroupKey { field_id: self.field_id, level: child_level, - left_bound: range_left_bound.as_slice(), + left_bound: &*range_left_bound, }; let mut child_left_bound = Bound::Included(child_left_key); @@ -300,7 +322,7 @@ impl FacetsUpdateIncrementalInner { let update_key = FacetGroupKey { field_id: self.field_id, level: parent_level, - left_bound: range_left_bound.as_slice(), + left_bound: &*range_left_bound, }; drop(child_it); if let Bound::Included(_) = child_left_bound { From fd88c834c36de2d44d4423df334fc41d1651cb1d Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 8 Jan 2025 15:22:11 +0100 Subject: [PATCH 11/14] Modernize valid_lmdb_key --- crates/milli/src/update/index_documents/helpers/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/milli/src/update/index_documents/helpers/mod.rs b/crates/milli/src/update/index_documents/helpers/mod.rs index 236c63cc3..5dec54ffc 100644 --- a/crates/milli/src/update/index_documents/helpers/mod.rs +++ b/crates/milli/src/update/index_documents/helpers/mod.rs @@ -10,10 +10,10 @@ use fst::{IntoStreamer, Streamer}; pub use grenad_helpers::*; pub use merge_functions::*; -use crate::{MAX_LMDB_KEY_LENGTH, MAX_WORD_LENGTH}; +use crate::MAX_LMDB_KEY_LENGTH; pub fn valid_lmdb_key(key: impl AsRef<[u8]>) -> bool { - key.as_ref().len() <= MAX_WORD_LENGTH * 2 && !key.as_ref().is_empty() + key.as_ref().len() <= MAX_LMDB_KEY_LENGTH - 3 && !key.as_ref().is_empty() } pub fn valid_facet_value(facet_value: impl AsRef<[u8]>) -> bool { From 6a577254fa7a6cf17f229da676ebfafbc8744dce Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 8 Jan 2025 16:25:30 +0100 Subject: [PATCH 12/14] No longer ignore the first child without parent --- .../milli/src/update/facet/new_incremental.rs | 95 ++++++++----------- 1 file changed, 39 insertions(+), 56 deletions(-) diff --git a/crates/milli/src/update/facet/new_incremental.rs b/crates/milli/src/update/facet/new_incremental.rs index e2aab0fab..82e1c31fb 100644 --- a/crates/milli/src/update/facet/new_incremental.rs +++ b/crates/milli/src/update/facet/new_incremental.rs @@ -91,16 +91,13 @@ impl FacetsUpdateIncrementalInner { FacetGroupKey { field_id: self.field_id, level: parent_level, left_bound: &[] }; let mut last_parent: Option> = None; - let mut child_it = changed_children.drain(..); + let mut child_it = + changed_children.drain(..).filter(|child| valid_facet_value(&child.facet_value)); 'current_level: while let Some(child) = child_it.next() { - if !valid_facet_value(&child.facet_value) { - continue; - } - if let Some(last_parent) = &last_parent { if &child.facet_value >= last_parent { self.compute_parent_group(wtxn, child_level, child.facet_value)?; - continue; + continue 'current_level; } } @@ -136,69 +133,55 @@ impl FacetsUpdateIncrementalInner { } Some(Err(err)) => return Err(err.into()), None => { - self.compute_parent_group(wtxn, child_level, child.facet_value)?; + // no parent for that key + let mut parent_it = self + .db + .remap_data_type::() + .prefix_iter_mut(wtxn, &parent_level_left_bound)?; + match parent_it.next() { + // 1. left of the current left bound, or + Some(Ok((first_key, _first_value))) => 'change_left_bound: { + // make sure we don't spill on the neighboring fid (level also included defensively) + if first_key.field_id != self.field_id + || first_key.level != parent_level + { + break 'change_left_bound; + } + // remove old left bound + unsafe { parent_it.del_current()? }; + drop(parent_it); + changed_parents.push(FacetFieldIdChange { + facet_value: child.facet_value.clone(), + }); + self.compute_parent_group(wtxn, child_level, child.facet_value)?; + // pop all elements in order to visit the new left bound + let new_left_bound = + &mut changed_parents.last_mut().unwrap().facet_value; + for child in child_it.by_ref() { + new_left_bound.clone_from(&child.facet_value); - // do we have children without parents? - if let Some(child) = child_it.next() { - // no parent for that key - let mut parent_it = self - .db - .remap_data_type::() - .prefix_iter_mut(wtxn, &parent_level_left_bound)?; - match parent_it.next() { - // 1. left of the current left bound, or - Some(Ok((first_key, _first_value))) => 'change_left_bound: { - // make sure we don't spill on the neighboring fid (level also included defensively) - if first_key.field_id != self.field_id - || first_key.level != parent_level - { - break 'change_left_bound; - } - // remove old left bound - unsafe { parent_it.del_current()? }; - drop(parent_it); - // pop all elements and order to visit the new left bound - changed_parents.push(FacetFieldIdChange { - facet_value: child.facet_value.clone(), - }); self.compute_parent_group( wtxn, child_level, child.facet_value, )?; - for child in child_it.by_ref() { - let new_left_bound = - &mut changed_parents.last_mut().unwrap().facet_value; - - new_left_bound.clone_from(&child.facet_value); - - self.compute_parent_group( - wtxn, - child_level, - child.facet_value, - )?; - } - - break 'current_level; } - Some(Err(err)) => return Err(err.into()), - // 2. max level reached, exit - None => { - drop(parent_it); + + break 'current_level; + } + Some(Err(err)) => return Err(err.into()), + // 2. max level reached, exit + None => { + drop(parent_it); + self.compute_parent_group(wtxn, child_level, child.facet_value)?; + for child in child_it.by_ref() { self.compute_parent_group( wtxn, child_level, child.facet_value, )?; - for child in child_it.by_ref() { - self.compute_parent_group( - wtxn, - child_level, - child.facet_value, - )?; - } - return Ok(()); } + return Ok(()); } } } From 7ec7200378b1b5f91b04c176c163eaef723f6b0e Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 8 Jan 2025 16:25:44 +0100 Subject: [PATCH 13/14] Check valid_facet_value as part of a filter of the iterator --- crates/milli/src/update/facet/new_incremental.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/milli/src/update/facet/new_incremental.rs b/crates/milli/src/update/facet/new_incremental.rs index 82e1c31fb..e917e41b6 100644 --- a/crates/milli/src/update/facet/new_incremental.rs +++ b/crates/milli/src/update/facet/new_incremental.rs @@ -91,8 +91,12 @@ impl FacetsUpdateIncrementalInner { FacetGroupKey { field_id: self.field_id, level: parent_level, left_bound: &[] }; let mut last_parent: Option> = None; - let mut child_it = - changed_children.drain(..).filter(|child| valid_facet_value(&child.facet_value)); + let mut child_it = changed_children + // drain all changed children + .drain(..) + // keep only children whose value is valid in the LMDB sense + .filter(|child| valid_facet_value(&child.facet_value)); + // `while let` rather than `for` because we advance `child_it` inside of the loop 'current_level: while let Some(child) = child_it.next() { if let Some(last_parent) = &last_parent { if &child.facet_value >= last_parent { From e83c0217555b2c44680cb79ece8de061d07fde92 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 8 Jan 2025 16:50:05 +0100 Subject: [PATCH 14/14] When spilling on the next fid, no longer ignore children --- .../milli/src/update/facet/new_incremental.rs | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/crates/milli/src/update/facet/new_incremental.rs b/crates/milli/src/update/facet/new_incremental.rs index e917e41b6..0890f8593 100644 --- a/crates/milli/src/update/facet/new_incremental.rs +++ b/crates/milli/src/update/facet/new_incremental.rs @@ -144,12 +144,26 @@ impl FacetsUpdateIncrementalInner { .prefix_iter_mut(wtxn, &parent_level_left_bound)?; match parent_it.next() { // 1. left of the current left bound, or - Some(Ok((first_key, _first_value))) => 'change_left_bound: { + Some(Ok((first_key, _first_value))) => { // make sure we don't spill on the neighboring fid (level also included defensively) if first_key.field_id != self.field_id || first_key.level != parent_level { - break 'change_left_bound; + // max level reached, exit + drop(parent_it); + self.compute_parent_group( + wtxn, + child_level, + child.facet_value, + )?; + for child in child_it.by_ref() { + self.compute_parent_group( + wtxn, + child_level, + child.facet_value, + )?; + } + return Ok(()); } // remove old left bound unsafe { parent_it.del_current()? }; @@ -170,8 +184,6 @@ impl FacetsUpdateIncrementalInner { child.facet_value, )?; } - - break 'current_level; } Some(Err(err)) => return Err(err.into()), // 2. max level reached, exit