Rewrite the cache to fill multiple caches

This commit is contained in:
Clément Renault 2024-10-24 17:40:02 +02:00
parent 437940d053
commit b7e106b34a
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 182 additions and 125 deletions

View File

@ -1,8 +1,11 @@
use std::borrow::BorrowMut;
use std::fs::File;
use std::hash::BuildHasher;
use std::io::{self, BufReader, BufWriter, Read as _, Seek, Write as _};
use std::vec;
use hashbrown::hash_map::RawEntryMut;
use hashbrown::{DefaultHashBuilder, HashMap};
use raw_collections::alloc::{RefBump, RefBytes};
use roaring::RoaringBitmap;
use tempfile::tempfile;
@ -11,22 +14,6 @@ use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::new::indexer::document_changes::MostlySend;
use crate::CboRoaringBitmapCodec;
const KEY_SIZE: usize = 12;
// #[derive(Debug)]
pub struct CboCachedSorter<'extractor> {
cache: hashbrown::HashMap<
RefBytes<'extractor>,
DelAddRoaringBitmap,
hashbrown::DefaultHashBuilder,
RefBump<'extractor>,
>,
alloc: RefBump<'extractor>,
spilled_entries: UnorderedEntries,
deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>,
}
// # How the Merge Algorithm works
//
// Each extractor create #Threads caches and balances the entries
@ -71,80 +58,205 @@ pub struct CboCachedSorter<'extractor> {
// For now we can use a grenad sorter for spilling even thought I think
// it's not the most efficient way (too many files open, sorting entries).
impl<'extractor> CboCachedSorter<'extractor> {
/// TODO may add the capacity
pub fn new_in(alloc: RefBump<'extractor>) -> io::Result<Self> {
Ok(CboCachedSorter {
cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)),
alloc,
spilled_entries: tempfile().map(UnorderedEntries::new)?,
deladd_buffer: Vec::new(),
cbo_buffer: Vec::new(),
})
}
pub struct CboCachedSorter<'extractor> {
hasher: DefaultHashBuilder,
alloc: RefBump<'extractor>,
caches: InnerCaches<'extractor>,
}
enum InnerCaches<'extractor> {
Normal(NormalCaches<'extractor>),
Spilling(SpillingCaches<'extractor>),
}
impl<'extractor> CboCachedSorter<'extractor> {
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) {
match self.cache.raw_entry_mut().from_key(key) {
pub fn new_in(buckets: usize, alloc: RefBump<'extractor>) -> Self {
Self {
hasher: DefaultHashBuilder::default(),
caches: InnerCaches::Normal(NormalCaches {
caches: std::iter::repeat_with(|| RefBump::clone(&alloc))
.map(HashMap::new_in)
.take(buckets)
.collect(),
}),
alloc,
}
}
fn buckets(&self) -> usize {
match &self.caches {
InnerCaches::Normal(caches) => caches.caches.len(),
InnerCaches::Spilling(caches) => caches.caches.len(),
}
}
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> io::Result<()> {
let buckets = self.buckets();
match &mut self.caches {
InnerCaches::Normal(normal) => {
normal.insert_del_u32(&self.hasher, &self.alloc, buckets, key, n);
Ok(())
}
InnerCaches::Spilling(spilling) => {
spilling.insert_del_u32(&self.hasher, buckets, key, n)
}
}
}
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> io::Result<()> {
let buckets = self.buckets();
match &mut self.caches {
InnerCaches::Normal(normal) => {
normal.insert_add_u32(&self.hasher, &self.alloc, buckets, key, n);
Ok(())
}
InnerCaches::Spilling(spilling) => {
spilling.insert_add_u32(&self.hasher, buckets, key, n)
}
}
}
pub fn freeze(&mut self) -> grenad::Result<()> {
todo!()
}
}
struct NormalCaches<'extractor> {
caches: Vec<
HashMap<RefBytes<'extractor>, DelAddRoaringBitmap, DefaultHashBuilder, RefBump<'extractor>>,
>,
}
impl<'extractor> NormalCaches<'extractor> {
pub fn insert_del_u32(
&mut self,
hasher: &DefaultHashBuilder,
alloc: &RefBump<'extractor>,
buckets: usize,
key: &[u8],
n: u32,
) {
let hash = compute_bytes_hash(hasher, key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k.as_ref() == key) {
RawEntryMut::Occupied(mut entry) => {
let DelAddRoaringBitmap { del, add: _ } = entry.get_mut();
del.get_or_insert_with(RoaringBitmap::default).insert(n);
entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n);
}
RawEntryMut::Vacant(entry) => {
let alloc = RefBump::clone(&self.alloc);
let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key));
entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del_u32(n));
let alloc = RefBump::clone(&alloc);
let key = RefBump::map(alloc, |a| a.alloc_slice_copy(key));
entry.insert_hashed_nocheck(
hash,
RefBytes(key),
DelAddRoaringBitmap::new_del_u32(n),
);
}
}
}
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) {
match self.cache.raw_entry_mut().from_key(key) {
pub fn insert_add_u32(
&mut self,
hasher: &DefaultHashBuilder,
alloc: &RefBump<'extractor>,
buckets: usize,
key: &[u8],
n: u32,
) {
let hash = compute_bytes_hash(hasher, key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k.as_ref() == key) {
RawEntryMut::Occupied(mut entry) => {
let DelAddRoaringBitmap { del: _, add } = entry.get_mut();
add.get_or_insert_with(RoaringBitmap::default).insert(n);
entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n);
}
RawEntryMut::Vacant(entry) => {
let alloc = RefBump::clone(&self.alloc);
let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key));
entry.insert(RefBytes(key), DelAddRoaringBitmap::new_add_u32(n));
let alloc = RefBump::clone(&alloc);
let key = RefBump::map(alloc, |a| a.alloc_slice_copy(key));
entry.insert_hashed_nocheck(
hash,
RefBytes(key),
DelAddRoaringBitmap::new_add_u32(n),
);
}
}
}
}
pub fn spill_to_disk(self) -> io::Result<SpilledCache> {
let Self { cache, alloc: _, mut spilled_entries, mut deladd_buffer, mut cbo_buffer } = self;
struct SpillingCaches<'extractor> {
caches: Vec<
HashMap<RefBytes<'extractor>, DelAddRoaringBitmap, DefaultHashBuilder, RefBump<'extractor>>,
>,
// TODO it must be a grenad Sorter with a DelAddCboRoaringBitmapCodec
spilled_entries: Vec<UnorderedEntries>,
deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>,
}
for (key, deladd) in cache {
impl<'extractor> SpillingCaches<'extractor> {
pub fn insert_del_u32(
&mut self,
hasher: &DefaultHashBuilder,
buckets: usize,
key: &[u8],
n: u32,
) -> io::Result<()> {
let hash = compute_bytes_hash(hasher, key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k.as_ref() == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n);
Ok(())
}
RawEntryMut::Vacant(_entry) => {
let deladd = DelAddRoaringBitmap::new_del_u32(n);
spill_entry_to_disk(
&mut spilled_entries,
&mut deladd_buffer,
&mut cbo_buffer,
&key,
&mut self.spilled_entries[bucket],
&mut self.deladd_buffer,
&mut self.cbo_buffer,
key,
deladd,
)?;
)
}
}
}
Ok(SpilledCache { spilled_entries, deladd_buffer, cbo_buffer })
pub fn insert_add_u32(
&mut self,
hasher: &DefaultHashBuilder,
buckets: usize,
key: &[u8],
n: u32,
) -> io::Result<()> {
let hash = compute_bytes_hash(hasher, key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |k| k.as_ref() == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n);
Ok(())
}
// TODO Do not spill to disk if not necessary
pub fn into_unordered_entries(self) -> io::Result<UnorderedEntriesIntoIter> {
let Self { cache, alloc: _, mut spilled_entries, mut cbo_buffer, mut deladd_buffer } = self;
for (key, deladd) in cache {
RawEntryMut::Vacant(_entry) => {
let deladd = DelAddRoaringBitmap::new_add_u32(n);
spill_entry_to_disk(
&mut spilled_entries,
&mut deladd_buffer,
&mut cbo_buffer,
&key,
&mut self.spilled_entries[bucket],
&mut self.deladd_buffer,
&mut self.cbo_buffer,
key,
deladd,
)?;
)
}
}
}
}
spilled_entries.into_iter_bitmap()
}
fn compute_bytes_hash<S: BuildHasher>(hash_builder: &S, key: &[u8]) -> u64 {
use std::hash::{Hash, Hasher};
let mut state = hash_builder.build_hasher();
key.hash(&mut state);
state.finish()
}
fn compute_bucket_from_hash(buckets: usize, hash: u64) -> usize {
hash as usize % buckets
}
fn spill_entry_to_disk(
@ -182,27 +294,6 @@ fn spill_entry_to_disk(
spilled_entries.push(key, bytes)
}
pub struct SpilledCache {
spilled_entries: UnorderedEntries,
deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>,
}
impl SpilledCache {
pub fn reconstruct(self, alloc: RefBump<'_>) -> CboCachedSorter<'_> {
let SpilledCache { spilled_entries, deladd_buffer, cbo_buffer } = self;
CboCachedSorter {
cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)),
alloc,
spilled_entries,
deladd_buffer,
cbo_buffer,
}
}
}
unsafe impl<'extractor> MostlySend for CboCachedSorter<'extractor> {}
pub struct UnorderedEntries {
entry_sizes: Vec<(u32, u32)>,
file: BufWriter<File>,
@ -291,40 +382,6 @@ impl UnorderedEntriesIntoIter {
}
}
#[derive(Default, Debug)]
struct Stats {
pub len: usize,
pub average: f32,
pub mean: u32,
pub min: u32,
pub max: u32,
}
impl Stats {
fn from_slice(slice: &mut [u32]) -> Stats {
slice.sort_unstable();
Self::from_sorted_slice(slice)
}
fn from_slice_p99(slice: &mut [u32]) -> Stats {
slice.sort_unstable();
let new_len = slice.len() - (slice.len() as f32 / 100.0) as usize;
match slice.get(..new_len) {
Some(slice) => Self::from_sorted_slice(slice),
None => Stats::default(),
}
}
fn from_sorted_slice(slice: &[u32]) -> Stats {
let sum: f64 = slice.iter().map(|i| *i as f64).sum();
let average = (sum / slice.len() as f64) as f32;
let mean = *slice.len().checked_div(2).and_then(|middle| slice.get(middle)).unwrap_or(&0);
let min = *slice.first().unwrap_or(&0);
let max = *slice.last().unwrap_or(&0);
Stats { len: slice.len(), average, mean, min, max }
}
}
#[derive(Debug, Clone)]
pub struct DelAddRoaringBitmap {
pub(crate) del: Option<RoaringBitmap>,

View File

@ -221,7 +221,7 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
type Data = RefCell<Option<WordDocidsCachedSorters<'extractor>>>;
fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> {
Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in(extractor_alloc))))
Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in(extractor_alloc)?)))
}
fn process(