Fix even more errors around the cache

This commit is contained in:
Clément Renault 2024-10-16 16:21:19 +02:00
parent 495742e113
commit 27fc50c476
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
6 changed files with 78 additions and 93 deletions

View File

@ -307,14 +307,14 @@ pub struct WordDocidsExtractorData<'extractor> {
}
impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
type Data = RefCell<WordDocidsCachedSorters<'extractor>>;
type Data = RefCell<Option<WordDocidsCachedSorters<'extractor>>>;
fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> {
Ok(RefCell::new(WordDocidsCachedSorters::new_in(
Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in(
self.grenad_parameters,
self.max_memory,
extractor_alloc,
)))
))))
}
fn process(
@ -334,7 +334,7 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
return Ok(());
}
let mut data = data.borrow_mut();
let sorters = data.borrow_mut().take().unwrap();
let WordDocidsCachedSorters {
word_fid_docids,
word_docids,
@ -343,7 +343,7 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
fid_word_count_docids,
fid_word_count,
current_docid,
} = &mut *data;
} = sorters;
let spilled_word_fid_docids = word_fid_docids.spill_to_disk()?;
let spilled_word_docids = word_docids.spill_to_disk()?;
@ -356,14 +356,17 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
extractor_alloc.borrow_mut().reset();
let alloc = RefBump::new(extractor_alloc.borrow());
data.word_fid_docids = spilled_word_fid_docids.reconstruct(RefBump::clone(&alloc));
data.word_docids = spilled_word_docids.reconstruct(RefBump::clone(&alloc));
data.exact_word_docids = spilled_exact_word_docids.reconstruct(RefBump::clone(&alloc));
data.word_position_docids =
spilled_word_position_docids.reconstruct(RefBump::clone(&alloc));
data.fid_word_count_docids = spilled_fid_word_count_docids.reconstruct(alloc);
// data.fid_word_count = spilled_fid_word_count.reconstruct();
// data.current_docid = spilled_current_docid.reconstruct();
*data.borrow_mut() = Some(WordDocidsCachedSorters {
word_fid_docids: spilled_word_fid_docids.reconstruct(RefBump::clone(&alloc)),
word_docids: spilled_word_docids.reconstruct(RefBump::clone(&alloc)),
exact_word_docids: spilled_exact_word_docids.reconstruct(RefBump::clone(&alloc)),
word_position_docids: spilled_word_position_docids.reconstruct(RefBump::clone(&alloc)),
fid_word_count_docids: spilled_fid_word_count_docids.reconstruct(alloc),
// fid_word_count: spilled_fid_word_count.reconstruct(),
// current_docid: spilled_current_docid.reconstruct(),
fid_word_count,
current_docid,
});
Ok(())
}
@ -436,7 +439,7 @@ impl WordDocidsExtractors {
tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter();
let mut builder = WordDocidsMergerBuilders::new();
for cache in datastore.into_iter().map(|cache| cache.0.into_inner()) {
for cache in datastore.into_iter().flat_map(RefCell::into_inner) {
builder.add_sorters(cache)?;
}
@ -445,14 +448,14 @@ impl WordDocidsExtractors {
}
fn extract_document_change(
context: &DocumentChangeContext<RefCell<WordDocidsCachedSorters>>,
context: &DocumentChangeContext<RefCell<Option<WordDocidsCachedSorters>>>,
document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange,
) -> Result<()> {
let index = &context.index;
let rtxn = &context.txn;
let mut cached_sorter = context.data.0.borrow_mut_or_yield();
let cached_sorter = cached_sorter.deref_mut();
let mut cached_sorter_ref = context.data.borrow_mut_or_yield();
let cached_sorter = cached_sorter.as_mut().unwrap();
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let new_fields_ids_map = new_fields_ids_map.deref_mut();
@ -463,16 +466,14 @@ impl WordDocidsExtractors {
match document_change {
DocumentChange::Deletion(inner) => {
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter
.insert_del_u32(
fid,
pos,
word,
is_exact_attribute(fname),
inner.docid(),
&mut buffer,
)
.map_err(crate::Error::from)
cached_sorter.insert_del_u32(
fid,
pos,
word,
is_exact_attribute(fname),
inner.docid(),
&mut buffer,
);
};
document_tokenizer.tokenize_document(
inner.current(rtxn, index, context.db_fields_ids_map)?,
@ -482,16 +483,14 @@ impl WordDocidsExtractors {
}
DocumentChange::Update(inner) => {
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter
.insert_del_u32(
fid,
pos,
word,
is_exact_attribute(fname),
inner.docid(),
&mut buffer,
)
.map_err(crate::Error::from)
cached_sorter.insert_del_u32(
fid,
pos,
word,
is_exact_attribute(fname),
inner.docid(),
&mut buffer,
);
};
document_tokenizer.tokenize_document(
inner.current(rtxn, index, context.db_fields_ids_map)?,
@ -500,16 +499,14 @@ impl WordDocidsExtractors {
)?;
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter
.insert_add_u32(
fid,
pos,
word,
is_exact_attribute(fname),
inner.docid(),
&mut buffer,
)
.map_err(crate::Error::from)
cached_sorter.insert_add_u32(
fid,
pos,
word,
is_exact_attribute(fname),
inner.docid(),
&mut buffer,
);
};
document_tokenizer.tokenize_document(
inner.new(rtxn, index, context.db_fields_ids_map)?,
@ -519,16 +516,14 @@ impl WordDocidsExtractors {
}
DocumentChange::Insertion(inner) => {
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter
.insert_add_u32(
fid,
pos,
word,
is_exact_attribute(fname),
inner.docid(),
&mut buffer,
)
.map_err(crate::Error::from)
cached_sorter.insert_add_u32(
fid,
pos,
word,
is_exact_attribute(fname),
inner.docid(),
&mut buffer,
);
};
document_tokenizer.tokenize_document(
inner.new(),
@ -538,7 +533,9 @@ impl WordDocidsExtractors {
}
}
cached_sorter.flush_fid_word_count(&mut buffer)
cached_sorter.flush_fid_word_count(&mut buffer);
Ok(())
}
fn attributes_to_extract<'a>(

View File

@ -28,11 +28,10 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
}
// This method is reimplemented to count the number of words in the document in each field
// and to store the docids of the documents that have a number of words in a given field equal to or under than MAX_COUNTED_WORDS.
// and to store the docids of the documents that have a number of words in a given field
// equal to or under than MAX_COUNTED_WORDS.
fn extract_document_change(
context: &DocumentChangeContext<
FullySend<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>,
>,
context: &DocumentChangeContext<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>,
document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange,
) -> Result<()> {
@ -48,7 +47,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let new_fields_ids_map = &mut *new_fields_ids_map;
let mut cached_sorter = context.data.0.borrow_mut_or_yield();
let mut cached_sorter = context.data.borrow_mut_or_yield();
let cached_sorter = &mut *cached_sorter;
// is a vecdequeue, and will be smol, so can stay on the heap for now
@ -109,14 +108,14 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
del_word_pair_proximity.dedup_by(|(k1, _), (k2, _)| k1 == k2);
for ((w1, w2), prox) in del_word_pair_proximity.iter() {
let key = build_key(*prox, w1, w2, &mut key_buffer);
cached_sorter.insert_del_u32(key, docid)?;
cached_sorter.insert_del_u32(key, docid);
}
add_word_pair_proximity.sort_unstable();
add_word_pair_proximity.dedup_by(|(k1, _), (k2, _)| k1 == k2);
for ((w1, w2), prox) in add_word_pair_proximity.iter() {
let key = build_key(*prox, w1, w2, &mut key_buffer);
cached_sorter.insert_add_u32(key, docid)?;
cached_sorter.insert_add_u32(key, docid);
}
Ok(())
}
@ -139,7 +138,7 @@ fn build_key<'a>(
fn word_positions_into_word_pair_proximity(
word_positions: &mut VecDeque<(Rc<str>, u16)>,
word_pair_proximity: &mut impl FnMut((Rc<str>, Rc<str>), u8),
) -> Result<()> {
) {
let (head_word, head_position) = word_positions.pop_front().unwrap();
for (word, position) in word_positions.iter() {
let prox = index_proximity(head_position as u32, *position as u32) as u8;
@ -147,7 +146,6 @@ fn word_positions_into_word_pair_proximity(
word_pair_proximity((head_word.clone(), word.clone()), prox);
}
}
Ok(())
}
fn process_document_tokens<'doc>(
@ -163,17 +161,16 @@ fn process_document_tokens<'doc>(
.front()
.map_or(false, |(_w, p)| index_proximity(*p as u32, pos as u32) >= MAX_DISTANCE)
{
word_positions_into_word_pair_proximity(word_positions, word_pair_proximity)?;
word_positions_into_word_pair_proximity(word_positions, word_pair_proximity)
}
// insert the new word.
word_positions.push_back((Rc::from(word), pos));
Ok(())
};
document_tokenizer.tokenize_document(document, fields_ids_map, &mut token_fn)?;
while !word_positions.is_empty() {
word_positions_into_word_pair_proximity(word_positions, word_pair_proximity)?;
word_positions_into_word_pair_proximity(word_positions, word_pair_proximity);
}
Ok(())

View File

@ -11,6 +11,7 @@ pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers};
pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor;
use grenad::Merger;
use heed::RoTxn;
use raw_collections::alloc::RefBump;
use rayon::iter::{ParallelBridge, ParallelIterator};
use tokenize_document::{tokenizer_builder, DocumentTokenizer};
@ -34,15 +35,10 @@ pub struct SearchableExtractorData<'extractor, EX: SearchableExtractor> {
impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
for SearchableExtractorData<'extractor, EX>
{
type Data = FullySend<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>;
type Data = RefCell<CboCachedSorter<'extractor, MergeDeladdCboRoaringBitmaps>>;
fn init_data(
&self,
_extractor_alloc: raw_collections::alloc::RefBump<'extractor>,
) -> Result<Self::Data> {
Ok(FullySend(RefCell::new(CboCachedSorter::new(
// TODO use a better value
1_000_000.try_into().unwrap(),
fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> {
Ok(RefCell::new(CboCachedSorter::new_in(
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
@ -52,13 +48,14 @@ impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
self.max_memory,
false,
),
))))
extractor_alloc,
)))
}
fn process(
&self,
change: DocumentChange,
context: &crate::update::new::indexer::document_changes::DocumentChangeContext<Self::Data>,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
EX::extract_document_change(context, self.tokenizer, change)
}
@ -150,9 +147,7 @@ pub trait SearchableExtractor: Sized + Sync {
}
fn extract_document_change(
context: &DocumentChangeContext<
FullySend<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>,
>,
context: &DocumentChangeContext<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>,
document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange,
) -> Result<()>;

View File

@ -26,7 +26,7 @@ impl<'a> DocumentTokenizer<'a> {
&self,
document: impl Document<'doc>,
field_id_map: &mut GlobalFieldsIdsMap,
token_fn: &mut impl FnMut(&str, FieldId, u16, &str) -> Result<()>,
token_fn: &mut impl FnMut(&str, FieldId, u16, &str),
) -> Result<()> {
let mut field_position = HashMap::new();
@ -50,7 +50,7 @@ impl<'a> DocumentTokenizer<'a> {
Value::Number(n) => {
let token = n.to_string();
if let Ok(position) = (*position).try_into() {
token_fn(name, field_id, position, token.as_str())?;
token_fn(name, field_id, position, token.as_str());
}
Ok(())
@ -74,7 +74,7 @@ impl<'a> DocumentTokenizer<'a> {
if !token.is_empty() && token.len() <= MAX_WORD_LENGTH {
*position = index;
if let Ok(position) = (*position).try_into() {
token_fn(name, field_id, position, token)?;
token_fn(name, field_id, position, token);
}
}
}
@ -171,7 +171,6 @@ mod test {
use bumpalo::Bump;
use charabia::TokenizerBuilder;
use meili_snap::snapshot;
use raw_collections::RawMap;
use serde_json::json;
use serde_json::value::RawValue;
@ -230,7 +229,6 @@ mod test {
&mut global_fields_ids_map,
&mut |_fname, fid, pos, word| {
words.insert([fid, pos], word.to_string());
Ok(())
},
)
.unwrap();

View File

@ -106,6 +106,8 @@ unsafe impl<T> MostlySend for FullySend<T> where T: Send {}
unsafe impl<T> MostlySend for RefCell<T> where T: MostlySend {}
unsafe impl<T> MostlySend for Option<T> where T: MostlySend {}
impl<T> FullySend<T> {
pub fn into(self) -> T {
self.0

View File

@ -95,11 +95,7 @@ mod test {
fn test_deletions() {
struct DeletionWithData<'extractor> {
deleted: RefCell<
hashbrown::HashSet<
DocumentId,
hashbrown::hash_map::DefaultHashBuilder,
RefBump<'extractor>,
>,
hashbrown::HashSet<DocumentId, hashbrown::DefaultHashBuilder, RefBump<'extractor>>,
>,
}