Finished the merging of all spilled entries

This commit is contained in:
Clément Renault 2024-10-28 16:15:13 +01:00
parent 93d639ead1
commit 498f51c7b3
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
5 changed files with 160 additions and 93 deletions

View File

@ -1,14 +1,20 @@
use std::cmp::Ordering;
use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
use std::fs::File; use std::fs::File;
use std::hash::BuildHasher; use std::hash::BuildHasher;
use std::{iter, mem}; use std::{io, iter, mem};
use bumpalo::Bump; use bumpalo::Bump;
use grenad::ReaderCursor;
use hashbrown::hash_map::RawEntryMut; use hashbrown::hash_map::RawEntryMut;
use hashbrown::{DefaultHashBuilder, HashMap, HashSet}; use hashbrown::{DefaultHashBuilder, HashMap, HashSet};
use raw_collections::map::FrozenMap; use raw_collections::map::FrozenMap;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::update::new::indexer::document_changes::MostlySend;
use crate::update::new::KvReaderDelAdd;
use crate::update::MergeDeladdCboRoaringBitmaps; use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::CboRoaringBitmapCodec; use crate::CboRoaringBitmapCodec;
@ -75,7 +81,7 @@ impl<'extractor> CboCachedSorter<'extractor> {
Self { Self {
hasher: DefaultHashBuilder::default(), hasher: DefaultHashBuilder::default(),
caches: InnerCaches::Normal(NormalCaches { caches: InnerCaches::Normal(NormalCaches {
caches: std::iter::repeat_with(|| HashMap::new_in(alloc)).take(buckets).collect(), caches: iter::repeat_with(|| HashMap::new_in(alloc)).take(buckets).collect(),
}), }),
alloc, alloc,
} }
@ -131,6 +137,8 @@ impl<'extractor> CboCachedSorter<'extractor> {
} }
} }
unsafe impl MostlySend for CboCachedSorter<'_> {}
struct NormalCaches<'extractor> { struct NormalCaches<'extractor> {
caches: caches:
Vec<HashMap<&'extractor [u8], DelAddRoaringBitmap, DefaultHashBuilder, &'extractor Bump>>, Vec<HashMap<&'extractor [u8], DelAddRoaringBitmap, DefaultHashBuilder, &'extractor Bump>>,
@ -323,17 +331,102 @@ pub struct FrozenCache<'a, 'extractor> {
spilled: grenad::Reader<File>, spilled: grenad::Reader<File>,
} }
pub fn merge_me<F>(frozen: Vec<FrozenCache>, f: F) -> crate::Result<()> pub fn merge_me<F>(frozen: Vec<FrozenCache>, mut iter: F) -> crate::Result<()>
where where
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> crate::Result<()>, F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> crate::Result<()>,
{ {
let (mut maps, spilled): (Vec<_>, Vec<_>) =
frozen.into_iter().map(|FrozenCache { cache, spilled }| (cache, spilled)).collect();
// First manage the spilled entries by looking into the HashMaps and then merge them. // First manage the spilled entries by looking into the HashMaps and then merge them.
let mut heap = BinaryHeap::new();
for (source_index, source) in spilled.into_iter().enumerate() {
let mut cursor = source.into_cursor()?;
if cursor.move_on_next()?.is_some() {
heap.push(Entry { cursor, source_index });
}
}
loop {
let mut first_entry = match heap.pop() {
Some(entry) => entry,
None => break,
};
let (first_key, first_value) = match first_entry.cursor.current() {
Some((key, value)) => (key, value),
None => break,
};
let mut output = DelAddRoaringBitmap::from_bytes(first_value)?;
while let Some(mut entry) = heap.peek_mut() {
if let Some((key, _value)) = entry.cursor.current() {
if first_key == key {
let new = DelAddRoaringBitmap::from_bytes(first_value)?;
output.merge(new);
// When we are done we the current value of this entry move make
// it move forward and let the heap reorganize itself (on drop)
if entry.cursor.move_on_next()?.is_none() {
PeekMut::pop(entry);
}
} else {
break;
}
}
}
// Once we merged all of the spilled bitmaps we must also
// fetch the entries from the non-spilled entries (the HashMaps).
for (map_index, map) in maps.iter_mut().enumerate() {
if first_entry.source_index != map_index {
if let Some(new) = map.get_mut(first_key) {
let new = mem::replace(new, DelAddRoaringBitmap::dummy());
output.merge(new);
}
}
}
// We send the merged entry outside.
(iter)(first_key, output)?;
// Don't forget to put the first entry back into the heap.
if first_entry.cursor.move_on_next()?.is_some() {
heap.push(first_entry)
}
}
// Then manage the content on the HashMap that weren't taken (mem::take). // Then manage the content on the HashMap that weren't taken (mem::take).
todo!() todo!()
} }
struct Entry<R> {
cursor: ReaderCursor<R>,
source_index: usize,
}
impl<R> Ord for Entry<R> {
fn cmp(&self, other: &Entry<R>) -> Ordering {
let skey = self.cursor.current().map(|(k, _)| k);
let okey = other.cursor.current().map(|(k, _)| k);
skey.cmp(&okey).then(self.source_index.cmp(&other.source_index)).reverse()
}
}
impl<R> Eq for Entry<R> {}
impl<R> PartialEq for Entry<R> {
fn eq(&self, other: &Entry<R>) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<R> PartialOrd for Entry<R> {
fn partial_cmp(&self, other: &Entry<R>) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct DelAddRoaringBitmap { pub struct DelAddRoaringBitmap {
pub(crate) del: Option<RoaringBitmap>, pub(crate) del: Option<RoaringBitmap>,
@ -341,6 +434,26 @@ pub struct DelAddRoaringBitmap {
} }
impl DelAddRoaringBitmap { impl DelAddRoaringBitmap {
fn from_bytes(bytes: &[u8]) -> io::Result<DelAddRoaringBitmap> {
let reader = KvReaderDelAdd::from_slice(bytes);
let del = match reader.get(DelAdd::Deletion) {
Some(bytes) => CboRoaringBitmapCodec::deserialize_from(bytes).map(Some)?,
None => None,
};
let add = match reader.get(DelAdd::Addition) {
Some(bytes) => CboRoaringBitmapCodec::deserialize_from(bytes).map(Some)?,
None => None,
};
Ok(DelAddRoaringBitmap { del, add })
}
fn dummy() -> DelAddRoaringBitmap {
DelAddRoaringBitmap { del: None, add: None }
}
fn new_del_add_u32(n: u32) -> Self { fn new_del_add_u32(n: u32) -> Self {
DelAddRoaringBitmap { DelAddRoaringBitmap {
del: Some(RoaringBitmap::from([n])), del: Some(RoaringBitmap::from([n])),
@ -363,4 +476,21 @@ impl DelAddRoaringBitmap {
fn new_add_u32(n: u32) -> Self { fn new_add_u32(n: u32) -> Self {
DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) } DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) }
} }
fn merge(&mut self, rhs: DelAddRoaringBitmap) {
let DelAddRoaringBitmap { del, add } = self;
let DelAddRoaringBitmap { del: ndel, add: nadd } = rhs;
*del = match (mem::take(del), ndel) {
(None, None) => None,
(None, Some(del)) | (Some(del), None) => Some(del),
(Some(del), Some(ndel)) => Some(del | ndel),
};
*add = match (mem::take(add), nadd) {
(None, None) => None,
(None, Some(add)) | (Some(add), None) => Some(add),
(Some(add), Some(nadd)) => Some(add | nadd),
};
}
} }

View File

@ -31,7 +31,7 @@ pub struct FacetedExtractorData<'extractor> {
impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> { impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> {
type Data = RefCell<CboCachedSorter<'extractor>>; type Data = RefCell<CboCachedSorter<'extractor>>;
fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> { fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(CboCachedSorter::new_in(extractor_alloc)?)) Ok(RefCell::new(CboCachedSorter::new_in(extractor_alloc)?))
} }
@ -221,7 +221,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>, indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>, extractor_allocs: &mut ThreadLocal<FullySend<Bump>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> { ) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
let max_memory = grenad_parameters.max_memory_by_thread(); let max_memory = grenad_parameters.max_memory_by_thread();

View File

@ -19,7 +19,7 @@ pub trait DocidsExtractor {
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>, indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>, extractor_allocs: &mut ThreadLocal<FullySend<Bump>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>; ) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>;
} }

View File

@ -17,29 +17,31 @@ use crate::update::new::indexer::document_changes::{
IndexingContext, MostlySend, RefCellExt, ThreadLocal, IndexingContext, MostlySend, RefCellExt, ThreadLocal,
}; };
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
const MAX_COUNTED_WORDS: usize = 30; const MAX_COUNTED_WORDS: usize = 30;
pub struct WordDocidsCachedSorters<'indexer> { pub struct WordDocidsCachedSorters<'extractor> {
word_fid_docids: CboCachedSorter<'indexer>, word_fid_docids: CboCachedSorter<'extractor>,
word_docids: CboCachedSorter<'indexer>, word_docids: CboCachedSorter<'extractor>,
exact_word_docids: CboCachedSorter<'indexer>, exact_word_docids: CboCachedSorter<'extractor>,
word_position_docids: CboCachedSorter<'indexer>, word_position_docids: CboCachedSorter<'extractor>,
fid_word_count_docids: CboCachedSorter<'indexer>, fid_word_count_docids: CboCachedSorter<'extractor>,
fid_word_count: HashMap<FieldId, (usize, usize)>, fid_word_count: HashMap<FieldId, (usize, usize)>,
current_docid: Option<DocumentId>, current_docid: Option<DocumentId>,
} }
unsafe impl<'indexer> MostlySend for WordDocidsCachedSorters<'indexer> {} unsafe impl<'extractor> MostlySend for WordDocidsCachedSorters<'extractor> {}
impl<'indexer> WordDocidsCachedSorters<'indexer> { impl<'extractor> WordDocidsCachedSorters<'extractor> {
pub fn new_in(alloc: RefBump<'indexer>) -> io::Result<Self> { /// TODO Make sure to give the same max_memory to all of them, without splitting it
pub fn new_in(alloc: &'extractor Bump) -> io::Result<Self> {
Ok(Self { Ok(Self {
word_fid_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?, word_fid_docids: CboCachedSorter::new_in(alloc)?,
word_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?, word_docids: CboCachedSorter::new_in(alloc)?,
exact_word_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?, exact_word_docids: CboCachedSorter::new_in(alloc)?,
word_position_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?, word_position_docids: CboCachedSorter::new_in(alloc)?,
fid_word_count_docids: CboCachedSorter::new_in(alloc)?, fid_word_count_docids: CboCachedSorter::new_in(alloc)?,
fid_word_count: HashMap::new(), fid_word_count: HashMap::new(),
current_docid: None, current_docid: None,
@ -220,7 +222,7 @@ pub struct WordDocidsExtractorData<'extractor> {
impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
type Data = RefCell<Option<WordDocidsCachedSorters<'extractor>>>; type Data = RefCell<Option<WordDocidsCachedSorters<'extractor>>>;
fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> { fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in(extractor_alloc)?))) Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in(extractor_alloc)?)))
} }
@ -231,50 +233,6 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
) -> Result<()> { ) -> Result<()> {
WordDocidsExtractors::extract_document_change(context, self.tokenizer, change) WordDocidsExtractors::extract_document_change(context, self.tokenizer, change)
} }
fn spill_if_needed<'doc>(
&'doc self,
data: &'doc Self::Data,
extractor_alloc: &'extractor RefCell<Bump>,
) -> Result<()> {
if self.max_memory.map_or(true, |mm| extractor_alloc.borrow().allocated_bytes() < mm) {
return Ok(());
}
let sorters = data.borrow_mut().take().unwrap();
let WordDocidsCachedSorters {
word_fid_docids,
word_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
// Note that fid_word_count and current_docid do not have any reference
// to any 'extractor-allocated memory so we don't have to spill it to disk
fid_word_count,
current_docid,
} = sorters;
let spilled_word_fid_docids = word_fid_docids.spill_to_disk()?;
let spilled_word_docids = word_docids.spill_to_disk()?;
let spilled_exact_word_docids = exact_word_docids.spill_to_disk()?;
let spilled_word_position_docids = word_position_docids.spill_to_disk()?;
let spilled_fid_word_count_docids = fid_word_count_docids.spill_to_disk()?;
extractor_alloc.borrow_mut().reset();
let alloc = RefBump::new(extractor_alloc.borrow());
*data.borrow_mut() = Some(WordDocidsCachedSorters {
word_fid_docids: spilled_word_fid_docids.reconstruct(RefBump::clone(&alloc)),
word_docids: spilled_word_docids.reconstruct(RefBump::clone(&alloc)),
exact_word_docids: spilled_exact_word_docids.reconstruct(RefBump::clone(&alloc)),
word_position_docids: spilled_word_position_docids.reconstruct(RefBump::clone(&alloc)),
fid_word_count_docids: spilled_fid_word_count_docids.reconstruct(alloc),
fid_word_count,
current_docid,
});
Ok(())
}
} }
pub struct WordDocidsExtractors; pub struct WordDocidsExtractors;
@ -283,7 +241,7 @@ impl WordDocidsExtractors {
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>, indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>, extractor_allocs: &mut ThreadLocal<FullySend<Bump>>,
) -> Result<WordDocidsMergers> { ) -> Result<WordDocidsMergers> {
let max_memory = grenad_parameters.max_memory_by_thread(); let max_memory = grenad_parameters.max_memory_by_thread();
let index = indexing_context.index; let index = indexing_context.index;

View File

@ -260,7 +260,7 @@ pub struct DocumentChangeContext<
pub doc_alloc: Bump, pub doc_alloc: Bump,
/// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills. /// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills.
pub extractor_alloc: &'extractor RefCell<Bump>, pub extractor_alloc: &'extractor Bump,
/// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents /// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>, doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
@ -283,14 +283,14 @@ impl<
index: &'indexer Index, index: &'indexer Index,
db_fields_ids_map: &'indexer FieldsIdsMap, db_fields_ids_map: &'indexer FieldsIdsMap,
new_fields_ids_map: &'fid RwLock<FieldsIdsMap>, new_fields_ids_map: &'fid RwLock<FieldsIdsMap>,
extractor_allocs: &'extractor ThreadLocal<FullySend<RefCell<Bump>>>, extractor_allocs: &'extractor ThreadLocal<FullySend<Bump>>,
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>, doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
datastore: &'data ThreadLocal<T>, datastore: &'data ThreadLocal<T>,
fields_ids_map_store: &'doc ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>, fields_ids_map_store: &'doc ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
init_data: F, init_data: F,
) -> Result<Self> ) -> Result<Self>
where where
F: FnOnce(RefBump<'extractor>) -> Result<T>, F: FnOnce(&'extractor Bump) -> Result<T>,
{ {
let doc_alloc = let doc_alloc =
doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024 * 1024)))); doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024 * 1024))));
@ -301,9 +301,7 @@ impl<
let fields_ids_map = &fields_ids_map.0; let fields_ids_map = &fields_ids_map.0;
let extractor_alloc = extractor_allocs.get_or_default(); let extractor_alloc = extractor_allocs.get_or_default();
let extractor_alloc_ref = RefBump::new(extractor_alloc.0.borrow_or_yield()); let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?;
let data = datastore.get_or_try(move || init_data(extractor_alloc_ref))?;
let txn = index.read_txn()?; let txn = index.read_txn()?;
Ok(DocumentChangeContext { Ok(DocumentChangeContext {
@ -323,30 +321,13 @@ impl<
pub trait Extractor<'extractor>: Sync { pub trait Extractor<'extractor>: Sync {
type Data: MostlySend; type Data: MostlySend;
fn init_data<'doc>(&'doc self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data>; fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data>;
fn process<'doc>( fn process<'doc>(
&'doc self, &'doc self,
change: DocumentChange<'doc>, change: DocumentChange<'doc>,
context: &'doc DocumentChangeContext<Self::Data>, context: &'doc DocumentChangeContext<Self::Data>,
) -> Result<()>; ) -> Result<()>;
/// Gives a chance to the data to spill itself on disk and reset the extractor allocator.
///
/// The Extractor and data implementations are responsible for checking that there are no live allocations before
/// attempting to reset the extractor allocator. Failure to uphold this invariant will result in a panic.
///
/// # Panics
///
/// - calls `extractor_alloc.borrow_mut()` while there are outstanding allocations referencing the `extractor_alloc`
#[inline]
fn spill_if_needed<'doc>(
&'doc self,
_data: &'doc Self::Data,
_extractor_alloc: &'extractor RefCell<Bump>,
) -> Result<()> {
Ok(())
}
} }
pub trait DocumentChanges<'pl // lifetime of the underlying payload pub trait DocumentChanges<'pl // lifetime of the underlying payload
@ -396,7 +377,7 @@ pub fn for_each_document_change<
doc_allocs, doc_allocs,
fields_ids_map_store, fields_ids_map_store,
}: IndexingContext<'fid, 'indexer, 'index>, }: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<RefCell<Bump>>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
datastore: &'data ThreadLocal<EX::Data>, datastore: &'data ThreadLocal<EX::Data>,
) -> Result<()> ) -> Result<()>
where where
@ -404,7 +385,7 @@ where
{ {
// Clean up and reuse the extractor allocs // Clean up and reuse the extractor allocs
for extractor_alloc in extractor_allocs.iter_mut() { for extractor_alloc in extractor_allocs.iter_mut() {
extractor_alloc.0.get_mut().reset(); extractor_alloc.0.reset();
} }
let pi = document_changes.iter(); let pi = document_changes.iter();
@ -436,8 +417,6 @@ where
// send back the doc_alloc in the pool // send back the doc_alloc in the pool
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));
extractor.spill_if_needed(&context.data, &context.extractor_alloc)?;
res res
}, },
) )