diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index 232eba594..2db996aa8 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -3,6 +3,7 @@ use std::fmt::Write as _; use bumpalo::Bump; use grenad::{MergeFunction, Sorter}; +use hashbrown::hash_map::RawEntryMut; use raw_collections::alloc::{RefBump, RefBytes}; use roaring::bitmap::Statistics; use roaring::RoaringBitmap; @@ -12,17 +13,16 @@ use crate::CboRoaringBitmapCodec; const KEY_SIZE: usize = 12; -#[derive(Debug)] +// #[derive(Debug)] pub struct CboCachedSorter<'extractor, MF> { - cache: Option< - hashbrown::HashMap< - // TODO check the size of it - RefBytes<'extractor>, - DelAddRoaringBitmap, - hashbrown::DefaultHashBuilder, - RefBump<'extractor>, - >, + cache: hashbrown::HashMap< + // TODO check the size of it + RefBytes<'extractor>, + DelAddRoaringBitmap, + hashbrown::DefaultHashBuilder, + RefBump<'extractor>, >, + alloc: RefBump<'extractor>, sorter: Sorter, deladd_buffer: Vec, cbo_buffer: Vec, @@ -34,7 +34,8 @@ impl<'extractor, MF> CboCachedSorter<'extractor, MF> { /// TODO may add the capacity pub fn new_in(sorter: Sorter, alloc: RefBump<'extractor>) -> Self { CboCachedSorter { - cache: Some(hashbrown::HashMap::new_in(alloc)), + cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)), + alloc, sorter, deladd_buffer: Vec::new(), cbo_buffer: Vec::new(), @@ -45,103 +46,85 @@ impl<'extractor, MF> CboCachedSorter<'extractor, MF> { } impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> { - pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> { - match self.cache.unwrap().get_mut(key) { - Some(DelAddRoaringBitmap { del, add: _ }) => { + pub fn insert_del_u32(&mut self, key: &[u8], n: u32) { + match self.cache.raw_entry_mut().from_key(key) { + RawEntryMut::Occupied(mut entry) => { + let DelAddRoaringBitmap { del, add: _ } = entry.get_mut(); del.get_or_insert_with(RoaringBitmap::default).insert(n); } - None => { + RawEntryMut::Vacant(entry) => { self.total_insertions += 1; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; - let value = DelAddRoaringBitmap::new_del_u32(n); - if let Some((key, deladd)) = self.cache.push(key.into(), value) { - self.write_entry(key, deladd)?; - } + let alloc = RefBump::clone(&self.alloc); + let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); + entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del_u32(n)); } } - - Ok(()) } - pub fn insert_del( - &mut self, - key: &[u8], - bitmap: RoaringBitmap, - ) -> grenad::Result<(), MF::Error> { - match self.cache.unwrap().get_mut(key) { - Some(DelAddRoaringBitmap { del, add: _ }) => { + pub fn insert_del(&mut self, key: &[u8], bitmap: RoaringBitmap) { + match self.cache.raw_entry_mut().from_key(key) { + RawEntryMut::Occupied(mut entry) => { + let DelAddRoaringBitmap { del, add: _ } = entry.get_mut(); *del.get_or_insert_with(RoaringBitmap::default) |= bitmap; } - None => { + RawEntryMut::Vacant(entry) => { self.total_insertions += 1; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; - let value = DelAddRoaringBitmap::new_del(bitmap); - if let Some((key, deladd)) = self.cache.push(key.into(), value) { - self.write_entry(key, deladd)?; - } + let alloc = RefBump::clone(&self.alloc); + let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); + entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del(bitmap)); } } - - Ok(()) } - pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> { - match self.cache.unwrap().get_mut(key) { - Some(DelAddRoaringBitmap { del: _, add }) => { + pub fn insert_add_u32(&mut self, key: &[u8], n: u32) { + match self.cache.raw_entry_mut().from_key(key) { + RawEntryMut::Occupied(mut entry) => { + let DelAddRoaringBitmap { del: _, add } = entry.get_mut(); add.get_or_insert_with(RoaringBitmap::default).insert(n); } - None => { + RawEntryMut::Vacant(entry) => { self.total_insertions += 1; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; - let value = DelAddRoaringBitmap::new_add_u32(n); - if let Some((key, deladd)) = self.cache.push(key.into(), value) { - self.write_entry(key, deladd)?; - } + let alloc = RefBump::clone(&self.alloc); + let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); + entry.insert(RefBytes(key), DelAddRoaringBitmap::new_add_u32(n)); } } - - Ok(()) } - pub fn insert_add( - &mut self, - key: &[u8], - bitmap: RoaringBitmap, - ) -> grenad::Result<(), MF::Error> { - match self.cache.unwrap().get_mut(key) { - Some(DelAddRoaringBitmap { del: _, add }) => { + pub fn insert_add(&mut self, key: &[u8], bitmap: RoaringBitmap) { + match self.cache.raw_entry_mut().from_key(key) { + RawEntryMut::Occupied(mut entry) => { + let DelAddRoaringBitmap { del: _, add } = entry.get_mut(); *add.get_or_insert_with(RoaringBitmap::default) |= bitmap; } - None => { + RawEntryMut::Vacant(entry) => { self.total_insertions += 1; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; - let value = DelAddRoaringBitmap::new_add(bitmap); - if let Some((key, deladd)) = self.cache.push(key.into(), value) { - self.write_entry(key, deladd)?; - } + let alloc = RefBump::clone(&self.alloc); + let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); + entry.insert(RefBytes(key), DelAddRoaringBitmap::new_add(bitmap)); } } - - Ok(()) } - pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> { - match self.cache.unwrap().get_mut(key) { - Some(DelAddRoaringBitmap { del, add }) => { + pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) { + match self.cache.raw_entry_mut().from_key(key) { + RawEntryMut::Occupied(mut entry) => { + let DelAddRoaringBitmap { del, add } = entry.get_mut(); del.get_or_insert_with(RoaringBitmap::default).insert(n); add.get_or_insert_with(RoaringBitmap::default).insert(n); } - None => { + RawEntryMut::Vacant(entry) => { self.total_insertions += 1; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; - let value = DelAddRoaringBitmap::new_del_add_u32(n); - if let Some((key, deladd)) = self.cache.push(key.into(), value) { - self.write_entry(key, deladd)?; - } + let alloc = RefBump::clone(&self.alloc); + let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); + entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del_add_u32(n)); } } - - Ok(()) } fn write_entry>( @@ -182,23 +165,25 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> { self.sorter.insert(key, val) } - pub fn spill_to_disk(&mut self, bump: &'extractor RefCell) -> std::io::Result<()> { - let cache = self.cache.take().unwrap(); + pub fn spill_to_disk(self) -> std::io::Result> { + let Self { + cache, + alloc: _, + sorter, + deladd_buffer, + cbo_buffer, + total_insertions, + fitted_in_key, + } = self; /// I want to spill to disk for real drop(cache); - bump.borrow_mut().reset(); - - let alloc = RefBump::new(bump.borrow()); - self.cache = Some(hashbrown::HashMap::new_in(alloc)); - - Ok(()) + Ok(SpilledCache { sorter, deladd_buffer, cbo_buffer, total_insertions, fitted_in_key }) } pub fn into_sorter(self) -> grenad::Result, MF::Error> { let Self { cache, sorter, total_insertions, fitted_in_key, .. } = self; - let cache = cache.unwrap(); let mut all_n_containers = Vec::new(); let mut all_n_array_containers = Vec::new(); @@ -260,6 +245,34 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> { } } +pub struct SpilledCache { + sorter: Sorter, + deladd_buffer: Vec, + cbo_buffer: Vec, + total_insertions: usize, + fitted_in_key: usize, +} + +impl SpilledCache { + pub fn reconstruct<'extractor>( + self, + alloc: RefBump<'extractor>, + ) -> CboCachedSorter<'extractor, MF> { + let SpilledCache { sorter, deladd_buffer, cbo_buffer, total_insertions, fitted_in_key } = + self; + + CboCachedSorter { + cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)), + alloc, + sorter, + deladd_buffer, + cbo_buffer, + total_insertions, + fitted_in_key, + } + } +} + #[derive(Default, Debug)] struct Stats { pub len: usize, diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index c76ab49d0..ef75b5f57 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -7,6 +7,7 @@ use std::ops::DerefMut as _; use bumpalo::Bump; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; +use raw_collections::alloc::RefBump; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use crate::update::new::extract::cache::CboCachedSorter; @@ -332,6 +333,49 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { ) -> Result<()> { WordDocidsExtractors::extract_document_change(context, self.tokenizer, change) } + + fn spill_if_needed<'doc>( + &'doc self, + data: &'doc Self::Data, + extractor_alloc: &'extractor RefCell, + ) -> Result<()> { + if self.max_memory.map_or(true, |mm| extractor_alloc.borrow().allocated_bytes() < mm) { + return Ok(()); + } + + let mut data = data.0.borrow_mut(); + let WordDocidsCachedSorters { + word_fid_docids, + word_docids, + exact_word_docids, + word_position_docids, + fid_word_count_docids, + fid_word_count, + current_docid, + } = &mut *data; + + let spilled_word_fid_docids = word_fid_docids.spill_to_disk()?; + let spilled_word_docids = word_docids.spill_to_disk()?; + let spilled_exact_word_docids = exact_word_docids.spill_to_disk()?; + let spilled_word_position_docids = word_position_docids.spill_to_disk()?; + let spilled_fid_word_count_docids = fid_word_count_docids.spill_to_disk()?; + // let spilled_fid_word_count = fid_word_count.spill_to_disk()?; + // let spilled_current_docid = current_docid.spill_to_disk()?; + + extractor_alloc.borrow_mut().reset(); + + let alloc = RefBump::new(extractor_alloc.borrow()); + data.word_fid_docids = spilled_word_fid_docids.reconstruct(RefBump::clone(&alloc)); + data.word_docids = spilled_word_docids.reconstruct(RefBump::clone(&alloc)); + data.exact_word_docids = spilled_exact_word_docids.reconstruct(RefBump::clone(&alloc)); + data.word_position_docids = + spilled_word_position_docids.reconstruct(RefBump::clone(&alloc)); + data.fid_word_count_docids = spilled_fid_word_count_docids.reconstruct(alloc); + // data.fid_word_count = spilled_fid_word_count.reconstruct(); + // data.current_docid = spilled_current_docid.reconstruct(); + + Ok(()) + } } pub struct WordDocidsExtractors; diff --git a/milli/src/update/new/indexer/document_changes.rs b/milli/src/update/new/indexer/document_changes.rs index 183ba8b56..96a7681d0 100644 --- a/milli/src/update/new/indexer/document_changes.rs +++ b/milli/src/update/new/indexer/document_changes.rs @@ -432,7 +432,7 @@ where // send back the doc_alloc in the pool context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); - extractor.spill_if_needed(&context.data, &context.extractor_alloc); + extractor.spill_if_needed(&context.data, &context.extractor_alloc)?; res },