diff --git a/milli/src/update/del_add.rs b/milli/src/update/del_add.rs index c8b7f0f6a..dc7c0409a 100644 --- a/milli/src/update/del_add.rs +++ b/milli/src/update/del_add.rs @@ -102,3 +102,17 @@ pub fn del_add_from_two_obkvs( pub fn is_noop_del_add_obkv(del_add: KvReaderDelAdd) -> bool { del_add.get(DelAdd::Deletion) == del_add.get(DelAdd::Addition) } + +/// A function that extracts and returns the Add side of a DelAdd obkv. +/// This is useful when there are no previous value in the database and +/// therefore we don't need to do a diff with what's already there. +/// +/// If there is no Add side we currently write an empty buffer +/// which is a valid CboRoaringBitmap. +#[allow(clippy::ptr_arg)] // required to avoid signature mismatch +pub fn deladd_serialize_add_side<'a>( + obkv: &'a [u8], + _buffer: &mut Vec, +) -> crate::Result<&'a [u8]> { + Ok(KvReaderDelAdd::new(obkv).get(DelAdd::Addition).unwrap_or_default()) +} diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index 4f764ab95..f520ea7b0 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -9,6 +9,7 @@ use log::debug; use super::{ClonableMmap, MergeFn}; use crate::error::InternalError; +use crate::update::index_documents::valid_lmdb_key; use crate::Result; pub type CursorClonableMmap = io::Cursor; @@ -282,6 +283,49 @@ pub fn sorter_into_lmdb_database( Ok(()) } +/// Write provided sorter in database using serialize_value function. +/// merge_values function is used if an entry already exist in the database. +pub fn write_sorter_into_database( + sorter: Sorter, + database: &heed::Database, + wtxn: &mut heed::RwTxn, + index_is_empty: bool, + serialize_value: FS, + merge_values: FM, +) -> Result<()> +where + FS: for<'a> Fn(&'a [u8], &'a mut Vec) -> Result<&'a [u8]>, + FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec) -> Result>, +{ + puffin::profile_function!(); + + let mut buffer = Vec::new(); + let database = database.remap_types::(); + + let mut merger_iter = sorter.into_stream_merger_iter()?; + while let Some((key, value)) = merger_iter.next()? { + if valid_lmdb_key(key) { + buffer.clear(); + let value = if index_is_empty { + Some(serialize_value(value, &mut buffer)?) + } else { + match database.get(wtxn, key)? { + Some(prev_value) => merge_values(value, prev_value, &mut buffer)?, + None => Some(serialize_value(value, &mut buffer)?), + } + }; + match value { + Some(value) => database.put(wtxn, key, value)?, + None => { + database.delete(wtxn, key)?; + } + } + } + } + + Ok(()) +} + /// Used when trying to merge readers, but you don't actually care about the values. pub fn merge_ignore_values<'a>(_key: &[u8], _values: &[Cow<'a, [u8]>]) -> Result> { Ok(Cow::Owned(Vec::new())) diff --git a/milli/src/update/index_documents/helpers/merge_functions.rs b/milli/src/update/index_documents/helpers/merge_functions.rs index 98c1c1a04..5d9ca7ef2 100644 --- a/milli/src/update/index_documents/helpers/merge_functions.rs +++ b/milli/src/update/index_documents/helpers/merge_functions.rs @@ -239,3 +239,19 @@ pub fn merge_deladd_cbo_roaring_bitmaps<'a>( output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into) } } + +/// A function that merges a DelAdd of bitmao into an already existing bitmap. +/// +/// The first argument is the DelAdd obkv of CboRoaringBitmaps and +/// the second one is the CboRoaringBitmap to merge into. +pub fn merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap<'a>( + deladd_obkv: &[u8], + previous: &[u8], + buffer: &'a mut Vec, +) -> Result> { + Ok(CboRoaringBitmapCodec::merge_deladd_into( + KvReaderDelAdd::new(deladd_obkv), + previous, + buffer, + )?) +} diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index 1f2f8e6ef..c167f1cd3 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -9,12 +9,13 @@ pub use clonable_mmap::{ClonableMmap, CursorClonableMmap}; use fst::{IntoStreamer, Streamer}; pub use grenad_helpers::{ as_cloneable_grenad, create_sorter, create_writer, grenad_obkv_into_chunks, - merge_ignore_values, sorter_into_lmdb_database, sorter_into_reader, writer_into_reader, - GrenadParameters, MergeableReader, + merge_ignore_values, sorter_into_lmdb_database, sorter_into_reader, write_sorter_into_database, + writer_into_reader, GrenadParameters, MergeableReader, }; pub use merge_functions::{ concat_u32s_array, keep_first, keep_latest_obkv, merge_btreeset_string, - merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps, merge_roaring_bitmaps, + merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_roaring_bitmaps, obkvs_keep_last_addition_merge_deletions, obkvs_merge_additions_and_deletions, serialize_roaring_bitmap, MergeFn, }; diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 8552cf52b..5dbb4dd0b 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -26,8 +26,10 @@ pub use self::enrich::{ }; pub use self::helpers::{ as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset, - fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, - sorter_into_lmdb_database, valid_lmdb_key, writer_into_reader, ClonableMmap, MergeFn, + fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, + merge_roaring_bitmaps, sorter_into_lmdb_database, valid_lmdb_key, write_sorter_into_database, + writer_into_reader, ClonableMmap, MergeFn, }; use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index b53d859cd..90f9b7739 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -13,7 +13,10 @@ use obkv::{KvReader, KvWriter}; use ordered_float::OrderedFloat; use roaring::RoaringBitmap; -use super::helpers::{self, merge_ignore_values, valid_lmdb_key, CursorClonableMmap}; +use super::helpers::{ + self, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_ignore_values, + valid_lmdb_key, CursorClonableMmap, +}; use super::{ClonableMmap, MergeFn}; use crate::distance::NDotProductPoint; use crate::error::UserError; @@ -21,12 +24,11 @@ use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::facet::FacetType; use crate::index::db_name::DOCUMENTS; use crate::index::Hnsw; -use crate::update::del_add::{DelAdd, KvReaderDelAdd}; +use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd}; use crate::update::facet::FacetsUpdate; use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_array_at}; use crate::{ - lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, Result, - SerializationError, BEU32, + lat_lng_to_xyz, DocumentId, FieldId, GeoPoint, Index, Result, SerializationError, BEU32, }; pub(crate) enum TypedChunk { @@ -186,7 +188,7 @@ pub(crate) fn write_typed_chunk_into_index( wtxn, index_is_empty, deladd_serialize_add_side, - merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; is_merged_database = true; } @@ -202,7 +204,7 @@ pub(crate) fn write_typed_chunk_into_index( wtxn, index_is_empty, deladd_serialize_add_side, - merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; let exact_word_docids_iter = unsafe { as_cloneable_grenad(&exact_word_docids_reader) }?; @@ -212,7 +214,7 @@ pub(crate) fn write_typed_chunk_into_index( wtxn, index_is_empty, deladd_serialize_add_side, - merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; let word_fid_docids_iter = unsafe { as_cloneable_grenad(&word_fid_docids_reader) }?; @@ -222,7 +224,7 @@ pub(crate) fn write_typed_chunk_into_index( wtxn, index_is_empty, deladd_serialize_add_side, - merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; // create fst from word docids @@ -244,7 +246,7 @@ pub(crate) fn write_typed_chunk_into_index( wtxn, index_is_empty, deladd_serialize_add_side, - merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; is_merged_database = true; } @@ -265,7 +267,7 @@ pub(crate) fn write_typed_chunk_into_index( wtxn, index_is_empty, deladd_serialize_add_side, - merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; is_merged_database = true; } @@ -276,7 +278,7 @@ pub(crate) fn write_typed_chunk_into_index( wtxn, index_is_empty, deladd_serialize_add_side, - merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; is_merged_database = true; } @@ -287,7 +289,7 @@ pub(crate) fn write_typed_chunk_into_index( wtxn, index_is_empty, deladd_serialize_add_side, - merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; is_merged_database = true; } @@ -298,7 +300,7 @@ pub(crate) fn write_typed_chunk_into_index( wtxn, index_is_empty, deladd_serialize_add_side, - merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; is_merged_database = true; } @@ -495,33 +497,6 @@ fn merge_word_docids_reader_into_fst( Ok(builder.into_set()) } -/// A function that extracts and returns the Add side of a DelAdd obkv. -/// This is useful when there are no previous value in the database and -/// therefore we don't need to do a diff with what's already there. -/// -/// If there is no Add side we currently write an empty buffer -/// which is a valid CboRoaringBitmap. -#[allow(clippy::ptr_arg)] // required to avoid signature mismatch -fn deladd_serialize_add_side<'a>(obkv: &'a [u8], _buffer: &mut Vec) -> Result<&'a [u8]> { - Ok(KvReaderDelAdd::new(obkv).get(DelAdd::Addition).unwrap_or_default()) -} - -/// A function that merges a DelAdd of bitmao into an already existing bitmap. -/// -/// The first argument is the DelAdd obkv of CboRoaringBitmaps and -/// the second one is the CboRoaringBitmap to merge into. -fn merge_deladd_cbo_roaring_bitmaps<'a>( - deladd_obkv: &[u8], - previous: &[u8], - buffer: &'a mut Vec, -) -> Result> { - Ok(CboRoaringBitmapCodec::merge_deladd_into( - KvReaderDelAdd::new(deladd_obkv), - previous, - buffer, - )?) -} - /// Write provided entries in database using serialize_value function. /// merge_values function is used if an entry already exist in the database. fn write_entries_into_database( diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 8220aa777..618f451dc 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -4,9 +4,11 @@ use grenad::CompressionType; use heed::types::{ByteSlice, Str}; use heed::Database; +use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd}; use crate::update::index_documents::{ - create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, valid_lmdb_key, - CursorClonableMmap, MergeFn, + create_sorter, merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key, + write_sorter_into_database, CursorClonableMmap, MergeFn, }; use crate::{CboRoaringBitmapCodec, Result}; @@ -51,7 +53,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { // and write into it at the same time, therefore we write into another file. let mut prefix_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, self.chunk_compression_type, self.chunk_compression_level, self.max_nb_chunks, @@ -92,11 +94,16 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { // We fetch the docids associated to the newly added word prefix fst only. let db = self.word_docids.remap_data_type::(); + let mut buffer = Vec::new(); for prefix in new_prefix_fst_words { let prefix = std::str::from_utf8(prefix.as_bytes())?; for result in db.prefix_iter(self.wtxn, prefix)? { let (_word, data) = result?; - prefix_docids_sorter.insert(prefix, data)?; + buffer.clear(); + let mut writer = KvWriterDelAdd::new(&mut buffer); + writer.insert(DelAdd::Addition, data)?; + + prefix_docids_sorter.insert(prefix, writer.into_inner()?)?; } } @@ -110,12 +117,16 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { drop(iter); + let database_is_empty = self.word_prefix_docids.is_empty(self.wtxn)?; + // We finally write the word prefix docids into the LMDB database. - sorter_into_lmdb_database( - self.wtxn, - *self.word_prefix_docids.as_polymorph(), + write_sorter_into_database( prefix_docids_sorter, - merge_cbo_roaring_bitmaps, + &self.word_prefix_docids, + self.wtxn, + database_is_empty, + deladd_serialize_add_side, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; Ok(())