Improve the cache again

This commit is contained in:
Clément Renault 2024-10-16 15:03:53 +02:00
parent f095477c0c
commit 05a015b27c
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 135 additions and 78 deletions

View File

@ -3,6 +3,7 @@ use std::fmt::Write as _;
use bumpalo::Bump; use bumpalo::Bump;
use grenad::{MergeFunction, Sorter}; use grenad::{MergeFunction, Sorter};
use hashbrown::hash_map::RawEntryMut;
use raw_collections::alloc::{RefBump, RefBytes}; use raw_collections::alloc::{RefBump, RefBytes};
use roaring::bitmap::Statistics; use roaring::bitmap::Statistics;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
@ -12,17 +13,16 @@ use crate::CboRoaringBitmapCodec;
const KEY_SIZE: usize = 12; const KEY_SIZE: usize = 12;
#[derive(Debug)] // #[derive(Debug)]
pub struct CboCachedSorter<'extractor, MF> { pub struct CboCachedSorter<'extractor, MF> {
cache: Option< cache: hashbrown::HashMap<
hashbrown::HashMap< // TODO check the size of it
// TODO check the size of it RefBytes<'extractor>,
RefBytes<'extractor>, DelAddRoaringBitmap,
DelAddRoaringBitmap, hashbrown::DefaultHashBuilder,
hashbrown::DefaultHashBuilder, RefBump<'extractor>,
RefBump<'extractor>,
>,
>, >,
alloc: RefBump<'extractor>,
sorter: Sorter<MF>, sorter: Sorter<MF>,
deladd_buffer: Vec<u8>, deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>, cbo_buffer: Vec<u8>,
@ -34,7 +34,8 @@ impl<'extractor, MF> CboCachedSorter<'extractor, MF> {
/// TODO may add the capacity /// TODO may add the capacity
pub fn new_in(sorter: Sorter<MF>, alloc: RefBump<'extractor>) -> Self { pub fn new_in(sorter: Sorter<MF>, alloc: RefBump<'extractor>) -> Self {
CboCachedSorter { CboCachedSorter {
cache: Some(hashbrown::HashMap::new_in(alloc)), cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)),
alloc,
sorter, sorter,
deladd_buffer: Vec::new(), deladd_buffer: Vec::new(),
cbo_buffer: Vec::new(), cbo_buffer: Vec::new(),
@ -45,103 +46,85 @@ impl<'extractor, MF> CboCachedSorter<'extractor, MF> {
} }
impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> { impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> {
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> { pub fn insert_del_u32(&mut self, key: &[u8], n: u32) {
match self.cache.unwrap().get_mut(key) { match self.cache.raw_entry_mut().from_key(key) {
Some(DelAddRoaringBitmap { del, add: _ }) => { RawEntryMut::Occupied(mut entry) => {
let DelAddRoaringBitmap { del, add: _ } = entry.get_mut();
del.get_or_insert_with(RoaringBitmap::default).insert(n); del.get_or_insert_with(RoaringBitmap::default).insert(n);
} }
None => { RawEntryMut::Vacant(entry) => {
self.total_insertions += 1; self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_del_u32(n); let alloc = RefBump::clone(&self.alloc);
if let Some((key, deladd)) = self.cache.push(key.into(), value) { let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key));
self.write_entry(key, deladd)?; entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del_u32(n));
}
} }
} }
Ok(())
} }
pub fn insert_del( pub fn insert_del(&mut self, key: &[u8], bitmap: RoaringBitmap) {
&mut self, match self.cache.raw_entry_mut().from_key(key) {
key: &[u8], RawEntryMut::Occupied(mut entry) => {
bitmap: RoaringBitmap, let DelAddRoaringBitmap { del, add: _ } = entry.get_mut();
) -> grenad::Result<(), MF::Error> {
match self.cache.unwrap().get_mut(key) {
Some(DelAddRoaringBitmap { del, add: _ }) => {
*del.get_or_insert_with(RoaringBitmap::default) |= bitmap; *del.get_or_insert_with(RoaringBitmap::default) |= bitmap;
} }
None => { RawEntryMut::Vacant(entry) => {
self.total_insertions += 1; self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_del(bitmap); let alloc = RefBump::clone(&self.alloc);
if let Some((key, deladd)) = self.cache.push(key.into(), value) { let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key));
self.write_entry(key, deladd)?; entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del(bitmap));
}
} }
} }
Ok(())
} }
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> { pub fn insert_add_u32(&mut self, key: &[u8], n: u32) {
match self.cache.unwrap().get_mut(key) { match self.cache.raw_entry_mut().from_key(key) {
Some(DelAddRoaringBitmap { del: _, add }) => { RawEntryMut::Occupied(mut entry) => {
let DelAddRoaringBitmap { del: _, add } = entry.get_mut();
add.get_or_insert_with(RoaringBitmap::default).insert(n); add.get_or_insert_with(RoaringBitmap::default).insert(n);
} }
None => { RawEntryMut::Vacant(entry) => {
self.total_insertions += 1; self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_add_u32(n); let alloc = RefBump::clone(&self.alloc);
if let Some((key, deladd)) = self.cache.push(key.into(), value) { let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key));
self.write_entry(key, deladd)?; entry.insert(RefBytes(key), DelAddRoaringBitmap::new_add_u32(n));
}
} }
} }
Ok(())
} }
pub fn insert_add( pub fn insert_add(&mut self, key: &[u8], bitmap: RoaringBitmap) {
&mut self, match self.cache.raw_entry_mut().from_key(key) {
key: &[u8], RawEntryMut::Occupied(mut entry) => {
bitmap: RoaringBitmap, let DelAddRoaringBitmap { del: _, add } = entry.get_mut();
) -> grenad::Result<(), MF::Error> {
match self.cache.unwrap().get_mut(key) {
Some(DelAddRoaringBitmap { del: _, add }) => {
*add.get_or_insert_with(RoaringBitmap::default) |= bitmap; *add.get_or_insert_with(RoaringBitmap::default) |= bitmap;
} }
None => { RawEntryMut::Vacant(entry) => {
self.total_insertions += 1; self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_add(bitmap); let alloc = RefBump::clone(&self.alloc);
if let Some((key, deladd)) = self.cache.push(key.into(), value) { let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key));
self.write_entry(key, deladd)?; entry.insert(RefBytes(key), DelAddRoaringBitmap::new_add(bitmap));
}
} }
} }
Ok(())
} }
pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> { pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) {
match self.cache.unwrap().get_mut(key) { match self.cache.raw_entry_mut().from_key(key) {
Some(DelAddRoaringBitmap { del, add }) => { RawEntryMut::Occupied(mut entry) => {
let DelAddRoaringBitmap { del, add } = entry.get_mut();
del.get_or_insert_with(RoaringBitmap::default).insert(n); del.get_or_insert_with(RoaringBitmap::default).insert(n);
add.get_or_insert_with(RoaringBitmap::default).insert(n); add.get_or_insert_with(RoaringBitmap::default).insert(n);
} }
None => { RawEntryMut::Vacant(entry) => {
self.total_insertions += 1; self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_del_add_u32(n); let alloc = RefBump::clone(&self.alloc);
if let Some((key, deladd)) = self.cache.push(key.into(), value) { let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key));
self.write_entry(key, deladd)?; entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del_add_u32(n));
}
} }
} }
Ok(())
} }
fn write_entry<A: AsRef<[u8]>>( fn write_entry<A: AsRef<[u8]>>(
@ -182,23 +165,25 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> {
self.sorter.insert(key, val) self.sorter.insert(key, val)
} }
pub fn spill_to_disk(&mut self, bump: &'extractor RefCell<Bump>) -> std::io::Result<()> { pub fn spill_to_disk(self) -> std::io::Result<SpilledCache<MF>> {
let cache = self.cache.take().unwrap(); let Self {
cache,
alloc: _,
sorter,
deladd_buffer,
cbo_buffer,
total_insertions,
fitted_in_key,
} = self;
/// I want to spill to disk for real /// I want to spill to disk for real
drop(cache); drop(cache);
bump.borrow_mut().reset(); Ok(SpilledCache { sorter, deladd_buffer, cbo_buffer, total_insertions, fitted_in_key })
let alloc = RefBump::new(bump.borrow());
self.cache = Some(hashbrown::HashMap::new_in(alloc));
Ok(())
} }
pub fn into_sorter(self) -> grenad::Result<Sorter<MF>, MF::Error> { pub fn into_sorter(self) -> grenad::Result<Sorter<MF>, MF::Error> {
let Self { cache, sorter, total_insertions, fitted_in_key, .. } = self; let Self { cache, sorter, total_insertions, fitted_in_key, .. } = self;
let cache = cache.unwrap();
let mut all_n_containers = Vec::new(); let mut all_n_containers = Vec::new();
let mut all_n_array_containers = Vec::new(); let mut all_n_array_containers = Vec::new();
@ -260,6 +245,34 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> {
} }
} }
pub struct SpilledCache<MF> {
sorter: Sorter<MF>,
deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>,
total_insertions: usize,
fitted_in_key: usize,
}
impl<MF> SpilledCache<MF> {
pub fn reconstruct<'extractor>(
self,
alloc: RefBump<'extractor>,
) -> CboCachedSorter<'extractor, MF> {
let SpilledCache { sorter, deladd_buffer, cbo_buffer, total_insertions, fitted_in_key } =
self;
CboCachedSorter {
cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)),
alloc,
sorter,
deladd_buffer,
cbo_buffer,
total_insertions,
fitted_in_key,
}
}
}
#[derive(Default, Debug)] #[derive(Default, Debug)]
struct Stats { struct Stats {
pub len: usize, pub len: usize,

View File

@ -7,6 +7,7 @@ use std::ops::DerefMut as _;
use bumpalo::Bump; use bumpalo::Bump;
use grenad::{Merger, MergerBuilder}; use grenad::{Merger, MergerBuilder};
use heed::RoTxn; use heed::RoTxn;
use raw_collections::alloc::RefBump;
use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::cache::CboCachedSorter;
@ -332,6 +333,49 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
) -> Result<()> { ) -> Result<()> {
WordDocidsExtractors::extract_document_change(context, self.tokenizer, change) WordDocidsExtractors::extract_document_change(context, self.tokenizer, change)
} }
fn spill_if_needed<'doc>(
&'doc self,
data: &'doc Self::Data,
extractor_alloc: &'extractor RefCell<Bump>,
) -> Result<()> {
if self.max_memory.map_or(true, |mm| extractor_alloc.borrow().allocated_bytes() < mm) {
return Ok(());
}
let mut data = data.0.borrow_mut();
let WordDocidsCachedSorters {
word_fid_docids,
word_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
fid_word_count,
current_docid,
} = &mut *data;
let spilled_word_fid_docids = word_fid_docids.spill_to_disk()?;
let spilled_word_docids = word_docids.spill_to_disk()?;
let spilled_exact_word_docids = exact_word_docids.spill_to_disk()?;
let spilled_word_position_docids = word_position_docids.spill_to_disk()?;
let spilled_fid_word_count_docids = fid_word_count_docids.spill_to_disk()?;
// let spilled_fid_word_count = fid_word_count.spill_to_disk()?;
// let spilled_current_docid = current_docid.spill_to_disk()?;
extractor_alloc.borrow_mut().reset();
let alloc = RefBump::new(extractor_alloc.borrow());
data.word_fid_docids = spilled_word_fid_docids.reconstruct(RefBump::clone(&alloc));
data.word_docids = spilled_word_docids.reconstruct(RefBump::clone(&alloc));
data.exact_word_docids = spilled_exact_word_docids.reconstruct(RefBump::clone(&alloc));
data.word_position_docids =
spilled_word_position_docids.reconstruct(RefBump::clone(&alloc));
data.fid_word_count_docids = spilled_fid_word_count_docids.reconstruct(alloc);
// data.fid_word_count = spilled_fid_word_count.reconstruct();
// data.current_docid = spilled_current_docid.reconstruct();
Ok(())
}
} }
pub struct WordDocidsExtractors; pub struct WordDocidsExtractors;

View File

@ -432,7 +432,7 @@ where
// send back the doc_alloc in the pool // send back the doc_alloc in the pool
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));
extractor.spill_if_needed(&context.data, &context.extractor_alloc); extractor.spill_if_needed(&context.data, &context.extractor_alloc)?;
res res
}, },