diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 39b1bec17..df449d943 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -8,32 +8,23 @@ use roaring::RoaringBitmap; use super::extract::FacetKind; use super::StdResult; use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; -use crate::update::new::extract::CboCachedSorter; use crate::update::new::KvReaderFieldId; use crate::{DocumentId, Index}; /// The capacity of the channel is currently in number of messages. -pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) { +pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver) { let (sender, receiver) = crossbeam_channel::bounded(cap); ( - MergerSender { + ExtractorSender { sender, send_count: Default::default(), writer_contentious_count: Default::default(), - merger_contentious_count: Default::default(), + extractor_contentious_count: Default::default(), }, WriterReceiver(receiver), ) } -/// The capacity of the channel is currently in number of messages. -pub fn extractors_merger_channels<'extractor>( - cap: usize, -) -> (ExtractorSender<'extractor>, MergerReceiver<'extractor>) { - let (sender, receiver) = crossbeam_channel::bounded(cap); - (ExtractorSender(sender), MergerReceiver(receiver)) -} - pub enum KeyValueEntry { SmallInMemory { key_length: usize, data: Box<[u8]> }, LargeOnDisk { key: Box<[u8]>, value: Mmap }, @@ -200,30 +191,30 @@ impl IntoIterator for WriterReceiver { } } -pub struct MergerSender { +pub struct ExtractorSender { sender: Sender, - /// The number of message we send in total in the channel. + /// The number of message we sent in total in the channel. send_count: std::cell::Cell, /// The number of times we sent something in a channel that was full. writer_contentious_count: std::cell::Cell, /// The number of times we sent something in a channel that was empty. - merger_contentious_count: std::cell::Cell, + extractor_contentious_count: std::cell::Cell, } -impl Drop for MergerSender { +impl Drop for ExtractorSender { fn drop(&mut self) { eprintln!( - "Merger channel stats: {} sends, {} writer contentions ({}%), {} merger contentions ({}%)", + "Extractor channel stats: {} sends, {} writer contentions ({}%), {} extractor contentions ({}%)", self.send_count.get(), self.writer_contentious_count.get(), (self.writer_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0, - self.merger_contentious_count.get(), - (self.merger_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0 + self.extractor_contentious_count.get(), + (self.extractor_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0 ) } } -impl MergerSender { +impl ExtractorSender { pub fn main(&self) -> MainSender<'_> { MainSender(self) } @@ -256,7 +247,7 @@ impl MergerSender { self.writer_contentious_count.set(self.writer_contentious_count.get() + 1); } if self.sender.is_empty() { - self.merger_contentious_count.set(self.merger_contentious_count.get() + 1); + self.extractor_contentious_count.set(self.extractor_contentious_count.get() + 1); } self.send_count.set(self.send_count.get() + 1); match self.sender.send(op) { @@ -266,7 +257,7 @@ impl MergerSender { } } -pub struct MainSender<'a>(&'a MergerSender); +pub struct MainSender<'a>(&'a ExtractorSender); impl MainSender<'_> { pub fn write_words_fst(&self, value: Mmap) -> StdResult<(), SendError<()>> { @@ -312,99 +303,37 @@ pub trait DatabaseType { const DATABASE: Database; } -pub trait MergerOperationType { - fn new_merger_operation<'extractor>( - caches: Vec>>, - ) -> MergerOperation<'extractor>; -} - impl DatabaseType for ExactWordDocids { const DATABASE: Database = Database::ExactWordDocids; } -impl MergerOperationType for ExactWordDocids { - fn new_merger_operation<'extractor>( - caches: Vec>>, - ) -> MergerOperation<'extractor> { - MergerOperation::ExactWordDocidsMerger(caches) - } -} - impl DatabaseType for FidWordCountDocids { const DATABASE: Database = Database::FidWordCountDocids; } -impl MergerOperationType for FidWordCountDocids { - fn new_merger_operation<'extractor>( - caches: Vec>>, - ) -> MergerOperation<'extractor> { - MergerOperation::FidWordCountDocidsMerger(caches) - } -} - impl DatabaseType for WordDocids { const DATABASE: Database = Database::WordDocids; } -impl MergerOperationType for WordDocids { - fn new_merger_operation<'extractor>( - caches: Vec>>, - ) -> MergerOperation<'extractor> { - MergerOperation::WordDocidsMerger(caches) - } -} - impl DatabaseType for WordFidDocids { const DATABASE: Database = Database::WordFidDocids; } -impl MergerOperationType for WordFidDocids { - fn new_merger_operation<'extractor>( - caches: Vec>>, - ) -> MergerOperation<'extractor> { - MergerOperation::WordFidDocidsMerger(caches) - } -} - impl DatabaseType for WordPairProximityDocids { const DATABASE: Database = Database::WordPairProximityDocids; } -impl MergerOperationType for WordPairProximityDocids { - fn new_merger_operation<'extractor>( - caches: Vec>>, - ) -> MergerOperation<'extractor> { - MergerOperation::WordPairProximityDocidsMerger(caches) - } -} - impl DatabaseType for WordPositionDocids { const DATABASE: Database = Database::WordPositionDocids; } -impl MergerOperationType for WordPositionDocids { - fn new_merger_operation<'extractor>( - caches: Vec>>, - ) -> MergerOperation<'extractor> { - MergerOperation::WordPositionDocidsMerger(caches) - } -} - -impl MergerOperationType for FacetDocids { - fn new_merger_operation<'extractor>( - caches: Vec>>, - ) -> MergerOperation<'extractor> { - MergerOperation::FacetDocidsMerger(caches) - } -} - pub trait DocidsSender { fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>>; fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>>; } pub struct WordDocidsSender<'a, D> { - sender: &'a MergerSender, + sender: &'a ExtractorSender, _marker: PhantomData, } @@ -427,7 +356,7 @@ impl DocidsSender for WordDocidsSender<'_, D> { } pub struct FacetDocidsSender<'a> { - sender: &'a MergerSender, + sender: &'a ExtractorSender, } impl DocidsSender for FacetDocidsSender<'_> { @@ -461,7 +390,7 @@ impl DocidsSender for FacetDocidsSender<'_> { } } -pub struct DocumentsSender<'a>(&'a MergerSender); +pub struct DocumentsSender<'a>(&'a ExtractorSender); impl DocumentsSender<'_> { /// TODO do that efficiently @@ -504,86 +433,3 @@ impl DocumentsSender<'_> { } } } - -pub enum MergerOperation<'extractor> { - ExactWordDocidsMerger(Vec>>), - FidWordCountDocidsMerger(Vec>>), - WordDocidsMerger(Vec>>), - WordFidDocidsMerger(Vec>>), - WordPairProximityDocidsMerger(Vec>>), - WordPositionDocidsMerger(Vec>>), - FacetDocidsMerger(Vec>>), - DeleteDocument { docid: DocumentId, external_id: String }, - InsertDocument { docid: DocumentId, external_id: String, document: Box }, - FinishedDocument, -} - -pub struct MergerReceiver<'extractor>(Receiver>); - -impl<'extractor> IntoIterator for MergerReceiver<'extractor> { - type Item = MergerOperation<'extractor>; - type IntoIter = IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -pub struct ExtractorSender<'extractor>(Sender>); - -impl<'extractor> ExtractorSender<'extractor> { - pub fn document_sender(&self) -> DocumentSender<'_, 'extractor> { - DocumentSender(Some(&self.0)) - } - - pub fn send_searchable( - &self, - caches: Vec>>, - ) -> StdResult<(), SendError<()>> { - match self.0.send(D::new_merger_operation(caches)) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } - } -} - -pub struct DocumentSender<'a, 'extractor>(Option<&'a Sender>>); - -impl DocumentSender<'_, '_> { - pub fn insert( - &self, - docid: DocumentId, - external_id: String, - document: Box, - ) -> StdResult<(), SendError<()>> { - let sender = self.0.unwrap(); - match sender.send(MergerOperation::InsertDocument { docid, external_id, document }) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } - } - - pub fn delete(&self, docid: DocumentId, external_id: String) -> StdResult<(), SendError<()>> { - let sender = self.0.unwrap(); - match sender.send(MergerOperation::DeleteDocument { docid, external_id }) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } - } - - pub fn finish(mut self) -> StdResult<(), SendError<()>> { - let sender = self.0.take().unwrap(); - match sender.send(MergerOperation::FinishedDocument) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } - } -} - -impl Drop for DocumentSender<'_, '_> { - fn drop(&mut self) { - if let Some(sender) = self.0.take() { - let _ = sender.send(MergerOperation::FinishedDocument); - } - } -} diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index 2941ed94b..e492cb798 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -371,16 +371,14 @@ pub struct FrozenCache<'a, 'extractor> { } pub fn transpose_and_freeze_caches<'a, 'extractor>( - caches: &'a mut [Vec>], + caches: &'a mut [CboCachedSorter<'extractor>], ) -> Result>>> { - let width = caches.get(0).map(Vec::len).unwrap_or(0); + let width = caches.get(0).map(CboCachedSorter::buckets).unwrap_or(0); let mut bucket_caches: Vec<_> = iter::repeat_with(Vec::new).take(width).collect(); - for thread_caches in caches { - for cache in thread_caches.iter_mut() { - for frozen in cache.freeze()? { - bucket_caches[frozen.bucket].push(frozen); - } + for thread_cache in caches { + for frozen in thread_cache.freeze()? { + bucket_caches[frozen.bucket].push(frozen); } } diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index a271f26dd..8e292d8ed 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -157,18 +157,18 @@ struct WordDocidsMergerBuilders { } pub struct WordDocidsMergers<'extractor> { - pub word_fid_docids: Vec>>, - pub word_docids: Vec>>, - pub exact_word_docids: Vec>>, - pub word_position_docids: Vec>>, - pub fid_word_count_docids: Vec>>, + pub word_docids: Vec>, + pub word_fid_docids: Vec>, + pub exact_word_docids: Vec>, + pub word_position_docids: Vec>, + pub fid_word_count_docids: Vec>, } impl<'extractor> WordDocidsMergerBuilders<'extractor> { fn new() -> Self { Self { - word_fid_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), + word_fid_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), exact_word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), word_position_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), fid_word_count_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), @@ -177,8 +177,8 @@ impl<'extractor> WordDocidsMergerBuilders<'extractor> { fn add_sorters(&mut self, other: WordDocidsCachedSorters) -> Result<()> { let WordDocidsCachedSorters { - word_fid_docids, word_docids, + word_fid_docids, exact_word_docids, word_position_docids, fid_word_count_docids, @@ -186,14 +186,14 @@ impl<'extractor> WordDocidsMergerBuilders<'extractor> { current_docid: _, } = other; - let word_fid_docids_entries = word_fid_docids.into_unordered_entries()?; let word_docids_entries = word_docids.into_unordered_entries()?; + let word_fid_docids_entries = word_fid_docids.into_unordered_entries()?; let exact_word_docids_entries = exact_word_docids.into_unordered_entries()?; let word_position_docids_entries = word_position_docids.into_unordered_entries()?; let fid_word_count_docids_entries = fid_word_count_docids.into_unordered_entries()?; - self.word_fid_docids.push(word_fid_docids_entries); self.word_docids.push(word_docids_entries); + self.word_fid_docids.push(word_fid_docids_entries); self.exact_word_docids.push(exact_word_docids_entries); self.word_position_docids.push(word_position_docids_entries); self.fid_word_count_docids.push(fid_word_count_docids_entries); @@ -203,8 +203,8 @@ impl<'extractor> WordDocidsMergerBuilders<'extractor> { fn build(self) -> WordDocidsMergers<'extractor> { WordDocidsMergers { - word_fid_docids: self.word_fid_docids.build(), word_docids: self.word_docids.build(), + word_fid_docids: self.word_fid_docids.build(), exact_word_docids: self.exact_word_docids.build(), word_position_docids: self.word_position_docids.build(), fid_word_count_docids: self.fid_word_count_docids.build(), @@ -237,12 +237,12 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { pub struct WordDocidsExtractors; impl WordDocidsExtractors { - pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>( grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, - extractor_allocs: &mut ThreadLocal>, - ) -> Result { + extractor_allocs: &'extractor mut ThreadLocal>, + ) -> Result> { let max_memory = grenad_parameters.max_memory_by_thread(); let index = indexing_context.index; diff --git a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index dd2a2a0de..f66802f26 100644 --- a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -15,6 +15,7 @@ use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{FieldId, GlobalFieldsIdsMap, Index, Result}; pub struct WordPairProximityDocidsExtractor; + impl SearchableExtractor for WordPairProximityDocidsExtractor { fn attributes_to_extract<'a>( rtxn: &'a RoTxn, diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 15686d0bf..0b999bd30 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -1,4 +1,3 @@ -use std::cell::RefCell; use std::sync::RwLock; use std::thread::{self, Builder}; @@ -20,7 +19,7 @@ use super::channel::*; use super::document::write_to_obkv; use super::document_change::DocumentChange; use super::extract::*; -use super::merger::{merge_caches_entries, FacetFieldIdsDelta}; +use super::merger::{FacetDatabases, FacetFieldIdsDelta}; use super::word_fst_builder::PrefixDelta; use super::words_prefix_docids::{ compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, @@ -29,8 +28,9 @@ use super::{StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::facet::FacetType; use crate::proximity::ProximityPrecision; -use crate::update::new::channel::ExtractorSender; +use crate::update::new::word_fst_builder::{PrefixData, WordFstBuilder}; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; +use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids}; use crate::update::settings::InnerIndexSettings; use crate::update::{FacetsUpdateBulk, GrenadParameters}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; @@ -111,10 +111,8 @@ pub fn index<'pl, 'indexer, 'index, DC>( where DC: DocumentChanges<'pl>, { - let (merger_sender, writer_receiver) = merger_writer_channel(10_000); - // This channel acts as a rendezvous point to ensure that we are one task ahead - let (extractor_sender, merger_receiver) = extractors_merger_channels(4); - + // TODO find a better channel limit + let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000); let new_fields_ids_map = RwLock::new(new_fields_ids_map); let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads()); @@ -138,12 +136,12 @@ where let _entered = span.enter(); // document but we need to create a function that collects and compresses documents. - let document_sender = extractor_sender.document_sender(); - let document_extractor = DocumentExtractor { document_sender: &document_sender}; + let document_sender = extractor_sender.documents(); + let document_extractor = DocumentExtractor { document_sender: &document_sender }; let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?; - document_sender.finish().unwrap(); + // document_sender.finish().unwrap(); const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; let max_memory = TEN_GIB / dbg!(rayon::current_num_threads()); @@ -155,16 +153,15 @@ where { let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); let _entered = span.enter(); - extract_and_send_docids::< - _, - FacetedDocidsExtractor, - FacetDocids, - >( - grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - &extractor_sender, + + let mut facet_field_ids_delta = FacetFieldIdsDelta::new(); + let caches = FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?; + merge_and_send_facet_docids( + caches, + FacetDatabases::new(index), + &index, + extractor_sender.facet_docids(), + &mut facet_field_ids_delta, )?; } @@ -173,18 +170,88 @@ where let _entered = span.enter(); let WordDocidsMergers { - word_fid_docids, word_docids, + word_fid_docids, exact_word_docids, word_position_docids, fid_word_count_docids, } = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?; - extractor_sender.send_searchable::(word_docids).unwrap(); - extractor_sender.send_searchable::(word_fid_docids).unwrap(); - extractor_sender.send_searchable::(exact_word_docids).unwrap(); - extractor_sender.send_searchable::(word_position_docids).unwrap(); - extractor_sender.send_searchable::(fid_word_count_docids).unwrap(); + // TODO Word Docids Merger + // extractor_sender.send_searchable::(word_docids).unwrap(); + { + let words_fst = index.words_fst(&rtxn)?; + let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; + let prefix_settings = index.prefix_settings(&rtxn)?; + word_fst_builder.with_prefix_settings(prefix_settings); + + { + let span = + tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); + let _entered = span.enter(); + + merge_and_send_docids( + word_docids, + index.word_docids.remap_types(), + &index, + extractor_sender.docids::(), + |deladd, key| word_fst_builder.register_word(deladd, key), + )?; + } + + { + let span = + tracing::trace_span!(target: "indexing::documents::merge", "words_fst"); + let _entered = span.enter(); + + let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?; + extractor_sender.main().write_words_fst(word_fst_mmap).unwrap(); + if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data { + extractor_sender.main().write_words_prefixes_fst(prefixes_fst_mmap).unwrap(); + // merger_result.prefix_delta = Some(prefix_delta); + } + } + } + + // Word Fid Docids Merging + // extractor_sender.send_searchable::(word_fid_docids).unwrap(); + merge_and_send_docids( + word_fid_docids, + index.word_fid_docids.remap_types(), + &index, + extractor_sender.docids::(), + |_, _key| Ok(()), + )?; + + // Exact Word Docids Merging + // extractor_sender.send_searchable::(exact_word_docids).unwrap(); + merge_and_send_docids( + exact_word_docids, + index.exact_word_docids.remap_types(), + &index, + extractor_sender.docids::(), + |_, _key| Ok(()), + )?; + + // Word Position Docids Merging + // extractor_sender.send_searchable::(word_position_docids).unwrap(); + merge_and_send_docids( + word_position_docids, + index.word_position_docids.remap_types(), + &index, + extractor_sender.docids::(), + |_, _key| Ok(()), + )?; + + // Fid Word Count Docids Merging + // extractor_sender.send_searchable::(fid_word_count_docids).unwrap(); + merge_and_send_docids( + fid_word_count_docids, + index.field_id_word_count_docids.remap_types(), + &index, + extractor_sender.docids::(), + |_, _key| Ok(()), + )?; } // run the proximity extraction only if the precision is by word @@ -194,16 +261,13 @@ where if proximity_precision == ProximityPrecision::ByWord { let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); let _entered = span.enter(); - extract_and_send_docids::< - _, - WordPairProximityDocidsExtractor, - WordPairProximityDocids, - >( - grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - &extractor_sender, + let caches = ::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?; + merge_and_send_docids( + caches, + index.word_pair_proximity_docids.remap_types(), + &index, + extractor_sender.docids::(), + |_, _| Ok(()), )?; } @@ -232,22 +296,7 @@ where })?; let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); - let indexer_span = tracing::Span::current(); - // TODO manage the errors correctly - let merger_thread = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || { - let span = - tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "merge"); - let _entered = span.enter(); - let rtxn = index.read_txn().unwrap(); - merge_caches_entries( - merger_receiver, - merger_sender, - &rtxn, - index, - global_fields_ids_map, - ) - })?; for operation in writer_receiver { let database = operation.database(index); @@ -263,15 +312,14 @@ where /// TODO handle the panicking threads handle.join().unwrap()?; - let merger_result = merger_thread.join().unwrap()?; - if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta { - compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; - } + // if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta { + // compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; + // } - if let Some(prefix_delta) = merger_result.prefix_delta { - compute_prefix_database(index, wtxn, prefix_delta)?; - } + // if let Some(prefix_delta) = merger_result.prefix_delta { + // compute_prefix_database(index, wtxn, prefix_delta)?; + // } Ok(()) as Result<_> })?; @@ -345,31 +393,6 @@ fn compute_facet_level_database( Ok(()) } -/// TODO: GrenadParameters::default() should be removed in favor a passed parameter -/// TODO: manage the errors correctly -/// TODO: we must have a single trait that also gives the extractor type -fn extract_and_send_docids< - 'pl, - 'fid, - 'indexer, - 'index, - 'extractor, - DC: DocumentChanges<'pl>, - E: DocidsExtractor, - D: MergerOperationType, ->( - grenad_parameters: GrenadParameters, - document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index>, - extractor_allocs: &'extractor mut ThreadLocal>, - sender: &ExtractorSender<'extractor>, -) -> Result<()> { - let caches = - E::run_extraction(grenad_parameters, document_changes, indexing_context, extractor_allocs)?; - sender.send_searchable::(caches).unwrap(); - Ok(()) -} - /// Returns the primary key that has already been set for this index or the /// one we will guess by searching for the first key that contains "id" as a substring, /// and whether the primary key changed diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index eae54b8b5..92a33985b 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -1,8 +1,6 @@ -use std::fs::File; use std::io::{self}; use bincode::ErrorKind; -use grenad::Merger; use hashbrown::HashSet; use heed::types::Bytes; use heed::{Database, RoTxn}; @@ -13,191 +11,11 @@ use super::channel::*; use super::extract::{ merge_caches, transpose_and_freeze_caches, CboCachedSorter, DelAddRoaringBitmap, FacetKind, }; -use super::word_fst_builder::{PrefixData, PrefixDelta}; -use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId}; -use crate::update::del_add::{DelAdd, DelAddOperation}; -use crate::update::new::channel::MergerOperation; -use crate::update::new::word_fst_builder::WordFstBuilder; -use crate::update::MergeDeladdCboRoaringBitmaps; +use super::word_fst_builder::PrefixDelta; +use super::DocumentChange; +use crate::update::del_add::DelAdd; use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result}; -/// TODO We must return some infos/stats -#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] -pub fn merge_caches_entries( - receiver: MergerReceiver, - sender: MergerSender, - rtxn: &RoTxn, - index: &Index, - global_fields_ids_map: GlobalFieldsIdsMap<'_>, -) -> Result { - let mut buffer: Vec = Vec::new(); - let mut documents_ids = index.documents_ids(rtxn)?; - let mut geo_extractor = GeoExtractor::new(rtxn, index)?; - let mut merger_result = MergerResult::default(); - - for merger_operation in receiver { - match merger_operation { - MergerOperation::ExactWordDocidsMerger(caches) => { - let span = - tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); - let _entered = span.enter(); - merge_and_send_docids( - caches, - /// TODO do a MergerOperation::database(&Index) -> Database. - index.exact_word_docids.remap_types(), - rtxn, - &mut buffer, - sender.docids::(), - |_, _key| Ok(()), - )?; - } - MergerOperation::FidWordCountDocidsMerger(merger) => { - let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); - let _entered = span.enter(); - merge_and_send_docids( - merger, - index.field_id_word_count_docids.remap_types(), - rtxn, - &mut buffer, - sender.docids::(), - |_, _key| Ok(()), - )?; - } - MergerOperation::WordDocidsMerger(merger) => { - let words_fst = index.words_fst(rtxn)?; - let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; - let prefix_settings = index.prefix_settings(rtxn)?; - word_fst_builder.with_prefix_settings(prefix_settings); - - { - let span = - tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); - let _entered = span.enter(); - - merge_and_send_docids( - merger, - index.word_docids.remap_types(), - rtxn, - &mut buffer, - sender.docids::(), - |deladd, key| word_fst_builder.register_word(deladd, key), - )?; - } - - { - let span = - tracing::trace_span!(target: "indexing::documents::merge", "words_fst"); - let _entered = span.enter(); - - let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, rtxn)?; - sender.main().write_words_fst(word_fst_mmap).unwrap(); - if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data { - sender.main().write_words_prefixes_fst(prefixes_fst_mmap).unwrap(); - merger_result.prefix_delta = Some(prefix_delta); - } - } - } - MergerOperation::WordFidDocidsMerger(merger) => { - let span = - tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); - let _entered = span.enter(); - merge_and_send_docids( - merger, - index.word_fid_docids.remap_types(), - rtxn, - &mut buffer, - sender.docids::(), - |_, _key| Ok(()), - )?; - } - MergerOperation::WordPairProximityDocidsMerger(merger) => { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids"); - let _entered = span.enter(); - merge_and_send_docids( - merger, - index.word_pair_proximity_docids.remap_types(), - rtxn, - &mut buffer, - sender.docids::(), - |_, _key| Ok(()), - )?; - } - MergerOperation::WordPositionDocidsMerger(merger) => { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); - let _entered = span.enter(); - merge_and_send_docids( - merger, - index.word_position_docids.remap_types(), - rtxn, - &mut buffer, - sender.docids::(), - |_, _key| Ok(()), - )?; - } - MergerOperation::InsertDocument { docid, external_id, document } => { - let span = - tracing::trace_span!(target: "indexing::documents::merge", "insert_document"); - let _entered = span.enter(); - documents_ids.insert(docid); - sender.documents().uncompressed(docid, external_id.clone(), &document).unwrap(); - - if let Some(geo_extractor) = geo_extractor.as_mut() { - let current = index.documents.remap_data_type::().get(rtxn, &docid)?; - let current: Option<&KvReaderFieldId> = current.map(Into::into); - let change = match current { - Some(current) => DocumentChange::Update(todo!()), - None => DocumentChange::Insertion(todo!()), - }; - geo_extractor.manage_change(&mut global_fields_ids_map, &change)?; - } - } - MergerOperation::DeleteDocument { docid, external_id } => { - let span = - tracing::trace_span!(target: "indexing::documents::merge", "delete_document"); - let _entered = span.enter(); - if !documents_ids.remove(docid) { - unreachable!("Tried deleting a document that we do not know about"); - } - sender.documents().delete(docid, external_id.clone()).unwrap(); - - if let Some(geo_extractor) = geo_extractor.as_mut() { - let change = DocumentChange::Deletion(Deletion::create(docid, todo!())); - geo_extractor.manage_change(&mut global_fields_ids_map, &change)?; - } - } - MergerOperation::FinishedDocument => { - // send the rtree - } - MergerOperation::FacetDocidsMerger(merger) => { - let span = - tracing::trace_span!(target: "indexing::documents::merge", "facet_docids"); - let _entered = span.enter(); - let mut facet_field_ids_delta = FacetFieldIdsDelta::new(); - merge_and_send_facet_docids( - merger, - FacetDatabases::new(index), - rtxn, - &mut buffer, - sender.facet_docids(), - &mut facet_field_ids_delta, - )?; - - merger_result.facet_field_ids_delta = Some(facet_field_ids_delta); - } - } - } - - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "documents_ids"); - let _entered = span.enter(); - - // Send the documents ids unionized with the current one - sender.send_documents_ids(documents_ids).unwrap(); - } - - Ok(merger_result) -} - #[derive(Default, Debug)] pub struct MergerResult { /// The delta of the prefixes @@ -256,25 +74,28 @@ impl GeoExtractor { #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] fn merge_and_send_docids<'extractor>( - mut caches: Vec>>, + mut caches: Vec>, database: Database, - rtxn: &RoTxn<'_>, - buffer: &mut Vec, - docids_sender: impl DocidsSender, + index: &Index, + docids_sender: impl DocidsSender + Sync, mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()>, ) -> Result<()> { transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| { + let rtxn = index.read_txn()?; + let mut buffer = Vec::new(); merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { - let current = database.get(rtxn, key)?; + let current = database.get(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { - let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); + let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); docids_sender.write(key, value).unwrap(); - register_key(DelAdd::Addition, key) + // register_key(DelAdd::Addition, key) + Ok(()) } Operation::Delete => { docids_sender.delete(key).unwrap(); - register_key(DelAdd::Deletion, key) + // register_key(DelAdd::Deletion, key) + Ok(()) } Operation::Ignore => Ok(()), } @@ -284,25 +105,26 @@ fn merge_and_send_docids<'extractor>( #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] fn merge_and_send_facet_docids<'extractor>( - mut caches: Vec>>, + mut caches: Vec>, database: FacetDatabases, - rtxn: &RoTxn<'_>, - buffer: &mut Vec, - docids_sender: impl DocidsSender, + index: &Index, + docids_sender: impl DocidsSender + Sync, facet_field_ids_delta: &mut FacetFieldIdsDelta, ) -> Result<()> { transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| { + let rtxn = index.read_txn()?; + let mut buffer = Vec::new(); merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { - let current = database.get_cbo_roaring_bytes_value(rtxn, key)?; + let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { - facet_field_ids_delta.register_from_key(key); - let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); + // facet_field_ids_delta.register_from_key(key); + let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); docids_sender.write(key, value).unwrap(); Ok(()) } Operation::Delete => { - facet_field_ids_delta.register_from_key(key); + // facet_field_ids_delta.register_from_key(key); docids_sender.delete(key).unwrap(); Ok(()) } diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 37ccc75cd..2d4daee7a 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -1,4 +1,7 @@ pub use document_change::{Deletion, DocumentChange, Insertion, Update}; +pub use merger::{ + merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta, +}; pub use top_level_map::{CowStr, TopLevelMap}; use super::del_add::DelAdd;