From 31680f3014899e078f8a8c5b403e9548ee7b1fc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 29 Oct 2024 14:38:52 +0100 Subject: [PATCH] WIP --- milli/src/update/new/channel.rs | 92 ++++++++------- milli/src/update/new/extract/cache.rs | 6 +- .../new/extract/faceted/extract_facets.rs | 38 ++----- milli/src/update/new/extract/mod.rs | 13 +-- .../extract/searchable/extract_word_docids.rs | 17 ++- .../src/update/new/extract/searchable/mod.rs | 46 ++------ milli/src/update/new/indexer/mod.rs | 24 ++-- milli/src/update/new/merger.rs | 106 ++++++++---------- 8 files changed, 151 insertions(+), 191 deletions(-) diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 8226046e6..39b1bec17 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -1,8 +1,6 @@ -use std::fs::File; use std::marker::PhantomData; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; -use grenad::Merger; use heed::types::Bytes; use memmap2::Mmap; use roaring::RoaringBitmap; @@ -10,8 +8,8 @@ use roaring::RoaringBitmap; use super::extract::FacetKind; use super::StdResult; use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; +use crate::update::new::extract::CboCachedSorter; use crate::update::new::KvReaderFieldId; -use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{DocumentId, Index}; /// The capacity of the channel is currently in number of messages. @@ -29,7 +27,9 @@ pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) { } /// The capacity of the channel is currently in number of messages. -pub fn extractors_merger_channels(cap: usize) -> (ExtractorSender, MergerReceiver) { +pub fn extractors_merger_channels<'extractor>( + cap: usize, +) -> (ExtractorSender<'extractor>, MergerReceiver<'extractor>) { let (sender, receiver) = crossbeam_channel::bounded(cap); (ExtractorSender(sender), MergerReceiver(receiver)) } @@ -313,7 +313,9 @@ pub trait DatabaseType { } pub trait MergerOperationType { - fn new_merger_operation(merger: Merger) -> MergerOperation; + fn new_merger_operation<'extractor>( + caches: Vec>>, + ) -> MergerOperation<'extractor>; } impl DatabaseType for ExactWordDocids { @@ -321,8 +323,10 @@ impl DatabaseType for ExactWordDocids { } impl MergerOperationType for ExactWordDocids { - fn new_merger_operation(merger: Merger) -> MergerOperation { - MergerOperation::ExactWordDocidsMerger(merger) + fn new_merger_operation<'extractor>( + caches: Vec>>, + ) -> MergerOperation<'extractor> { + MergerOperation::ExactWordDocidsMerger(caches) } } @@ -331,8 +335,10 @@ impl DatabaseType for FidWordCountDocids { } impl MergerOperationType for FidWordCountDocids { - fn new_merger_operation(merger: Merger) -> MergerOperation { - MergerOperation::FidWordCountDocidsMerger(merger) + fn new_merger_operation<'extractor>( + caches: Vec>>, + ) -> MergerOperation<'extractor> { + MergerOperation::FidWordCountDocidsMerger(caches) } } @@ -341,8 +347,10 @@ impl DatabaseType for WordDocids { } impl MergerOperationType for WordDocids { - fn new_merger_operation(merger: Merger) -> MergerOperation { - MergerOperation::WordDocidsMerger(merger) + fn new_merger_operation<'extractor>( + caches: Vec>>, + ) -> MergerOperation<'extractor> { + MergerOperation::WordDocidsMerger(caches) } } @@ -351,8 +359,10 @@ impl DatabaseType for WordFidDocids { } impl MergerOperationType for WordFidDocids { - fn new_merger_operation(merger: Merger) -> MergerOperation { - MergerOperation::WordFidDocidsMerger(merger) + fn new_merger_operation<'extractor>( + caches: Vec>>, + ) -> MergerOperation<'extractor> { + MergerOperation::WordFidDocidsMerger(caches) } } @@ -361,8 +371,10 @@ impl DatabaseType for WordPairProximityDocids { } impl MergerOperationType for WordPairProximityDocids { - fn new_merger_operation(merger: Merger) -> MergerOperation { - MergerOperation::WordPairProximityDocidsMerger(merger) + fn new_merger_operation<'extractor>( + caches: Vec>>, + ) -> MergerOperation<'extractor> { + MergerOperation::WordPairProximityDocidsMerger(caches) } } @@ -371,14 +383,18 @@ impl DatabaseType for WordPositionDocids { } impl MergerOperationType for WordPositionDocids { - fn new_merger_operation(merger: Merger) -> MergerOperation { - MergerOperation::WordPositionDocidsMerger(merger) + fn new_merger_operation<'extractor>( + caches: Vec>>, + ) -> MergerOperation<'extractor> { + MergerOperation::WordPositionDocidsMerger(caches) } } impl MergerOperationType for FacetDocids { - fn new_merger_operation(merger: Merger) -> MergerOperation { - MergerOperation::FacetDocidsMerger(merger) + fn new_merger_operation<'extractor>( + caches: Vec>>, + ) -> MergerOperation<'extractor> { + MergerOperation::FacetDocidsMerger(caches) } } @@ -489,23 +505,23 @@ impl DocumentsSender<'_> { } } -pub enum MergerOperation { - ExactWordDocidsMerger(Merger), - FidWordCountDocidsMerger(Merger), - WordDocidsMerger(Merger), - WordFidDocidsMerger(Merger), - WordPairProximityDocidsMerger(Merger), - WordPositionDocidsMerger(Merger), - FacetDocidsMerger(Merger), +pub enum MergerOperation<'extractor> { + ExactWordDocidsMerger(Vec>>), + FidWordCountDocidsMerger(Vec>>), + WordDocidsMerger(Vec>>), + WordFidDocidsMerger(Vec>>), + WordPairProximityDocidsMerger(Vec>>), + WordPositionDocidsMerger(Vec>>), + FacetDocidsMerger(Vec>>), DeleteDocument { docid: DocumentId, external_id: String }, InsertDocument { docid: DocumentId, external_id: String, document: Box }, FinishedDocument, } -pub struct MergerReceiver(Receiver); +pub struct MergerReceiver<'extractor>(Receiver>); -impl IntoIterator for MergerReceiver { - type Item = MergerOperation; +impl<'extractor> IntoIterator for MergerReceiver<'extractor> { + type Item = MergerOperation<'extractor>; type IntoIter = IntoIter; fn into_iter(self) -> Self::IntoIter { @@ -513,27 +529,27 @@ impl IntoIterator for MergerReceiver { } } -pub struct ExtractorSender(Sender); +pub struct ExtractorSender<'extractor>(Sender>); -impl ExtractorSender { - pub fn document_sender(&self) -> DocumentSender<'_> { +impl<'extractor> ExtractorSender<'extractor> { + pub fn document_sender(&self) -> DocumentSender<'_, 'extractor> { DocumentSender(Some(&self.0)) } pub fn send_searchable( &self, - merger: Merger, + caches: Vec>>, ) -> StdResult<(), SendError<()>> { - match self.0.send(D::new_merger_operation(merger)) { + match self.0.send(D::new_merger_operation(caches)) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } } } -pub struct DocumentSender<'a>(Option<&'a Sender>); +pub struct DocumentSender<'a, 'extractor>(Option<&'a Sender>>); -impl DocumentSender<'_> { +impl DocumentSender<'_, '_> { pub fn insert( &self, docid: DocumentId, @@ -564,7 +580,7 @@ impl DocumentSender<'_> { } } -impl Drop for DocumentSender<'_> { +impl Drop for DocumentSender<'_, '_> { fn drop(&mut self) { if let Some(sender) = self.0.take() { let _ = sender.send(MergerOperation::FinishedDocument); diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index f34d049f9..2941ed94b 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -392,7 +392,7 @@ pub fn transpose_and_freeze_caches<'a, 'extractor>( /// # Panics /// /// - If the bucket IDs in these frozen caches are not exactly the same. -pub fn merge_caches(frozen: Vec, mut iter: F) -> Result<()> +pub fn merge_caches(frozen: Vec, mut f: F) -> Result<()> where F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>, { @@ -455,7 +455,7 @@ where } // We send the merged entry outside. - (iter)(first_key, output)?; + (f)(first_key, output)?; // Don't forget to put the first entry back into the heap. if first_entry.cursor.move_on_next()?.is_some() { @@ -478,7 +478,7 @@ where } // We send the merged entry outside. - (iter)(key, output)?; + (f)(key, output)?; } } } diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 794eb90be..cce2f4c0e 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -1,12 +1,9 @@ use std::cell::RefCell; use std::collections::HashSet; -use std::fs::File; use std::ops::DerefMut as _; use bumpalo::Bump; -use grenad::Merger; use heed::RoTxn; -use raw_collections::alloc::RefBump; use serde_json::Value; use super::super::cache::CboCachedSorter; @@ -19,16 +16,16 @@ use crate::update::new::indexer::document_changes::{ IndexingContext, RefCellExt, ThreadLocal, }; use crate::update::new::DocumentChange; -use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; +use crate::update::GrenadParameters; use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH}; -pub struct FacetedExtractorData<'extractor> { - attributes_to_extract: &'extractor [&'extractor str], +pub struct FacetedExtractorData<'a> { + attributes_to_extract: &'a [&'a str], grenad_parameters: GrenadParameters, max_memory: Option, } -impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> { +impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> { type Data = RefCell>; fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { @@ -217,12 +214,12 @@ fn truncate_str(s: &str) -> &str { impl DocidsExtractor for FacetedDocidsExtractor { #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] - fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>( grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, - extractor_allocs: &mut ThreadLocal>, - ) -> Result> { + extractor_allocs: &'extractor mut ThreadLocal>, + ) -> Result>> { let max_memory = grenad_parameters.max_memory_by_thread(); let index = indexing_context.index; @@ -251,26 +248,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { &datastore, )?; } - { - let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); - let span = - tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); - let _entered = span.enter(); - let readers: Vec<_> = datastore - .into_iter() - // .par_bridge() // T is !Send - .map(|cached_sorter| { - let cached_sorter = cached_sorter.into_inner(); - let sorter = cached_sorter.into_sorter()?; - sorter.into_reader_cursors() - }) - .collect(); - - for reader in readers { - builder.extend(reader?); - } - Ok(builder.build()) - } + Ok(datastore.into_iter().map(RefCell::into_inner).collect()) } } diff --git a/milli/src/update/new/extract/mod.rs b/milli/src/update/new/extract/mod.rs index 4c4374a8f..aa71752eb 100644 --- a/milli/src/update/new/extract/mod.rs +++ b/milli/src/update/new/extract/mod.rs @@ -2,25 +2,22 @@ mod cache; mod faceted; mod searchable; -use std::cell::RefCell; -use std::fs::File; - use bumpalo::Bump; +pub use cache::{merge_caches, transpose_and_freeze_caches, CboCachedSorter, DelAddRoaringBitmap}; pub use faceted::*; -use grenad::Merger; pub use searchable::*; use super::indexer::document_changes::{DocumentChanges, FullySend, IndexingContext, ThreadLocal}; -use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; +use crate::update::GrenadParameters; use crate::Result; pub trait DocidsExtractor { - fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>( grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, - extractor_allocs: &mut ThreadLocal>, - ) -> Result>; + extractor_allocs: &'extractor mut ThreadLocal>, + ) -> Result>>; } /// TODO move in permissive json pointer diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index 79602503a..a271f26dd 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -7,7 +7,6 @@ use std::ops::DerefMut as _; use bumpalo::Bump; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; -use raw_collections::alloc::RefBump; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use crate::update::new::extract::cache::CboCachedSorter; @@ -157,15 +156,15 @@ struct WordDocidsMergerBuilders { fid_word_count_docids: MergerBuilder, } -pub struct WordDocidsMergers { - pub word_fid_docids: Merger, - pub word_docids: Merger, - pub exact_word_docids: Merger, - pub word_position_docids: Merger, - pub fid_word_count_docids: Merger, +pub struct WordDocidsMergers<'extractor> { + pub word_fid_docids: Vec>>, + pub word_docids: Vec>>, + pub exact_word_docids: Vec>>, + pub word_position_docids: Vec>>, + pub fid_word_count_docids: Vec>>, } -impl WordDocidsMergerBuilders { +impl<'extractor> WordDocidsMergerBuilders<'extractor> { fn new() -> Self { Self { word_fid_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), @@ -202,7 +201,7 @@ impl WordDocidsMergerBuilders { Ok(()) } - fn build(self) -> WordDocidsMergers { + fn build(self) -> WordDocidsMergers<'extractor> { WordDocidsMergers { word_fid_docids: self.word_fid_docids.build(), word_docids: self.word_docids.build(), diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 5f5c995b1..b5dc1f19d 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -9,9 +9,7 @@ use std::marker::PhantomData; use bumpalo::Bump; pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers}; pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; -use grenad::Merger; use heed::RoTxn; -use raw_collections::alloc::RefBump; use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; @@ -21,18 +19,18 @@ use crate::update::new::indexer::document_changes::{ IndexingContext, ThreadLocal, }; use crate::update::new::DocumentChange; -use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; +use crate::update::GrenadParameters; use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE}; -pub struct SearchableExtractorData<'extractor, EX: SearchableExtractor> { - tokenizer: &'extractor DocumentTokenizer<'extractor>, +pub struct SearchableExtractorData<'a, EX: SearchableExtractor> { + tokenizer: &'a DocumentTokenizer<'a>, grenad_parameters: GrenadParameters, max_memory: Option, _ex: PhantomData, } -impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> - for SearchableExtractorData<'extractor, EX> +impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> + for SearchableExtractorData<'a, EX> { type Data = RefCell>; @@ -50,12 +48,12 @@ impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> } pub trait SearchableExtractor: Sized + Sync { - fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>( grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, - extractor_allocs: &mut ThreadLocal>, - ) -> Result> { + extractor_allocs: &'extractor mut ThreadLocal>, + ) -> Result>> { let max_memory = grenad_parameters.max_memory_by_thread(); let rtxn = indexing_context.index.read_txn()?; @@ -107,28 +105,8 @@ pub trait SearchableExtractor: Sized + Sync { &datastore, )?; } - { - let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); - let span = - tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); - let _entered = span.enter(); - let readers: Vec<_> = datastore - .into_iter() - // .par_bridge() // T is !Send - .map(|cache_sorter| { - let cached_sorter = cache_sorter.into_inner(); - let sorter = cached_sorter.into_sorter()?; - sorter.into_reader_cursors() - }) - .collect(); - - for reader in readers { - builder.extend(reader?); - } - - Ok(builder.build()) - } + Ok(datastore.into_iter().map(RefCell::into_inner).collect()) } fn extract_document_change( @@ -144,12 +122,12 @@ pub trait SearchableExtractor: Sized + Sync { } impl DocidsExtractor for T { - fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>( grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, - extractor_allocs: &mut ThreadLocal>, - ) -> Result> { + extractor_allocs: &'extractor mut ThreadLocal>, + ) -> Result>> { Self::run_extraction( grenad_parameters, document_changes, diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 8cb874f19..15686d0bf 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -20,7 +20,7 @@ use super::channel::*; use super::document::write_to_obkv; use super::document_change::DocumentChange; use super::extract::*; -use super::merger::{merge_grenad_entries, FacetFieldIdsDelta}; +use super::merger::{merge_caches_entries, FacetFieldIdsDelta}; use super::word_fst_builder::PrefixDelta; use super::words_prefix_docids::{ compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, @@ -33,7 +33,7 @@ use crate::update::new::channel::ExtractorSender; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; use crate::update::settings::InnerIndexSettings; use crate::update::{FacetsUpdateBulk, GrenadParameters}; -use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; +use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; pub(crate) mod de; pub mod document_changes; @@ -42,11 +42,11 @@ mod document_operation; mod partial_dump; mod update_by_function; -struct DocumentExtractor<'a> { - document_sender: &'a DocumentSender<'a>, +struct DocumentExtractor<'a, 'extractor> { + document_sender: &'a DocumentSender<'a, 'extractor>, } -impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { +impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a, 'extractor> { type Data = FullySend<()>; fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result { @@ -179,6 +179,7 @@ where word_position_docids, fid_word_count_docids, } = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?; + extractor_sender.send_searchable::(word_docids).unwrap(); extractor_sender.send_searchable::(word_fid_docids).unwrap(); extractor_sender.send_searchable::(exact_word_docids).unwrap(); @@ -201,7 +202,7 @@ where grenad_parameters, document_changes, indexing_context, - &mut extractor_allocs, + &mut extractor_allocs, &extractor_sender, )?; } @@ -239,7 +240,7 @@ where tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "merge"); let _entered = span.enter(); let rtxn = index.read_txn().unwrap(); - merge_grenad_entries( + merge_caches_entries( merger_receiver, merger_sender, &rtxn, @@ -352,6 +353,7 @@ fn extract_and_send_docids< 'fid, 'indexer, 'index, + 'extractor, DC: DocumentChanges<'pl>, E: DocidsExtractor, D: MergerOperationType, @@ -359,12 +361,12 @@ fn extract_and_send_docids< grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, - extractor_allocs: &mut ThreadLocal>, - sender: &ExtractorSender, + extractor_allocs: &'extractor mut ThreadLocal>, + sender: &ExtractorSender<'extractor>, ) -> Result<()> { - let merger = + let caches = E::run_extraction(grenad_parameters, document_changes, indexing_context, extractor_allocs)?; - sender.send_searchable::(merger).unwrap(); + sender.send_searchable::(caches).unwrap(); Ok(()) } diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 6183beb63..eae54b8b5 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -6,13 +6,16 @@ use grenad::Merger; use hashbrown::HashSet; use heed::types::Bytes; use heed::{Database, RoTxn}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use roaring::RoaringBitmap; use super::channel::*; -use super::extract::FacetKind; +use super::extract::{ + merge_caches, transpose_and_freeze_caches, CboCachedSorter, DelAddRoaringBitmap, FacetKind, +}; use super::word_fst_builder::{PrefixData, PrefixDelta}; use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId}; -use crate::update::del_add::DelAdd; +use crate::update::del_add::{DelAdd, DelAddOperation}; use crate::update::new::channel::MergerOperation; use crate::update::new::word_fst_builder::WordFstBuilder; use crate::update::MergeDeladdCboRoaringBitmaps; @@ -20,7 +23,7 @@ use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, /// TODO We must return some infos/stats #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] -pub fn merge_grenad_entries( +pub fn merge_caches_entries( receiver: MergerReceiver, sender: MergerSender, rtxn: &RoTxn, @@ -34,12 +37,12 @@ pub fn merge_grenad_entries( for merger_operation in receiver { match merger_operation { - MergerOperation::ExactWordDocidsMerger(merger) => { + MergerOperation::ExactWordDocidsMerger(caches) => { let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); let _entered = span.enter(); merge_and_send_docids( - merger, + caches, /// TODO do a MergerOperation::database(&Index) -> Database. index.exact_word_docids.remap_types(), rtxn, @@ -192,8 +195,6 @@ pub fn merge_grenad_entries( sender.send_documents_ids(documents_ids).unwrap(); } - // ... - Ok(merger_result) } @@ -254,69 +255,61 @@ impl GeoExtractor { } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] -fn merge_and_send_docids( - merger: Merger, +fn merge_and_send_docids<'extractor>( + mut caches: Vec>>, database: Database, rtxn: &RoTxn<'_>, buffer: &mut Vec, docids_sender: impl DocidsSender, mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()>, ) -> Result<()> { - let mut merger_iter = merger.into_stream_merger_iter().unwrap(); - while let Some((key, deladd)) = merger_iter.next().unwrap() { - let current = database.get(rtxn, key)?; - let deladd: &KvReaderDelAdd = deladd.into(); - let del = deladd.get(DelAdd::Deletion); - let add = deladd.get(DelAdd::Addition); - - match merge_cbo_bitmaps(current, del, add)? { - Operation::Write(bitmap) => { - let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); - docids_sender.write(key, value).unwrap(); - register_key(DelAdd::Addition, key)?; + transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| { + merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { + let current = database.get(rtxn, key)?; + match merge_cbo_bitmaps(current, del, add)? { + Operation::Write(bitmap) => { + let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); + docids_sender.write(key, value).unwrap(); + register_key(DelAdd::Addition, key) + } + Operation::Delete => { + docids_sender.delete(key).unwrap(); + register_key(DelAdd::Deletion, key) + } + Operation::Ignore => Ok(()), } - Operation::Delete => { - docids_sender.delete(key).unwrap(); - register_key(DelAdd::Deletion, key)?; - } - Operation::Ignore => (), - } - } - - Ok(()) + }) + }) } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] -fn merge_and_send_facet_docids( - merger: Merger, +fn merge_and_send_facet_docids<'extractor>( + mut caches: Vec>>, database: FacetDatabases, rtxn: &RoTxn<'_>, buffer: &mut Vec, docids_sender: impl DocidsSender, facet_field_ids_delta: &mut FacetFieldIdsDelta, ) -> Result<()> { - let mut merger_iter = merger.into_stream_merger_iter().unwrap(); - while let Some((key, deladd)) = merger_iter.next().unwrap() { - let current = database.get_cbo_roaring_bytes_value(rtxn, key)?; - let deladd: &KvReaderDelAdd = deladd.into(); - let del = deladd.get(DelAdd::Deletion); - let add = deladd.get(DelAdd::Addition); - - match merge_cbo_bitmaps(current, del, add)? { - Operation::Write(bitmap) => { - facet_field_ids_delta.register_from_key(key); - let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); - docids_sender.write(key, value).unwrap(); + transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| { + merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { + let current = database.get_cbo_roaring_bytes_value(rtxn, key)?; + match merge_cbo_bitmaps(current, del, add)? { + Operation::Write(bitmap) => { + facet_field_ids_delta.register_from_key(key); + let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); + docids_sender.write(key, value).unwrap(); + Ok(()) + } + Operation::Delete => { + facet_field_ids_delta.register_from_key(key); + docids_sender.delete(key).unwrap(); + Ok(()) + } + Operation::Ignore => Ok(()), } - Operation::Delete => { - facet_field_ids_delta.register_from_key(key); - docids_sender.delete(key).unwrap(); - } - Operation::Ignore => (), - } - } - - Ok(()) + }) + }) } struct FacetDatabases<'a> { @@ -409,13 +402,10 @@ enum Operation { /// A function that merges the DelAdd CboRoaringBitmaps with the current bitmap. fn merge_cbo_bitmaps( current: Option<&[u8]>, - del: Option<&[u8]>, - add: Option<&[u8]>, + del: Option, + add: Option, ) -> Result { let current = current.map(CboRoaringBitmapCodec::deserialize_from).transpose()?; - let del = del.map(CboRoaringBitmapCodec::deserialize_from).transpose()?; - let add = add.map(CboRoaringBitmapCodec::deserialize_from).transpose()?; - match (current, del, add) { (None, None, None) => Ok(Operation::Ignore), // but it's strange (None, None, Some(add)) => Ok(Operation::Write(add)),