rewrite the current transform to handle a new byte specifying the kind of operation it's merging

This commit is contained in:
Tamo 2023-02-08 12:53:38 +01:00
parent 9882029fa4
commit 8f64fba1ce
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69

View File

@ -12,7 +12,9 @@ use roaring::RoaringBitmap;
use serde_json::Value;
use smartstring::SmartString;
use super::helpers::{create_sorter, create_writer, keep_latest_obkv, merge_obkvs, MergeFn};
use super::helpers::{
create_sorter, create_writer, keep_latest_obkv, merge_obkvs, merge_two_obkvs, MergeFn,
};
use super::{IndexDocumentsMethod, IndexerConfig};
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
use crate::error::{Error, InternalError, UserError};
@ -59,6 +61,12 @@ pub struct Transform<'a, 'i> {
documents_count: usize,
}
#[repr(u8)]
enum Operation {
Addition,
Deletion,
}
/// Create a mapping between the field ids found in the document batch and the one that were
/// already present in the index.
///
@ -94,7 +102,7 @@ impl<'a, 'i> Transform<'a, 'i> {
// with the same user id must be merged or fully replaced in the same batch.
let merge_function = match index_documents_method {
IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv,
IndexDocumentsMethod::UpdateDocuments => merge_obkvs,
IndexDocumentsMethod::UpdateDocuments => merge_obkvs_and_operations,
};
// We initialize the sorter with the user indexing settings.
@ -161,6 +169,7 @@ impl<'a, 'i> Transform<'a, 'i> {
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
let mut obkv_buffer = Vec::new();
let mut document_sorter_buffer = Vec::new();
let mut documents_count = 0;
let mut docid_buffer: Vec<u8> = Vec::new();
let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new();
@ -248,26 +257,46 @@ impl<'a, 'i> Transform<'a, 'i> {
skip_insertion = true;
} else {
// we associate the base document with the new key, everything will get merged later.
self.original_sorter.insert(docid.to_be_bytes(), base_obkv)?;
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8);
document_sorter_buffer.extend_from_slice(base_obkv);
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? {
Some(buffer) => {
self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?
Some(flattened_obkv) => {
// we recreate our buffer with the flattened documents
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8);
document_sorter_buffer.extend_from_slice(&flattened_obkv);
self.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
}
None => self.flattened_sorter.insert(docid.to_be_bytes(), base_obkv)?,
None => self
.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
}
}
}
if !skip_insertion {
self.new_documents_ids.insert(docid);
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8);
document_sorter_buffer.extend_from_slice(&obkv_buffer);
// We use the extracted/generated user id as the key for this document.
self.original_sorter.insert(docid.to_be_bytes(), obkv_buffer.clone())?;
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? {
Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?,
None => {
self.flattened_sorter.insert(docid.to_be_bytes(), obkv_buffer.clone())?
Some(flattened_obkv) => {
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8);
document_sorter_buffer.extend_from_slice(&flattened_obkv);
self.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
}
None => self
.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
}
}
documents_count += 1;
@ -487,6 +516,11 @@ impl<'a, 'i> Transform<'a, 'i> {
let mut documents_count = 0;
while let Some((key, val)) = iter.next()? {
if val[0] == Operation::Deletion as u8 {
continue;
}
let val = &val[1..];
// send a callback to show at which step we are
documents_count += 1;
progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {
@ -518,9 +552,18 @@ impl<'a, 'i> Transform<'a, 'i> {
self.indexer_settings.chunk_compression_level,
tempfile::tempfile()?,
);
// Once we have written all the documents into the final sorter, we write the documents
// into this writer, extract the file and reset the seek to be able to read it again.
self.flattened_sorter.write_into_stream_writer(&mut writer)?;
// Once we have written all the documents into the final sorter, we write the nested documents
// into this writer.
// We get rids of the `Operation` byte and skip the deleted documents as well.
let mut iter = self.flattened_sorter.into_stream_merger_iter()?;
while let Some((key, val)) = iter.next()? {
if val[0] == Operation::Deletion as u8 {
continue;
}
let val = &val[1..];
writer.insert(key, val)?;
}
let mut flattened_documents = writer.into_inner()?;
flattened_documents.rewind()?;
@ -677,6 +720,39 @@ impl<'a, 'i> Transform<'a, 'i> {
}
}
/// Merge all the obks in the order we see them.
fn merge_obkvs_and_operations<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
// [add, add, delete, add, add]
// we can ignore everything that happened before the last delete.
let starting_position = obkvs
.iter()
.rev()
.position(|obkv| obkv[0] == Operation::Deletion as u8)
.map_or(0, |pos| obkvs.len() - pos);
// [add, add, delete]
// if the last operation was a deletion then we simply return the deletion
if starting_position == obkvs.len() {
return Ok(obkvs[obkvs.len() - 1].clone());
}
let mut buffer = Vec::new();
// (add, add, delete) [add, add]
// in the other case, no deletion will be encountered during the merge
Ok(obkvs[starting_position..]
.iter()
.cloned()
.reduce(|acc, current| {
let first = obkv::KvReader::new(&acc[1..]);
let second = obkv::KvReader::new(&current[1..]);
merge_two_obkvs(first, second, &mut buffer);
// TODO: do this only once at the end
buffer.insert(0, Operation::Addition as u8);
Cow::from(buffer.clone())
})
.unwrap())
}
/// Drops all the value of type `U` in vec, and reuses the allocation to create a `Vec<T>`.
///
/// The size and alignment of T and U must match.