Correctly spill into a grenad sorter

This commit is contained in:
Clément Renault 2024-10-17 10:31:29 +02:00
parent 287b1c51db
commit b602f73f0d
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -63,22 +63,6 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> {
}
}
pub fn insert_del(&mut self, key: &[u8], bitmap: RoaringBitmap) {
match self.cache.raw_entry_mut().from_key(key) {
RawEntryMut::Occupied(mut entry) => {
let DelAddRoaringBitmap { del, add: _ } = entry.get_mut();
*del.get_or_insert_with(RoaringBitmap::default) |= bitmap;
}
RawEntryMut::Vacant(entry) => {
self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let alloc = RefBump::clone(&self.alloc);
let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key));
entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del(bitmap));
}
}
}
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) {
match self.cache.raw_entry_mut().from_key(key) {
RawEntryMut::Occupied(mut entry) => {
@ -95,96 +79,69 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> {
}
}
pub fn insert_add(&mut self, key: &[u8], bitmap: RoaringBitmap) {
match self.cache.raw_entry_mut().from_key(key) {
RawEntryMut::Occupied(mut entry) => {
let DelAddRoaringBitmap { del: _, add } = entry.get_mut();
*add.get_or_insert_with(RoaringBitmap::default) |= bitmap;
}
RawEntryMut::Vacant(entry) => {
self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let alloc = RefBump::clone(&self.alloc);
let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key));
entry.insert(RefBytes(key), DelAddRoaringBitmap::new_add(bitmap));
}
}
}
pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) {
match self.cache.raw_entry_mut().from_key(key) {
RawEntryMut::Occupied(mut entry) => {
let DelAddRoaringBitmap { del, add } = entry.get_mut();
del.get_or_insert_with(RoaringBitmap::default).insert(n);
add.get_or_insert_with(RoaringBitmap::default).insert(n);
}
RawEntryMut::Vacant(entry) => {
self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let alloc = RefBump::clone(&self.alloc);
let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key));
entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del_add_u32(n));
}
}
}
fn write_entry<A: AsRef<[u8]>>(
&mut self,
sorter: &mut Sorter<MF>,
deladd_buffer: &mut Vec<u8>,
cbo_buffer: &mut Vec<u8>,
key: A,
deladd: DelAddRoaringBitmap,
) -> grenad::Result<(), MF::Error> {
/// TODO we must create a serialization trait to correctly serialize bitmaps
self.deladd_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut self.deladd_buffer);
deladd_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(deladd_buffer);
match deladd {
DelAddRoaringBitmap { del: Some(del), add: None } => {
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer);
value_writer.insert(DelAdd::Deletion, &cbo_buffer)?;
}
DelAddRoaringBitmap { del: None, add: Some(add) } => {
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer);
value_writer.insert(DelAdd::Addition, &cbo_buffer)?;
}
DelAddRoaringBitmap { del: Some(del), add: Some(add) } => {
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer);
value_writer.insert(DelAdd::Deletion, &cbo_buffer)?;
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer);
value_writer.insert(DelAdd::Addition, &cbo_buffer)?;
}
DelAddRoaringBitmap { del: None, add: None } => return Ok(()),
}
let bytes = value_writer.into_inner().unwrap();
self.sorter.insert(key, bytes)
sorter.insert(key, bytes)
}
pub fn direct_insert(&mut self, key: &[u8], val: &[u8]) -> grenad::Result<(), MF::Error> {
self.sorter.insert(key, val)
}
pub fn spill_to_disk(self) -> std::io::Result<SpilledCache<MF>> {
pub fn spill_to_disk(self) -> grenad::Result<SpilledCache<MF>, MF::Error> {
let Self {
cache,
alloc: _,
sorter,
deladd_buffer,
cbo_buffer,
mut sorter,
mut deladd_buffer,
mut cbo_buffer,
total_insertions,
fitted_in_key,
} = self;
/// I want to spill to disk for real
drop(cache);
for (key, deladd) in cache {
Self::write_entry(&mut sorter, &mut deladd_buffer, &mut cbo_buffer, key, deladd)?;
}
Ok(SpilledCache { sorter, deladd_buffer, cbo_buffer, total_insertions, fitted_in_key })
}
pub fn into_sorter(self) -> grenad::Result<Sorter<MF>, MF::Error> {
let Self { cache, sorter, total_insertions, fitted_in_key, .. } = self;
let Self {
cache,
alloc: _,
mut sorter,
mut deladd_buffer,
mut cbo_buffer,
total_insertions,
fitted_in_key,
} = self;
let mut all_n_containers = Vec::new();
let mut all_n_array_containers = Vec::new();
@ -214,8 +171,7 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> {
}
for (key, deladd) in cache {
// self.write_entry(key, deladd)?;
todo!("spill into the sorter")
Self::write_entry(&mut sorter, &mut deladd_buffer, &mut cbo_buffer, key, deladd)?;
}
let mut output = String::new();