diff --git a/Cargo.lock b/Cargo.lock index 1e3d71c9c..920f67a8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4435,7 +4435,7 @@ dependencies = [ [[package]] name = "raw-collections" version = "0.1.0" -source = "git+https://github.com/dureuill/raw-collections.git#147dfe8eee739f2638c921c83e7d64ca1d47dcb2" +source = "git+https://github.com/dureuill/raw-collections.git#4ab9619207632c20f4e0c2e126d9d909cc58ef65" dependencies = [ "allocator-api2", "bumpalo", diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index 5a4bc6b92..54991159d 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -1,14 +1,15 @@ use std::fs::File; use std::hash::BuildHasher; -use std::io::{self, BufReader, BufWriter, Read as _, Seek, Write as _}; -use std::vec; +use std::{iter, mem}; use bumpalo::Bump; use hashbrown::hash_map::RawEntryMut; -use hashbrown::{DefaultHashBuilder, HashMap}; +use hashbrown::{DefaultHashBuilder, HashMap, HashSet}; +use raw_collections::map::FrozenMap; use roaring::RoaringBitmap; -use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; +use crate::update::del_add::{DelAdd, KvWriterDelAdd}; +use crate::update::MergeDeladdCboRoaringBitmaps; use crate::CboRoaringBitmapCodec; // # How the Merge Algorithm works @@ -55,6 +56,9 @@ use crate::CboRoaringBitmapCodec; // For now we can use a grenad sorter for spilling even thought I think // it's not the most efficient way (too many files open, sorting entries). +/// A cache that stores bytes keys associated to CboDelAddRoaringBitmaps. +/// +/// Internally balances the content over `N` buckets for future merging. pub struct CboCachedSorter<'extractor> { hasher: DefaultHashBuilder, alloc: &'extractor Bump, @@ -84,7 +88,7 @@ impl<'extractor> CboCachedSorter<'extractor> { } } - pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> io::Result<()> { + pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), crate::Error> { let buckets = self.buckets(); match &mut self.caches { InnerCaches::Normal(normal) => { @@ -97,7 +101,7 @@ impl<'extractor> CboCachedSorter<'extractor> { } } - pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> io::Result<()> { + pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), crate::Error> { let buckets = self.buckets(); match &mut self.caches { InnerCaches::Normal(normal) => { @@ -110,7 +114,19 @@ impl<'extractor> CboCachedSorter<'extractor> { } } - pub fn freeze(&mut self) -> grenad::Result<()> { + pub fn spill_only(&mut self) -> grenad::Result<()> { + let CboCachedSorter { hasher: _, alloc: _, caches } = self; + + if let InnerCaches::Normal(normal_caches) = caches { + let dummy = NormalCaches { caches: Vec::new() }; + let NormalCaches { caches: cache_maps } = mem::replace(normal_caches, dummy); + *caches = InnerCaches::Spilling(SpillingCaches::from_cache_maps(cache_maps)); + } + + Ok(()) + } + + pub fn freeze(&mut self) -> Vec<()> { todo!() } } @@ -132,7 +148,7 @@ impl<'extractor> NormalCaches<'extractor> { let hash = compute_bytes_hash(hasher, key); let bucket = compute_bucket_from_hash(buckets, hash); - match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k == key) { + match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) { RawEntryMut::Occupied(mut entry) => { entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n); } @@ -156,7 +172,7 @@ impl<'extractor> NormalCaches<'extractor> { ) { let hash = compute_bytes_hash(hasher, key); let bucket = compute_bucket_from_hash(buckets, hash); - match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k == key) { + match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) { RawEntryMut::Occupied(mut entry) => { entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n); } @@ -174,23 +190,41 @@ impl<'extractor> NormalCaches<'extractor> { struct SpillingCaches<'extractor> { caches: Vec>, - // TODO it must be a grenad Sorter with a DelAddCboRoaringBitmapCodec - spilled_entries: Vec, + spilled_entries: Vec>, deladd_buffer: Vec, cbo_buffer: Vec, } impl<'extractor> SpillingCaches<'extractor> { + fn from_cache_maps( + caches: Vec< + HashMap<&'extractor [u8], DelAddRoaringBitmap, DefaultHashBuilder, &'extractor Bump>, + >, + ) -> SpillingCaches<'extractor> { + SpillingCaches { + spilled_entries: iter::repeat_with(|| { + let mut builder = grenad::SorterBuilder::new(MergeDeladdCboRoaringBitmaps); + builder.allow_realloc(false); + builder.build() + }) + .take(caches.len()) + .collect(), + caches, + deladd_buffer: Vec::new(), + cbo_buffer: Vec::new(), + } + } + pub fn insert_del_u32( &mut self, hasher: &DefaultHashBuilder, buckets: usize, key: &[u8], n: u32, - ) -> io::Result<()> { + ) -> grenad::Result<(), crate::Error> { let hash = compute_bytes_hash(hasher, key); let bucket = compute_bucket_from_hash(buckets, hash); - match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k.as_ref() == key) { + match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) { RawEntryMut::Occupied(mut entry) => { entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n); Ok(()) @@ -214,10 +248,10 @@ impl<'extractor> SpillingCaches<'extractor> { buckets: usize, key: &[u8], n: u32, - ) -> io::Result<()> { + ) -> grenad::Result<(), crate::Error> { let hash = compute_bytes_hash(hasher, key); let bucket = compute_bucket_from_hash(buckets, hash); - match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k.as_ref() == key) { + match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) { RawEntryMut::Occupied(mut entry) => { entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n); Ok(()) @@ -250,12 +284,12 @@ fn compute_bucket_from_hash(buckets: usize, hash: u64) -> usize { } fn spill_entry_to_disk( - spilled_entries: &mut UnorderedEntries, + spilled_entries: &mut grenad::Sorter, deladd_buffer: &mut Vec, cbo_buffer: &mut Vec, key: &[u8], deladd: DelAddRoaringBitmap, -) -> io::Result<()> { +) -> grenad::Result<(), crate::Error> { deladd_buffer.clear(); let mut value_writer = KvWriterDelAdd::new(deladd_buffer); match deladd { @@ -281,95 +315,23 @@ fn spill_entry_to_disk( DelAddRoaringBitmap { del: None, add: None } => return Ok(()), } let bytes = value_writer.into_inner().unwrap(); - spilled_entries.push(key, bytes) + spilled_entries.insert(key, bytes) } -pub struct UnorderedEntries { - entry_sizes: Vec<(u32, u32)>, - file: BufWriter, +pub struct FrozenCache<'a, 'extractor> { + cache: FrozenMap<'a, 'extractor, &'extractor [u8], DelAddRoaringBitmap, DefaultHashBuilder>, + spilled: grenad::Reader, } -impl UnorderedEntries { - fn new(file: File) -> Self { - UnorderedEntries { entry_sizes: Vec::new(), file: BufWriter::new(file) } - } +pub fn merge_me(frozen: Vec, f: F) -> crate::Result<()> +where + F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> crate::Result<()>, +{ + // First manage the spilled entries by looking into the HashMaps and then merge them. - /// Pushes a new tuple of key/value into a file. - /// - /// If the function fails you must not continue to use this struct and rather drop it. - /// - /// # Panics - /// - /// - Panics if the key or value length is larger than 2^32 bytes. - fn push(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> { - let key_len = key.len().try_into().unwrap(); - let value_len = value.len().try_into().unwrap(); + // Then manage the content on the HashMap that weren't taken (mem::take). - self.file.write_all(key)?; - self.file.write_all(value)?; - - self.entry_sizes.push((key_len, value_len)); - - Ok(()) - } - - fn into_iter_bitmap(self) -> io::Result { - let Self { entry_sizes, file } = self; - - let mut file = file.into_inner().map_err(|e| e.into_error())?; - file.rewind()?; - - Ok(UnorderedEntriesIntoIter { - entry_sizes: entry_sizes.into_iter(), - file: BufReader::new(file), - buffer: Vec::new(), - }) - } -} - -pub struct UnorderedEntriesIntoIter { - entry_sizes: vec::IntoIter<(u32, u32)>, - file: BufReader, - buffer: Vec, -} - -impl UnorderedEntriesIntoIter { - fn next_ref(&mut self) -> io::Result> { - match self.entry_sizes.next() { - Some((key_len, value_len)) => { - let key_len = key_len as usize; - let value_len = value_len as usize; - let total_len = key_len + value_len; - - self.buffer.resize(total_len, 0); - let buffer = &mut self.buffer[..total_len]; - - self.file.read_exact(buffer)?; - let buffer = &self.buffer[..total_len]; - - Ok(Some(buffer.split_at(key_len))) - } - None => Ok(None), - } - } - - pub fn next_deladd_bitmap(&mut self) -> io::Result> { - match self.next_ref()? { - Some((key, value_bytes)) => { - let reader = KvReaderDelAdd::from_slice(value_bytes); - let del = match reader.get(DelAdd::Deletion) { - Some(del_bytes) => Some(CboRoaringBitmapCodec::deserialize_from(del_bytes)?), - None => None, - }; - let add = match reader.get(DelAdd::Addition) { - Some(add_bytes) => Some(CboRoaringBitmapCodec::deserialize_from(add_bytes)?), - None => None, - }; - Ok(Some((key, DelAddRoaringBitmap { del, add }))) - } - None => Ok(None), - } - } + todo!() } #[derive(Debug, Clone)]