From 8f64fba1cebe968036ec562ecc79f77eec9a396c Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 8 Feb 2023 12:53:38 +0100 Subject: [PATCH] rewrite the current transform to handle a new byte specifying the kind of operation it's merging --- milli/src/update/index_documents/transform.rs | 102 +++++++++++++++--- 1 file changed, 89 insertions(+), 13 deletions(-) diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 9e07e78ad..fe8f06b6c 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -12,7 +12,9 @@ use roaring::RoaringBitmap; use serde_json::Value; use smartstring::SmartString; -use super::helpers::{create_sorter, create_writer, keep_latest_obkv, merge_obkvs, MergeFn}; +use super::helpers::{ + create_sorter, create_writer, keep_latest_obkv, merge_obkvs, merge_two_obkvs, MergeFn, +}; use super::{IndexDocumentsMethod, IndexerConfig}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; @@ -59,6 +61,12 @@ pub struct Transform<'a, 'i> { documents_count: usize, } +#[repr(u8)] +enum Operation { + Addition, + Deletion, +} + /// Create a mapping between the field ids found in the document batch and the one that were /// already present in the index. /// @@ -94,7 +102,7 @@ impl<'a, 'i> Transform<'a, 'i> { // with the same user id must be merged or fully replaced in the same batch. let merge_function = match index_documents_method { IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv, - IndexDocumentsMethod::UpdateDocuments => merge_obkvs, + IndexDocumentsMethod::UpdateDocuments => merge_obkvs_and_operations, }; // We initialize the sorter with the user indexing settings. @@ -161,6 +169,7 @@ impl<'a, 'i> Transform<'a, 'i> { self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; let mut obkv_buffer = Vec::new(); + let mut document_sorter_buffer = Vec::new(); let mut documents_count = 0; let mut docid_buffer: Vec = Vec::new(); let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new(); @@ -248,26 +257,46 @@ impl<'a, 'i> Transform<'a, 'i> { skip_insertion = true; } else { // we associate the base document with the new key, everything will get merged later. - self.original_sorter.insert(docid.to_be_bytes(), base_obkv)?; + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_buffer.extend_from_slice(base_obkv); + self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? { - Some(buffer) => { - self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)? + Some(flattened_obkv) => { + // we recreate our buffer with the flattened documents + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_buffer.extend_from_slice(&flattened_obkv); + self.flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_buffer)? } - None => self.flattened_sorter.insert(docid.to_be_bytes(), base_obkv)?, + None => self + .flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_buffer)?, } } } if !skip_insertion { self.new_documents_ids.insert(docid); + + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_buffer.extend_from_slice(&obkv_buffer); // We use the extracted/generated user id as the key for this document. - self.original_sorter.insert(docid.to_be_bytes(), obkv_buffer.clone())?; + self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? { - Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?, - None => { - self.flattened_sorter.insert(docid.to_be_bytes(), obkv_buffer.clone())? + Some(flattened_obkv) => { + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_buffer.extend_from_slice(&flattened_obkv); + self.flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_buffer)? } + None => self + .flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_buffer)?, } } documents_count += 1; @@ -487,6 +516,11 @@ impl<'a, 'i> Transform<'a, 'i> { let mut documents_count = 0; while let Some((key, val)) = iter.next()? { + if val[0] == Operation::Deletion as u8 { + continue; + } + let val = &val[1..]; + // send a callback to show at which step we are documents_count += 1; progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { @@ -518,9 +552,18 @@ impl<'a, 'i> Transform<'a, 'i> { self.indexer_settings.chunk_compression_level, tempfile::tempfile()?, ); - // Once we have written all the documents into the final sorter, we write the documents - // into this writer, extract the file and reset the seek to be able to read it again. - self.flattened_sorter.write_into_stream_writer(&mut writer)?; + + // Once we have written all the documents into the final sorter, we write the nested documents + // into this writer. + // We get rids of the `Operation` byte and skip the deleted documents as well. + let mut iter = self.flattened_sorter.into_stream_merger_iter()?; + while let Some((key, val)) = iter.next()? { + if val[0] == Operation::Deletion as u8 { + continue; + } + let val = &val[1..]; + writer.insert(key, val)?; + } let mut flattened_documents = writer.into_inner()?; flattened_documents.rewind()?; @@ -677,6 +720,39 @@ impl<'a, 'i> Transform<'a, 'i> { } } +/// Merge all the obks in the order we see them. +fn merge_obkvs_and_operations<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result> { + // [add, add, delete, add, add] + // we can ignore everything that happened before the last delete. + let starting_position = obkvs + .iter() + .rev() + .position(|obkv| obkv[0] == Operation::Deletion as u8) + .map_or(0, |pos| obkvs.len() - pos); + + // [add, add, delete] + // if the last operation was a deletion then we simply return the deletion + if starting_position == obkvs.len() { + return Ok(obkvs[obkvs.len() - 1].clone()); + } + let mut buffer = Vec::new(); + + // (add, add, delete) [add, add] + // in the other case, no deletion will be encountered during the merge + Ok(obkvs[starting_position..] + .iter() + .cloned() + .reduce(|acc, current| { + let first = obkv::KvReader::new(&acc[1..]); + let second = obkv::KvReader::new(¤t[1..]); + merge_two_obkvs(first, second, &mut buffer); + // TODO: do this only once at the end + buffer.insert(0, Operation::Addition as u8); + Cow::from(buffer.clone()) + }) + .unwrap()) +} + /// Drops all the value of type `U` in vec, and reuses the allocation to create a `Vec`. /// /// The size and alignment of T and U must match.