From 895ab2906c1a2478d5550f3c6763a91967561f47 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 16 Feb 2023 18:42:47 +0100 Subject: [PATCH] apply review suggestions --- .../helpers/merge_functions.rs | 36 ++++++++++++++ .../src/update/index_documents/helpers/mod.rs | 4 +- milli/src/update/index_documents/transform.rs | 49 ++++++------------- 3 files changed, 52 insertions(+), 37 deletions(-) diff --git a/milli/src/update/index_documents/helpers/merge_functions.rs b/milli/src/update/index_documents/helpers/merge_functions.rs index 6e3aa7ec7..7b8891a7a 100644 --- a/milli/src/update/index_documents/helpers/merge_functions.rs +++ b/milli/src/update/index_documents/helpers/merge_functions.rs @@ -6,6 +6,7 @@ use roaring::RoaringBitmap; use super::read_u32_ne_bytes; use crate::heed_codec::CboRoaringBitmapCodec; +use crate::update::index_documents::transform::Operation; use crate::Result; pub type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> Result>; @@ -73,6 +74,41 @@ pub fn merge_two_obkvs(base: obkv::KvReaderU16, update: obkv::KvReaderU16, buffe writer.finish().unwrap(); } +/// Merge all the obks in the order we see them. +pub fn merge_obkvs_and_operations<'a>( + _key: &[u8], + obkvs: &[Cow<'a, [u8]>], +) -> Result> { + // [add, add, delete, add, add] + // we can ignore everything that happened before the last delete. + let starting_position = + obkvs.iter().rposition(|obkv| obkv[0] == Operation::Deletion as u8).unwrap_or(0); + + // [add, add, delete] + // if the last operation was a deletion then we simply return the deletion + if starting_position == obkvs.len() - 1 && obkvs.last().unwrap()[0] == Operation::Deletion as u8 + { + return Ok(obkvs[obkvs.len() - 1].clone()); + } + let mut buffer = Vec::new(); + + // (add, add, delete) [add, add] + // in the other case, no deletion will be encountered during the merge + let mut ret = + obkvs[starting_position..].iter().cloned().fold(Vec::new(), |mut acc, current| { + let first = obkv::KvReader::new(&acc); + let second = obkv::KvReader::new(¤t[1..]); + merge_two_obkvs(first, second, &mut buffer); + + // we want the result of the merge into our accumulator + std::mem::swap(&mut acc, &mut buffer); + acc + }); + + ret.insert(0, Operation::Addition as u8); + Ok(Cow::from(ret)) +} + pub fn merge_cbo_roaring_bitmaps<'a>( _key: &[u8], values: &[Cow<'a, [u8]>], diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index 3a25851e4..ce6a2abe9 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -14,8 +14,8 @@ pub use grenad_helpers::{ }; pub use merge_functions::{ concat_u32s_array, keep_first, keep_latest_obkv, merge_cbo_roaring_bitmaps, - merge_roaring_bitmaps, merge_two_obkvs, roaring_bitmap_from_u32s_array, - serialize_roaring_bitmap, MergeFn, + merge_obkvs_and_operations, merge_roaring_bitmaps, merge_two_obkvs, + roaring_bitmap_from_u32s_array, serialize_roaring_bitmap, MergeFn, }; use crate::MAX_WORD_LENGTH; diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 0624db468..6097278a7 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -12,7 +12,9 @@ use roaring::RoaringBitmap; use serde_json::Value; use smartstring::SmartString; -use super::helpers::{create_sorter, create_writer, keep_latest_obkv, merge_two_obkvs, MergeFn}; +use super::helpers::{ + create_sorter, create_writer, keep_latest_obkv, merge_obkvs_and_operations, MergeFn, +}; use super::{IndexDocumentsMethod, IndexerConfig}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; @@ -66,7 +68,7 @@ pub struct Transform<'a, 'i> { /// This enum is specific to the grenad sorter stored in the transform. /// It's used as the first byte of the grenads and tells you if the document id was an addition or a deletion. #[repr(u8)] -enum Operation { +pub enum Operation { Addition, Deletion, } @@ -327,7 +329,16 @@ impl<'a, 'i> Transform<'a, 'i> { Ok(documents_count) } - /// The counter part of `read_documents` that removes documents that may have been inserted into the transform previously. + /// The counter part of `read_documents` that removes documents either from the transform or the database. + /// It can be called before, after or in between two calls of the `read_documents`. + /// + /// It needs to update all the internal datastructure in the transform. + /// - If the document is coming from the database -> it's marked as a to_delete document + /// - If the document to remove was inserted by the `read_documents` method before AND was present in the db, + /// it's marked as `to_delete` + added into the grenad to ensure we don't reinsert it. + /// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db, + /// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids. + /// - If the document to remove was not present in either the db or the transform we do nothing. pub fn remove_documents( &mut self, mut to_remove: Vec, @@ -783,38 +794,6 @@ impl<'a, 'i> Transform<'a, 'i> { } } -/// Merge all the obks in the order we see them. -fn merge_obkvs_and_operations<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result> { - // [add, add, delete, add, add] - // we can ignore everything that happened before the last delete. - let starting_position = - obkvs.iter().rposition(|obkv| obkv[0] == Operation::Deletion as u8).unwrap_or(0); - - // [add, add, delete] - // if the last operation was a deletion then we simply return the deletion - if starting_position == obkvs.len() - 1 && obkvs.last().unwrap()[0] == Operation::Deletion as u8 - { - return Ok(obkvs[obkvs.len() - 1].clone()); - } - let mut buffer = Vec::new(); - - // (add, add, delete) [add, add] - // in the other case, no deletion will be encountered during the merge - let mut ret = - obkvs[starting_position..].iter().cloned().fold(Vec::new(), |mut acc, current| { - let first = obkv::KvReader::new(&acc); - let second = obkv::KvReader::new(¤t[1..]); - merge_two_obkvs(first, second, &mut buffer); - - // we want the result of the merge into our accumulator - std::mem::swap(&mut acc, &mut buffer); - acc - }); - - ret.insert(0, Operation::Addition as u8); - Ok(Cow::from(ret)) -} - /// Drops all the value of type `U` in vec, and reuses the allocation to create a `Vec`. /// /// The size and alignment of T and U must match.