diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index 54991159d..94722cd97 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -1,14 +1,20 @@ +use std::cmp::Ordering; +use std::collections::binary_heap::PeekMut; +use std::collections::BinaryHeap; use std::fs::File; use std::hash::BuildHasher; -use std::{iter, mem}; +use std::{io, iter, mem}; use bumpalo::Bump; +use grenad::ReaderCursor; use hashbrown::hash_map::RawEntryMut; use hashbrown::{DefaultHashBuilder, HashMap, HashSet}; use raw_collections::map::FrozenMap; use roaring::RoaringBitmap; use crate::update::del_add::{DelAdd, KvWriterDelAdd}; +use crate::update::new::indexer::document_changes::MostlySend; +use crate::update::new::KvReaderDelAdd; use crate::update::MergeDeladdCboRoaringBitmaps; use crate::CboRoaringBitmapCodec; @@ -75,7 +81,7 @@ impl<'extractor> CboCachedSorter<'extractor> { Self { hasher: DefaultHashBuilder::default(), caches: InnerCaches::Normal(NormalCaches { - caches: std::iter::repeat_with(|| HashMap::new_in(alloc)).take(buckets).collect(), + caches: iter::repeat_with(|| HashMap::new_in(alloc)).take(buckets).collect(), }), alloc, } @@ -131,6 +137,8 @@ impl<'extractor> CboCachedSorter<'extractor> { } } +unsafe impl MostlySend for CboCachedSorter<'_> {} + struct NormalCaches<'extractor> { caches: Vec>, @@ -323,17 +331,102 @@ pub struct FrozenCache<'a, 'extractor> { spilled: grenad::Reader, } -pub fn merge_me(frozen: Vec, f: F) -> crate::Result<()> +pub fn merge_me(frozen: Vec, mut iter: F) -> crate::Result<()> where F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> crate::Result<()>, { + let (mut maps, spilled): (Vec<_>, Vec<_>) = + frozen.into_iter().map(|FrozenCache { cache, spilled }| (cache, spilled)).collect(); + // First manage the spilled entries by looking into the HashMaps and then merge them. + let mut heap = BinaryHeap::new(); + for (source_index, source) in spilled.into_iter().enumerate() { + let mut cursor = source.into_cursor()?; + if cursor.move_on_next()?.is_some() { + heap.push(Entry { cursor, source_index }); + } + } + + loop { + let mut first_entry = match heap.pop() { + Some(entry) => entry, + None => break, + }; + + let (first_key, first_value) = match first_entry.cursor.current() { + Some((key, value)) => (key, value), + None => break, + }; + + let mut output = DelAddRoaringBitmap::from_bytes(first_value)?; + while let Some(mut entry) = heap.peek_mut() { + if let Some((key, _value)) = entry.cursor.current() { + if first_key == key { + let new = DelAddRoaringBitmap::from_bytes(first_value)?; + output.merge(new); + // When we are done we the current value of this entry move make + // it move forward and let the heap reorganize itself (on drop) + if entry.cursor.move_on_next()?.is_none() { + PeekMut::pop(entry); + } + } else { + break; + } + } + } + + // Once we merged all of the spilled bitmaps we must also + // fetch the entries from the non-spilled entries (the HashMaps). + for (map_index, map) in maps.iter_mut().enumerate() { + if first_entry.source_index != map_index { + if let Some(new) = map.get_mut(first_key) { + let new = mem::replace(new, DelAddRoaringBitmap::dummy()); + output.merge(new); + } + } + } + + // We send the merged entry outside. + (iter)(first_key, output)?; + + // Don't forget to put the first entry back into the heap. + if first_entry.cursor.move_on_next()?.is_some() { + heap.push(first_entry) + } + } // Then manage the content on the HashMap that weren't taken (mem::take). todo!() } +struct Entry { + cursor: ReaderCursor, + source_index: usize, +} + +impl Ord for Entry { + fn cmp(&self, other: &Entry) -> Ordering { + let skey = self.cursor.current().map(|(k, _)| k); + let okey = other.cursor.current().map(|(k, _)| k); + skey.cmp(&okey).then(self.source_index.cmp(&other.source_index)).reverse() + } +} + +impl Eq for Entry {} + +impl PartialEq for Entry { + fn eq(&self, other: &Entry) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl PartialOrd for Entry { + fn partial_cmp(&self, other: &Entry) -> Option { + Some(self.cmp(other)) + } +} + #[derive(Debug, Clone)] pub struct DelAddRoaringBitmap { pub(crate) del: Option, @@ -341,6 +434,26 @@ pub struct DelAddRoaringBitmap { } impl DelAddRoaringBitmap { + fn from_bytes(bytes: &[u8]) -> io::Result { + let reader = KvReaderDelAdd::from_slice(bytes); + + let del = match reader.get(DelAdd::Deletion) { + Some(bytes) => CboRoaringBitmapCodec::deserialize_from(bytes).map(Some)?, + None => None, + }; + + let add = match reader.get(DelAdd::Addition) { + Some(bytes) => CboRoaringBitmapCodec::deserialize_from(bytes).map(Some)?, + None => None, + }; + + Ok(DelAddRoaringBitmap { del, add }) + } + + fn dummy() -> DelAddRoaringBitmap { + DelAddRoaringBitmap { del: None, add: None } + } + fn new_del_add_u32(n: u32) -> Self { DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])), @@ -363,4 +476,21 @@ impl DelAddRoaringBitmap { fn new_add_u32(n: u32) -> Self { DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) } } + + fn merge(&mut self, rhs: DelAddRoaringBitmap) { + let DelAddRoaringBitmap { del, add } = self; + let DelAddRoaringBitmap { del: ndel, add: nadd } = rhs; + + *del = match (mem::take(del), ndel) { + (None, None) => None, + (None, Some(del)) | (Some(del), None) => Some(del), + (Some(del), Some(ndel)) => Some(del | ndel), + }; + + *add = match (mem::take(add), nadd) { + (None, None) => None, + (None, Some(add)) | (Some(add), None) => Some(add), + (Some(add), Some(nadd)) => Some(add | nadd), + }; + } } diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 947a6e6d2..e12c9a6ac 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -31,7 +31,7 @@ pub struct FacetedExtractorData<'extractor> { impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> { type Data = RefCell>; - fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result { + fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { Ok(RefCell::new(CboCachedSorter::new_in(extractor_alloc)?)) } @@ -221,7 +221,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, - extractor_allocs: &mut ThreadLocal>>, + extractor_allocs: &mut ThreadLocal>, ) -> Result> { let max_memory = grenad_parameters.max_memory_by_thread(); diff --git a/milli/src/update/new/extract/mod.rs b/milli/src/update/new/extract/mod.rs index aa69a69b1..4c4374a8f 100644 --- a/milli/src/update/new/extract/mod.rs +++ b/milli/src/update/new/extract/mod.rs @@ -19,7 +19,7 @@ pub trait DocidsExtractor { grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, - extractor_allocs: &mut ThreadLocal>>, + extractor_allocs: &mut ThreadLocal>, ) -> Result>; } diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index b932c9a64..cc954430d 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -17,29 +17,31 @@ use crate::update::new::indexer::document_changes::{ IndexingContext, MostlySend, RefCellExt, ThreadLocal, }; use crate::update::new::DocumentChange; +use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; const MAX_COUNTED_WORDS: usize = 30; -pub struct WordDocidsCachedSorters<'indexer> { - word_fid_docids: CboCachedSorter<'indexer>, - word_docids: CboCachedSorter<'indexer>, - exact_word_docids: CboCachedSorter<'indexer>, - word_position_docids: CboCachedSorter<'indexer>, - fid_word_count_docids: CboCachedSorter<'indexer>, +pub struct WordDocidsCachedSorters<'extractor> { + word_fid_docids: CboCachedSorter<'extractor>, + word_docids: CboCachedSorter<'extractor>, + exact_word_docids: CboCachedSorter<'extractor>, + word_position_docids: CboCachedSorter<'extractor>, + fid_word_count_docids: CboCachedSorter<'extractor>, fid_word_count: HashMap, current_docid: Option, } -unsafe impl<'indexer> MostlySend for WordDocidsCachedSorters<'indexer> {} +unsafe impl<'extractor> MostlySend for WordDocidsCachedSorters<'extractor> {} -impl<'indexer> WordDocidsCachedSorters<'indexer> { - pub fn new_in(alloc: RefBump<'indexer>) -> io::Result { +impl<'extractor> WordDocidsCachedSorters<'extractor> { + /// TODO Make sure to give the same max_memory to all of them, without splitting it + pub fn new_in(alloc: &'extractor Bump) -> io::Result { Ok(Self { - word_fid_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?, - word_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?, - exact_word_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?, - word_position_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?, + word_fid_docids: CboCachedSorter::new_in(alloc)?, + word_docids: CboCachedSorter::new_in(alloc)?, + exact_word_docids: CboCachedSorter::new_in(alloc)?, + word_position_docids: CboCachedSorter::new_in(alloc)?, fid_word_count_docids: CboCachedSorter::new_in(alloc)?, fid_word_count: HashMap::new(), current_docid: None, @@ -220,7 +222,7 @@ pub struct WordDocidsExtractorData<'extractor> { impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { type Data = RefCell>>; - fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result { + fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in(extractor_alloc)?))) } @@ -231,50 +233,6 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { ) -> Result<()> { WordDocidsExtractors::extract_document_change(context, self.tokenizer, change) } - - fn spill_if_needed<'doc>( - &'doc self, - data: &'doc Self::Data, - extractor_alloc: &'extractor RefCell, - ) -> Result<()> { - if self.max_memory.map_or(true, |mm| extractor_alloc.borrow().allocated_bytes() < mm) { - return Ok(()); - } - - let sorters = data.borrow_mut().take().unwrap(); - let WordDocidsCachedSorters { - word_fid_docids, - word_docids, - exact_word_docids, - word_position_docids, - fid_word_count_docids, - // Note that fid_word_count and current_docid do not have any reference - // to any 'extractor-allocated memory so we don't have to spill it to disk - fid_word_count, - current_docid, - } = sorters; - - let spilled_word_fid_docids = word_fid_docids.spill_to_disk()?; - let spilled_word_docids = word_docids.spill_to_disk()?; - let spilled_exact_word_docids = exact_word_docids.spill_to_disk()?; - let spilled_word_position_docids = word_position_docids.spill_to_disk()?; - let spilled_fid_word_count_docids = fid_word_count_docids.spill_to_disk()?; - - extractor_alloc.borrow_mut().reset(); - - let alloc = RefBump::new(extractor_alloc.borrow()); - *data.borrow_mut() = Some(WordDocidsCachedSorters { - word_fid_docids: spilled_word_fid_docids.reconstruct(RefBump::clone(&alloc)), - word_docids: spilled_word_docids.reconstruct(RefBump::clone(&alloc)), - exact_word_docids: spilled_exact_word_docids.reconstruct(RefBump::clone(&alloc)), - word_position_docids: spilled_word_position_docids.reconstruct(RefBump::clone(&alloc)), - fid_word_count_docids: spilled_fid_word_count_docids.reconstruct(alloc), - fid_word_count, - current_docid, - }); - - Ok(()) - } } pub struct WordDocidsExtractors; @@ -283,7 +241,7 @@ impl WordDocidsExtractors { pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, - extractor_allocs: &mut ThreadLocal>>, + extractor_allocs: &mut ThreadLocal>, ) -> Result { let max_memory = grenad_parameters.max_memory_by_thread(); let index = indexing_context.index; diff --git a/milli/src/update/new/indexer/document_changes.rs b/milli/src/update/new/indexer/document_changes.rs index 7bcf2c1dc..70a1eb676 100644 --- a/milli/src/update/new/indexer/document_changes.rs +++ b/milli/src/update/new/indexer/document_changes.rs @@ -260,7 +260,7 @@ pub struct DocumentChangeContext< pub doc_alloc: Bump, /// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills. - pub extractor_alloc: &'extractor RefCell, + pub extractor_alloc: &'extractor Bump, /// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents doc_allocs: &'doc ThreadLocal>>, @@ -283,14 +283,14 @@ impl< index: &'indexer Index, db_fields_ids_map: &'indexer FieldsIdsMap, new_fields_ids_map: &'fid RwLock, - extractor_allocs: &'extractor ThreadLocal>>, + extractor_allocs: &'extractor ThreadLocal>, doc_allocs: &'doc ThreadLocal>>, datastore: &'data ThreadLocal, fields_ids_map_store: &'doc ThreadLocal>>>, init_data: F, ) -> Result where - F: FnOnce(RefBump<'extractor>) -> Result, + F: FnOnce(&'extractor Bump) -> Result, { let doc_alloc = doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024 * 1024)))); @@ -301,9 +301,7 @@ impl< let fields_ids_map = &fields_ids_map.0; let extractor_alloc = extractor_allocs.get_or_default(); - let extractor_alloc_ref = RefBump::new(extractor_alloc.0.borrow_or_yield()); - - let data = datastore.get_or_try(move || init_data(extractor_alloc_ref))?; + let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?; let txn = index.read_txn()?; Ok(DocumentChangeContext { @@ -323,30 +321,13 @@ impl< pub trait Extractor<'extractor>: Sync { type Data: MostlySend; - fn init_data<'doc>(&'doc self, extractor_alloc: RefBump<'extractor>) -> Result; + fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result; fn process<'doc>( &'doc self, change: DocumentChange<'doc>, context: &'doc DocumentChangeContext, ) -> Result<()>; - - /// Gives a chance to the data to spill itself on disk and reset the extractor allocator. - /// - /// The Extractor and data implementations are responsible for checking that there are no live allocations before - /// attempting to reset the extractor allocator. Failure to uphold this invariant will result in a panic. - /// - /// # Panics - /// - /// - calls `extractor_alloc.borrow_mut()` while there are outstanding allocations referencing the `extractor_alloc` - #[inline] - fn spill_if_needed<'doc>( - &'doc self, - _data: &'doc Self::Data, - _extractor_alloc: &'extractor RefCell, - ) -> Result<()> { - Ok(()) - } } pub trait DocumentChanges<'pl // lifetime of the underlying payload @@ -396,7 +377,7 @@ pub fn for_each_document_change< doc_allocs, fields_ids_map_store, }: IndexingContext<'fid, 'indexer, 'index>, - extractor_allocs: &'extractor mut ThreadLocal>>, + extractor_allocs: &'extractor mut ThreadLocal>, datastore: &'data ThreadLocal, ) -> Result<()> where @@ -404,7 +385,7 @@ where { // Clean up and reuse the extractor allocs for extractor_alloc in extractor_allocs.iter_mut() { - extractor_alloc.0.get_mut().reset(); + extractor_alloc.0.reset(); } let pi = document_changes.iter(); @@ -436,8 +417,6 @@ where // send back the doc_alloc in the pool context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); - extractor.spill_if_needed(&context.data, &context.extractor_alloc)?; - res }, )