Bring grenad back to spill

This commit is contained in:
Clément Renault 2024-10-28 14:09:15 +01:00
parent 2444ddbd3d
commit 93d639ead1
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 63 additions and 101 deletions

2
Cargo.lock generated
View File

@ -4435,7 +4435,7 @@ dependencies = [
[[package]]
name = "raw-collections"
version = "0.1.0"
source = "git+https://github.com/dureuill/raw-collections.git#147dfe8eee739f2638c921c83e7d64ca1d47dcb2"
source = "git+https://github.com/dureuill/raw-collections.git#4ab9619207632c20f4e0c2e126d9d909cc58ef65"
dependencies = [
"allocator-api2",
"bumpalo",

View File

@ -1,14 +1,15 @@
use std::fs::File;
use std::hash::BuildHasher;
use std::io::{self, BufReader, BufWriter, Read as _, Seek, Write as _};
use std::vec;
use std::{iter, mem};
use bumpalo::Bump;
use hashbrown::hash_map::RawEntryMut;
use hashbrown::{DefaultHashBuilder, HashMap};
use hashbrown::{DefaultHashBuilder, HashMap, HashSet};
use raw_collections::map::FrozenMap;
use roaring::RoaringBitmap;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::CboRoaringBitmapCodec;
// # How the Merge Algorithm works
@ -55,6 +56,9 @@ use crate::CboRoaringBitmapCodec;
// For now we can use a grenad sorter for spilling even thought I think
// it's not the most efficient way (too many files open, sorting entries).
/// A cache that stores bytes keys associated to CboDelAddRoaringBitmaps.
///
/// Internally balances the content over `N` buckets for future merging.
pub struct CboCachedSorter<'extractor> {
hasher: DefaultHashBuilder,
alloc: &'extractor Bump,
@ -84,7 +88,7 @@ impl<'extractor> CboCachedSorter<'extractor> {
}
}
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> io::Result<()> {
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), crate::Error> {
let buckets = self.buckets();
match &mut self.caches {
InnerCaches::Normal(normal) => {
@ -97,7 +101,7 @@ impl<'extractor> CboCachedSorter<'extractor> {
}
}
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> io::Result<()> {
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), crate::Error> {
let buckets = self.buckets();
match &mut self.caches {
InnerCaches::Normal(normal) => {
@ -110,7 +114,19 @@ impl<'extractor> CboCachedSorter<'extractor> {
}
}
pub fn freeze(&mut self) -> grenad::Result<()> {
pub fn spill_only(&mut self) -> grenad::Result<()> {
let CboCachedSorter { hasher: _, alloc: _, caches } = self;
if let InnerCaches::Normal(normal_caches) = caches {
let dummy = NormalCaches { caches: Vec::new() };
let NormalCaches { caches: cache_maps } = mem::replace(normal_caches, dummy);
*caches = InnerCaches::Spilling(SpillingCaches::from_cache_maps(cache_maps));
}
Ok(())
}
pub fn freeze(&mut self) -> Vec<()> {
todo!()
}
}
@ -132,7 +148,7 @@ impl<'extractor> NormalCaches<'extractor> {
let hash = compute_bytes_hash(hasher, key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k == key) {
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n);
}
@ -156,7 +172,7 @@ impl<'extractor> NormalCaches<'extractor> {
) {
let hash = compute_bytes_hash(hasher, key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k == key) {
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n);
}
@ -174,23 +190,41 @@ impl<'extractor> NormalCaches<'extractor> {
struct SpillingCaches<'extractor> {
caches:
Vec<HashMap<&'extractor [u8], DelAddRoaringBitmap, DefaultHashBuilder, &'extractor Bump>>,
// TODO it must be a grenad Sorter with a DelAddCboRoaringBitmapCodec
spilled_entries: Vec<UnorderedEntries>,
spilled_entries: Vec<grenad::Sorter<MergeDeladdCboRoaringBitmaps>>,
deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>,
}
impl<'extractor> SpillingCaches<'extractor> {
fn from_cache_maps(
caches: Vec<
HashMap<&'extractor [u8], DelAddRoaringBitmap, DefaultHashBuilder, &'extractor Bump>,
>,
) -> SpillingCaches<'extractor> {
SpillingCaches {
spilled_entries: iter::repeat_with(|| {
let mut builder = grenad::SorterBuilder::new(MergeDeladdCboRoaringBitmaps);
builder.allow_realloc(false);
builder.build()
})
.take(caches.len())
.collect(),
caches,
deladd_buffer: Vec::new(),
cbo_buffer: Vec::new(),
}
}
pub fn insert_del_u32(
&mut self,
hasher: &DefaultHashBuilder,
buckets: usize,
key: &[u8],
n: u32,
) -> io::Result<()> {
) -> grenad::Result<(), crate::Error> {
let hash = compute_bytes_hash(hasher, key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k.as_ref() == key) {
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n);
Ok(())
@ -214,10 +248,10 @@ impl<'extractor> SpillingCaches<'extractor> {
buckets: usize,
key: &[u8],
n: u32,
) -> io::Result<()> {
) -> grenad::Result<(), crate::Error> {
let hash = compute_bytes_hash(hasher, key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k.as_ref() == key) {
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n);
Ok(())
@ -250,12 +284,12 @@ fn compute_bucket_from_hash(buckets: usize, hash: u64) -> usize {
}
fn spill_entry_to_disk(
spilled_entries: &mut UnorderedEntries,
spilled_entries: &mut grenad::Sorter<MergeDeladdCboRoaringBitmaps>,
deladd_buffer: &mut Vec<u8>,
cbo_buffer: &mut Vec<u8>,
key: &[u8],
deladd: DelAddRoaringBitmap,
) -> io::Result<()> {
) -> grenad::Result<(), crate::Error> {
deladd_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(deladd_buffer);
match deladd {
@ -281,95 +315,23 @@ fn spill_entry_to_disk(
DelAddRoaringBitmap { del: None, add: None } => return Ok(()),
}
let bytes = value_writer.into_inner().unwrap();
spilled_entries.push(key, bytes)
spilled_entries.insert(key, bytes)
}
pub struct UnorderedEntries {
entry_sizes: Vec<(u32, u32)>,
file: BufWriter<File>,
pub struct FrozenCache<'a, 'extractor> {
cache: FrozenMap<'a, 'extractor, &'extractor [u8], DelAddRoaringBitmap, DefaultHashBuilder>,
spilled: grenad::Reader<File>,
}
impl UnorderedEntries {
fn new(file: File) -> Self {
UnorderedEntries { entry_sizes: Vec::new(), file: BufWriter::new(file) }
}
pub fn merge_me<F>(frozen: Vec<FrozenCache>, f: F) -> crate::Result<()>
where
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> crate::Result<()>,
{
// First manage the spilled entries by looking into the HashMaps and then merge them.
/// Pushes a new tuple of key/value into a file.
///
/// If the function fails you must not continue to use this struct and rather drop it.
///
/// # Panics
///
/// - Panics if the key or value length is larger than 2^32 bytes.
fn push(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
let key_len = key.len().try_into().unwrap();
let value_len = value.len().try_into().unwrap();
// Then manage the content on the HashMap that weren't taken (mem::take).
self.file.write_all(key)?;
self.file.write_all(value)?;
self.entry_sizes.push((key_len, value_len));
Ok(())
}
fn into_iter_bitmap(self) -> io::Result<UnorderedEntriesIntoIter> {
let Self { entry_sizes, file } = self;
let mut file = file.into_inner().map_err(|e| e.into_error())?;
file.rewind()?;
Ok(UnorderedEntriesIntoIter {
entry_sizes: entry_sizes.into_iter(),
file: BufReader::new(file),
buffer: Vec::new(),
})
}
}
pub struct UnorderedEntriesIntoIter {
entry_sizes: vec::IntoIter<(u32, u32)>,
file: BufReader<File>,
buffer: Vec<u8>,
}
impl UnorderedEntriesIntoIter {
fn next_ref(&mut self) -> io::Result<Option<(&[u8], &[u8])>> {
match self.entry_sizes.next() {
Some((key_len, value_len)) => {
let key_len = key_len as usize;
let value_len = value_len as usize;
let total_len = key_len + value_len;
self.buffer.resize(total_len, 0);
let buffer = &mut self.buffer[..total_len];
self.file.read_exact(buffer)?;
let buffer = &self.buffer[..total_len];
Ok(Some(buffer.split_at(key_len)))
}
None => Ok(None),
}
}
pub fn next_deladd_bitmap(&mut self) -> io::Result<Option<(&[u8], DelAddRoaringBitmap)>> {
match self.next_ref()? {
Some((key, value_bytes)) => {
let reader = KvReaderDelAdd::from_slice(value_bytes);
let del = match reader.get(DelAdd::Deletion) {
Some(del_bytes) => Some(CboRoaringBitmapCodec::deserialize_from(del_bytes)?),
None => None,
};
let add = match reader.get(DelAdd::Addition) {
Some(add_bytes) => Some(CboRoaringBitmapCodec::deserialize_from(add_bytes)?),
None => None,
};
Ok(Some((key, DelAddRoaringBitmap { del, add })))
}
None => Ok(None),
}
}
todo!()
}
#[derive(Debug, Clone)]