From 495742e113dd7c8532041eb94ca27c416d1b778a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 16 Oct 2024 15:57:06 +0200 Subject: [PATCH] Fix more errors around the cache --- milli/src/update/new/extract/cache.rs | 3 + .../new/extract/faceted/extract_facets.rs | 65 +++++------ .../extract/searchable/extract_word_docids.rs | 101 ++++++++---------- .../update/new/indexer/document_changes.rs | 2 + 4 files changed, 79 insertions(+), 92 deletions(-) diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index 2db996aa8..1701f09fc 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -9,6 +9,7 @@ use roaring::bitmap::Statistics; use roaring::RoaringBitmap; use crate::update::del_add::{DelAdd, KvWriterDelAdd}; +use crate::update::new::indexer::document_changes::MostlySend; use crate::CboRoaringBitmapCodec; const KEY_SIZE: usize = 12; @@ -273,6 +274,8 @@ impl SpilledCache { } } +unsafe impl<'extractor, MF: Send> MostlySend for CboCachedSorter<'extractor, MF> {} + #[derive(Default, Debug)] struct Stats { pub len: usize, diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 9f3ed18d8..76e358ac7 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -7,6 +7,7 @@ use std::ops::DerefMut as _; use bumpalo::Bump; use grenad::{MergeFunction, Merger}; use heed::RoTxn; +use raw_collections::alloc::RefBump; use rayon::iter::{ParallelBridge as _, ParallelIterator as _}; use serde_json::Value; @@ -30,15 +31,10 @@ pub struct FacetedExtractorData<'extractor> { } impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> { - type Data = FullySend>>; + type Data = RefCell>; - fn init_data( - &self, - _extractor_alloc: raw_collections::alloc::RefBump<'extractor>, - ) -> Result { - Ok(FullySend(RefCell::new(CboCachedSorter::new( - // TODO use a better value - 1_000_000.try_into().unwrap(), + fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result { + Ok(RefCell::new(CboCachedSorter::new_in( create_sorter( grenad::SortAlgorithm::Stable, MergeDeladdCboRoaringBitmaps, @@ -51,13 +47,14 @@ impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> { // 2. it creates correctness issues if it causes to yield a borrow-mut wielding task false, ), - )))) + extractor_alloc, + ))) } fn process( &self, change: DocumentChange, - context: &crate::update::new::indexer::document_changes::DocumentChangeContext, + context: &DocumentChangeContext, ) -> Result<()> { FacetedDocidsExtractor::extract_document_change(context, self.attributes_to_extract, change) } @@ -67,16 +64,14 @@ pub struct FacetedDocidsExtractor; impl FacetedDocidsExtractor { fn extract_document_change( - context: &DocumentChangeContext< - FullySend>>, - >, + context: &DocumentChangeContext>>, attributes_to_extract: &[&str], document_change: DocumentChange, ) -> Result<()> { let index = &context.index; let rtxn = &context.txn; let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); - let mut cached_sorter = context.data.0.borrow_mut_or_yield(); + let mut cached_sorter = context.data.borrow_mut_or_yield(); match document_change { DocumentChange::Deletion(inner) => extract_document_facets( attributes_to_extract, @@ -90,7 +85,8 @@ impl FacetedDocidsExtractor { inner.docid(), fid, value, - ) + ); + Ok(()) }, ), DocumentChange::Update(inner) => { @@ -106,7 +102,8 @@ impl FacetedDocidsExtractor { inner.docid(), fid, value, - ) + ); + Ok(()) }, )?; @@ -122,7 +119,8 @@ impl FacetedDocidsExtractor { inner.docid(), fid, value, - ) + ); + Ok(()) }, ) } @@ -138,31 +136,27 @@ impl FacetedDocidsExtractor { inner.docid(), fid, value, - ) + ); + Ok(()) }, ), } } - fn facet_fn_with_options( + fn facet_fn_with_options<'extractor, MF>( doc_alloc: &Bump, - cached_sorter: &mut CboCachedSorter, - cache_fn: impl Fn(&mut CboCachedSorter, &[u8], u32) -> grenad::Result<(), MF::Error>, + cached_sorter: &mut CboCachedSorter<'extractor, MF>, + cache_fn: impl Fn(&mut CboCachedSorter<'extractor, MF>, &[u8], u32), docid: DocumentId, fid: FieldId, value: &Value, - ) -> Result<()> - where - MF: MergeFunction, - MF::Error: Debug, - grenad::Error: Into, - { + ) { let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc); // Exists // key: fid buffer.push(FacetKind::Exists as u8); buffer.extend_from_slice(&fid.to_be_bytes()); - cache_fn(cached_sorter, &buffer, docid).map_err(Into::into)?; + cache_fn(cached_sorter, &buffer, docid); match value { // Number @@ -177,10 +171,7 @@ impl FacetedDocidsExtractor { buffer.push(0); // level 0 buffer.extend_from_slice(&ordered); buffer.extend_from_slice(&n.to_be_bytes()); - - cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) - } else { - Ok(()) + cache_fn(cached_sorter, &buffer, docid); } } // String @@ -193,7 +184,7 @@ impl FacetedDocidsExtractor { buffer.extend_from_slice(&fid.to_be_bytes()); buffer.push(0); // level 0 buffer.extend_from_slice(truncated.as_bytes()); - cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) + cache_fn(cached_sorter, &buffer, docid); } // Null // key: fid @@ -201,7 +192,7 @@ impl FacetedDocidsExtractor { buffer.clear(); buffer.push(FacetKind::Null as u8); buffer.extend_from_slice(&fid.to_be_bytes()); - cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) + cache_fn(cached_sorter, &buffer, docid); } // Empty // key: fid @@ -209,17 +200,17 @@ impl FacetedDocidsExtractor { buffer.clear(); buffer.push(FacetKind::Empty as u8); buffer.extend_from_slice(&fid.to_be_bytes()); - cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) + cache_fn(cached_sorter, &buffer, docid); } Value::Object(o) if o.is_empty() => { buffer.clear(); buffer.push(FacetKind::Empty as u8); buffer.extend_from_slice(&fid.to_be_bytes()); - cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) + cache_fn(cached_sorter, &buffer, docid); } // Otherwise, do nothing /// TODO: What about Value::Bool? - _ => Ok(()), + _ => (), } } diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index ef75b5f57..5875fc147 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -14,7 +14,7 @@ use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::indexer::document_changes::{ for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, - IndexingContext, RefCellExt, ThreadLocal, + IndexingContext, MostlySend, RefCellExt, ThreadLocal, }; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -22,26 +22,27 @@ use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_ const MAX_COUNTED_WORDS: usize = 30; -pub struct WordDocidsCachedSorters { - word_fid_docids: CboCachedSorter, - word_docids: CboCachedSorter, - exact_word_docids: CboCachedSorter, - word_position_docids: CboCachedSorter, - fid_word_count_docids: CboCachedSorter, +pub struct WordDocidsCachedSorters<'indexer> { + word_fid_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, + word_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, + exact_word_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, + word_position_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, + fid_word_count_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, fid_word_count: HashMap, current_docid: Option, } -impl WordDocidsCachedSorters { - pub fn new( +unsafe impl<'indexer> MostlySend for WordDocidsCachedSorters<'indexer> {} + +impl<'indexer> WordDocidsCachedSorters<'indexer> { + pub fn new_in( indexer: GrenadParameters, max_memory: Option, - capacity: NonZero, + alloc: RefBump<'indexer>, ) -> Self { let max_memory = max_memory.map(|max_memory| max_memory / 4); - let word_fid_docids = CboCachedSorter::new( - capacity, + let word_fid_docids = CboCachedSorter::new_in( create_sorter( grenad::SortAlgorithm::Stable, MergeDeladdCboRoaringBitmaps, @@ -51,9 +52,9 @@ impl WordDocidsCachedSorters { max_memory, false, ), + RefBump::clone(&alloc), ); - let word_docids = CboCachedSorter::new( - capacity, + let word_docids = CboCachedSorter::new_in( create_sorter( grenad::SortAlgorithm::Stable, MergeDeladdCboRoaringBitmaps, @@ -63,9 +64,9 @@ impl WordDocidsCachedSorters { max_memory, false, ), + RefBump::clone(&alloc), ); - let exact_word_docids = CboCachedSorter::new( - capacity, + let exact_word_docids = CboCachedSorter::new_in( create_sorter( grenad::SortAlgorithm::Stable, MergeDeladdCboRoaringBitmaps, @@ -75,9 +76,9 @@ impl WordDocidsCachedSorters { max_memory, false, ), + RefBump::clone(&alloc), ); - let word_position_docids = CboCachedSorter::new( - capacity, + let word_position_docids = CboCachedSorter::new_in( create_sorter( grenad::SortAlgorithm::Stable, MergeDeladdCboRoaringBitmaps, @@ -87,9 +88,9 @@ impl WordDocidsCachedSorters { max_memory, false, ), + RefBump::clone(&alloc), ); - let fid_word_count_docids = CboCachedSorter::new( - capacity, + let fid_word_count_docids = CboCachedSorter::new_in( create_sorter( grenad::SortAlgorithm::Stable, MergeDeladdCboRoaringBitmaps, @@ -99,6 +100,7 @@ impl WordDocidsCachedSorters { max_memory, false, ), + alloc, ); Self { @@ -120,29 +122,29 @@ impl WordDocidsCachedSorters { exact: bool, docid: u32, buffer: &mut Vec, - ) -> Result<()> { + ) { let key = word.as_bytes(); if exact { - self.exact_word_docids.insert_add_u32(key, docid)?; + self.exact_word_docids.insert_add_u32(key, docid); } else { - self.word_docids.insert_add_u32(key, docid)?; + self.word_docids.insert_add_u32(key, docid); } buffer.clear(); buffer.extend_from_slice(word.as_bytes()); buffer.push(0); buffer.extend_from_slice(&field_id.to_be_bytes()); - self.word_fid_docids.insert_add_u32(buffer, docid)?; + self.word_fid_docids.insert_add_u32(buffer, docid); let position = bucketed_position(position); buffer.clear(); buffer.extend_from_slice(word.as_bytes()); buffer.push(0); buffer.extend_from_slice(&position.to_be_bytes()); - self.word_position_docids.insert_add_u32(buffer, docid)?; + self.word_position_docids.insert_add_u32(buffer, docid); if self.current_docid.map_or(false, |id| docid != id) { - self.flush_fid_word_count(buffer)?; + self.flush_fid_word_count(buffer); } self.fid_word_count @@ -150,8 +152,6 @@ impl WordDocidsCachedSorters { .and_modify(|(_current_count, new_count)| *new_count += 1) .or_insert((0, 1)); self.current_docid = Some(docid); - - Ok(()) } fn insert_del_u32( @@ -162,61 +162,56 @@ impl WordDocidsCachedSorters { exact: bool, docid: u32, buffer: &mut Vec, - ) -> Result<()> { + ) { let key = word.as_bytes(); if exact { - self.exact_word_docids.insert_del_u32(key, docid)?; + self.exact_word_docids.insert_del_u32(key, docid); } else { - self.word_docids.insert_del_u32(key, docid)?; + self.word_docids.insert_del_u32(key, docid); } buffer.clear(); buffer.extend_from_slice(word.as_bytes()); buffer.push(0); buffer.extend_from_slice(&field_id.to_be_bytes()); - self.word_fid_docids.insert_del_u32(buffer, docid)?; + self.word_fid_docids.insert_del_u32(buffer, docid); let position = bucketed_position(position); buffer.clear(); buffer.extend_from_slice(word.as_bytes()); buffer.push(0); buffer.extend_from_slice(&position.to_be_bytes()); - self.word_position_docids.insert_del_u32(buffer, docid)?; + self.word_position_docids.insert_del_u32(buffer, docid); if self.current_docid.map_or(false, |id| docid != id) { - self.flush_fid_word_count(buffer)?; + self.flush_fid_word_count(buffer); } self.fid_word_count .entry(field_id) .and_modify(|(current_count, _new_count)| *current_count += 1) .or_insert((1, 0)); - self.current_docid = Some(docid); - Ok(()) + self.current_docid = Some(docid); } - fn flush_fid_word_count(&mut self, buffer: &mut Vec) -> Result<()> { + fn flush_fid_word_count(&mut self, buffer: &mut Vec) { for (fid, (current_count, new_count)) in self.fid_word_count.drain() { if current_count != new_count { if current_count <= MAX_COUNTED_WORDS { buffer.clear(); buffer.extend_from_slice(&fid.to_be_bytes()); buffer.push(current_count as u8); - self.fid_word_count_docids - .insert_del_u32(buffer, self.current_docid.unwrap())?; + self.fid_word_count_docids.insert_del_u32(buffer, self.current_docid.unwrap()); } if new_count <= MAX_COUNTED_WORDS { buffer.clear(); buffer.extend_from_slice(&fid.to_be_bytes()); buffer.push(new_count as u8); - self.fid_word_count_docids - .insert_add_u32(buffer, self.current_docid.unwrap())?; + self.fid_word_count_docids.insert_add_u32(buffer, self.current_docid.unwrap()); } } } - - Ok(()) } } @@ -312,24 +307,20 @@ pub struct WordDocidsExtractorData<'extractor> { } impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { - type Data = FullySend>; + type Data = RefCell>; - fn init_data( - &self, - _extractor_alloc: raw_collections::alloc::RefBump<'extractor>, - ) -> Result { - Ok(FullySend(RefCell::new(WordDocidsCachedSorters::new( + fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result { + Ok(RefCell::new(WordDocidsCachedSorters::new_in( self.grenad_parameters, self.max_memory, - // TODO use a better value - 200_000.try_into().unwrap(), - )))) + extractor_alloc, + ))) } fn process( &self, change: DocumentChange, - context: &crate::update::new::indexer::document_changes::DocumentChangeContext, + context: &DocumentChangeContext, ) -> Result<()> { WordDocidsExtractors::extract_document_change(context, self.tokenizer, change) } @@ -343,7 +334,7 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { return Ok(()); } - let mut data = data.0.borrow_mut(); + let mut data = data.borrow_mut(); let WordDocidsCachedSorters { word_fid_docids, word_docids, @@ -454,7 +445,7 @@ impl WordDocidsExtractors { } fn extract_document_change( - context: &DocumentChangeContext>>, + context: &DocumentChangeContext>, document_tokenizer: &DocumentTokenizer, document_change: DocumentChange, ) -> Result<()> { diff --git a/milli/src/update/new/indexer/document_changes.rs b/milli/src/update/new/indexer/document_changes.rs index 96a7681d0..9f7af5424 100644 --- a/milli/src/update/new/indexer/document_changes.rs +++ b/milli/src/update/new/indexer/document_changes.rs @@ -104,6 +104,8 @@ pub struct FullySend(pub T); // SAFETY: a type **fully** send is always mostly send as well. unsafe impl MostlySend for FullySend where T: Send {} +unsafe impl MostlySend for RefCell where T: MostlySend {} + impl FullySend { pub fn into(self) -> T { self.0