From b602f73f0d40f35d19e321321c04d23f759653bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 17 Oct 2024 10:31:29 +0200 Subject: [PATCH] Correctly spill into a grenad sorter --- milli/src/update/new/extract/cache.rs | 114 ++++++++------------------ 1 file changed, 35 insertions(+), 79 deletions(-) diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index 1701f09fc..200ca7be4 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -63,22 +63,6 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> { } } - pub fn insert_del(&mut self, key: &[u8], bitmap: RoaringBitmap) { - match self.cache.raw_entry_mut().from_key(key) { - RawEntryMut::Occupied(mut entry) => { - let DelAddRoaringBitmap { del, add: _ } = entry.get_mut(); - *del.get_or_insert_with(RoaringBitmap::default) |= bitmap; - } - RawEntryMut::Vacant(entry) => { - self.total_insertions += 1; - self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; - let alloc = RefBump::clone(&self.alloc); - let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); - entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del(bitmap)); - } - } - } - pub fn insert_add_u32(&mut self, key: &[u8], n: u32) { match self.cache.raw_entry_mut().from_key(key) { RawEntryMut::Occupied(mut entry) => { @@ -95,96 +79,69 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> { } } - pub fn insert_add(&mut self, key: &[u8], bitmap: RoaringBitmap) { - match self.cache.raw_entry_mut().from_key(key) { - RawEntryMut::Occupied(mut entry) => { - let DelAddRoaringBitmap { del: _, add } = entry.get_mut(); - *add.get_or_insert_with(RoaringBitmap::default) |= bitmap; - } - RawEntryMut::Vacant(entry) => { - self.total_insertions += 1; - self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; - let alloc = RefBump::clone(&self.alloc); - let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); - entry.insert(RefBytes(key), DelAddRoaringBitmap::new_add(bitmap)); - } - } - } - - pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) { - match self.cache.raw_entry_mut().from_key(key) { - RawEntryMut::Occupied(mut entry) => { - let DelAddRoaringBitmap { del, add } = entry.get_mut(); - del.get_or_insert_with(RoaringBitmap::default).insert(n); - add.get_or_insert_with(RoaringBitmap::default).insert(n); - } - RawEntryMut::Vacant(entry) => { - self.total_insertions += 1; - self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; - let alloc = RefBump::clone(&self.alloc); - let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); - entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del_add_u32(n)); - } - } - } - fn write_entry>( - &mut self, + sorter: &mut Sorter, + deladd_buffer: &mut Vec, + cbo_buffer: &mut Vec, key: A, deladd: DelAddRoaringBitmap, ) -> grenad::Result<(), MF::Error> { - /// TODO we must create a serialization trait to correctly serialize bitmaps - self.deladd_buffer.clear(); - let mut value_writer = KvWriterDelAdd::new(&mut self.deladd_buffer); + deladd_buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(deladd_buffer); match deladd { DelAddRoaringBitmap { del: Some(del), add: None } => { - self.cbo_buffer.clear(); - CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer); - value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?; + cbo_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer); + value_writer.insert(DelAdd::Deletion, &cbo_buffer)?; } DelAddRoaringBitmap { del: None, add: Some(add) } => { - self.cbo_buffer.clear(); - CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer); - value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?; + cbo_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer); + value_writer.insert(DelAdd::Addition, &cbo_buffer)?; } DelAddRoaringBitmap { del: Some(del), add: Some(add) } => { - self.cbo_buffer.clear(); - CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer); - value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?; + cbo_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer); + value_writer.insert(DelAdd::Deletion, &cbo_buffer)?; - self.cbo_buffer.clear(); - CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer); - value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?; + cbo_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer); + value_writer.insert(DelAdd::Addition, &cbo_buffer)?; } DelAddRoaringBitmap { del: None, add: None } => return Ok(()), } let bytes = value_writer.into_inner().unwrap(); - self.sorter.insert(key, bytes) + sorter.insert(key, bytes) } - pub fn direct_insert(&mut self, key: &[u8], val: &[u8]) -> grenad::Result<(), MF::Error> { - self.sorter.insert(key, val) - } - - pub fn spill_to_disk(self) -> std::io::Result> { + pub fn spill_to_disk(self) -> grenad::Result, MF::Error> { let Self { cache, alloc: _, - sorter, - deladd_buffer, - cbo_buffer, + mut sorter, + mut deladd_buffer, + mut cbo_buffer, total_insertions, fitted_in_key, } = self; - /// I want to spill to disk for real - drop(cache); + for (key, deladd) in cache { + Self::write_entry(&mut sorter, &mut deladd_buffer, &mut cbo_buffer, key, deladd)?; + } Ok(SpilledCache { sorter, deladd_buffer, cbo_buffer, total_insertions, fitted_in_key }) } pub fn into_sorter(self) -> grenad::Result, MF::Error> { - let Self { cache, sorter, total_insertions, fitted_in_key, .. } = self; + let Self { + cache, + alloc: _, + mut sorter, + mut deladd_buffer, + mut cbo_buffer, + total_insertions, + fitted_in_key, + } = self; let mut all_n_containers = Vec::new(); let mut all_n_array_containers = Vec::new(); @@ -214,8 +171,7 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> { } for (key, deladd) in cache { - // self.write_entry(key, deladd)?; - todo!("spill into the sorter") + Self::write_entry(&mut sorter, &mut deladd_buffer, &mut cbo_buffer, key, deladd)?; } let mut output = String::new();