Remove duplication of grenad merger

This commit is contained in:
Clément Renault 2024-08-30 14:33:58 +02:00
parent 794ebcd582
commit 54f2eb4507
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 4 additions and 67 deletions

View File

@ -1,61 +0,0 @@
use std::borrow::Cow;
use std::io;
use grenad::MergeFunction;
use roaring::RoaringBitmap;
use crate::update::del_add::DelAdd;
use crate::update::new::indexer::{KvReaderDelAdd, KvWriterDelAdd};
/// Do a union of CboRoaringBitmaps on both sides of a DelAdd obkv
/// separately and outputs a new DelAdd with both unions.
pub struct DelAddRoaringBitmapMerger;
impl MergeFunction for DelAddRoaringBitmapMerger {
type Error = io::Error;
fn merge<'a>(
&self,
_key: &[u8],
values: &[Cow<'a, [u8]>],
) -> std::result::Result<Cow<'a, [u8]>, Self::Error> {
if values.len() == 1 {
Ok(values[0].clone())
} else {
// Retrieve the bitmaps from both sides
let mut del_bitmaps_bytes = Vec::new();
let mut add_bitmaps_bytes = Vec::new();
for value in values {
let obkv: &KvReaderDelAdd = value.as_ref().into();
if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) {
del_bitmaps_bytes.push(bitmap_bytes);
}
if let Some(bitmap_bytes) = obkv.get(DelAdd::Addition) {
add_bitmaps_bytes.push(bitmap_bytes);
}
}
let mut output_deladd_obkv = KvWriterDelAdd::memory();
// Deletion
let mut buffer = Vec::new();
let mut merged = RoaringBitmap::new();
for bytes in del_bitmaps_bytes {
merged |= RoaringBitmap::deserialize_unchecked_from(bytes)?;
}
merged.serialize_into(&mut buffer)?;
output_deladd_obkv.insert(DelAdd::Deletion, &buffer)?;
// Addition
buffer.clear();
merged.clear();
for bytes in add_bitmaps_bytes {
merged |= RoaringBitmap::deserialize_unchecked_from(bytes)?;
}
merged.serialize_into(&mut buffer)?;
output_deladd_obkv.insert(DelAdd::Addition, &buffer)?;
output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into)
}
}
}

View File

@ -1,3 +0,0 @@
mod del_add_roaring_bitmap_merger;
pub use del_add_roaring_bitmap_merger::DelAddRoaringBitmapMerger;

View File

@ -2,7 +2,6 @@ mod document_change;
// mod extract;
mod channel;
mod items_pool;
mod merge;
/// TODO remove this
// mod global_fields_ids_map;
@ -38,7 +37,9 @@ mod indexer {
};
use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation;
use crate::update::{AvailableDocumentsIds, IndexDocumentsMethod};
use crate::update::{
AvailableDocumentsIds, IndexDocumentsMethod, MergeDeladdCboRoaringBitmaps,
};
use crate::{
CboRoaringBitmapCodec, DocumentId, Error, FieldId, FieldsIdsMap, Index, InternalError,
Result, UserError,
@ -428,7 +429,7 @@ mod indexer {
let sender = sender.word_docids();
let database = index.word_docids.remap_types::<Bytes, Bytes>();
let mut builder = grenad::MergerBuilder::new(merge::DelAddRoaringBitmapMerger);
let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
builder.extend(cursors);
/// TODO manage the error correctly
let mut merger_iter = builder.build().into_stream_merger_iter().unwrap();