Update extract_facet_number_docids to support deladd obkvs

This commit is contained in:
Clément Renault 2023-10-18 17:40:13 +02:00 committed by Louis Dureuil
parent a82dee21e0
commit fcd3a1434d
No known key found for this signature in database
4 changed files with 57 additions and 15 deletions

View File

@ -60,12 +60,16 @@ impl CboRoaringBitmapCodec {
/// if the merged values length is under the threshold, values are directly
/// serialized in the buffer else a RoaringBitmap is created from the
/// values and is serialized in the buffer.
pub fn merge_into(slices: &[Cow<[u8]>], buffer: &mut Vec<u8>) -> io::Result<()> {
pub fn merge_into<I, A>(slices: I, buffer: &mut Vec<u8>) -> io::Result<()>
where
I: IntoIterator<Item = A>,
A: AsRef<[u8]>,
{
let mut roaring = RoaringBitmap::new();
let mut vec = Vec::new();
for bytes in slices {
if bytes.len() <= THRESHOLD * size_of::<u32>() {
if bytes.as_ref().len() <= THRESHOLD * size_of::<u32>() {
let mut reader = bytes.as_ref();
while let Ok(integer) = reader.read_u32::<NativeEndian>() {
vec.push(integer);
@ -85,7 +89,7 @@ impl CboRoaringBitmapCodec {
}
} else {
// We can unwrap safely because the vector is sorted upper.
let roaring = RoaringBitmap::from_sorted_iter(vec.into_iter()).unwrap();
let roaring = RoaringBitmap::from_sorted_iter(vec).unwrap();
roaring.serialize_into(buffer)?;
}
} else {

View File

@ -4,11 +4,12 @@ use std::io::{self, BufReader};
use heed::{BytesDecode, BytesEncode};
use super::helpers::{
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters,
create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters,
};
use crate::heed_codec::facet::{
FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec,
};
use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd};
use crate::Result;
/// Extracts the facet number and the documents ids where this facet number appear.
@ -17,8 +18,7 @@ use crate::Result;
/// documents ids from the given chunk of docid facet number positions.
#[logging_timer::time]
pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
// TODO Reader<Key, Obkv<DelAdd, ()>>
docid_fid_facet_number: grenad::Reader<R>,
fid_docid_facet_number: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
@ -27,24 +27,30 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
let mut facet_number_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
// TODO We must modify the merger to do unions of Del and Add separately
merge_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
);
let mut cursor = docid_fid_facet_number.into_cursor()?;
// TODO the value is a Obkv<DelAdd, ()> and must be taken into account
while let Some((key_bytes, _)) = cursor.move_on_next()? {
let mut buffer = Vec::new();
let mut cursor = fid_docid_facet_number.into_cursor()?;
while let Some((key_bytes, deladd_obkv_bytes)) = cursor.move_on_next()? {
let (field_id, document_id, number) =
FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap();
let key = FacetGroupKey { field_id, level: 0, left_bound: number };
let key_bytes = FacetGroupKeyCodec::<OrderedF64Codec>::bytes_encode(&key).unwrap();
// TODO We must put a Obkv<DelAdd, RoaringBitmap>
facet_number_docids_sorter.insert(key_bytes, document_id.to_ne_bytes())?;
buffer.clear();
let mut obkv = KvWriterDelAdd::new(&mut buffer);
for (deladd_key, _) in KvReaderDelAdd::new(deladd_obkv_bytes).iter() {
obkv.insert(deladd_key, document_id.to_ne_bytes())?;
}
obkv.finish()?;
facet_number_docids_sorter.insert(key_bytes, &buffer)?;
}
sorter_into_reader(facet_number_docids_sorter, indexer)

View File

@ -205,3 +205,34 @@ pub fn merge_cbo_roaring_bitmaps<'a>(
Ok(Cow::from(vec))
}
}
pub fn merge_deladd_cbo_roaring_bitmaps<'a>(
_key: &[u8],
values: &[Cow<'a, [u8]>],
) -> Result<Cow<'a, [u8]>> {
if values.len() == 1 {
Ok(values[0].clone())
} else {
// Retrieve the bitmaps from both sides
let mut del_bitmaps_bytes = Vec::new();
let mut add_bitmaps_bytes = Vec::new();
for value in values {
let obkv = KvReaderDelAdd::new(value);
if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) {
del_bitmaps_bytes.push(bitmap_bytes);
}
if let Some(bitmap_bytes) = obkv.get(DelAdd::Addition) {
add_bitmaps_bytes.push(bitmap_bytes);
}
}
let mut output_deladd_obkv = KvWriterDelAdd::memory();
let mut buffer = Vec::new();
CboRoaringBitmapCodec::merge_into(del_bitmaps_bytes, &mut buffer)?;
output_deladd_obkv.insert(DelAdd::Deletion, &buffer)?;
buffer.clear();
CboRoaringBitmapCodec::merge_into(add_bitmaps_bytes, &mut buffer)?;
output_deladd_obkv.insert(DelAdd::Addition, &buffer)?;
output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into)
}
}

View File

@ -14,8 +14,9 @@ pub use grenad_helpers::{
};
pub use merge_functions::{
concat_u32s_array, keep_first, keep_latest_obkv, merge_btreeset_string,
merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, obkvs_keep_last_addition_merge_deletions,
obkvs_merge_additions_and_deletions, serialize_roaring_bitmap, MergeFn,
merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps, merge_roaring_bitmaps,
obkvs_keep_last_addition_merge_deletions, obkvs_merge_additions_and_deletions,
serialize_roaring_bitmap, MergeFn,
};
use crate::MAX_WORD_LENGTH;