Make into_del_add_obkv parameters more human readable

This commit is contained in:
ManyTheFish 2023-11-20 10:53:40 +01:00
parent 39cbb499c2
commit d3575fb028
2 changed files with 68 additions and 32 deletions

View File

@ -32,13 +32,12 @@ impl Key for DelAdd {
/// Creates a Kv<K, Kv<DelAdd, value>> from Kv<K, value> /// Creates a Kv<K, Kv<DelAdd, value>> from Kv<K, value>
/// ///
/// if deletion is `true`, the value will be inserted behind a DelAdd::Deletion key. /// Deletion: put all the values under DelAdd::Deletion
/// if addition is `true`, the value will be inserted behind a DelAdd::Addition key. /// Addition: put all the values under DelAdd::Addition,
/// if both deletion and addition are `true, the value will be inserted in both keys. /// DeletionAndAddition: put all the values under DelAdd::Deletion and DelAdd::Addition,
pub fn into_del_add_obkv<K: obkv::Key + PartialOrd>( pub fn into_del_add_obkv<K: obkv::Key + PartialOrd>(
reader: obkv::KvReader<K>, reader: obkv::KvReader<K>,
deletion: bool, operation: DelAddOperation,
addition: bool,
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
) -> Result<(), std::io::Error> { ) -> Result<(), std::io::Error> {
let mut writer = obkv::KvWriter::new(buffer); let mut writer = obkv::KvWriter::new(buffer);
@ -46,21 +45,27 @@ pub fn into_del_add_obkv<K: obkv::Key + PartialOrd>(
for (key, value) in reader.iter() { for (key, value) in reader.iter() {
value_buffer.clear(); value_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer); let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
if deletion { if matches!(operation, DelAddOperation::Deletion | DelAddOperation::DeletionAndAddition) {
value_writer.insert(DelAdd::Deletion, value)?; value_writer.insert(DelAdd::Deletion, value)?;
} }
if addition { if matches!(operation, DelAddOperation::Addition | DelAddOperation::DeletionAndAddition) {
value_writer.insert(DelAdd::Addition, value)?; value_writer.insert(DelAdd::Addition, value)?;
} }
value_writer.finish()?; value_writer.finish()?;
if !value_buffer.is_empty() {
writer.insert(key, &value_buffer)?; writer.insert(key, &value_buffer)?;
} }
}
writer.finish() writer.finish()
} }
/// Enum controlling the side of the DelAdd obkv in which the provided value will be written.
#[derive(Debug, Clone, Copy)]
pub enum DelAddOperation {
Deletion,
Addition,
DeletionAndAddition,
}
/// Creates a Kv<K, Kv<DelAdd, value>> from two Kv<K, value> /// Creates a Kv<K, Kv<DelAdd, value>> from two Kv<K, value>
/// ///
/// putting each deletion obkv's keys under an DelAdd::Deletion /// putting each deletion obkv's keys under an DelAdd::Deletion

View File

@ -21,7 +21,7 @@ use super::{IndexDocumentsMethod, IndexerConfig};
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
use crate::error::{Error, InternalError, UserError}; use crate::error::{Error, InternalError, UserError};
use crate::index::{db_name, main_key}; use crate::index::{db_name, main_key};
use crate::update::del_add::{into_del_add_obkv, DelAdd, KvReaderDelAdd}; use crate::update::del_add::{into_del_add_obkv, DelAdd, DelAddOperation, KvReaderDelAdd};
use crate::update::index_documents::GrenadParameters; use crate::update::index_documents::GrenadParameters;
use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep}; use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep};
use crate::{ use crate::{
@ -265,8 +265,12 @@ impl<'a, 'i> Transform<'a, 'i> {
skip_insertion = true; skip_insertion = true;
} else { } else {
// we associate the base document with the new key, everything will get merged later. // we associate the base document with the new key, everything will get merged later.
let keep_original_version = let deladd_operation = match self.index_documents_method {
self.index_documents_method == IndexDocumentsMethod::UpdateDocuments; IndexDocumentsMethod::UpdateDocuments => {
DelAddOperation::DeletionAndAddition
}
IndexDocumentsMethod::ReplaceDocuments => DelAddOperation::Deletion,
};
document_sorter_key_buffer.clear(); document_sorter_key_buffer.clear();
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
@ -274,8 +278,7 @@ impl<'a, 'i> Transform<'a, 'i> {
document_sorter_value_buffer.push(Operation::Addition as u8); document_sorter_value_buffer.push(Operation::Addition as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(base_obkv), KvReaderU16::new(base_obkv),
true, deladd_operation,
keep_original_version,
&mut document_sorter_value_buffer, &mut document_sorter_value_buffer,
)?; )?;
self.original_sorter self.original_sorter
@ -287,8 +290,7 @@ impl<'a, 'i> Transform<'a, 'i> {
document_sorter_value_buffer.push(Operation::Addition as u8); document_sorter_value_buffer.push(Operation::Addition as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(&flattened_obkv), KvReaderU16::new(&flattened_obkv),
true, deladd_operation,
keep_original_version,
&mut document_sorter_value_buffer, &mut document_sorter_value_buffer,
)?; )?;
} }
@ -307,8 +309,7 @@ impl<'a, 'i> Transform<'a, 'i> {
document_sorter_value_buffer.push(Operation::Addition as u8); document_sorter_value_buffer.push(Operation::Addition as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(&obkv_buffer), KvReaderU16::new(&obkv_buffer),
false, DelAddOperation::Addition,
true,
&mut document_sorter_value_buffer, &mut document_sorter_value_buffer,
)?; )?;
// We use the extracted/generated user id as the key for this document. // We use the extracted/generated user id as the key for this document.
@ -321,8 +322,7 @@ impl<'a, 'i> Transform<'a, 'i> {
document_sorter_value_buffer.push(Operation::Addition as u8); document_sorter_value_buffer.push(Operation::Addition as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(&obkv), KvReaderU16::new(&obkv),
false, DelAddOperation::Addition,
true,
&mut document_sorter_value_buffer, &mut document_sorter_value_buffer,
)? )?
} }
@ -517,7 +517,11 @@ impl<'a, 'i> Transform<'a, 'i> {
// push it as to delete in the original_sorter // push it as to delete in the original_sorter
document_sorter_value_buffer.clear(); document_sorter_value_buffer.clear();
document_sorter_value_buffer.push(Operation::Deletion as u8); document_sorter_value_buffer.push(Operation::Deletion as u8);
into_del_add_obkv(KvReaderU16::new(base_obkv), true, false, document_sorter_value_buffer)?; into_del_add_obkv(
KvReaderU16::new(base_obkv),
DelAddOperation::Deletion,
document_sorter_value_buffer,
)?;
self.original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; self.original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
// flatten it and push it as to delete in the flattened_sorter // flatten it and push it as to delete in the flattened_sorter
@ -526,7 +530,11 @@ impl<'a, 'i> Transform<'a, 'i> {
// we recreate our buffer with the flattened documents // we recreate our buffer with the flattened documents
document_sorter_value_buffer.clear(); document_sorter_value_buffer.clear();
document_sorter_value_buffer.push(Operation::Deletion as u8); document_sorter_value_buffer.push(Operation::Deletion as u8);
into_del_add_obkv(KvReaderU16::new(&obkv), true, false, document_sorter_value_buffer)?; into_del_add_obkv(
KvReaderU16::new(&obkv),
DelAddOperation::Deletion,
document_sorter_value_buffer,
)?;
} }
self.flattened_sorter self.flattened_sorter
.insert(internal_docid.to_be_bytes(), &document_sorter_value_buffer)?; .insert(internal_docid.to_be_bytes(), &document_sorter_value_buffer)?;
@ -869,8 +877,7 @@ impl<'a, 'i> Transform<'a, 'i> {
document_sorter_value_buffer.clear(); document_sorter_value_buffer.clear();
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(buffer), KvReaderU16::new(buffer),
false, DelAddOperation::Addition,
true,
&mut document_sorter_value_buffer, &mut document_sorter_value_buffer,
)?; )?;
original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
@ -911,8 +918,7 @@ impl<'a, 'i> Transform<'a, 'i> {
document_sorter_value_buffer.clear(); document_sorter_value_buffer.clear();
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(&buffer), KvReaderU16::new(&buffer),
false, DelAddOperation::Addition,
true,
&mut document_sorter_value_buffer, &mut document_sorter_value_buffer,
)?; )?;
flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
@ -986,18 +992,38 @@ mod test {
let mut kv_writer = KvWriter::memory(); let mut kv_writer = KvWriter::memory();
kv_writer.insert(0_u8, [0]).unwrap(); kv_writer.insert(0_u8, [0]).unwrap();
let buffer = kv_writer.into_inner().unwrap(); let buffer = kv_writer.into_inner().unwrap();
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_0).unwrap(); into_del_add_obkv(
KvReaderU16::new(&buffer),
DelAddOperation::Addition,
&mut additive_doc_0,
)
.unwrap();
additive_doc_0.insert(0, Operation::Addition as u8); additive_doc_0.insert(0, Operation::Addition as u8);
into_del_add_obkv(KvReaderU16::new(&buffer), true, false, &mut deletive_doc_0).unwrap(); into_del_add_obkv(
KvReaderU16::new(&buffer),
DelAddOperation::Deletion,
&mut deletive_doc_0,
)
.unwrap();
deletive_doc_0.insert(0, Operation::Deletion as u8); deletive_doc_0.insert(0, Operation::Deletion as u8);
into_del_add_obkv(KvReaderU16::new(&buffer), true, true, &mut del_add_doc_0).unwrap(); into_del_add_obkv(
KvReaderU16::new(&buffer),
DelAddOperation::DeletionAndAddition,
&mut del_add_doc_0,
)
.unwrap();
del_add_doc_0.insert(0, Operation::Addition as u8); del_add_doc_0.insert(0, Operation::Addition as u8);
let mut additive_doc_1 = Vec::new(); let mut additive_doc_1 = Vec::new();
let mut kv_writer = KvWriter::memory(); let mut kv_writer = KvWriter::memory();
kv_writer.insert(1_u8, [1]).unwrap(); kv_writer.insert(1_u8, [1]).unwrap();
let buffer = kv_writer.into_inner().unwrap(); let buffer = kv_writer.into_inner().unwrap();
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_1).unwrap(); into_del_add_obkv(
KvReaderU16::new(&buffer),
DelAddOperation::Addition,
&mut additive_doc_1,
)
.unwrap();
additive_doc_1.insert(0, Operation::Addition as u8); additive_doc_1.insert(0, Operation::Addition as u8);
let mut additive_doc_0_1 = Vec::new(); let mut additive_doc_0_1 = Vec::new();
@ -1005,7 +1031,12 @@ mod test {
kv_writer.insert(0_u8, [0]).unwrap(); kv_writer.insert(0_u8, [0]).unwrap();
kv_writer.insert(1_u8, [1]).unwrap(); kv_writer.insert(1_u8, [1]).unwrap();
let buffer = kv_writer.into_inner().unwrap(); let buffer = kv_writer.into_inner().unwrap();
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_0_1).unwrap(); into_del_add_obkv(
KvReaderU16::new(&buffer),
DelAddOperation::Addition,
&mut additive_doc_0_1,
)
.unwrap();
additive_doc_0_1.insert(0, Operation::Addition as u8); additive_doc_0_1.insert(0, Operation::Addition as u8);
let ret = obkvs_merge_additions_and_deletions(&[], &[Cow::from(additive_doc_0.as_slice())]) let ret = obkvs_merge_additions_and_deletions(&[], &[Cow::from(additive_doc_0.as_slice())])