Use new incremental facet implementation

This commit is contained in:
Louis Dureuil 2024-12-31 17:05:07 +01:00
parent 0780b04f6f
commit 025be5147f
No known key found for this signature in database
3 changed files with 120 additions and 65 deletions

View File

@ -95,6 +95,7 @@ use crate::{try_split_array_at, FieldId, Index, Result};
pub mod bulk; pub mod bulk;
pub mod incremental; pub mod incremental;
pub mod new_incremental;
/// A builder used to add new elements to the `facet_id_string_docids` or `facet_id_f64_docids` databases. /// A builder used to add new elements to the `facet_id_string_docids` or `facet_id_f64_docids` databases.
/// ///

View File

@ -36,6 +36,8 @@ use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::progress::Progress; use crate::progress::Progress;
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::update::facet::new_incremental::FacetsUpdateIncremental;
use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
@ -203,6 +205,7 @@ where
caches, caches,
FacetDatabases::new(index), FacetDatabases::new(index),
index, index,
&rtxn,
extractor_sender.facet_docids(), extractor_sender.facet_docids(),
)?; )?;
} }
@ -735,27 +738,56 @@ fn compute_facet_search_database(
fn compute_facet_level_database( fn compute_facet_level_database(
index: &Index, index: &Index,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
facet_field_ids_delta: FacetFieldIdsDelta, mut facet_field_ids_delta: FacetFieldIdsDelta,
) -> Result<()> { ) -> Result<()> {
if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() { for (fid, delta) in facet_field_ids_delta.consume_facet_string_delta() {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string");
let _entered = span.enter(); let _entered = span.enter();
FacetsUpdateBulk::new_not_updating_level_0( match delta {
index, super::merger::FacetFieldIdDelta::Bulk => {
modified_facet_string_ids, tracing::info!(%fid, "bulk string facet processing");
FacetType::String, FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
) .execute(wtxn)?
.execute(wtxn)?;
} }
if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() { super::merger::FacetFieldIdDelta::Incremental(delta_data) => {
tracing::info!(%fid, len=%delta_data.len(), "incremental string facet processing");
FacetsUpdateIncremental::new(
index,
FacetType::String,
fid,
delta_data,
FACET_GROUP_SIZE,
FACET_MIN_LEVEL_SIZE,
FACET_MAX_GROUP_SIZE,
)
.execute(wtxn)?
}
}
}
for (fid, delta) in facet_field_ids_delta.consume_facet_number_delta() {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
let _entered = span.enter(); let _entered = span.enter();
FacetsUpdateBulk::new_not_updating_level_0( match delta {
super::merger::FacetFieldIdDelta::Bulk => {
tracing::info!(%fid, "bulk number facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number)
.execute(wtxn)?
}
super::merger::FacetFieldIdDelta::Incremental(delta_data) => {
tracing::info!(%fid, len=%delta_data.len(), "incremental number facet processing");
FacetsUpdateIncremental::new(
index, index,
modified_facet_number_ids,
FacetType::Number, FacetType::Number,
fid,
delta_data,
FACET_GROUP_SIZE,
FACET_MIN_LEVEL_SIZE,
FACET_MAX_GROUP_SIZE,
) )
.execute(wtxn)?; .execute(wtxn)?
}
}
} }
Ok(()) Ok(())

View File

@ -1,6 +1,6 @@
use std::cell::RefCell; use std::cell::RefCell;
use hashbrown::{HashMap, HashSet}; use hashbrown::HashMap;
use heed::types::Bytes; use heed::types::Bytes;
use heed::{Database, RoTxn}; use heed::{Database, RoTxn};
use memmap2::Mmap; use memmap2::Mmap;
@ -12,6 +12,7 @@ use super::extract::{
merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
FacetKind, GeoExtractorData, FacetKind, GeoExtractorData,
}; };
use crate::update::facet::new_incremental::{FacetFieldIdChange, FacetFieldIdOperation};
use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
@ -100,23 +101,32 @@ pub fn merge_and_send_facet_docids<'extractor>(
mut caches: Vec<BalancedCaches<'extractor>>, mut caches: Vec<BalancedCaches<'extractor>>,
database: FacetDatabases, database: FacetDatabases,
index: &Index, index: &Index,
rtxn: &RoTxn,
docids_sender: FacetDocidsSender, docids_sender: FacetDocidsSender,
) -> Result<FacetFieldIdsDelta> { ) -> Result<FacetFieldIdsDelta> {
let max_string_count = (index.facet_id_string_docids.len(rtxn)? / 500) as usize;
let max_number_count = (index.facet_id_f64_docids.len(rtxn)? / 500) as usize;
transpose_and_freeze_caches(&mut caches)? transpose_and_freeze_caches(&mut caches)?
.into_par_iter() .into_par_iter()
.map(|frozen| { .map(|frozen| {
let mut facet_field_ids_delta = FacetFieldIdsDelta::default(); let mut facet_field_ids_delta =
FacetFieldIdsDelta::new(max_string_count, max_number_count);
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? { match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => { Operation::Write(bitmap) => {
facet_field_ids_delta.register_from_key(key); let operation = if current.is_some() {
FacetFieldIdOperation::InPlace
} else {
FacetFieldIdOperation::Insert
};
facet_field_ids_delta.register_from_key(key, operation);
docids_sender.write(key, &bitmap)?; docids_sender.write(key, &bitmap)?;
Ok(()) Ok(())
} }
Operation::Delete => { Operation::Delete => {
facet_field_ids_delta.register_from_key(key); facet_field_ids_delta.register_from_key(key, FacetFieldIdOperation::Remove);
docids_sender.delete(key)?; docids_sender.delete(key)?;
Ok(()) Ok(())
} }
@ -126,7 +136,10 @@ pub fn merge_and_send_facet_docids<'extractor>(
Ok(facet_field_ids_delta) Ok(facet_field_ids_delta)
}) })
.reduce(|| Ok(FacetFieldIdsDelta::default()), |lhs, rhs| Ok(lhs?.merge(rhs?))) .reduce(
|| Ok(FacetFieldIdsDelta::new(max_string_count, max_number_count)),
|lhs, rhs| Ok(lhs?.merge(rhs?)),
)
} }
pub struct FacetDatabases<'a> { pub struct FacetDatabases<'a> {
@ -162,11 +175,11 @@ pub enum FacetFieldIdDelta {
} }
impl FacetFieldIdDelta { impl FacetFieldIdDelta {
fn push(&mut self, facet_value: &[u8], operation: FacetFieldIdOperation, db_size: usize) { fn push(&mut self, facet_value: &[u8], operation: FacetFieldIdOperation, max_count: usize) {
*self = match std::mem::replace(self, FacetFieldIdDelta::Bulk) { *self = match std::mem::replace(self, FacetFieldIdDelta::Bulk) {
FacetFieldIdDelta::Bulk => FacetFieldIdDelta::Bulk, FacetFieldIdDelta::Bulk => FacetFieldIdDelta::Bulk,
FacetFieldIdDelta::Incremental(mut v) => { FacetFieldIdDelta::Incremental(mut v) => {
if v.len() >= (db_size / 500) { if v.len() >= max_count {
FacetFieldIdDelta::Bulk FacetFieldIdDelta::Bulk
} else { } else {
v.push(FacetFieldIdChange { facet_value: facet_value.into(), operation }); v.push(FacetFieldIdChange { facet_value: facet_value.into(), operation });
@ -175,39 +188,47 @@ impl FacetFieldIdDelta {
} }
} }
} }
fn merge(&mut self, rhs: Option<Self>, max_count: usize) {
let Some(rhs) = rhs else {
return;
};
*self = match (std::mem::replace(self, FacetFieldIdDelta::Bulk), rhs) {
(FacetFieldIdDelta::Bulk, _) | (_, FacetFieldIdDelta::Bulk) => FacetFieldIdDelta::Bulk,
(
FacetFieldIdDelta::Incremental(mut left),
FacetFieldIdDelta::Incremental(mut right),
) => {
if left.len() + right.len() >= max_count {
FacetFieldIdDelta::Bulk
} else {
left.append(&mut right);
FacetFieldIdDelta::Incremental(left)
}
}
};
}
} }
#[derive(Debug)] #[derive(Debug)]
pub struct FacetFieldIdChange {
facet_value: Box<[u8]>,
operation: FacetFieldIdOperation,
}
#[derive(Debug, Clone, Copy)]
pub enum FacetFieldIdOperation {
/// The docids have been modified for an existing facet value
///
/// The modification must be propagated to upper levels, without changing the structure of the tree
InPlace,
/// A new value has been inserted
///
/// The modification must be propagated to upper levels, splitting nodes and adding new levels as necessary.
Insert,
/// An existing value has been deleted
///
/// The modification must be propagated to upper levels, merging nodes and removing levels as necessary.
Remove,
}
#[derive(Debug, Default)]
pub struct FacetFieldIdsDelta { pub struct FacetFieldIdsDelta {
/// The field ids that have been modified /// The field ids that have been modified
modified_facet_string_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>, modified_facet_string_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>,
modified_facet_number_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>, modified_facet_number_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>,
db_size: usize, max_string_count: usize,
max_number_count: usize,
} }
impl FacetFieldIdsDelta { impl FacetFieldIdsDelta {
pub fn new(max_string_count: usize, max_number_count: usize) -> Self {
Self {
max_string_count,
max_number_count,
modified_facet_string_ids: Default::default(),
modified_facet_number_ids: Default::default(),
}
}
fn register_facet_string_id( fn register_facet_string_id(
&mut self, &mut self,
field_id: FieldId, field_id: FieldId,
@ -217,7 +238,7 @@ impl FacetFieldIdsDelta {
self.modified_facet_string_ids self.modified_facet_string_ids
.entry(field_id) .entry(field_id)
.or_insert(FacetFieldIdDelta::Incremental(Default::default())) .or_insert(FacetFieldIdDelta::Incremental(Default::default()))
.push(facet_value, operation, self.db_size); .push(facet_value, operation, self.max_string_count);
} }
fn register_facet_number_id( fn register_facet_number_id(
@ -229,7 +250,7 @@ impl FacetFieldIdsDelta {
self.modified_facet_number_ids self.modified_facet_number_ids
.entry(field_id) .entry(field_id)
.or_insert(FacetFieldIdDelta::Incremental(Default::default())) .or_insert(FacetFieldIdDelta::Incremental(Default::default()))
.push(facet_value, operation, self.db_size); .push(facet_value, operation, self.max_number_count);
} }
fn register_from_key(&mut self, key: &[u8], operation: FacetFieldIdOperation) { fn register_from_key(&mut self, key: &[u8], operation: FacetFieldIdOperation) {
@ -241,36 +262,37 @@ impl FacetFieldIdsDelta {
} }
} }
fn extract_key_data(&self, key: &[u8]) -> (FacetKind, FieldId, &[u8]) { fn extract_key_data<'key>(&self, key: &'key [u8]) -> (FacetKind, FieldId, &'key [u8]) {
let facet_kind = FacetKind::from(key[0]); let facet_kind = FacetKind::from(key[0]);
let field_id = FieldId::from_be_bytes([key[1], key[2]]); let field_id = FieldId::from_be_bytes([key[1], key[2]]);
let facet_value = &key[2..]; let facet_value = &key[2..];
(facet_kind, field_id, facet_value) (facet_kind, field_id, facet_value)
} }
pub fn modified_facet_string_ids(&self) -> Option<Vec<FieldId>> { pub fn consume_facet_string_delta(
if self.modified_facet_string_ids.is_empty() { &mut self,
None ) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
} else { self.modified_facet_string_ids.drain()
Some(self.modified_facet_string_ids.iter().copied().collect())
}
} }
pub fn modified_facet_number_ids(&self) -> Option<Vec<FieldId>> { pub fn consume_facet_number_delta(
if self.modified_facet_number_ids.is_empty() { &mut self,
None ) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
} else { self.modified_facet_number_ids.drain()
Some(self.modified_facet_number_ids.iter().copied().collect())
}
} }
pub fn merge(mut self, rhs: Self) -> Self { pub fn merge(mut self, rhs: Self) -> Self {
let Self { modified_facet_number_ids, modified_facet_string_ids } = rhs; // rhs.max_xx_count is assumed to be equal to self.max_xx_count, and so gets unused
modified_facet_number_ids.into_iter().for_each(|fid| { let Self { modified_facet_number_ids, modified_facet_string_ids, .. } = rhs;
self.modified_facet_number_ids.insert(fid); modified_facet_number_ids.into_iter().for_each(|(fid, mut delta)| {
let old_delta = self.modified_facet_number_ids.remove(&fid);
delta.merge(old_delta, self.max_number_count);
self.modified_facet_number_ids.insert(fid, delta);
}); });
modified_facet_string_ids.into_iter().for_each(|fid| { modified_facet_string_ids.into_iter().for_each(|(fid, mut delta)| {
self.modified_facet_string_ids.insert(fid); let old_delta = self.modified_facet_string_ids.remove(&fid);
delta.merge(old_delta, self.max_string_count);
self.modified_facet_string_ids.insert(fid, delta);
}); });
self self
} }