From 096d7705c7b127069d9a61b0a9a386a68009c133 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Thu, 12 Oct 2023 11:46:56 +0200 Subject: [PATCH] Make the transform struct return diff-based documents obkvs --- milli/src/update/del_add.rs | 60 +++++ .../helpers/merge_functions.rs | 126 ++++++--- .../src/update/index_documents/helpers/mod.rs | 4 +- milli/src/update/index_documents/transform.rs | 253 +++++++++++++----- milli/src/update/mod.rs | 1 + 5 files changed, 349 insertions(+), 95 deletions(-) create mode 100644 milli/src/update/del_add.rs diff --git a/milli/src/update/del_add.rs b/milli/src/update/del_add.rs new file mode 100644 index 000000000..e8e595837 --- /dev/null +++ b/milli/src/update/del_add.rs @@ -0,0 +1,60 @@ +use obkv::Key; + +pub type KvWriterDelAdd = obkv::KvWriter; +pub type KvReaderDelAdd<'a> = obkv::KvReader<'a, DelAdd>; + +/// DelAdd defines the new value to add in the database and old value to delete from the database. +/// +/// Its used in an OBKV to be serialized in grenad files. +#[repr(u8)] +#[derive(Clone, Copy, PartialOrd, PartialEq, Debug)] +pub enum DelAdd { + Deletion = 0, + Addition = 1, +} + +impl Key for DelAdd { + const BYTES_SIZE: usize = std::mem::size_of::(); + type BYTES = [u8; Self::BYTES_SIZE]; + + fn to_be_bytes(&self) -> Self::BYTES { + u8::to_be_bytes(*self as u8) + } + + fn from_be_bytes(array: Self::BYTES) -> Self { + match u8::from_be_bytes(array) { + 0 => Self::Deletion, + 1 => Self::Addition, + otherwise => unreachable!("DelAdd has only 2 variants, unknown variant: {}", otherwise), + } + } +} + +/// Creates a Kv> from Kv +/// +/// if deletion is `true`, the value will be inserted behind a DelAdd::Deletion key. +/// if addition is `true`, the value will be inserted behind a DelAdd::Addition key. +/// if both deletion and addition are `true, the value will be inserted in both keys. +pub fn into_del_add_obkv( + reader: obkv::KvReader, + deletion: bool, + addition: bool, + buffer: &mut Vec, +) -> Result<(), std::io::Error> { + let mut writer = obkv::KvWriter::new(buffer); + let mut value_buffer = Vec::new(); + for (key, value) in reader.iter() { + value_buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(&mut value_buffer); + if deletion { + value_writer.insert(DelAdd::Deletion, value)?; + } + if addition { + value_writer.insert(DelAdd::Addition, value)?; + } + value_writer.finish()?; + writer.insert(key, &value_buffer)?; + } + + writer.finish() +} diff --git a/milli/src/update/index_documents/helpers/merge_functions.rs b/milli/src/update/index_documents/helpers/merge_functions.rs index 90cfa0f60..6317b5610 100644 --- a/milli/src/update/index_documents/helpers/merge_functions.rs +++ b/milli/src/update/index_documents/helpers/merge_functions.rs @@ -6,6 +6,7 @@ use std::result::Result as StdResult; use roaring::RoaringBitmap; use crate::heed_codec::CboRoaringBitmapCodec; +use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::index_documents::transform::Operation; use crate::Result; @@ -76,55 +77,118 @@ pub fn keep_latest_obkv<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result) { +pub fn merge_two_del_add_obkvs( + base: obkv::KvReaderU16, + update: obkv::KvReaderU16, + merge_additions: bool, + buffer: &mut Vec, +) { use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; buffer.clear(); let mut writer = obkv::KvWriter::new(buffer); + let mut value_buffer = Vec::new(); for eob in merge_join_by(base.iter(), update.iter(), |(b, _), (u, _)| b.cmp(u)) { match eob { - Both(_, (k, v)) | Left((k, v)) | Right((k, v)) => writer.insert(k, v).unwrap(), + Left((k, v)) => { + if merge_additions { + writer.insert(k, v).unwrap() + } else { + // If merge_additions is false, recreate an obkv keeping the deletions only. + value_buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(&mut value_buffer); + let base_reader = KvReaderDelAdd::new(v); + + if let Some(deletion) = base_reader.get(DelAdd::Deletion) { + value_writer.insert(DelAdd::Deletion, deletion).unwrap(); + value_writer.finish().unwrap(); + writer.insert(k, &value_buffer).unwrap() + } + } + } + Right((k, v)) => writer.insert(k, v).unwrap(), + Both((k, base), (_, update)) => { + // merge deletions and additions. + value_buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(&mut value_buffer); + let base_reader = KvReaderDelAdd::new(base); + let update_reader = KvReaderDelAdd::new(update); + + // keep newest deletion. + if let Some(deletion) = + update_reader.get(DelAdd::Deletion).or(base_reader.get(DelAdd::Deletion)) + { + value_writer.insert(DelAdd::Deletion, deletion).unwrap(); + } + + // keep base addition only if merge_additions is true. + let base_addition = + merge_additions.then(|| base_reader.get(DelAdd::Addition)).flatten(); + // keep newest addition. + if let Some(addition) = update_reader.get(DelAdd::Addition).or(base_addition) { + value_writer.insert(DelAdd::Addition, addition).unwrap(); + } + + value_writer.finish().unwrap(); + writer.insert(k, &value_buffer).unwrap() + } } } writer.finish().unwrap(); } -/// Merge all the obks in the order we see them. -pub fn merge_obkvs_and_operations<'a>( +/// Merge all the obkvs from the newest to the oldest. +fn inner_merge_del_add_obkvs<'a>( + obkvs: &[Cow<'a, [u8]>], + merge_additions: bool, +) -> Result> { + // pop the newest operation from the list. + let (newest, obkvs) = obkvs.split_last().unwrap(); + // keep the operation type for the returned value. + let newest_operation_type = newest[0]; + + // treat the newest obkv as the starting point of the merge. + let mut acc_operation_type = newest_operation_type; + let mut acc = newest[1..].to_vec(); + let mut buffer = Vec::new(); + // reverse iter from the most recent to the oldest. + for current in obkvs.into_iter().rev() { + // if in the previous iteration there was a complete deletion, + // stop the merge process. + if acc_operation_type == Operation::Deletion as u8 { + break; + } + + let newest = obkv::KvReader::new(&acc); + let oldest = obkv::KvReader::new(¤t[1..]); + merge_two_del_add_obkvs(oldest, newest, merge_additions, &mut buffer); + + // we want the result of the merge into our accumulator. + std::mem::swap(&mut acc, &mut buffer); + acc_operation_type = current[0]; + } + + acc.insert(0, newest_operation_type); + Ok(Cow::from(acc)) +} + +/// Merge all the obkvs from the newest to the oldest. +pub fn obkvs_merge_additions_and_deletions<'a>( _key: &[u8], obkvs: &[Cow<'a, [u8]>], ) -> Result> { - // [add, add, delete, add, add] - // we can ignore everything that happened before the last delete. - let starting_position = - obkvs.iter().rposition(|obkv| obkv[0] == Operation::Deletion as u8).unwrap_or(0); + inner_merge_del_add_obkvs(obkvs, true) +} - // [add, add, delete] - // if the last operation was a deletion then we simply return the deletion - if starting_position == obkvs.len() - 1 && obkvs.last().unwrap()[0] == Operation::Deletion as u8 - { - return Ok(obkvs[obkvs.len() - 1].clone()); - } - let mut buffer = Vec::new(); - - // (add, add, delete) [add, add] - // in the other case, no deletion will be encountered during the merge - let mut ret = - obkvs[starting_position..].iter().cloned().fold(Vec::new(), |mut acc, current| { - let first = obkv::KvReader::new(&acc); - let second = obkv::KvReader::new(¤t[1..]); - merge_two_obkvs(first, second, &mut buffer); - - // we want the result of the merge into our accumulator - std::mem::swap(&mut acc, &mut buffer); - acc - }); - - ret.insert(0, Operation::Addition as u8); - Ok(Cow::from(ret)) +/// Merge all the obkvs deletions from the newest to the oldest and keep only the newest additions. +pub fn obkvs_keep_last_addition_merge_deletions<'a>( + _key: &[u8], + obkvs: &[Cow<'a, [u8]>], +) -> Result> { + inner_merge_del_add_obkvs(obkvs, false) } pub fn merge_cbo_roaring_bitmaps<'a>( diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index 3dc9f8172..8f70a2de2 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -14,8 +14,8 @@ pub use grenad_helpers::{ }; pub use merge_functions::{ concat_u32s_array, keep_first, keep_latest_obkv, merge_btreeset_string, - merge_cbo_roaring_bitmaps, merge_obkvs_and_operations, merge_roaring_bitmaps, merge_two_obkvs, - serialize_roaring_bitmap, MergeFn, + merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, obkvs_keep_last_addition_merge_deletions, + obkvs_merge_additions_and_deletions, serialize_roaring_bitmap, MergeFn, }; use crate::MAX_WORD_LENGTH; diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 7a0c811a8..614b56b30 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -7,18 +7,20 @@ use std::io::{Read, Seek}; use fxhash::FxHashMap; use heed::RoTxn; use itertools::Itertools; -use obkv::{KvReader, KvWriter}; +use obkv::{KvReader, KvReaderU16, KvWriter}; use roaring::RoaringBitmap; use serde_json::Value; use smartstring::SmartString; use super::helpers::{ - create_sorter, create_writer, keep_latest_obkv, merge_obkvs_and_operations, MergeFn, + create_sorter, create_writer, obkvs_keep_last_addition_merge_deletions, + obkvs_merge_additions_and_deletions, MergeFn, }; use super::{IndexDocumentsMethod, IndexerConfig}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; use crate::index::{db_name, main_key}; +use crate::update::del_add::into_del_add_obkv; use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep}; use crate::{ FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32, @@ -106,8 +108,8 @@ impl<'a, 'i> Transform<'a, 'i> { // We must choose the appropriate merge function for when two or more documents // with the same user id must be merged or fully replaced in the same batch. let merge_function = match index_documents_method { - IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv, - IndexDocumentsMethod::UpdateDocuments => merge_obkvs_and_operations, + IndexDocumentsMethod::ReplaceDocuments => obkvs_keep_last_addition_merge_deletions, + IndexDocumentsMethod::UpdateDocuments => obkvs_merge_additions_and_deletions, }; // We initialize the sorter with the user indexing settings. @@ -223,19 +225,21 @@ impl<'a, 'i> Transform<'a, 'i> { let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) { Entry::Occupied(entry) => *entry.get() as u32, Entry::Vacant(entry) => { - // If the document was already in the db we mark it as a replaced document. - // It'll be deleted later. - if let Some(docid) = external_documents_ids.get(entry.key()) { - // If it was already in the list of replaced documents it means it was deleted - // by the remove_document method. We should starts as if it never existed. - if self.replaced_documents_ids.insert(docid) { - original_docid = Some(docid); + let docid = match external_documents_ids.get(entry.key()) { + Some(docid) => { + // If it was already in the list of replaced documents it means it was deleted + // by the remove_document method. We should starts as if it never existed. + if self.replaced_documents_ids.insert(docid) { + original_docid = Some(docid); + } + + docid } - } - let docid = self - .available_documents_ids - .next() - .ok_or(UserError::DocumentLimitReached)?; + None => self + .available_documents_ids + .next() + .ok_or(UserError::DocumentLimitReached)?, + }; entry.insert(docid as u64); docid } @@ -263,16 +267,28 @@ impl<'a, 'i> Transform<'a, 'i> { skip_insertion = true; } else { // we associate the base document with the new key, everything will get merged later. + let keep_original_version = + self.index_documents_method == IndexDocumentsMethod::UpdateDocuments; document_sorter_buffer.clear(); document_sorter_buffer.push(Operation::Addition as u8); - document_sorter_buffer.extend_from_slice(base_obkv); + into_del_add_obkv( + KvReaderU16::new(base_obkv), + true, + keep_original_version, + &mut document_sorter_buffer, + )?; self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? { Some(flattened_obkv) => { // we recreate our buffer with the flattened documents document_sorter_buffer.clear(); document_sorter_buffer.push(Operation::Addition as u8); - document_sorter_buffer.extend_from_slice(&flattened_obkv); + into_del_add_obkv( + KvReaderU16::new(&flattened_obkv), + true, + keep_original_version, + &mut document_sorter_buffer, + )?; self.flattened_sorter .insert(docid.to_be_bytes(), &document_sorter_buffer)? } @@ -288,7 +304,12 @@ impl<'a, 'i> Transform<'a, 'i> { document_sorter_buffer.clear(); document_sorter_buffer.push(Operation::Addition as u8); - document_sorter_buffer.extend_from_slice(&obkv_buffer); + into_del_add_obkv( + KvReaderU16::new(&obkv_buffer), + false, + true, + &mut document_sorter_buffer, + )?; // We use the extracted/generated user id as the key for this document. self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; @@ -296,7 +317,12 @@ impl<'a, 'i> Transform<'a, 'i> { Some(flattened_obkv) => { document_sorter_buffer.clear(); document_sorter_buffer.push(Operation::Addition as u8); - document_sorter_buffer.extend_from_slice(&flattened_obkv); + into_del_add_obkv( + KvReaderU16::new(&flattened_obkv), + false, + true, + &mut document_sorter_buffer, + )?; self.flattened_sorter .insert(docid.to_be_bytes(), &document_sorter_buffer)? } @@ -354,19 +380,25 @@ impl<'a, 'i> Transform<'a, 'i> { let external_documents_ids = self.index.external_documents_ids(wtxn)?; let mut documents_deleted = 0; + let mut document_sorter_buffer = Vec::new(); for to_remove in to_remove { if should_abort() { return Err(Error::InternalError(InternalError::AbortedIndexation)); } - match self.new_external_documents_ids_builder.entry((*to_remove).into()) { + // Check if the document has been added in the current indexing process. + let deleted_from_current = match self + .new_external_documents_ids_builder + .entry((*to_remove).into()) + { // if the document was added in a previous iteration of the transform we make it as deleted in the sorters. Entry::Occupied(entry) => { let doc_id = *entry.get() as u32; - self.original_sorter - .insert(doc_id.to_be_bytes(), [Operation::Deletion as u8])?; - self.flattened_sorter - .insert(doc_id.to_be_bytes(), [Operation::Deletion as u8])?; + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Deletion as u8); + obkv::KvWriterU16::new(&mut document_sorter_buffer).finish().unwrap(); + self.original_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?; + self.flattened_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?; // we must NOT update the list of replaced_documents_ids // Either: @@ -375,21 +407,69 @@ impl<'a, 'i> Transform<'a, 'i> { // we're removing it there is nothing to do. self.new_documents_ids.remove(doc_id); entry.remove_entry(); + true } - Entry::Vacant(entry) => { - // If the document was already in the db we mark it as a `to_delete` document. - // It'll be deleted later. We don't need to push anything to the sorters. - if let Some(docid) = external_documents_ids.get(entry.key()) { - self.replaced_documents_ids.insert(docid); - } else { - // if the document is nowehere to be found, there is nothing to do and we must NOT - // increment the count of documents_deleted - continue; - } - } + Entry::Vacant(_) => false, }; - documents_deleted += 1; + // If the document was already in the db we mark it as a `to_delete` document. + // Then we push the document in sorters in deletion mode. + let deleted_from_db = match external_documents_ids.get(&to_remove) { + Some(docid) => { + self.replaced_documents_ids.insert(docid); + + // fetch the obkv document + let original_key = BEU32::new(docid); + let base_obkv = self + .index + .documents + .remap_data_type::() + .get(wtxn, &original_key)? + .ok_or(InternalError::DatabaseMissingEntry { + db_name: db_name::DOCUMENTS, + key: None, + })?; + + // push it as to delete in the original_sorter + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Deletion as u8); + into_del_add_obkv( + KvReaderU16::new(base_obkv), + true, + false, + &mut document_sorter_buffer, + )?; + self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + + // flatten it and push it as to delete in the flattened_sorter + match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? { + Some(flattened_obkv) => { + // we recreate our buffer with the flattened documents + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Deletion as u8); + into_del_add_obkv( + KvReaderU16::new(&flattened_obkv), + true, + false, + &mut document_sorter_buffer, + )?; + self.flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_buffer)? + } + None => self + .flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_buffer)?, + } + + true + } + None => false, + }; + + // increase counter only if the document existed somewhere before. + if deleted_from_current || deleted_from_db { + documents_deleted += 1; + } } Ok(documents_deleted) @@ -589,9 +669,7 @@ impl<'a, 'i> Transform<'a, 'i> { let mut documents_count = 0; while let Some((key, val)) = iter.next()? { - if val[0] == Operation::Deletion as u8 { - continue; - } + // skip first byte corresponding to the operation type (Deletion or Addition). let val = &val[1..]; // send a callback to show at which step we are @@ -631,9 +709,7 @@ impl<'a, 'i> Transform<'a, 'i> { // We get rids of the `Operation` byte and skip the deleted documents as well. let mut iter = self.flattened_sorter.into_stream_merger_iter()?; while let Some((key, val)) = iter.next()? { - if val[0] == Operation::Deletion as u8 { - continue; - } + // skip first byte corresponding to the operation type (Deletion or Addition). let val = &val[1..]; writer.insert(key, val)?; } @@ -711,6 +787,7 @@ impl<'a, 'i> Transform<'a, 'i> { ); let mut obkv_buffer = Vec::new(); + let mut document_sorter_buffer = Vec::new(); for result in self.index.all_documents(wtxn)? { let (docid, obkv) = result?; @@ -725,7 +802,9 @@ impl<'a, 'i> Transform<'a, 'i> { } let buffer = obkv_writer.into_inner()?; - original_writer.insert(docid.to_be_bytes(), &buffer)?; + document_sorter_buffer.clear(); + into_del_add_obkv(KvReaderU16::new(buffer), true, true, &mut document_sorter_buffer)?; + original_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?; // Once we have the document. We're going to flatten it // and insert it in the flattened sorter. @@ -760,7 +839,9 @@ impl<'a, 'i> Transform<'a, 'i> { let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?; writer.insert(fid, &value)?; } - flattened_writer.insert(docid.to_be_bytes(), &buffer)?; + document_sorter_buffer.clear(); + into_del_add_obkv(KvReaderU16::new(&buffer), true, true, &mut document_sorter_buffer)?; + flattened_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?; } // Once we have written all the documents, we extract @@ -824,38 +905,86 @@ mod test { #[test] fn merge_obkvs() { - let mut doc_0 = Vec::new(); - let mut kv_writer = KvWriter::new(&mut doc_0); + let mut additive_doc_0 = Vec::new(); + let mut deletive_doc_0 = Vec::new(); + let mut del_add_doc_0 = Vec::new(); + let mut kv_writer = KvWriter::memory(); kv_writer.insert(0_u8, [0]).unwrap(); - kv_writer.finish().unwrap(); - doc_0.insert(0, Operation::Addition as u8); + let buffer = kv_writer.into_inner().unwrap(); + into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_0).unwrap(); + additive_doc_0.insert(0, Operation::Addition as u8); + into_del_add_obkv(KvReaderU16::new(&buffer), true, false, &mut deletive_doc_0).unwrap(); + deletive_doc_0.insert(0, Operation::Deletion as u8); + into_del_add_obkv(KvReaderU16::new(&buffer), true, true, &mut del_add_doc_0).unwrap(); + del_add_doc_0.insert(0, Operation::Addition as u8); - let ret = merge_obkvs_and_operations(&[], &[Cow::from(doc_0.as_slice())]).unwrap(); - assert_eq!(*ret, doc_0); + let mut additive_doc_1 = Vec::new(); + let mut kv_writer = KvWriter::memory(); + kv_writer.insert(1_u8, [1]).unwrap(); + let buffer = kv_writer.into_inner().unwrap(); + into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_1).unwrap(); + additive_doc_1.insert(0, Operation::Addition as u8); - let ret = merge_obkvs_and_operations( + let mut additive_doc_0_1 = Vec::new(); + let mut kv_writer = KvWriter::memory(); + kv_writer.insert(0_u8, [0]).unwrap(); + kv_writer.insert(1_u8, [1]).unwrap(); + let buffer = kv_writer.into_inner().unwrap(); + into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_0_1).unwrap(); + additive_doc_0_1.insert(0, Operation::Addition as u8); + + let ret = obkvs_merge_additions_and_deletions(&[], &[Cow::from(additive_doc_0.as_slice())]) + .unwrap(); + assert_eq!(*ret, additive_doc_0); + + let ret = obkvs_merge_additions_and_deletions( &[], - &[Cow::from([Operation::Deletion as u8].as_slice()), Cow::from(doc_0.as_slice())], + &[Cow::from(deletive_doc_0.as_slice()), Cow::from(additive_doc_0.as_slice())], ) .unwrap(); - assert_eq!(*ret, doc_0); + assert_eq!(*ret, del_add_doc_0); - let ret = merge_obkvs_and_operations( + let ret = obkvs_merge_additions_and_deletions( &[], - &[Cow::from(doc_0.as_slice()), Cow::from([Operation::Deletion as u8].as_slice())], + &[Cow::from(additive_doc_0.as_slice()), Cow::from(deletive_doc_0.as_slice())], ) .unwrap(); - assert_eq!(*ret, [Operation::Deletion as u8]); + assert_eq!(*ret, deletive_doc_0); - let ret = merge_obkvs_and_operations( + let ret = obkvs_merge_additions_and_deletions( &[], &[ - Cow::from([Operation::Addition as u8, 1].as_slice()), - Cow::from([Operation::Deletion as u8].as_slice()), - Cow::from(doc_0.as_slice()), + Cow::from(additive_doc_1.as_slice()), + Cow::from(deletive_doc_0.as_slice()), + Cow::from(additive_doc_0.as_slice()), ], ) .unwrap(); - assert_eq!(*ret, doc_0); + assert_eq!(*ret, del_add_doc_0); + + let ret = obkvs_merge_additions_and_deletions( + &[], + &[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())], + ) + .unwrap(); + assert_eq!(*ret, additive_doc_0_1); + + let ret = obkvs_keep_last_addition_merge_deletions( + &[], + &[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())], + ) + .unwrap(); + assert_eq!(*ret, additive_doc_0); + + let ret = obkvs_keep_last_addition_merge_deletions( + &[], + &[ + Cow::from(deletive_doc_0.as_slice()), + Cow::from(additive_doc_1.as_slice()), + Cow::from(additive_doc_0.as_slice()), + ], + ) + .unwrap(); + assert_eq!(*ret, del_add_doc_0); } } diff --git a/milli/src/update/mod.rs b/milli/src/update/mod.rs index 9982957e5..6224995a3 100644 --- a/milli/src/update/mod.rs +++ b/milli/src/update/mod.rs @@ -21,6 +21,7 @@ pub use self::words_prefixes_fst::WordsPrefixesFst; mod available_documents_ids; mod clear_documents; +pub(crate) mod del_add; mod delete_documents; pub(crate) mod facet; mod index_documents;