diff --git a/Cargo.lock b/Cargo.lock index 79292a637..172a67806 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -503,7 +503,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2 [[package]] name = "benchmarks" -version = "1.12.0" +version = "1.12.2" dependencies = [ "anyhow", "bumpalo", @@ -694,7 +694,7 @@ dependencies = [ [[package]] name = "build-info" -version = "1.12.0" +version = "1.12.2" dependencies = [ "anyhow", "time", @@ -1671,7 +1671,7 @@ dependencies = [ [[package]] name = "dump" -version = "1.12.0" +version = "1.12.2" dependencies = [ "anyhow", "big_s", @@ -1873,7 +1873,7 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "file-store" -version = "1.12.0" +version = "1.12.2" dependencies = [ "tempfile", "thiserror 2.0.9", @@ -1895,7 +1895,7 @@ dependencies = [ [[package]] name = "filter-parser" -version = "1.12.0" +version = "1.12.2" dependencies = [ "insta", "nom", @@ -1915,7 +1915,7 @@ dependencies = [ [[package]] name = "flatten-serde-json" -version = "1.12.0" +version = "1.12.2" dependencies = [ "criterion", "serde_json", @@ -2054,7 +2054,7 @@ dependencies = [ [[package]] name = "fuzzers" -version = "1.12.0" +version = "1.12.2" dependencies = [ "arbitrary", "bumpalo", @@ -2743,7 +2743,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d" [[package]] name = "index-scheduler" -version = "1.12.0" +version = "1.12.2" dependencies = [ "anyhow", "arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2950,7 +2950,7 @@ dependencies = [ [[package]] name = "json-depth-checker" -version = "1.12.0" +version = "1.12.2" dependencies = [ "criterion", "serde_json", @@ -3569,7 +3569,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "meili-snap" -version = "1.12.0" +version = "1.12.2" dependencies = [ "insta", "md5", @@ -3578,7 +3578,7 @@ dependencies = [ [[package]] name = "meilisearch" -version = "1.12.0" +version = "1.12.2" dependencies = [ "actix-cors", "actix-http", @@ -3670,7 +3670,7 @@ dependencies = [ [[package]] name = "meilisearch-auth" -version = "1.12.0" +version = "1.12.2" dependencies = [ "base64 0.22.1", "enum-iterator", @@ -3689,7 +3689,7 @@ dependencies = [ [[package]] name = "meilisearch-types" -version = "1.12.0" +version = "1.12.2" dependencies = [ "actix-web", "anyhow", @@ -3723,7 +3723,7 @@ dependencies = [ [[package]] name = "meilitool" -version = "1.12.0" +version = "1.12.2" dependencies = [ "anyhow", "arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)", @@ -3758,7 +3758,7 @@ dependencies = [ [[package]] name = "milli" -version = "1.12.0" +version = "1.12.2" dependencies = [ "allocator-api2", "arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4269,7 +4269,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "permissive-json-pointer" -version = "1.12.0" +version = "1.12.2" dependencies = [ "big_s", "serde_json", @@ -6846,7 +6846,7 @@ dependencies = [ [[package]] name = "xtask" -version = "1.12.0" +version = "1.12.2" dependencies = [ "anyhow", "build-info", diff --git a/Cargo.toml b/Cargo.toml index 89a17d8fc..6a6610b15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ members = [ ] [workspace.package] -version = "1.12.0" +version = "1.12.2" authors = [ "Quentin de Quelen ", "Clément Renault ", diff --git a/crates/index-scheduler/src/queue/tasks.rs b/crates/index-scheduler/src/queue/tasks.rs index bb6930b4b..c88192e17 100644 --- a/crates/index-scheduler/src/queue/tasks.rs +++ b/crates/index-scheduler/src/queue/tasks.rs @@ -94,6 +94,10 @@ impl TaskQueue { debug_assert!(old_task != *task); debug_assert_eq!(old_task.uid, task.uid); debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some()); + debug_assert!( + old_task.batch_uid.is_none() && task.batch_uid.is_some(), + "\n==> old: {old_task:?}\n==> new: {task:?}" + ); if old_task.status != task.status { self.update_status(wtxn, old_task.status, |bitmap| { diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 365f7acd4..eff3740a0 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -78,9 +78,7 @@ impl IndexScheduler { if let DocumentOperation::Add(content_uuid) = operation { let content_file = self.queue.file_store.get_update(*content_uuid)?; let mmap = unsafe { memmap2::Mmap::map(&content_file)? }; - if !mmap.is_empty() { - content_files.push(mmap); - } + content_files.push(mmap); } } diff --git a/crates/meilisearch/tests/documents/add_documents.rs b/crates/meilisearch/tests/documents/add_documents.rs index d72b1a7a8..7eedeed3d 100644 --- a/crates/meilisearch/tests/documents/add_documents.rs +++ b/crates/meilisearch/tests/documents/add_documents.rs @@ -1220,9 +1220,89 @@ async fn replace_document() { #[actix_rt::test] async fn add_no_documents() { let server = Server::new().await; - let index = server.index("test"); - let (_response, code) = index.add_documents(json!([]), None).await; + let index = server.index("kefir"); + let (task, code) = index.add_documents(json!([]), None).await; snapshot!(code, @"202 Accepted"); + let task = server.wait_task(task.uid()).await; + let task = task.succeeded(); + snapshot!(task, @r#" + { + "uid": "[uid]", + "batchUid": "[batch_uid]", + "indexUid": "kefir", + "status": "succeeded", + "type": "documentAdditionOrUpdate", + "canceledBy": null, + "details": { + "receivedDocuments": 0, + "indexedDocuments": 0 + }, + "error": null, + "duration": "[duration]", + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]" + } + "#); + + let (task, _code) = index.add_documents(json!([]), Some("kefkef")).await; + let task = server.wait_task(task.uid()).await; + let task = task.succeeded(); + snapshot!(task, @r#" + { + "uid": "[uid]", + "batchUid": "[batch_uid]", + "indexUid": "kefir", + "status": "succeeded", + "type": "documentAdditionOrUpdate", + "canceledBy": null, + "details": { + "receivedDocuments": 0, + "indexedDocuments": 0 + }, + "error": null, + "duration": "[duration]", + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]" + } + "#); + + let (task, _code) = index.add_documents(json!([{ "kefkef": 1 }]), None).await; + let task = server.wait_task(task.uid()).await; + let task = task.succeeded(); + snapshot!(task, @r#" + { + "uid": "[uid]", + "batchUid": "[batch_uid]", + "indexUid": "kefir", + "status": "succeeded", + "type": "documentAdditionOrUpdate", + "canceledBy": null, + "details": { + "receivedDocuments": 1, + "indexedDocuments": 1 + }, + "error": null, + "duration": "[duration]", + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]" + } + "#); + let (documents, _status) = index.get_all_documents(GetAllDocumentsOptions::default()).await; + snapshot!(documents, @r#" + { + "results": [ + { + "kefkef": 1 + } + ], + "offset": 0, + "limit": 20, + "total": 1 + } + "#); } #[actix_rt::test] diff --git a/crates/milli/src/update/facet/mod.rs b/crates/milli/src/update/facet/mod.rs index c78610e23..dbacf6248 100644 --- a/crates/milli/src/update/facet/mod.rs +++ b/crates/milli/src/update/facet/mod.rs @@ -79,22 +79,29 @@ pub const FACET_MIN_LEVEL_SIZE: u8 = 5; use std::collections::BTreeSet; use std::fs::File; use std::io::BufReader; +use std::ops::Bound; use grenad::Merger; use heed::types::{Bytes, DecodeIgnore}; +use heed::BytesDecode as _; +use roaring::RoaringBitmap; use time::OffsetDateTime; use tracing::debug; use self::incremental::FacetsUpdateIncremental; use super::{FacetsUpdateBulk, MergeDeladdBtreesetString, MergeDeladdCboRoaringBitmaps}; use crate::facet::FacetType; -use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec}; +use crate::heed_codec::facet::{ + FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec, OrderedF64Codec, +}; use crate::heed_codec::BytesRefCodec; +use crate::search::facet::get_highest_level; use crate::update::del_add::{DelAdd, KvReaderDelAdd}; use crate::{try_split_array_at, FieldId, Index, Result}; pub mod bulk; pub mod incremental; +pub mod new_incremental; /// A builder used to add new elements to the `facet_id_string_docids` or `facet_id_f64_docids` databases. /// @@ -597,3 +604,194 @@ mod comparison_bench { } } } + +/// Run sanity checks on the specified fid tree +/// +/// 1. No "orphan" child value, any child value has a parent +/// 2. Any docid in the child appears in the parent +/// 3. No docid in the parent is missing from all its children +/// 4. no group is bigger than max_group_size +/// 5. Less than 50% of groups are bigger than group_size +/// 6. group size matches the number of children +/// 7. max_level is < 255 +pub(crate) fn sanity_checks( + index: &Index, + rtxn: &heed::RoTxn, + field_id: FieldId, + facet_type: FacetType, + group_size: usize, + _min_level_size: usize, // might add a check on level size later + max_group_size: usize, +) -> Result<()> { + tracing::info!(%field_id, ?facet_type, "performing sanity checks"); + let database = match facet_type { + FacetType::String => { + index.facet_id_string_docids.remap_key_type::>() + } + FacetType::Number => { + index.facet_id_f64_docids.remap_key_type::>() + } + }; + + let leaf_prefix: FacetGroupKey<&[u8]> = FacetGroupKey { field_id, level: 0, left_bound: &[] }; + + let leaf_it = database.prefix_iter(rtxn, &leaf_prefix)?; + + let max_level = get_highest_level(rtxn, database, field_id)?; + if max_level == u8::MAX { + panic!("max_level == 255"); + } + + for leaf in leaf_it { + let (leaf_facet_value, leaf_docids) = leaf?; + let mut current_level = 0; + + let mut current_parent_facet_value: Option> = None; + let mut current_parent_docids: Option = None; + loop { + current_level += 1; + if current_level >= max_level { + break; + } + let parent_key_right_bound = FacetGroupKey { + field_id, + level: current_level, + left_bound: leaf_facet_value.left_bound, + }; + let (parent_facet_value, parent_docids) = database + .get_lower_than_or_equal_to(rtxn, &parent_key_right_bound)? + .expect("no parent found"); + if parent_facet_value.level != current_level { + panic!( + "wrong parent level, found_level={}, expected_level={}", + parent_facet_value.level, current_level + ); + } + if parent_facet_value.field_id != field_id { + panic!("wrong parent fid"); + } + if parent_facet_value.left_bound > leaf_facet_value.left_bound { + panic!("wrong parent left bound"); + } + + if !leaf_docids.bitmap.is_subset(&parent_docids.bitmap) { + panic!( + "missing docids from leaf in parent, current_level={}, parent={}, child={}, missing={missing:?}, child_len={}, child={:?}", + current_level, + facet_to_string(parent_facet_value.left_bound, facet_type), + facet_to_string(leaf_facet_value.left_bound, facet_type), + leaf_docids.bitmap.len(), + leaf_docids.bitmap.clone(), + missing=leaf_docids.bitmap - parent_docids.bitmap, + ) + } + + if let Some(current_parent_facet_value) = current_parent_facet_value { + if current_parent_facet_value.field_id != parent_facet_value.field_id { + panic!("wrong parent parent fid"); + } + if current_parent_facet_value.level + 1 != parent_facet_value.level { + panic!("wrong parent parent level"); + } + if current_parent_facet_value.left_bound < parent_facet_value.left_bound { + panic!("wrong parent parent left bound"); + } + } + + if let Some(current_parent_docids) = current_parent_docids { + if !current_parent_docids.bitmap.is_subset(&parent_docids.bitmap) { + panic!("missing docids from intermediate node in parent, parent_level={}, parent={}, intermediate={}, missing={missing:?}, intermediate={:?}", + parent_facet_value.level, + facet_to_string(parent_facet_value.left_bound, facet_type), + facet_to_string(current_parent_facet_value.unwrap().left_bound, facet_type), + current_parent_docids.bitmap.clone(), + missing=current_parent_docids.bitmap - parent_docids.bitmap, + ); + } + } + + current_parent_facet_value = Some(parent_facet_value); + current_parent_docids = Some(parent_docids); + } + } + tracing::info!(%field_id, ?facet_type, "checked all leaves"); + + let mut current_level = max_level; + let mut greater_than_group = 0usize; + let mut total = 0usize; + loop { + if current_level == 0 { + break; + } + let child_level = current_level - 1; + tracing::info!(%field_id, ?facet_type, %current_level, "checked groups for level"); + let level_groups_prefix: FacetGroupKey<&[u8]> = + FacetGroupKey { field_id, level: current_level, left_bound: &[] }; + let mut level_groups_it = database.prefix_iter(rtxn, &level_groups_prefix)?.peekable(); + + 'group_it: loop { + let Some(group) = level_groups_it.next() else { break 'group_it }; + + let (group_facet_value, group_docids) = group?; + let child_left_bound = group_facet_value.left_bound.to_owned(); + let mut expected_docids = RoaringBitmap::new(); + let mut expected_size = 0usize; + let right_bound = level_groups_it + .peek() + .and_then(|res| res.as_ref().ok()) + .map(|(key, _)| key.left_bound); + let child_left_bound = FacetGroupKey { + field_id, + level: child_level, + left_bound: child_left_bound.as_slice(), + }; + let child_left_bound = Bound::Included(&child_left_bound); + let child_right_bound; + let child_right_bound = if let Some(right_bound) = right_bound { + child_right_bound = + FacetGroupKey { field_id, level: child_level, left_bound: right_bound }; + Bound::Excluded(&child_right_bound) + } else { + Bound::Unbounded + }; + let children = database.range(rtxn, &(child_left_bound, child_right_bound))?; + for child in children { + let (child_facet_value, child_docids) = child?; + if child_facet_value.field_id != field_id { + break; + } + if child_facet_value.level != child_level { + break; + } + expected_size += 1; + expected_docids |= &child_docids.bitmap; + } + assert_eq!(expected_size, group_docids.size as usize); + assert!(expected_size <= max_group_size); + assert_eq!(expected_docids, group_docids.bitmap); + total += 1; + if expected_size > group_size { + greater_than_group += 1; + } + } + + current_level -= 1; + } + if greater_than_group * 2 > total { + panic!("too many groups have a size > group_size"); + } + + tracing::info!("sanity checks OK"); + + Ok(()) +} + +fn facet_to_string(facet_value: &[u8], facet_type: FacetType) -> String { + match facet_type { + FacetType::String => bstr::BStr::new(facet_value).to_string(), + FacetType::Number => match OrderedF64Codec::bytes_decode(facet_value) { + Ok(value) => value.to_string(), + Err(e) => format!("error: {e} (bytes: {facet_value:?}"), + }, + } +} diff --git a/crates/milli/src/update/facet/new_incremental.rs b/crates/milli/src/update/facet/new_incremental.rs new file mode 100644 index 000000000..0890f8593 --- /dev/null +++ b/crates/milli/src/update/facet/new_incremental.rs @@ -0,0 +1,498 @@ +use std::ops::Bound; + +use heed::types::{Bytes, DecodeIgnore}; +use heed::{BytesDecode as _, Database, RwTxn}; +use roaring::RoaringBitmap; + +use crate::facet::FacetType; +use crate::heed_codec::facet::{ + FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec, +}; +use crate::heed_codec::BytesRefCodec; +use crate::search::facet::get_highest_level; +use crate::update::valid_facet_value; +use crate::{FieldId, Index, Result}; + +pub struct FacetsUpdateIncremental { + inner: FacetsUpdateIncrementalInner, + delta_data: Vec, +} + +struct FacetsUpdateIncrementalInner { + db: Database, FacetGroupValueCodec>, + field_id: FieldId, + group_size: u8, + min_level_size: u8, + max_group_size: u8, +} + +impl FacetsUpdateIncremental { + pub fn new( + index: &Index, + facet_type: FacetType, + field_id: FieldId, + delta_data: Vec, + group_size: u8, + min_level_size: u8, + max_group_size: u8, + ) -> Self { + FacetsUpdateIncremental { + inner: FacetsUpdateIncrementalInner { + db: match facet_type { + FacetType::String => index + .facet_id_string_docids + .remap_key_type::>(), + FacetType::Number => index + .facet_id_f64_docids + .remap_key_type::>(), + }, + field_id, + group_size, + min_level_size, + max_group_size, + }, + + delta_data, + } + } + + #[tracing::instrument(level = "trace", skip_all, target = "indexing::facets::incremental")] + pub fn execute(mut self, wtxn: &mut RwTxn) -> Result<()> { + if self.delta_data.is_empty() { + return Ok(()); + } + self.delta_data.sort_unstable_by( + |FacetFieldIdChange { facet_value: left, .. }, + FacetFieldIdChange { facet_value: right, .. }| { + left.cmp(right) + // sort in **reverse** lexicographic order + .reverse() + }, + ); + + self.inner.find_changed_parents(wtxn, self.delta_data)?; + + self.inner.add_or_delete_level(wtxn) + } +} + +impl FacetsUpdateIncrementalInner { + /// WARNING: `changed_children` must be sorted in **reverse** lexicographic order. + fn find_changed_parents( + &self, + wtxn: &mut RwTxn, + mut changed_children: Vec, + ) -> Result<()> { + let mut changed_parents = vec![]; + for child_level in 0u8..u8::MAX { + // child_level < u8::MAX by construction + let parent_level = child_level + 1; + let parent_level_left_bound: FacetGroupKey<&[u8]> = + FacetGroupKey { field_id: self.field_id, level: parent_level, left_bound: &[] }; + + let mut last_parent: Option> = None; + let mut child_it = changed_children + // drain all changed children + .drain(..) + // keep only children whose value is valid in the LMDB sense + .filter(|child| valid_facet_value(&child.facet_value)); + // `while let` rather than `for` because we advance `child_it` inside of the loop + 'current_level: while let Some(child) = child_it.next() { + if let Some(last_parent) = &last_parent { + if &child.facet_value >= last_parent { + self.compute_parent_group(wtxn, child_level, child.facet_value)?; + continue 'current_level; + } + } + + // need to find a new parent + let parent_key_prefix = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: &*child.facet_value, + }; + + let parent = self + .db + .remap_data_type::() + .rev_range( + wtxn, + &( + Bound::Excluded(&parent_level_left_bound), + Bound::Included(&parent_key_prefix), + ), + )? + .next(); + + match parent { + Some(Ok((parent_key, _parent_value))) => { + // found parent, cache it for next keys + last_parent = Some(parent_key.left_bound.to_owned().into_boxed_slice()); + + // add to modified list for parent level + changed_parents.push(FacetFieldIdChange { + facet_value: parent_key.left_bound.to_owned().into_boxed_slice(), + }); + self.compute_parent_group(wtxn, child_level, child.facet_value)?; + } + Some(Err(err)) => return Err(err.into()), + None => { + // no parent for that key + let mut parent_it = self + .db + .remap_data_type::() + .prefix_iter_mut(wtxn, &parent_level_left_bound)?; + match parent_it.next() { + // 1. left of the current left bound, or + Some(Ok((first_key, _first_value))) => { + // make sure we don't spill on the neighboring fid (level also included defensively) + if first_key.field_id != self.field_id + || first_key.level != parent_level + { + // max level reached, exit + drop(parent_it); + self.compute_parent_group( + wtxn, + child_level, + child.facet_value, + )?; + for child in child_it.by_ref() { + self.compute_parent_group( + wtxn, + child_level, + child.facet_value, + )?; + } + return Ok(()); + } + // remove old left bound + unsafe { parent_it.del_current()? }; + drop(parent_it); + changed_parents.push(FacetFieldIdChange { + facet_value: child.facet_value.clone(), + }); + self.compute_parent_group(wtxn, child_level, child.facet_value)?; + // pop all elements in order to visit the new left bound + let new_left_bound = + &mut changed_parents.last_mut().unwrap().facet_value; + for child in child_it.by_ref() { + new_left_bound.clone_from(&child.facet_value); + + self.compute_parent_group( + wtxn, + child_level, + child.facet_value, + )?; + } + } + Some(Err(err)) => return Err(err.into()), + // 2. max level reached, exit + None => { + drop(parent_it); + self.compute_parent_group(wtxn, child_level, child.facet_value)?; + for child in child_it.by_ref() { + self.compute_parent_group( + wtxn, + child_level, + child.facet_value, + )?; + } + return Ok(()); + } + } + } + } + } + if changed_parents.is_empty() { + return Ok(()); + } + drop(child_it); + std::mem::swap(&mut changed_children, &mut changed_parents); + // changed_parents is now empty because changed_children was emptied by the drain + } + Ok(()) + } + + fn compute_parent_group( + &self, + wtxn: &mut RwTxn<'_>, + parent_level: u8, + parent_left_bound: Box<[u8]>, + ) -> Result<()> { + let mut range_left_bound: Vec = parent_left_bound.into(); + if parent_level == 0 { + return Ok(()); + } + let child_level = parent_level - 1; + + let parent_key = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: &*range_left_bound, + }; + let child_right_bound = self + .db + .remap_data_type::() + .get_greater_than(wtxn, &parent_key)? + .and_then( + |( + FacetGroupKey { + level: right_level, + field_id: right_fid, + left_bound: right_bound, + }, + _, + )| { + if parent_level != right_level || self.field_id != right_fid { + // there was a greater key, but with a greater level or fid, so not a sibling to the parent: ignore + return None; + } + Some(right_bound.to_owned()) + }, + ); + let child_right_bound = match &child_right_bound { + Some(right_bound) => Bound::Excluded(FacetGroupKey { + left_bound: right_bound.as_slice(), + field_id: self.field_id, + level: child_level, + }), + None => Bound::Unbounded, + }; + + let child_left_key = FacetGroupKey { + field_id: self.field_id, + level: child_level, + left_bound: &*range_left_bound, + }; + let mut child_left_bound = Bound::Included(child_left_key); + + loop { + // do a first pass on the range to find the number of children + let child_count = self + .db + .remap_data_type::() + .range(wtxn, &(child_left_bound, child_right_bound))? + .take(self.max_group_size as usize * 2) + .count(); + let mut child_it = self.db.range(wtxn, &(child_left_bound, child_right_bound))?; + + // pick the right group_size depending on the number of children + let group_size = if child_count >= self.max_group_size as usize * 2 { + // more than twice the max_group_size => there will be space for at least 2 groups of max_group_size + self.max_group_size as usize + } else if child_count >= self.group_size as usize { + // size in [group_size, max_group_size * 2[ + // divided by 2 it is between [group_size / 2, max_group_size[ + // this ensures that the tree is balanced + child_count / 2 + } else { + // take everything + child_count + }; + + let res: Result<_> = child_it + .by_ref() + .take(group_size) + // stop if we go to the next level or field id + .take_while(|res| match res { + Ok((child_key, _)) => { + child_key.field_id == self.field_id && child_key.level == child_level + } + Err(_) => true, + }) + .try_fold( + (None, FacetGroupValue { size: 0, bitmap: Default::default() }), + |(bounds, mut group_value), child_res| { + let (child_key, child_value) = child_res?; + let bounds = match bounds { + Some((left_bound, _)) => Some((left_bound, child_key.left_bound)), + None => Some((child_key.left_bound, child_key.left_bound)), + }; + // max_group_size <= u8::MAX + group_value.size += 1; + group_value.bitmap |= &child_value.bitmap; + Ok((bounds, group_value)) + }, + ); + + let (bounds, group_value) = res?; + + let Some((group_left_bound, right_bound)) = bounds else { + let update_key = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: &*range_left_bound, + }; + drop(child_it); + if let Bound::Included(_) = child_left_bound { + self.db.delete(wtxn, &update_key)?; + } + + break; + }; + + drop(child_it); + let current_left_bound = group_left_bound.to_owned(); + + let delete_old_bound = match child_left_bound { + Bound::Included(bound) => { + if bound.left_bound != current_left_bound { + Some(range_left_bound.clone()) + } else { + None + } + } + _ => None, + }; + + range_left_bound.clear(); + range_left_bound.extend_from_slice(right_bound); + let child_left_key = FacetGroupKey { + field_id: self.field_id, + level: child_level, + left_bound: range_left_bound.as_slice(), + }; + child_left_bound = Bound::Excluded(child_left_key); + + if let Some(old_bound) = delete_old_bound { + let update_key = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: old_bound.as_slice(), + }; + self.db.delete(wtxn, &update_key)?; + } + + let update_key = FacetGroupKey { + field_id: self.field_id, + level: parent_level, + left_bound: current_left_bound.as_slice(), + }; + if group_value.bitmap.is_empty() { + self.db.delete(wtxn, &update_key)?; + } else { + self.db.put(wtxn, &update_key, &group_value)?; + } + } + + Ok(()) + } + + /// Check whether the highest level has exceeded `min_level_size` * `self.group_size`. + /// If it has, we must build an addition level above it. + /// Then check whether the highest level is under `min_level_size`. + /// If it has, we must remove the complete level. + pub(crate) fn add_or_delete_level(&self, txn: &mut RwTxn<'_>) -> Result<()> { + let highest_level = get_highest_level(txn, self.db, self.field_id)?; + let mut highest_level_prefix = vec![]; + highest_level_prefix.extend_from_slice(&self.field_id.to_be_bytes()); + highest_level_prefix.push(highest_level); + + let size_highest_level = + self.db.remap_types::().prefix_iter(txn, &highest_level_prefix)?.count(); + + if size_highest_level >= self.group_size as usize * self.min_level_size as usize { + self.add_level(txn, highest_level, &highest_level_prefix, size_highest_level) + } else if size_highest_level < self.min_level_size as usize && highest_level != 0 { + self.delete_level(txn, &highest_level_prefix) + } else { + Ok(()) + } + } + + /// Delete a level. + fn delete_level(&self, txn: &mut RwTxn<'_>, highest_level_prefix: &[u8]) -> Result<()> { + let mut to_delete = vec![]; + let mut iter = + self.db.remap_types::().prefix_iter(txn, highest_level_prefix)?; + for el in iter.by_ref() { + let (k, _) = el?; + to_delete.push( + FacetGroupKeyCodec::::bytes_decode(k) + .map_err(heed::Error::Encoding)? + .into_owned(), + ); + } + drop(iter); + for k in to_delete { + self.db.delete(txn, &k.as_ref())?; + } + Ok(()) + } + + /// Build an additional level for the field id. + fn add_level( + &self, + txn: &mut RwTxn<'_>, + highest_level: u8, + highest_level_prefix: &[u8], + size_highest_level: usize, + ) -> Result<()> { + let mut groups_iter = self + .db + .remap_types::() + .prefix_iter(txn, highest_level_prefix)?; + + let nbr_new_groups = size_highest_level / self.group_size as usize; + let nbr_leftover_elements = size_highest_level % self.group_size as usize; + + let mut to_add = vec![]; + for _ in 0..nbr_new_groups { + let mut first_key = None; + let mut values = RoaringBitmap::new(); + for _ in 0..self.group_size { + let (key_bytes, value_i) = groups_iter.next().unwrap()?; + let key_i = FacetGroupKeyCodec::::bytes_decode(key_bytes) + .map_err(heed::Error::Encoding)?; + + if first_key.is_none() { + first_key = Some(key_i); + } + values |= value_i.bitmap; + } + let key = FacetGroupKey { + field_id: self.field_id, + level: highest_level + 1, + left_bound: first_key.unwrap().left_bound, + }; + let value = FacetGroupValue { size: self.group_size, bitmap: values }; + to_add.push((key.into_owned(), value)); + } + // now we add the rest of the level, in case its size is > group_size * min_level_size + // this can indeed happen if the min_level_size parameter changes between two calls to `insert` + if nbr_leftover_elements > 0 { + let mut first_key = None; + let mut values = RoaringBitmap::new(); + for _ in 0..nbr_leftover_elements { + let (key_bytes, value_i) = groups_iter.next().unwrap()?; + let key_i = FacetGroupKeyCodec::::bytes_decode(key_bytes) + .map_err(heed::Error::Encoding)?; + + if first_key.is_none() { + first_key = Some(key_i); + } + values |= value_i.bitmap; + } + let key = FacetGroupKey { + field_id: self.field_id, + level: highest_level + 1, + left_bound: first_key.unwrap().left_bound, + }; + // Note: nbr_leftover_elements can be casted to a u8 since it is bounded by `max_group_size` + // when it is created above. + let value = FacetGroupValue { size: nbr_leftover_elements as u8, bitmap: values }; + to_add.push((key.into_owned(), value)); + } + + drop(groups_iter); + for (key, value) in to_add { + self.db.put(txn, &key.as_ref(), &value)?; + } + Ok(()) + } +} + +#[derive(Debug)] +pub struct FacetFieldIdChange { + pub facet_value: Box<[u8]>, +} diff --git a/crates/milli/src/update/index_documents/helpers/mod.rs b/crates/milli/src/update/index_documents/helpers/mod.rs index c188e324d..5dec54ffc 100644 --- a/crates/milli/src/update/index_documents/helpers/mod.rs +++ b/crates/milli/src/update/index_documents/helpers/mod.rs @@ -10,10 +10,14 @@ use fst::{IntoStreamer, Streamer}; pub use grenad_helpers::*; pub use merge_functions::*; -use crate::MAX_WORD_LENGTH; +use crate::MAX_LMDB_KEY_LENGTH; pub fn valid_lmdb_key(key: impl AsRef<[u8]>) -> bool { - key.as_ref().len() <= MAX_WORD_LENGTH * 2 && !key.as_ref().is_empty() + key.as_ref().len() <= MAX_LMDB_KEY_LENGTH - 3 && !key.as_ref().is_empty() +} + +pub fn valid_facet_value(facet_value: impl AsRef<[u8]>) -> bool { + facet_value.as_ref().len() <= MAX_LMDB_KEY_LENGTH - 3 && !facet_value.as_ref().is_empty() } /// Divides one slice into two at an index, returns `None` if mid is out of bounds. diff --git a/crates/milli/src/update/index_documents/snapshots/mod.rs/geo_filtered_placeholder_search_should_not_return_deleted_documents/facet_id_f64_docids.snap b/crates/milli/src/update/index_documents/snapshots/mod.rs/geo_filtered_placeholder_search_should_not_return_deleted_documents/facet_id_f64_docids.snap index c45c350e7..7ab60b90d 100644 --- a/crates/milli/src/update/index_documents/snapshots/mod.rs/geo_filtered_placeholder_search_should_not_return_deleted_documents/facet_id_f64_docids.snap +++ b/crates/milli/src/update/index_documents/snapshots/mod.rs/geo_filtered_placeholder_search_should_not_return_deleted_documents/facet_id_f64_docids.snap @@ -1,5 +1,5 @@ --- -source: milli/src/update/index_documents/mod.rs +source: crates/milli/src/update/index_documents/mod.rs --- 3 0 48.9021 1 [19, ] 3 0 49.9314 1 [17, ] @@ -15,6 +15,11 @@ source: milli/src/update/index_documents/mod.rs 3 0 50.7453 1 [7, ] 3 0 50.8466 1 [10, ] 3 0 51.0537 1 [9, ] +3 1 48.9021 2 [17, 19, ] +3 1 50.1793 3 [13, 14, 15, ] +3 1 50.4502 4 [0, 3, 8, 12, ] +3 1 50.6312 2 [1, 2, ] +3 1 50.7453 3 [7, 9, 10, ] 4 0 2.271 1 [17, ] 4 0 2.3708 1 [19, ] 4 0 2.7637 1 [14, ] @@ -28,4 +33,3 @@ source: milli/src/update/index_documents/mod.rs 4 0 3.6957 1 [9, ] 4 0 3.9623 1 [12, ] 4 0 4.337 1 [10, ] - diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 8f14fa7ed..8216742ec 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -252,6 +252,24 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( previous_offset = iter.byte_offset(); } + if payload.is_empty() { + let result = retrieve_or_guess_primary_key( + rtxn, + index, + new_fields_ids_map, + primary_key_from_op, + None, + ); + match result { + Ok(Ok((pk, _))) => { + primary_key.get_or_insert(pk); + } + Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (), + Ok(Err(user_error)) => return Err(Error::UserError(user_error)), + Err(error) => return Err(error), + }; + } + Ok(new_docids_version_offsets) } diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index 53fd8a89b..63536c559 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -100,6 +100,7 @@ where caches, FacetDatabases::new(index), index, + &rtxn, extractor_sender.facet_docids(), )?; } diff --git a/crates/milli/src/update/new/indexer/post_processing.rs b/crates/milli/src/update/new/indexer/post_processing.rs index 6bd139068..201ab9ec9 100644 --- a/crates/milli/src/update/new/indexer/post_processing.rs +++ b/crates/milli/src/update/new/indexer/post_processing.rs @@ -8,7 +8,10 @@ use super::document_changes::IndexingContext; use crate::facet::FacetType; use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; use crate::update::del_add::DelAdd; +use crate::update::facet::new_incremental::FacetsUpdateIncremental; +use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE}; use crate::update::new::facet_search_builder::FacetSearchBuilder; +use crate::update::new::merger::FacetFieldIdDelta; use crate::update::new::steps::IndexingStep; use crate::update::new::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; use crate::update::new::words_prefix_docids::{ @@ -160,27 +163,66 @@ fn compute_facet_search_database( fn compute_facet_level_database( index: &Index, wtxn: &mut RwTxn, - facet_field_ids_delta: FacetFieldIdsDelta, + mut facet_field_ids_delta: FacetFieldIdsDelta, ) -> Result<()> { - if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() { + for (fid, delta) in facet_field_ids_delta.consume_facet_string_delta() { let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); let _entered = span.enter(); - FacetsUpdateBulk::new_not_updating_level_0( - index, - modified_facet_string_ids, - FacetType::String, - ) - .execute(wtxn)?; + match delta { + FacetFieldIdDelta::Bulk => { + tracing::debug!(%fid, "bulk string facet processing"); + FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String) + .execute(wtxn)? + } + FacetFieldIdDelta::Incremental(delta_data) => { + tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing"); + FacetsUpdateIncremental::new( + index, + FacetType::String, + fid, + delta_data, + FACET_GROUP_SIZE, + FACET_MIN_LEVEL_SIZE, + FACET_MAX_GROUP_SIZE, + ) + .execute(wtxn)? + } + } } - if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() { + + for (fid, delta) in facet_field_ids_delta.consume_facet_number_delta() { let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); let _entered = span.enter(); - FacetsUpdateBulk::new_not_updating_level_0( + match delta { + FacetFieldIdDelta::Bulk => { + tracing::debug!(%fid, "bulk number facet processing"); + FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number) + .execute(wtxn)? + } + FacetFieldIdDelta::Incremental(delta_data) => { + tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing"); + FacetsUpdateIncremental::new( + index, + FacetType::Number, + fid, + delta_data, + FACET_GROUP_SIZE, + FACET_MIN_LEVEL_SIZE, + FACET_MAX_GROUP_SIZE, + ) + .execute(wtxn)? + } + } + debug_assert!(crate::update::facet::sanity_checks( index, - modified_facet_number_ids, + wtxn, + fid, FacetType::Number, + FACET_GROUP_SIZE as usize, + FACET_MIN_LEVEL_SIZE as usize, + FACET_MAX_GROUP_SIZE as usize, ) - .execute(wtxn)?; + .is_ok()); } Ok(()) diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 9e87388a2..090add6bd 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -1,6 +1,6 @@ use std::cell::RefCell; -use hashbrown::HashSet; +use hashbrown::HashMap; use heed::types::Bytes; use heed::{Database, RoTxn}; use memmap2::Mmap; @@ -12,6 +12,7 @@ use super::extract::{ merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, GeoExtractorData, }; +use crate::update::facet::new_incremental::FacetFieldIdChange; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] @@ -100,12 +101,18 @@ pub fn merge_and_send_facet_docids<'extractor>( mut caches: Vec>, database: FacetDatabases, index: &Index, + rtxn: &RoTxn, docids_sender: FacetDocidsSender, ) -> Result { + let max_string_count = (index.facet_id_string_docids.len(rtxn)? / 500) as usize; + let max_number_count = (index.facet_id_f64_docids.len(rtxn)? / 500) as usize; + let max_string_count = max_string_count.clamp(1000, 100_000); + let max_number_count = max_number_count.clamp(1000, 100_000); transpose_and_freeze_caches(&mut caches)? .into_par_iter() .map(|frozen| { - let mut facet_field_ids_delta = FacetFieldIdsDelta::default(); + let mut facet_field_ids_delta = + FacetFieldIdsDelta::new(max_string_count, max_number_count); let rtxn = index.read_txn()?; merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; @@ -126,7 +133,10 @@ pub fn merge_and_send_facet_docids<'extractor>( Ok(facet_field_ids_delta) }) - .reduce(|| Ok(FacetFieldIdsDelta::default()), |lhs, rhs| Ok(lhs?.merge(rhs?))) + .reduce( + || Ok(FacetFieldIdsDelta::new(max_string_count, max_number_count)), + |lhs, rhs| Ok(lhs?.merge(rhs?)), + ) } pub struct FacetDatabases<'a> { @@ -155,60 +165,131 @@ impl<'a> FacetDatabases<'a> { } } -#[derive(Debug, Default)] +#[derive(Debug)] +pub enum FacetFieldIdDelta { + Bulk, + Incremental(Vec), +} + +impl FacetFieldIdDelta { + fn push(&mut self, facet_value: &[u8], max_count: usize) { + *self = match std::mem::replace(self, FacetFieldIdDelta::Bulk) { + FacetFieldIdDelta::Bulk => FacetFieldIdDelta::Bulk, + FacetFieldIdDelta::Incremental(mut v) => { + if v.len() >= max_count { + FacetFieldIdDelta::Bulk + } else { + v.push(FacetFieldIdChange { facet_value: facet_value.into() }); + FacetFieldIdDelta::Incremental(v) + } + } + } + } + + fn merge(&mut self, rhs: Option, max_count: usize) { + let Some(rhs) = rhs else { + return; + }; + *self = match (std::mem::replace(self, FacetFieldIdDelta::Bulk), rhs) { + (FacetFieldIdDelta::Bulk, _) | (_, FacetFieldIdDelta::Bulk) => FacetFieldIdDelta::Bulk, + ( + FacetFieldIdDelta::Incremental(mut left), + FacetFieldIdDelta::Incremental(mut right), + ) => { + if left.len() + right.len() >= max_count { + FacetFieldIdDelta::Bulk + } else { + left.append(&mut right); + FacetFieldIdDelta::Incremental(left) + } + } + }; + } +} + +#[derive(Debug)] pub struct FacetFieldIdsDelta { /// The field ids that have been modified - modified_facet_string_ids: HashSet, - modified_facet_number_ids: HashSet, + modified_facet_string_ids: HashMap, + modified_facet_number_ids: HashMap, + max_string_count: usize, + max_number_count: usize, } impl FacetFieldIdsDelta { - fn register_facet_string_id(&mut self, field_id: FieldId) { - self.modified_facet_string_ids.insert(field_id); + pub fn new(max_string_count: usize, max_number_count: usize) -> Self { + Self { + max_string_count, + max_number_count, + modified_facet_string_ids: Default::default(), + modified_facet_number_ids: Default::default(), + } } - fn register_facet_number_id(&mut self, field_id: FieldId) { - self.modified_facet_number_ids.insert(field_id); + fn register_facet_string_id(&mut self, field_id: FieldId, facet_value: &[u8]) { + self.modified_facet_string_ids + .entry(field_id) + .or_insert(FacetFieldIdDelta::Incremental(Default::default())) + .push(facet_value, self.max_string_count); + } + + fn register_facet_number_id(&mut self, field_id: FieldId, facet_value: &[u8]) { + self.modified_facet_number_ids + .entry(field_id) + .or_insert(FacetFieldIdDelta::Incremental(Default::default())) + .push(facet_value, self.max_number_count); } fn register_from_key(&mut self, key: &[u8]) { - let (facet_kind, field_id) = self.extract_key_data(key); - match facet_kind { - FacetKind::Number => self.register_facet_number_id(field_id), - FacetKind::String => self.register_facet_string_id(field_id), + let (facet_kind, field_id, facet_value) = self.extract_key_data(key); + match (facet_kind, facet_value) { + (FacetKind::Number, Some(facet_value)) => { + self.register_facet_number_id(field_id, facet_value) + } + (FacetKind::String, Some(facet_value)) => { + self.register_facet_string_id(field_id, facet_value) + } _ => (), } } - fn extract_key_data(&self, key: &[u8]) -> (FacetKind, FieldId) { + fn extract_key_data<'key>(&self, key: &'key [u8]) -> (FacetKind, FieldId, Option<&'key [u8]>) { let facet_kind = FacetKind::from(key[0]); let field_id = FieldId::from_be_bytes([key[1], key[2]]); - (facet_kind, field_id) + let facet_value = if key.len() >= 4 { + // level is also stored in the key at [3] (always 0) + Some(&key[4..]) + } else { + None + }; + + (facet_kind, field_id, facet_value) } - pub fn modified_facet_string_ids(&self) -> Option> { - if self.modified_facet_string_ids.is_empty() { - None - } else { - Some(self.modified_facet_string_ids.iter().copied().collect()) - } + pub fn consume_facet_string_delta( + &mut self, + ) -> impl Iterator + '_ { + self.modified_facet_string_ids.drain() } - pub fn modified_facet_number_ids(&self) -> Option> { - if self.modified_facet_number_ids.is_empty() { - None - } else { - Some(self.modified_facet_number_ids.iter().copied().collect()) - } + pub fn consume_facet_number_delta( + &mut self, + ) -> impl Iterator + '_ { + self.modified_facet_number_ids.drain() } pub fn merge(mut self, rhs: Self) -> Self { - let Self { modified_facet_number_ids, modified_facet_string_ids } = rhs; - modified_facet_number_ids.into_iter().for_each(|fid| { - self.modified_facet_number_ids.insert(fid); + // rhs.max_xx_count is assumed to be equal to self.max_xx_count, and so gets unused + let Self { modified_facet_number_ids, modified_facet_string_ids, .. } = rhs; + modified_facet_number_ids.into_iter().for_each(|(fid, mut delta)| { + let old_delta = self.modified_facet_number_ids.remove(&fid); + delta.merge(old_delta, self.max_number_count); + self.modified_facet_number_ids.insert(fid, delta); }); - modified_facet_string_ids.into_iter().for_each(|fid| { - self.modified_facet_string_ids.insert(fid); + modified_facet_string_ids.into_iter().for_each(|(fid, mut delta)| { + let old_delta = self.modified_facet_string_ids.remove(&fid); + delta.merge(old_delta, self.max_string_count); + self.modified_facet_string_ids.insert(fid, delta); }); self }