From 27fc50c4768cae222d7ea2382fd7b19469da80d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 16 Oct 2024 16:21:19 +0200 Subject: [PATCH] Fix even more errors around the cache --- .../extract/searchable/extract_word_docids.rs | 113 +++++++++--------- .../extract_word_pair_proximity_docids.rs | 21 ++-- .../src/update/new/extract/searchable/mod.rs | 21 ++-- .../extract/searchable/tokenize_document.rs | 8 +- .../update/new/indexer/document_changes.rs | 2 + .../update/new/indexer/document_deletion.rs | 6 +- 6 files changed, 78 insertions(+), 93 deletions(-) diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index 5875fc147..9b7347f25 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -307,14 +307,14 @@ pub struct WordDocidsExtractorData<'extractor> { } impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { - type Data = RefCell>; + type Data = RefCell>>; fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result { - Ok(RefCell::new(WordDocidsCachedSorters::new_in( + Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in( self.grenad_parameters, self.max_memory, extractor_alloc, - ))) + )))) } fn process( @@ -334,7 +334,7 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { return Ok(()); } - let mut data = data.borrow_mut(); + let sorters = data.borrow_mut().take().unwrap(); let WordDocidsCachedSorters { word_fid_docids, word_docids, @@ -343,7 +343,7 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { fid_word_count_docids, fid_word_count, current_docid, - } = &mut *data; + } = sorters; let spilled_word_fid_docids = word_fid_docids.spill_to_disk()?; let spilled_word_docids = word_docids.spill_to_disk()?; @@ -356,14 +356,17 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { extractor_alloc.borrow_mut().reset(); let alloc = RefBump::new(extractor_alloc.borrow()); - data.word_fid_docids = spilled_word_fid_docids.reconstruct(RefBump::clone(&alloc)); - data.word_docids = spilled_word_docids.reconstruct(RefBump::clone(&alloc)); - data.exact_word_docids = spilled_exact_word_docids.reconstruct(RefBump::clone(&alloc)); - data.word_position_docids = - spilled_word_position_docids.reconstruct(RefBump::clone(&alloc)); - data.fid_word_count_docids = spilled_fid_word_count_docids.reconstruct(alloc); - // data.fid_word_count = spilled_fid_word_count.reconstruct(); - // data.current_docid = spilled_current_docid.reconstruct(); + *data.borrow_mut() = Some(WordDocidsCachedSorters { + word_fid_docids: spilled_word_fid_docids.reconstruct(RefBump::clone(&alloc)), + word_docids: spilled_word_docids.reconstruct(RefBump::clone(&alloc)), + exact_word_docids: spilled_exact_word_docids.reconstruct(RefBump::clone(&alloc)), + word_position_docids: spilled_word_position_docids.reconstruct(RefBump::clone(&alloc)), + fid_word_count_docids: spilled_fid_word_count_docids.reconstruct(alloc), + // fid_word_count: spilled_fid_word_count.reconstruct(), + // current_docid: spilled_current_docid.reconstruct(), + fid_word_count, + current_docid, + }); Ok(()) } @@ -436,7 +439,7 @@ impl WordDocidsExtractors { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); let mut builder = WordDocidsMergerBuilders::new(); - for cache in datastore.into_iter().map(|cache| cache.0.into_inner()) { + for cache in datastore.into_iter().flat_map(RefCell::into_inner) { builder.add_sorters(cache)?; } @@ -445,14 +448,14 @@ impl WordDocidsExtractors { } fn extract_document_change( - context: &DocumentChangeContext>, + context: &DocumentChangeContext>>, document_tokenizer: &DocumentTokenizer, document_change: DocumentChange, ) -> Result<()> { let index = &context.index; let rtxn = &context.txn; - let mut cached_sorter = context.data.0.borrow_mut_or_yield(); - let cached_sorter = cached_sorter.deref_mut(); + let mut cached_sorter_ref = context.data.borrow_mut_or_yield(); + let cached_sorter = cached_sorter.as_mut().unwrap(); let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let new_fields_ids_map = new_fields_ids_map.deref_mut(); @@ -463,16 +466,14 @@ impl WordDocidsExtractors { match document_change { DocumentChange::Deletion(inner) => { let mut token_fn = |fname: &str, fid, pos, word: &str| { - cached_sorter - .insert_del_u32( - fid, - pos, - word, - is_exact_attribute(fname), - inner.docid(), - &mut buffer, - ) - .map_err(crate::Error::from) + cached_sorter.insert_del_u32( + fid, + pos, + word, + is_exact_attribute(fname), + inner.docid(), + &mut buffer, + ); }; document_tokenizer.tokenize_document( inner.current(rtxn, index, context.db_fields_ids_map)?, @@ -482,16 +483,14 @@ impl WordDocidsExtractors { } DocumentChange::Update(inner) => { let mut token_fn = |fname: &str, fid, pos, word: &str| { - cached_sorter - .insert_del_u32( - fid, - pos, - word, - is_exact_attribute(fname), - inner.docid(), - &mut buffer, - ) - .map_err(crate::Error::from) + cached_sorter.insert_del_u32( + fid, + pos, + word, + is_exact_attribute(fname), + inner.docid(), + &mut buffer, + ); }; document_tokenizer.tokenize_document( inner.current(rtxn, index, context.db_fields_ids_map)?, @@ -500,16 +499,14 @@ impl WordDocidsExtractors { )?; let mut token_fn = |fname: &str, fid, pos, word: &str| { - cached_sorter - .insert_add_u32( - fid, - pos, - word, - is_exact_attribute(fname), - inner.docid(), - &mut buffer, - ) - .map_err(crate::Error::from) + cached_sorter.insert_add_u32( + fid, + pos, + word, + is_exact_attribute(fname), + inner.docid(), + &mut buffer, + ); }; document_tokenizer.tokenize_document( inner.new(rtxn, index, context.db_fields_ids_map)?, @@ -519,16 +516,14 @@ impl WordDocidsExtractors { } DocumentChange::Insertion(inner) => { let mut token_fn = |fname: &str, fid, pos, word: &str| { - cached_sorter - .insert_add_u32( - fid, - pos, - word, - is_exact_attribute(fname), - inner.docid(), - &mut buffer, - ) - .map_err(crate::Error::from) + cached_sorter.insert_add_u32( + fid, + pos, + word, + is_exact_attribute(fname), + inner.docid(), + &mut buffer, + ); }; document_tokenizer.tokenize_document( inner.new(), @@ -538,7 +533,9 @@ impl WordDocidsExtractors { } } - cached_sorter.flush_fid_word_count(&mut buffer) + cached_sorter.flush_fid_word_count(&mut buffer); + + Ok(()) } fn attributes_to_extract<'a>( diff --git a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index 53e6515a9..b8821dacc 100644 --- a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -28,11 +28,10 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { } // This method is reimplemented to count the number of words in the document in each field - // and to store the docids of the documents that have a number of words in a given field equal to or under than MAX_COUNTED_WORDS. + // and to store the docids of the documents that have a number of words in a given field + // equal to or under than MAX_COUNTED_WORDS. fn extract_document_change( - context: &DocumentChangeContext< - FullySend>>, - >, + context: &DocumentChangeContext>>, document_tokenizer: &DocumentTokenizer, document_change: DocumentChange, ) -> Result<()> { @@ -48,7 +47,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let new_fields_ids_map = &mut *new_fields_ids_map; - let mut cached_sorter = context.data.0.borrow_mut_or_yield(); + let mut cached_sorter = context.data.borrow_mut_or_yield(); let cached_sorter = &mut *cached_sorter; // is a vecdequeue, and will be smol, so can stay on the heap for now @@ -109,14 +108,14 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { del_word_pair_proximity.dedup_by(|(k1, _), (k2, _)| k1 == k2); for ((w1, w2), prox) in del_word_pair_proximity.iter() { let key = build_key(*prox, w1, w2, &mut key_buffer); - cached_sorter.insert_del_u32(key, docid)?; + cached_sorter.insert_del_u32(key, docid); } add_word_pair_proximity.sort_unstable(); add_word_pair_proximity.dedup_by(|(k1, _), (k2, _)| k1 == k2); for ((w1, w2), prox) in add_word_pair_proximity.iter() { let key = build_key(*prox, w1, w2, &mut key_buffer); - cached_sorter.insert_add_u32(key, docid)?; + cached_sorter.insert_add_u32(key, docid); } Ok(()) } @@ -139,7 +138,7 @@ fn build_key<'a>( fn word_positions_into_word_pair_proximity( word_positions: &mut VecDeque<(Rc, u16)>, word_pair_proximity: &mut impl FnMut((Rc, Rc), u8), -) -> Result<()> { +) { let (head_word, head_position) = word_positions.pop_front().unwrap(); for (word, position) in word_positions.iter() { let prox = index_proximity(head_position as u32, *position as u32) as u8; @@ -147,7 +146,6 @@ fn word_positions_into_word_pair_proximity( word_pair_proximity((head_word.clone(), word.clone()), prox); } } - Ok(()) } fn process_document_tokens<'doc>( @@ -163,17 +161,16 @@ fn process_document_tokens<'doc>( .front() .map_or(false, |(_w, p)| index_proximity(*p as u32, pos as u32) >= MAX_DISTANCE) { - word_positions_into_word_pair_proximity(word_positions, word_pair_proximity)?; + word_positions_into_word_pair_proximity(word_positions, word_pair_proximity) } // insert the new word. word_positions.push_back((Rc::from(word), pos)); - Ok(()) }; document_tokenizer.tokenize_document(document, fields_ids_map, &mut token_fn)?; while !word_positions.is_empty() { - word_positions_into_word_pair_proximity(word_positions, word_pair_proximity)?; + word_positions_into_word_pair_proximity(word_positions, word_pair_proximity); } Ok(()) diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 8934ee892..29fda386b 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -11,6 +11,7 @@ pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers}; pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; use grenad::Merger; use heed::RoTxn; +use raw_collections::alloc::RefBump; use rayon::iter::{ParallelBridge, ParallelIterator}; use tokenize_document::{tokenizer_builder, DocumentTokenizer}; @@ -34,15 +35,10 @@ pub struct SearchableExtractorData<'extractor, EX: SearchableExtractor> { impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> for SearchableExtractorData<'extractor, EX> { - type Data = FullySend>>; + type Data = RefCell>; - fn init_data( - &self, - _extractor_alloc: raw_collections::alloc::RefBump<'extractor>, - ) -> Result { - Ok(FullySend(RefCell::new(CboCachedSorter::new( - // TODO use a better value - 1_000_000.try_into().unwrap(), + fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result { + Ok(RefCell::new(CboCachedSorter::new_in( create_sorter( grenad::SortAlgorithm::Stable, MergeDeladdCboRoaringBitmaps, @@ -52,13 +48,14 @@ impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> self.max_memory, false, ), - )))) + extractor_alloc, + ))) } fn process( &self, change: DocumentChange, - context: &crate::update::new::indexer::document_changes::DocumentChangeContext, + context: &DocumentChangeContext, ) -> Result<()> { EX::extract_document_change(context, self.tokenizer, change) } @@ -150,9 +147,7 @@ pub trait SearchableExtractor: Sized + Sync { } fn extract_document_change( - context: &DocumentChangeContext< - FullySend>>, - >, + context: &DocumentChangeContext>>, document_tokenizer: &DocumentTokenizer, document_change: DocumentChange, ) -> Result<()>; diff --git a/milli/src/update/new/extract/searchable/tokenize_document.rs b/milli/src/update/new/extract/searchable/tokenize_document.rs index b8fd24f1b..11512e50b 100644 --- a/milli/src/update/new/extract/searchable/tokenize_document.rs +++ b/milli/src/update/new/extract/searchable/tokenize_document.rs @@ -26,7 +26,7 @@ impl<'a> DocumentTokenizer<'a> { &self, document: impl Document<'doc>, field_id_map: &mut GlobalFieldsIdsMap, - token_fn: &mut impl FnMut(&str, FieldId, u16, &str) -> Result<()>, + token_fn: &mut impl FnMut(&str, FieldId, u16, &str), ) -> Result<()> { let mut field_position = HashMap::new(); @@ -50,7 +50,7 @@ impl<'a> DocumentTokenizer<'a> { Value::Number(n) => { let token = n.to_string(); if let Ok(position) = (*position).try_into() { - token_fn(name, field_id, position, token.as_str())?; + token_fn(name, field_id, position, token.as_str()); } Ok(()) @@ -74,7 +74,7 @@ impl<'a> DocumentTokenizer<'a> { if !token.is_empty() && token.len() <= MAX_WORD_LENGTH { *position = index; if let Ok(position) = (*position).try_into() { - token_fn(name, field_id, position, token)?; + token_fn(name, field_id, position, token); } } } @@ -171,7 +171,6 @@ mod test { use bumpalo::Bump; use charabia::TokenizerBuilder; use meili_snap::snapshot; - use raw_collections::RawMap; use serde_json::json; use serde_json::value::RawValue; @@ -230,7 +229,6 @@ mod test { &mut global_fields_ids_map, &mut |_fname, fid, pos, word| { words.insert([fid, pos], word.to_string()); - Ok(()) }, ) .unwrap(); diff --git a/milli/src/update/new/indexer/document_changes.rs b/milli/src/update/new/indexer/document_changes.rs index 9f7af5424..7bcf2c1dc 100644 --- a/milli/src/update/new/indexer/document_changes.rs +++ b/milli/src/update/new/indexer/document_changes.rs @@ -106,6 +106,8 @@ unsafe impl MostlySend for FullySend where T: Send {} unsafe impl MostlySend for RefCell where T: MostlySend {} +unsafe impl MostlySend for Option where T: MostlySend {} + impl FullySend { pub fn into(self) -> T { self.0 diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index a9628f419..c10c052a0 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -95,11 +95,7 @@ mod test { fn test_deletions() { struct DeletionWithData<'extractor> { deleted: RefCell< - hashbrown::HashSet< - DocumentId, - hashbrown::hash_map::DefaultHashBuilder, - RefBump<'extractor>, - >, + hashbrown::HashSet>, >, }