diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index df449d943..87c82698f 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -1,4 +1,5 @@ use std::marker::PhantomData; +use std::sync::atomic::{AtomicUsize, Ordering}; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use heed::types::Bytes; @@ -194,22 +195,22 @@ impl IntoIterator for WriterReceiver { pub struct ExtractorSender { sender: Sender, /// The number of message we sent in total in the channel. - send_count: std::cell::Cell, + send_count: AtomicUsize, /// The number of times we sent something in a channel that was full. - writer_contentious_count: std::cell::Cell, + writer_contentious_count: AtomicUsize, /// The number of times we sent something in a channel that was empty. - extractor_contentious_count: std::cell::Cell, + extractor_contentious_count: AtomicUsize, } impl Drop for ExtractorSender { fn drop(&mut self) { eprintln!( "Extractor channel stats: {} sends, {} writer contentions ({}%), {} extractor contentions ({}%)", - self.send_count.get(), - self.writer_contentious_count.get(), - (self.writer_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0, - self.extractor_contentious_count.get(), - (self.extractor_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0 + self.send_count.load(Ordering:: SeqCst), + self.writer_contentious_count.load(Ordering::SeqCst), + (self.writer_contentious_count.load(Ordering::SeqCst) as f32 / self.send_count.load(Ordering::SeqCst) as f32) * 100.0, + self.extractor_contentious_count.load(Ordering::SeqCst), + (self.extractor_contentious_count.load(Ordering::SeqCst) as f32 / self.send_count.load(Ordering::SeqCst) as f32) * 100.0 ) } } @@ -244,12 +245,12 @@ impl ExtractorSender { fn send(&self, op: WriterOperation) -> StdResult<(), SendError<()>> { if self.sender.is_full() { - self.writer_contentious_count.set(self.writer_contentious_count.get() + 1); + self.writer_contentious_count.fetch_add(1, Ordering::SeqCst); } if self.sender.is_empty() { - self.extractor_contentious_count.set(self.extractor_contentious_count.get() + 1); + self.extractor_contentious_count.fetch_add(1, Ordering::SeqCst); } - self.send_count.set(self.send_count.get() + 1); + self.send_count.fetch_add(1, Ordering::SeqCst); match self.sender.send(op) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index e492cb798..b1d250755 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -66,10 +66,10 @@ use crate::{CboRoaringBitmapCodec, Result}; /// A cache that stores bytes keys associated to CboDelAddRoaringBitmaps. /// /// Internally balances the content over `N` buckets for future merging. -pub struct CboCachedSorter<'extractor> { +pub struct BalancedCaches<'extractor> { hasher: DefaultHashBuilder, alloc: &'extractor Bump, - max_memory: usize, + max_memory: Option, caches: InnerCaches<'extractor>, } @@ -78,8 +78,8 @@ enum InnerCaches<'extractor> { Spilling(SpillingCaches<'extractor>), } -impl<'extractor> CboCachedSorter<'extractor> { - pub fn new_in(buckets: usize, max_memory: usize, alloc: &'extractor Bump) -> Self { +impl<'extractor> BalancedCaches<'extractor> { + pub fn new_in(buckets: usize, max_memory: Option, alloc: &'extractor Bump) -> Self { Self { hasher: DefaultHashBuilder::default(), max_memory, @@ -98,7 +98,7 @@ impl<'extractor> CboCachedSorter<'extractor> { } pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> Result<()> { - if self.alloc.allocated_bytes() >= self.max_memory { + if self.max_memory.map_or(false, |mm| self.alloc.allocated_bytes() >= mm) { self.start_spilling()?; } @@ -115,7 +115,7 @@ impl<'extractor> CboCachedSorter<'extractor> { } pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> Result<()> { - if self.alloc.allocated_bytes() >= self.max_memory { + if self.max_memory.map_or(false, |mm| self.alloc.allocated_bytes() >= mm) { self.start_spilling()?; } @@ -134,7 +134,7 @@ impl<'extractor> CboCachedSorter<'extractor> { /// Make sure the cache is no longer allocating data /// and writes every new and unknow entry to disk. fn start_spilling(&mut self) -> Result<()> { - let CboCachedSorter { hasher: _, alloc: _, max_memory: _, caches } = self; + let BalancedCaches { hasher: _, alloc: _, max_memory: _, caches } = self; if let InnerCaches::Normal(normal_caches) = caches { let dummy = NormalCaches { caches: Vec::new() }; @@ -173,7 +173,7 @@ impl<'extractor> CboCachedSorter<'extractor> { } } -unsafe impl MostlySend for CboCachedSorter<'_> {} +unsafe impl MostlySend for BalancedCaches<'_> {} struct NormalCaches<'extractor> { caches: @@ -371,9 +371,9 @@ pub struct FrozenCache<'a, 'extractor> { } pub fn transpose_and_freeze_caches<'a, 'extractor>( - caches: &'a mut [CboCachedSorter<'extractor>], + caches: &'a mut [BalancedCaches<'extractor>], ) -> Result>>> { - let width = caches.get(0).map(CboCachedSorter::buckets).unwrap_or(0); + let width = caches.get(0).map(BalancedCaches::buckets).unwrap_or(0); let mut bucket_caches: Vec<_> = iter::repeat_with(Vec::new).take(width).collect(); for thread_cache in caches { diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index cce2f4c0e..8e2be81f2 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -6,7 +6,7 @@ use bumpalo::Bump; use heed::RoTxn; use serde_json::Value; -use super::super::cache::CboCachedSorter; +use super::super::cache::BalancedCaches; use super::facet_document::extract_document_facets; use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; @@ -22,14 +22,18 @@ use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH}; pub struct FacetedExtractorData<'a> { attributes_to_extract: &'a [&'a str], grenad_parameters: GrenadParameters, - max_memory: Option, + buckets: usize, } impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> { - type Data = RefCell>; + type Data = RefCell>; fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { - Ok(RefCell::new(CboCachedSorter::new_in(extractor_alloc)?)) + Ok(RefCell::new(BalancedCaches::new_in( + self.buckets, + self.grenad_parameters.max_memory, + extractor_alloc, + ))) } fn process( @@ -45,7 +49,7 @@ pub struct FacetedDocidsExtractor; impl FacetedDocidsExtractor { fn extract_document_change( - context: &DocumentChangeContext>, + context: &DocumentChangeContext>, attributes_to_extract: &[&str], document_change: DocumentChange, ) -> Result<()> { @@ -62,7 +66,7 @@ impl FacetedDocidsExtractor { Self::facet_fn_with_options( &context.doc_alloc, cached_sorter.deref_mut(), - CboCachedSorter::insert_del_u32, + BalancedCaches::insert_del_u32, inner.docid(), fid, value, @@ -79,7 +83,7 @@ impl FacetedDocidsExtractor { Self::facet_fn_with_options( &context.doc_alloc, cached_sorter.deref_mut(), - CboCachedSorter::insert_del_u32, + BalancedCaches::insert_del_u32, inner.docid(), fid, value, @@ -96,7 +100,7 @@ impl FacetedDocidsExtractor { Self::facet_fn_with_options( &context.doc_alloc, cached_sorter.deref_mut(), - CboCachedSorter::insert_add_u32, + BalancedCaches::insert_add_u32, inner.docid(), fid, value, @@ -113,7 +117,7 @@ impl FacetedDocidsExtractor { Self::facet_fn_with_options( &context.doc_alloc, cached_sorter.deref_mut(), - CboCachedSorter::insert_add_u32, + BalancedCaches::insert_add_u32, inner.docid(), fid, value, @@ -126,8 +130,8 @@ impl FacetedDocidsExtractor { fn facet_fn_with_options<'extractor>( doc_alloc: &Bump, - cached_sorter: &mut CboCachedSorter<'extractor>, - cache_fn: impl Fn(&mut CboCachedSorter<'extractor>, &[u8], u32) -> Result<()>, + cached_sorter: &mut BalancedCaches<'extractor>, + cache_fn: impl Fn(&mut BalancedCaches<'extractor>, &[u8], u32) -> Result<()>, docid: DocumentId, fid: FieldId, value: &Value, @@ -219,7 +223,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, extractor_allocs: &'extractor mut ThreadLocal>, - ) -> Result>> { + ) -> Result>> { let max_memory = grenad_parameters.max_memory_by_thread(); let index = indexing_context.index; @@ -238,7 +242,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { let extractor = FacetedExtractorData { attributes_to_extract: &attributes_to_extract, grenad_parameters, - max_memory, + buckets: rayon::max_num_threads(), }; for_each_document_change( document_changes, diff --git a/milli/src/update/new/extract/mod.rs b/milli/src/update/new/extract/mod.rs index aa71752eb..a3c0384e0 100644 --- a/milli/src/update/new/extract/mod.rs +++ b/milli/src/update/new/extract/mod.rs @@ -3,7 +3,7 @@ mod faceted; mod searchable; use bumpalo::Bump; -pub use cache::{merge_caches, transpose_and_freeze_caches, CboCachedSorter, DelAddRoaringBitmap}; +pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap}; pub use faceted::*; pub use searchable::*; @@ -17,7 +17,7 @@ pub trait DocidsExtractor { document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, extractor_allocs: &'extractor mut ThreadLocal>, - ) -> Result>>; + ) -> Result>>; } /// TODO move in permissive json pointer diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index 8e292d8ed..a2a590ba0 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -1,15 +1,14 @@ use std::cell::RefCell; use std::collections::HashMap; use std::fs::File; -use std::io; use std::ops::DerefMut as _; use bumpalo::Bump; -use grenad::{Merger, MergerBuilder}; +use grenad::MergerBuilder; use heed::RoTxn; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; -use crate::update::new::extract::cache::CboCachedSorter; +use crate::update::new::extract::cache::BalancedCaches; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::indexer::document_changes::{ for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, @@ -21,30 +20,30 @@ use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_ const MAX_COUNTED_WORDS: usize = 30; -pub struct WordDocidsCachedSorters<'extractor> { - word_fid_docids: CboCachedSorter<'extractor>, - word_docids: CboCachedSorter<'extractor>, - exact_word_docids: CboCachedSorter<'extractor>, - word_position_docids: CboCachedSorter<'extractor>, - fid_word_count_docids: CboCachedSorter<'extractor>, +pub struct WordDocidsBalancedCaches<'extractor> { + word_fid_docids: BalancedCaches<'extractor>, + word_docids: BalancedCaches<'extractor>, + exact_word_docids: BalancedCaches<'extractor>, + word_position_docids: BalancedCaches<'extractor>, + fid_word_count_docids: BalancedCaches<'extractor>, fid_word_count: HashMap, current_docid: Option, } -unsafe impl<'extractor> MostlySend for WordDocidsCachedSorters<'extractor> {} +unsafe impl<'extractor> MostlySend for WordDocidsBalancedCaches<'extractor> {} -impl<'extractor> WordDocidsCachedSorters<'extractor> { +impl<'extractor> WordDocidsBalancedCaches<'extractor> { /// TODO Make sure to give the same max_memory to all of them, without splitting it - pub fn new_in(alloc: &'extractor Bump) -> io::Result { - Ok(Self { - word_fid_docids: CboCachedSorter::new_in(alloc)?, - word_docids: CboCachedSorter::new_in(alloc)?, - exact_word_docids: CboCachedSorter::new_in(alloc)?, - word_position_docids: CboCachedSorter::new_in(alloc)?, - fid_word_count_docids: CboCachedSorter::new_in(alloc)?, + pub fn new_in(buckets: usize, max_memory: Option, alloc: &'extractor Bump) -> Self { + Self { + word_fid_docids: BalancedCaches::new_in(buckets, max_memory, alloc), + word_docids: BalancedCaches::new_in(buckets, max_memory, alloc), + exact_word_docids: BalancedCaches::new_in(buckets, max_memory, alloc), + word_position_docids: BalancedCaches::new_in(buckets, max_memory, alloc), + fid_word_count_docids: BalancedCaches::new_in(buckets, max_memory, alloc), fid_word_count: HashMap::new(), current_docid: None, - }) + } } fn insert_add_u32( @@ -148,35 +147,27 @@ impl<'extractor> WordDocidsCachedSorters<'extractor> { } } -struct WordDocidsMergerBuilders { - word_fid_docids: MergerBuilder, - word_docids: MergerBuilder, - exact_word_docids: MergerBuilder, - word_position_docids: MergerBuilder, - fid_word_count_docids: MergerBuilder, -} - pub struct WordDocidsMergers<'extractor> { - pub word_docids: Vec>, - pub word_fid_docids: Vec>, - pub exact_word_docids: Vec>, - pub word_position_docids: Vec>, - pub fid_word_count_docids: Vec>, + pub word_docids: Vec>, + pub word_fid_docids: Vec>, + pub exact_word_docids: Vec>, + pub word_position_docids: Vec>, + pub fid_word_count_docids: Vec>, } -impl<'extractor> WordDocidsMergerBuilders<'extractor> { +impl<'extractor> WordDocidsMergers<'extractor> { fn new() -> Self { Self { - word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), - word_fid_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), - exact_word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), - word_position_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), - fid_word_count_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), + word_docids: Vec::new(), + word_fid_docids: Vec::new(), + exact_word_docids: Vec::new(), + word_position_docids: Vec::new(), + fid_word_count_docids: Vec::new(), } } - fn add_sorters(&mut self, other: WordDocidsCachedSorters) -> Result<()> { - let WordDocidsCachedSorters { + fn push(&mut self, other: WordDocidsBalancedCaches<'extractor>) -> Result<()> { + let WordDocidsBalancedCaches { word_docids, word_fid_docids, exact_word_docids, @@ -186,43 +177,31 @@ impl<'extractor> WordDocidsMergerBuilders<'extractor> { current_docid: _, } = other; - let word_docids_entries = word_docids.into_unordered_entries()?; - let word_fid_docids_entries = word_fid_docids.into_unordered_entries()?; - let exact_word_docids_entries = exact_word_docids.into_unordered_entries()?; - let word_position_docids_entries = word_position_docids.into_unordered_entries()?; - let fid_word_count_docids_entries = fid_word_count_docids.into_unordered_entries()?; - - self.word_docids.push(word_docids_entries); - self.word_fid_docids.push(word_fid_docids_entries); - self.exact_word_docids.push(exact_word_docids_entries); - self.word_position_docids.push(word_position_docids_entries); - self.fid_word_count_docids.push(fid_word_count_docids_entries); + self.word_docids.push(word_docids); + self.word_fid_docids.push(word_fid_docids); + self.exact_word_docids.push(exact_word_docids); + self.word_position_docids.push(word_position_docids); + self.fid_word_count_docids.push(fid_word_count_docids); Ok(()) } - - fn build(self) -> WordDocidsMergers<'extractor> { - WordDocidsMergers { - word_docids: self.word_docids.build(), - word_fid_docids: self.word_fid_docids.build(), - exact_word_docids: self.exact_word_docids.build(), - word_position_docids: self.word_position_docids.build(), - fid_word_count_docids: self.fid_word_count_docids.build(), - } - } } -pub struct WordDocidsExtractorData<'extractor> { - tokenizer: &'extractor DocumentTokenizer<'extractor>, +pub struct WordDocidsExtractorData<'a> { + tokenizer: &'a DocumentTokenizer<'a>, grenad_parameters: GrenadParameters, - max_memory: Option, + buckets: usize, } -impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { - type Data = RefCell>>; +impl<'a, 'extractor> Extractor<'extractor> for WordDocidsExtractorData<'a> { + type Data = RefCell>>; fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { - Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in(extractor_alloc)?))) + Ok(RefCell::new(Some(WordDocidsBalancedCaches::new_in( + self.buckets, + self.grenad_parameters.max_memory, + extractor_alloc, + )))) } fn process( @@ -243,10 +222,9 @@ impl WordDocidsExtractors { indexing_context: IndexingContext<'fid, 'indexer, 'index>, extractor_allocs: &'extractor mut ThreadLocal>, ) -> Result> { - let max_memory = grenad_parameters.max_memory_by_thread(); let index = indexing_context.index; - let rtxn = index.read_txn()?; + let stop_words = index.stop_words(&rtxn)?; let allowed_separators = index.allowed_separators(&rtxn)?; let allowed_separators: Option> = @@ -284,7 +262,7 @@ impl WordDocidsExtractors { let extractor = WordDocidsExtractorData { tokenizer: &document_tokenizer, grenad_parameters, - max_memory, + buckets: rayon::max_num_threads(), }; for_each_document_change( @@ -296,21 +274,16 @@ impl WordDocidsExtractors { )?; } - { - let span = - tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); - let _entered = span.enter(); - let mut builder = WordDocidsMergerBuilders::new(); - for cache in datastore.into_iter().flat_map(RefCell::into_inner) { - builder.add_sorters(cache)?; - } - - Ok(builder.build()) + let mut merger = WordDocidsMergers::new(); + for cache in datastore.into_iter().flat_map(RefCell::into_inner) { + merger.push(cache)?; } + + Ok(merger) } fn extract_document_change( - context: &DocumentChangeContext>>, + context: &DocumentChangeContext>>, document_tokenizer: &DocumentTokenizer, document_change: DocumentChange, ) -> Result<()> { diff --git a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index f66802f26..56c781396 100644 --- a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -8,7 +8,7 @@ use super::tokenize_document::DocumentTokenizer; use super::SearchableExtractor; use crate::proximity::{index_proximity, MAX_DISTANCE}; use crate::update::new::document::Document; -use crate::update::new::extract::cache::CboCachedSorter; +use crate::update::new::extract::cache::BalancedCaches; use crate::update::new::indexer::document_changes::{DocumentChangeContext, FullySend, RefCellExt}; use crate::update::new::DocumentChange; use crate::update::MergeDeladdCboRoaringBitmaps; @@ -32,7 +32,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { // and to store the docids of the documents that have a number of words in a given field // equal to or under than MAX_COUNTED_WORDS. fn extract_document_change( - context: &DocumentChangeContext>, + context: &DocumentChangeContext>, document_tokenizer: &DocumentTokenizer, document_change: DocumentChange, ) -> Result<()> { diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index b5dc1f19d..87602af94 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -12,7 +12,7 @@ pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; use heed::RoTxn; use tokenize_document::{tokenizer_builder, DocumentTokenizer}; -use super::cache::CboCachedSorter; +use super::cache::BalancedCaches; use super::DocidsExtractor; use crate::update::new::indexer::document_changes::{ for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, @@ -25,17 +25,21 @@ use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE}; pub struct SearchableExtractorData<'a, EX: SearchableExtractor> { tokenizer: &'a DocumentTokenizer<'a>, grenad_parameters: GrenadParameters, - max_memory: Option, + buckets: usize, _ex: PhantomData, } impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> for SearchableExtractorData<'a, EX> { - type Data = RefCell>; + type Data = RefCell>; fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { - Ok(RefCell::new(CboCachedSorter::new_in(extractor_alloc)?)) + Ok(RefCell::new(BalancedCaches::new_in( + self.buckets, + self.grenad_parameters.max_memory, + extractor_alloc, + ))) } fn process( @@ -53,7 +57,7 @@ pub trait SearchableExtractor: Sized + Sync { document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, extractor_allocs: &'extractor mut ThreadLocal>, - ) -> Result>> { + ) -> Result>> { let max_memory = grenad_parameters.max_memory_by_thread(); let rtxn = indexing_context.index.read_txn()?; @@ -87,7 +91,7 @@ pub trait SearchableExtractor: Sized + Sync { let extractor_data: SearchableExtractorData = SearchableExtractorData { tokenizer: &document_tokenizer, grenad_parameters, - max_memory, + buckets: rayon::max_num_threads(), _ex: PhantomData, }; @@ -110,7 +114,7 @@ pub trait SearchableExtractor: Sized + Sync { } fn extract_document_change( - context: &DocumentChangeContext>, + context: &DocumentChangeContext>, document_tokenizer: &DocumentTokenizer, document_change: DocumentChange, ) -> Result<()>; @@ -127,7 +131,7 @@ impl DocidsExtractor for T { document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, extractor_allocs: &'extractor mut ThreadLocal>, - ) -> Result>> { + ) -> Result>> { Self::run_extraction( grenad_parameters, document_changes, diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 0b999bd30..0d1efa1d1 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -42,11 +42,11 @@ mod document_operation; mod partial_dump; mod update_by_function; -struct DocumentExtractor<'a, 'extractor> { - document_sender: &'a DocumentSender<'a, 'extractor>, +struct DocumentExtractor<'a> { + documents_sender: &'a DocumentsSender<'a>, } -impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a, 'extractor> { +impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { type Data = FullySend<()>; fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result { @@ -70,7 +70,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a, 'extractor> match change { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); - self.document_sender.delete(docid, external_docid).unwrap(); + self.documents_sender.delete(docid, external_docid).unwrap(); } /// TODO: change NONE by SOME(vector) when implemented DocumentChange::Update(update) => { @@ -79,14 +79,14 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a, 'extractor> update.new(&context.txn, context.index, &context.db_fields_ids_map)?; let content = write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?; - self.document_sender.insert(docid, external_docid, content.boxed()).unwrap(); + self.documents_sender.uncompressed(docid, external_docid, &content).unwrap(); } DocumentChange::Insertion(insertion) => { let docid = insertion.docid(); let content = insertion.new(); let content = write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?; - self.document_sender.insert(docid, external_docid, content.boxed()).unwrap(); + self.documents_sender.uncompressed(docid, external_docid, &content).unwrap(); // extracted_dictionary_sender.send(self, dictionary: &[u8]); } } @@ -137,7 +137,7 @@ where // document but we need to create a function that collects and compresses documents. let document_sender = extractor_sender.documents(); - let document_extractor = DocumentExtractor { document_sender: &document_sender }; + let document_extractor = DocumentExtractor { documents_sender: &document_sender }; let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?; @@ -180,6 +180,7 @@ where // TODO Word Docids Merger // extractor_sender.send_searchable::(word_docids).unwrap(); { + let rtxn = index.read_txn()?; let words_fst = index.words_fst(&rtxn)?; let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; let prefix_settings = index.prefix_settings(&rtxn)?; diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 92a33985b..a80c6c096 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -9,7 +9,7 @@ use roaring::RoaringBitmap; use super::channel::*; use super::extract::{ - merge_caches, transpose_and_freeze_caches, CboCachedSorter, DelAddRoaringBitmap, FacetKind, + merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, }; use super::word_fst_builder::PrefixDelta; use super::DocumentChange; @@ -73,8 +73,8 @@ impl GeoExtractor { } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] -fn merge_and_send_docids<'extractor>( - mut caches: Vec>, +pub fn merge_and_send_docids<'extractor>( + mut caches: Vec>, database: Database, index: &Index, docids_sender: impl DocidsSender + Sync, @@ -104,8 +104,8 @@ fn merge_and_send_docids<'extractor>( } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] -fn merge_and_send_facet_docids<'extractor>( - mut caches: Vec>, +pub fn merge_and_send_facet_docids<'extractor>( + mut caches: Vec>, database: FacetDatabases, index: &Index, docids_sender: impl DocidsSender + Sync, @@ -134,12 +134,12 @@ fn merge_and_send_facet_docids<'extractor>( }) } -struct FacetDatabases<'a> { +pub struct FacetDatabases<'a> { index: &'a Index, } impl<'a> FacetDatabases<'a> { - fn new(index: &'a Index) -> Self { + pub fn new(index: &'a Index) -> Self { Self { index } } @@ -168,7 +168,7 @@ pub struct FacetFieldIdsDelta { } impl FacetFieldIdsDelta { - fn new() -> Self { + pub fn new() -> Self { Self { modified_facet_string_ids: HashSet::new(), modified_facet_number_ids: HashSet::new(),