diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index 0f643e764..7152ec727 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -1,8 +1,11 @@ +use std::borrow::BorrowMut; use std::fs::File; +use std::hash::BuildHasher; use std::io::{self, BufReader, BufWriter, Read as _, Seek, Write as _}; use std::vec; use hashbrown::hash_map::RawEntryMut; +use hashbrown::{DefaultHashBuilder, HashMap}; use raw_collections::alloc::{RefBump, RefBytes}; use roaring::RoaringBitmap; use tempfile::tempfile; @@ -11,22 +14,6 @@ use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::new::indexer::document_changes::MostlySend; use crate::CboRoaringBitmapCodec; -const KEY_SIZE: usize = 12; - -// #[derive(Debug)] -pub struct CboCachedSorter<'extractor> { - cache: hashbrown::HashMap< - RefBytes<'extractor>, - DelAddRoaringBitmap, - hashbrown::DefaultHashBuilder, - RefBump<'extractor>, - >, - alloc: RefBump<'extractor>, - spilled_entries: UnorderedEntries, - deladd_buffer: Vec, - cbo_buffer: Vec, -} - // # How the Merge Algorithm works // // Each extractor create #Threads caches and balances the entries @@ -71,80 +58,205 @@ pub struct CboCachedSorter<'extractor> { // For now we can use a grenad sorter for spilling even thought I think // it's not the most efficient way (too many files open, sorting entries). -impl<'extractor> CboCachedSorter<'extractor> { - /// TODO may add the capacity - pub fn new_in(alloc: RefBump<'extractor>) -> io::Result { - Ok(CboCachedSorter { - cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)), - alloc, - spilled_entries: tempfile().map(UnorderedEntries::new)?, - deladd_buffer: Vec::new(), - cbo_buffer: Vec::new(), - }) - } +pub struct CboCachedSorter<'extractor> { + hasher: DefaultHashBuilder, + alloc: RefBump<'extractor>, + caches: InnerCaches<'extractor>, +} + +enum InnerCaches<'extractor> { + Normal(NormalCaches<'extractor>), + Spilling(SpillingCaches<'extractor>), } impl<'extractor> CboCachedSorter<'extractor> { - pub fn insert_del_u32(&mut self, key: &[u8], n: u32) { - match self.cache.raw_entry_mut().from_key(key) { + pub fn new_in(buckets: usize, alloc: RefBump<'extractor>) -> Self { + Self { + hasher: DefaultHashBuilder::default(), + caches: InnerCaches::Normal(NormalCaches { + caches: std::iter::repeat_with(|| RefBump::clone(&alloc)) + .map(HashMap::new_in) + .take(buckets) + .collect(), + }), + alloc, + } + } + + fn buckets(&self) -> usize { + match &self.caches { + InnerCaches::Normal(caches) => caches.caches.len(), + InnerCaches::Spilling(caches) => caches.caches.len(), + } + } + + pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> io::Result<()> { + let buckets = self.buckets(); + match &mut self.caches { + InnerCaches::Normal(normal) => { + normal.insert_del_u32(&self.hasher, &self.alloc, buckets, key, n); + Ok(()) + } + InnerCaches::Spilling(spilling) => { + spilling.insert_del_u32(&self.hasher, buckets, key, n) + } + } + } + + pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> io::Result<()> { + let buckets = self.buckets(); + match &mut self.caches { + InnerCaches::Normal(normal) => { + normal.insert_add_u32(&self.hasher, &self.alloc, buckets, key, n); + Ok(()) + } + InnerCaches::Spilling(spilling) => { + spilling.insert_add_u32(&self.hasher, buckets, key, n) + } + } + } + + pub fn freeze(&mut self) -> grenad::Result<()> { + todo!() + } +} + +struct NormalCaches<'extractor> { + caches: Vec< + HashMap, DelAddRoaringBitmap, DefaultHashBuilder, RefBump<'extractor>>, + >, +} + +impl<'extractor> NormalCaches<'extractor> { + pub fn insert_del_u32( + &mut self, + hasher: &DefaultHashBuilder, + alloc: &RefBump<'extractor>, + buckets: usize, + key: &[u8], + n: u32, + ) { + let hash = compute_bytes_hash(hasher, key); + let bucket = compute_bucket_from_hash(buckets, hash); + + match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k.as_ref() == key) { RawEntryMut::Occupied(mut entry) => { - let DelAddRoaringBitmap { del, add: _ } = entry.get_mut(); - del.get_or_insert_with(RoaringBitmap::default).insert(n); + entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n); } RawEntryMut::Vacant(entry) => { - let alloc = RefBump::clone(&self.alloc); - let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); - entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del_u32(n)); + let alloc = RefBump::clone(&alloc); + let key = RefBump::map(alloc, |a| a.alloc_slice_copy(key)); + entry.insert_hashed_nocheck( + hash, + RefBytes(key), + DelAddRoaringBitmap::new_del_u32(n), + ); } } } - pub fn insert_add_u32(&mut self, key: &[u8], n: u32) { - match self.cache.raw_entry_mut().from_key(key) { + pub fn insert_add_u32( + &mut self, + hasher: &DefaultHashBuilder, + alloc: &RefBump<'extractor>, + buckets: usize, + key: &[u8], + n: u32, + ) { + let hash = compute_bytes_hash(hasher, key); + let bucket = compute_bucket_from_hash(buckets, hash); + match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k.as_ref() == key) { RawEntryMut::Occupied(mut entry) => { - let DelAddRoaringBitmap { del: _, add } = entry.get_mut(); - add.get_or_insert_with(RoaringBitmap::default).insert(n); + entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n); } RawEntryMut::Vacant(entry) => { - let alloc = RefBump::clone(&self.alloc); - let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); - entry.insert(RefBytes(key), DelAddRoaringBitmap::new_add_u32(n)); + let alloc = RefBump::clone(&alloc); + let key = RefBump::map(alloc, |a| a.alloc_slice_copy(key)); + entry.insert_hashed_nocheck( + hash, + RefBytes(key), + DelAddRoaringBitmap::new_add_u32(n), + ); + } + } + } +} + +struct SpillingCaches<'extractor> { + caches: Vec< + HashMap, DelAddRoaringBitmap, DefaultHashBuilder, RefBump<'extractor>>, + >, + // TODO it must be a grenad Sorter with a DelAddCboRoaringBitmapCodec + spilled_entries: Vec, + deladd_buffer: Vec, + cbo_buffer: Vec, +} + +impl<'extractor> SpillingCaches<'extractor> { + pub fn insert_del_u32( + &mut self, + hasher: &DefaultHashBuilder, + buckets: usize, + key: &[u8], + n: u32, + ) -> io::Result<()> { + let hash = compute_bytes_hash(hasher, key); + let bucket = compute_bucket_from_hash(buckets, hash); + match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k.as_ref() == key) { + RawEntryMut::Occupied(mut entry) => { + entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n); + Ok(()) + } + RawEntryMut::Vacant(_entry) => { + let deladd = DelAddRoaringBitmap::new_del_u32(n); + spill_entry_to_disk( + &mut self.spilled_entries[bucket], + &mut self.deladd_buffer, + &mut self.cbo_buffer, + key, + deladd, + ) } } } - pub fn spill_to_disk(self) -> io::Result { - let Self { cache, alloc: _, mut spilled_entries, mut deladd_buffer, mut cbo_buffer } = self; - - for (key, deladd) in cache { - spill_entry_to_disk( - &mut spilled_entries, - &mut deladd_buffer, - &mut cbo_buffer, - &key, - deladd, - )?; + pub fn insert_add_u32( + &mut self, + hasher: &DefaultHashBuilder, + buckets: usize, + key: &[u8], + n: u32, + ) -> io::Result<()> { + let hash = compute_bytes_hash(hasher, key); + let bucket = compute_bucket_from_hash(buckets, hash); + match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k.as_ref() == key) { + RawEntryMut::Occupied(mut entry) => { + entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n); + Ok(()) + } + RawEntryMut::Vacant(_entry) => { + let deladd = DelAddRoaringBitmap::new_add_u32(n); + spill_entry_to_disk( + &mut self.spilled_entries[bucket], + &mut self.deladd_buffer, + &mut self.cbo_buffer, + key, + deladd, + ) + } } - - Ok(SpilledCache { spilled_entries, deladd_buffer, cbo_buffer }) } +} - // TODO Do not spill to disk if not necessary - pub fn into_unordered_entries(self) -> io::Result { - let Self { cache, alloc: _, mut spilled_entries, mut cbo_buffer, mut deladd_buffer } = self; +fn compute_bytes_hash(hash_builder: &S, key: &[u8]) -> u64 { + use std::hash::{Hash, Hasher}; + let mut state = hash_builder.build_hasher(); + key.hash(&mut state); + state.finish() +} - for (key, deladd) in cache { - spill_entry_to_disk( - &mut spilled_entries, - &mut deladd_buffer, - &mut cbo_buffer, - &key, - deladd, - )?; - } - - spilled_entries.into_iter_bitmap() - } +fn compute_bucket_from_hash(buckets: usize, hash: u64) -> usize { + hash as usize % buckets } fn spill_entry_to_disk( @@ -182,27 +294,6 @@ fn spill_entry_to_disk( spilled_entries.push(key, bytes) } -pub struct SpilledCache { - spilled_entries: UnorderedEntries, - deladd_buffer: Vec, - cbo_buffer: Vec, -} - -impl SpilledCache { - pub fn reconstruct(self, alloc: RefBump<'_>) -> CboCachedSorter<'_> { - let SpilledCache { spilled_entries, deladd_buffer, cbo_buffer } = self; - CboCachedSorter { - cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)), - alloc, - spilled_entries, - deladd_buffer, - cbo_buffer, - } - } -} - -unsafe impl<'extractor> MostlySend for CboCachedSorter<'extractor> {} - pub struct UnorderedEntries { entry_sizes: Vec<(u32, u32)>, file: BufWriter, @@ -291,40 +382,6 @@ impl UnorderedEntriesIntoIter { } } -#[derive(Default, Debug)] -struct Stats { - pub len: usize, - pub average: f32, - pub mean: u32, - pub min: u32, - pub max: u32, -} - -impl Stats { - fn from_slice(slice: &mut [u32]) -> Stats { - slice.sort_unstable(); - Self::from_sorted_slice(slice) - } - - fn from_slice_p99(slice: &mut [u32]) -> Stats { - slice.sort_unstable(); - let new_len = slice.len() - (slice.len() as f32 / 100.0) as usize; - match slice.get(..new_len) { - Some(slice) => Self::from_sorted_slice(slice), - None => Stats::default(), - } - } - - fn from_sorted_slice(slice: &[u32]) -> Stats { - let sum: f64 = slice.iter().map(|i| *i as f64).sum(); - let average = (sum / slice.len() as f64) as f32; - let mean = *slice.len().checked_div(2).and_then(|middle| slice.get(middle)).unwrap_or(&0); - let min = *slice.first().unwrap_or(&0); - let max = *slice.last().unwrap_or(&0); - Stats { len: slice.len(), average, mean, min, max } - } -} - #[derive(Debug, Clone)] pub struct DelAddRoaringBitmap { pub(crate) del: Option, diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index 7c850cebb..b932c9a64 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -221,7 +221,7 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { type Data = RefCell>>; fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result { - Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in(extractor_alloc)))) + Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in(extractor_alloc)?))) } fn process(