From f38db861200dba1b112ae1d0207c749543f919c5 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 6 Jan 2025 15:08:36 +0100 Subject: [PATCH] Add new incremental facet indexing --- crates/milli/src/update/facet/mod.rs | 1 + .../milli/src/update/facet/new_incremental.rs | 469 ++++++++++++++++++ 2 files changed, 470 insertions(+) create mode 100644 crates/milli/src/update/facet/new_incremental.rs diff --git a/crates/milli/src/update/facet/mod.rs b/crates/milli/src/update/facet/mod.rs index 44f499f8c..02b6e7649 100644 --- a/crates/milli/src/update/facet/mod.rs +++ b/crates/milli/src/update/facet/mod.rs @@ -101,6 +101,7 @@ use crate::{try_split_array_at, FieldId, Index, Result}; pub mod bulk; pub mod incremental; +pub mod new_incremental; /// A builder used to add new elements to the `facet_id_string_docids` or `facet_id_f64_docids` databases. /// diff --git a/crates/milli/src/update/facet/new_incremental.rs b/crates/milli/src/update/facet/new_incremental.rs new file mode 100644 index 000000000..57358888e --- /dev/null +++ b/crates/milli/src/update/facet/new_incremental.rs @@ -0,0 +1,469 @@ +use std::ops::Bound; + +use heed::types::{Bytes, DecodeIgnore}; +use heed::{BytesDecode as _, Database, RwTxn}; +use roaring::RoaringBitmap; + +use crate::facet::FacetType; +use crate::heed_codec::facet::{ + FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec, +}; +use crate::heed_codec::BytesRefCodec; +use crate::search::facet::get_highest_level; +use crate::update::valid_facet_value; +use crate::{FieldId, Index, Result}; + +pub struct FacetsUpdateIncremental { + inner: FacetsUpdateIncrementalInner, + delta_data: Vec, +} + +struct FacetsUpdateIncrementalInner { + db: Database, FacetGroupValueCodec>, + field_id: FieldId, + group_size: u8, + min_level_size: u8, + max_group_size: u8, +} + +impl FacetsUpdateIncremental { + pub fn new( + index: &Index, + facet_type: FacetType, + field_id: FieldId, + delta_data: Vec, + group_size: u8, + min_level_size: u8, + max_group_size: u8, + ) -> Self { + FacetsUpdateIncremental { + inner: FacetsUpdateIncrementalInner { + db: match facet_type { + FacetType::String => index + .facet_id_string_docids + .remap_key_type::>(), + FacetType::Number => index + .facet_id_f64_docids + .remap_key_type::>(), + }, + field_id, + group_size, + min_level_size, + max_group_size, + }, + + delta_data, + } + } + + #[tracing::instrument(level = "trace", skip_all, target = "indexing::facets::incremental")] + pub fn execute(mut self, wtxn: &mut RwTxn) -> Result<()> { + if self.delta_data.is_empty() { + return Ok(()); + } + self.delta_data.sort_unstable_by( + |FacetFieldIdChange { facet_value: left, .. }, + FacetFieldIdChange { facet_value: right, .. }| left.cmp(right), + ); + + self.inner.find_touched_parents( + wtxn, + 0, + self.delta_data + .into_iter() + // reverse lexicographic order + .rev() + .map(|change| change.facet_value.into()), + )?; + + self.inner.add_or_delete_level(wtxn) + } +} + +impl FacetsUpdateIncrementalInner { + /// WARNING: `touched_children` must be sorted in **reverse** lexicographic order. + fn find_touched_parents( + &self, + wtxn: &mut RwTxn, + child_level: u8, + mut touched_children: impl Iterator>, + ) -> Result<()> { + let mut touched_parents = vec![]; + let Some(parent_level) = child_level.checked_add(1) else { return Ok(()) }; + let parent_level_left_bound: FacetGroupKey<&[u8]> = + FacetGroupKey { field_id: self.field_id, level: parent_level, left_bound: &[] }; + + let mut last_parent: Option> = None; + + for child in &mut touched_children { + if !valid_facet_value(&child) { + continue; + } + + if let Some(last_parent) = &last_parent { + if child.as_slice() >= last_parent.as_slice() { + self.compute_parent_group(wtxn, child_level, child)?; + continue; + } + } + + // need to find a new parent + let parent_key_prefix = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: child.as_slice(), + }; + + let parent = self + .db + .remap_data_type::() + .rev_range( + wtxn, + &( + Bound::Excluded(&parent_level_left_bound), + Bound::Included(&parent_key_prefix), + ), + )? + .next(); + + match parent { + Some(Ok((parent_key, _parent_value))) => { + // found parent, cache it for next keys + last_parent = Some(parent_key.left_bound.to_owned()); + + // add to modified list for parent level + touched_parents.push(parent_key.left_bound.to_owned()); + self.compute_parent_group(wtxn, child_level, child)?; + } + Some(Err(err)) => return Err(err.into()), + None => { + self.compute_parent_group(wtxn, child_level, child)?; + break; + } + } + } + // do we have children without parents? + if let Some(child) = touched_children.next() { + // no parent for that key + let mut it = self + .db + .remap_data_type::() + .prefix_iter_mut(wtxn, &parent_level_left_bound)?; + match it.next() { + // 1. left of the current left bound, or + Some(Ok((first_key, _first_value))) => 'change_left_bound: { + // make sure we don't spill on the neighboring fid (level also included defensively) + if first_key.field_id != self.field_id || first_key.level != parent_level { + break 'change_left_bound; + } + // remove old left bound + unsafe { it.del_current()? }; + drop(it); + // pop all elements and order to visit the new left bound + touched_parents.push(child.clone()); + self.compute_parent_group(wtxn, child_level, child)?; + for child in touched_children { + let new_left_bound = touched_parents.last_mut().unwrap(); + new_left_bound.clear(); + new_left_bound.extend_from_slice(&child); + self.compute_parent_group(wtxn, child_level, child)?; + } + } + Some(Err(err)) => return Err(err.into()), + // 2. max level reached, exit + None => { + drop(it); + self.compute_parent_group(wtxn, child_level, child)?; + for child in touched_children { + self.compute_parent_group(wtxn, child_level, child)?; + } + } + } + } + self.find_touched_parents( + wtxn, + parent_level, + touched_parents + // no need to `rev` here because the parents were already visited in reverse order + .into_iter(), + ) + } + + fn compute_parent_group( + &self, + wtxn: &mut RwTxn<'_>, + parent_level: u8, + parent_left_bound: Vec, + ) -> Result<()> { + let mut range_left_bound = parent_left_bound; + if parent_level == 0 { + return Ok(()); + } + let child_level = parent_level - 1; + + let parent_key = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: range_left_bound.as_slice(), + }; + let child_right_bound = self + .db + .remap_data_type::() + .get_greater_than(wtxn, &parent_key)? + .and_then( + |( + FacetGroupKey { + level: right_level, + field_id: right_fid, + left_bound: right_bound, + }, + _, + )| { + if parent_level != right_level || self.field_id != right_fid { + // there was a greater key, but with a greater level or fid, so not a sibling to the parent: ignore + return None; + } + Some(right_bound.to_owned()) + }, + ); + let child_right_bound = match &child_right_bound { + Some(right_bound) => Bound::Excluded(FacetGroupKey { + left_bound: right_bound.as_slice(), + field_id: self.field_id, + level: child_level, + }), + None => Bound::Unbounded, + }; + + let child_left_key = FacetGroupKey { + field_id: self.field_id, + level: child_level, + left_bound: range_left_bound.as_slice(), + }; + let mut child_left_bound = Bound::Included(child_left_key); + + loop { + let mut child_it = self.db.range(wtxn, &(child_left_bound, child_right_bound))?; + let res: Result<_> = child_it + .by_ref() + .take(self.max_group_size as usize) + // stop if we go to the next level or field id + .take_while(|res| match res { + Ok((child_key, _)) => { + child_key.field_id == self.field_id && child_key.level == child_level + } + Err(_) => true, + }) + .try_fold( + (None, FacetGroupValue { size: 0, bitmap: Default::default() }), + |(bounds, mut group_value), child_res| { + let (child_key, child_value) = child_res?; + let bounds = match bounds { + Some((left_bound, _)) => Some((left_bound, child_key.left_bound)), + None => Some((child_key.left_bound, child_key.left_bound)), + }; + // max_group_size <= u8::MAX + group_value.size += 1; + group_value.bitmap |= &child_value.bitmap; + Ok((bounds, group_value)) + }, + ); + + let (bounds, group_value) = res?; + + let Some((group_left_bound, right_bound)) = bounds else { + let update_key = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: range_left_bound.as_slice(), + }; + drop(child_it); + if let Bound::Included(_) = child_left_bound { + self.db.delete(wtxn, &update_key)?; + } + + break; + }; + + drop(child_it); + let current_left_bound = group_left_bound.to_owned(); + + let delete_old_bound = match child_left_bound { + Bound::Included(bound) => { + if bound.left_bound != current_left_bound { + Some(range_left_bound.clone()) + } else { + None + } + } + _ => None, + }; + + range_left_bound.clear(); + range_left_bound.extend_from_slice(right_bound); + let child_left_key = FacetGroupKey { + field_id: self.field_id, + level: child_level, + left_bound: range_left_bound.as_slice(), + }; + child_left_bound = Bound::Excluded(child_left_key); + + if let Some(old_bound) = delete_old_bound { + let update_key = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: old_bound.as_slice(), + }; + self.db.delete(wtxn, &update_key)?; + } + + let update_key = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: current_left_bound.as_slice(), + }; + if group_value.bitmap.is_empty() { + self.db.delete(wtxn, &update_key)?; + } else { + self.db.put(wtxn, &update_key, &group_value)?; + } + } + + Ok(()) + } + + /// Check whether the highest level has exceeded `min_level_size` * `self.group_size`. + /// If it has, we must build an addition level above it. + /// Then check whether the highest level is under `min_level_size`. + /// If it has, we must remove the complete level. + pub(crate) fn add_or_delete_level(&self, txn: &mut RwTxn<'_>) -> Result<()> { + let highest_level = get_highest_level(txn, self.db, self.field_id)?; + let mut highest_level_prefix = vec![]; + highest_level_prefix.extend_from_slice(&self.field_id.to_be_bytes()); + highest_level_prefix.push(highest_level); + + let size_highest_level = + self.db.remap_types::().prefix_iter(txn, &highest_level_prefix)?.count(); + + if size_highest_level >= self.group_size as usize * self.min_level_size as usize { + self.add_level(txn, highest_level, &highest_level_prefix, size_highest_level) + } else if size_highest_level < self.min_level_size as usize && highest_level != 0 { + self.delete_level(txn, &highest_level_prefix) + } else { + Ok(()) + } + } + + /// Delete a level. + fn delete_level(&self, txn: &mut RwTxn<'_>, highest_level_prefix: &[u8]) -> Result<()> { + let mut to_delete = vec![]; + let mut iter = + self.db.remap_types::().prefix_iter(txn, highest_level_prefix)?; + for el in iter.by_ref() { + let (k, _) = el?; + to_delete.push( + FacetGroupKeyCodec::::bytes_decode(k) + .map_err(heed::Error::Encoding)? + .into_owned(), + ); + } + drop(iter); + for k in to_delete { + self.db.delete(txn, &k.as_ref())?; + } + Ok(()) + } + + /// Build an additional level for the field id. + fn add_level( + &self, + txn: &mut RwTxn<'_>, + highest_level: u8, + highest_level_prefix: &[u8], + size_highest_level: usize, + ) -> Result<()> { + let mut groups_iter = self + .db + .remap_types::() + .prefix_iter(txn, highest_level_prefix)?; + + let nbr_new_groups = size_highest_level / self.group_size as usize; + let nbr_leftover_elements = size_highest_level % self.group_size as usize; + + let mut to_add = vec![]; + for _ in 0..nbr_new_groups { + let mut first_key = None; + let mut values = RoaringBitmap::new(); + for _ in 0..self.group_size { + let (key_bytes, value_i) = groups_iter.next().unwrap()?; + let key_i = FacetGroupKeyCodec::::bytes_decode(key_bytes) + .map_err(heed::Error::Encoding)?; + + if first_key.is_none() { + first_key = Some(key_i); + } + values |= value_i.bitmap; + } + let key = FacetGroupKey { + field_id: self.field_id, + level: highest_level + 1, + left_bound: first_key.unwrap().left_bound, + }; + let value = FacetGroupValue { size: self.group_size, bitmap: values }; + to_add.push((key.into_owned(), value)); + } + // now we add the rest of the level, in case its size is > group_size * min_level_size + // this can indeed happen if the min_level_size parameter changes between two calls to `insert` + if nbr_leftover_elements > 0 { + let mut first_key = None; + let mut values = RoaringBitmap::new(); + for _ in 0..nbr_leftover_elements { + let (key_bytes, value_i) = groups_iter.next().unwrap()?; + let key_i = FacetGroupKeyCodec::::bytes_decode(key_bytes) + .map_err(heed::Error::Encoding)?; + + if first_key.is_none() { + first_key = Some(key_i); + } + values |= value_i.bitmap; + } + let key = FacetGroupKey { + field_id: self.field_id, + level: highest_level + 1, + left_bound: first_key.unwrap().left_bound, + }; + // Note: nbr_leftover_elements can be casted to a u8 since it is bounded by `max_group_size` + // when it is created above. + let value = FacetGroupValue { size: nbr_leftover_elements as u8, bitmap: values }; + to_add.push((key.into_owned(), value)); + } + + drop(groups_iter); + for (key, value) in to_add { + self.db.put(txn, &key.as_ref(), &value)?; + } + Ok(()) + } +} + +#[derive(Debug)] +pub struct FacetFieldIdChange { + pub facet_value: Box<[u8]>, + pub operation: FacetFieldIdOperation, +} + +#[derive(Debug, Clone, Copy)] +pub enum FacetFieldIdOperation { + /// The docids have been modified for an existing facet value + /// + /// The modification must be propagated to upper levels, without changing the structure of the tree + InPlace, + /// A new value has been inserted + /// + /// The modification must be propagated to upper levels, splitting nodes and adding new levels as necessary. + Insert, + /// An existing value has been deleted + /// + /// The modification must be propagated to upper levels, merging nodes and removing levels as necessary. + Remove, +}