From 98e48371c35da0b30716d946959c7fd50e7b284d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 4 Sep 2024 12:17:13 +0200 Subject: [PATCH] Factorize some stuff --- milli/src/update/new/channel.rs | 68 ++++++++------ milli/src/update/new/indexer/mod.rs | 56 +++++++++--- milli/src/update/new/merger.rs | 135 +++++++++++++++++----------- 3 files changed, 166 insertions(+), 93 deletions(-) diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index acea02316..8888132e3 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -9,7 +9,7 @@ use super::StdResult; use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY}; use crate::update::new::KvReaderFieldId; use crate::update::MergeDeladdCboRoaringBitmaps; -use crate::{DocumentId, Index}; +use crate::{CboRoaringBitmapCodec, DocumentId, Index}; /// The capacity of the channel is currently in number of messages. pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) { @@ -103,7 +103,9 @@ pub struct WriterOperation { pub enum Database { WordDocids, + ExactWordDocids, WordFidDocids, + WordPositionDocids, Documents, Main, } @@ -114,7 +116,9 @@ impl WriterOperation { Database::Main => index.main.remap_types(), Database::Documents => index.documents.remap_types(), Database::WordDocids => index.word_docids.remap_types(), + Database::ExactWordDocids => index.exact_word_docids.remap_types(), Database::WordFidDocids => index.word_fid_docids.remap_types(), + Database::WordPositionDocids => index.word_position_docids.remap_types(), } } @@ -141,11 +145,7 @@ impl MergerSender { MainSender(&self.0) } - pub fn word_docids(&self) -> DocidsSender<'_, WordDocids> { - DocidsSender { sender: &self.0, _marker: PhantomData } - } - - pub fn word_fid_docids(&self) -> DocidsSender<'_, WordFidDocids> { + pub fn docids(&self) -> DocidsSender<'_, D> { DocidsSender { sender: &self.0, _marker: PhantomData } } @@ -187,21 +187,45 @@ impl MainSender<'_> { } pub enum WordDocids {} +pub enum ExactWordDocids {} pub enum WordFidDocids {} +pub enum WordPositionDocids {} pub trait DatabaseType { - fn database() -> Database; + const DATABASE: Database; + + fn new_merger_operation(merger: Merger) -> MergerOperation; } impl DatabaseType for WordDocids { - fn database() -> Database { - Database::WordDocids + const DATABASE: Database = Database::WordDocids; + + fn new_merger_operation(merger: Merger) -> MergerOperation { + MergerOperation::WordDocidsMerger(merger) + } +} + +impl DatabaseType for ExactWordDocids { + const DATABASE: Database = Database::ExactWordDocids; + + fn new_merger_operation(merger: Merger) -> MergerOperation { + MergerOperation::ExactWordDocidsMerger(merger) } } impl DatabaseType for WordFidDocids { - fn database() -> Database { - Database::WordFidDocids + const DATABASE: Database = Database::WordFidDocids; + + fn new_merger_operation(merger: Merger) -> MergerOperation { + MergerOperation::WordFidDocidsMerger(merger) + } +} + +impl DatabaseType for WordPositionDocids { + const DATABASE: Database = Database::WordPositionDocids; + + fn new_merger_operation(merger: Merger) -> MergerOperation { + MergerOperation::WordPositionDocidsMerger(merger) } } @@ -213,7 +237,7 @@ pub struct DocidsSender<'a, D> { impl DocidsSender<'_, D> { pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { let entry = EntryOperation::Write(KeyValueEntry::from_key_value(key, value)); - match self.sender.send(WriterOperation { database: D::database(), entry }) { + match self.sender.send(WriterOperation { database: D::DATABASE, entry }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } @@ -221,7 +245,7 @@ impl DocidsSender<'_, D> { pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { let entry = EntryOperation::Delete(KeyEntry::from_key(key)); - match self.sender.send(WriterOperation { database: D::database(), entry }) { + match self.sender.send(WriterOperation { database: D::DATABASE, entry }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } @@ -258,7 +282,9 @@ impl DocumentsSender<'_> { pub enum MergerOperation { WordDocidsMerger(Merger), + ExactWordDocidsMerger(Merger), WordFidDocidsMerger(Merger), + WordPositionDocidsMerger(Merger), InsertDocument { docid: DocumentId, document: Box }, DeleteDocument { docid: DocumentId }, } @@ -295,23 +321,11 @@ impl ExtractorSender { } } - pub fn word_docids( + pub fn send_searchable( &self, merger: Merger, ) -> StdResult<(), SendError<()>> { - let operation = MergerOperation::WordDocidsMerger(merger); - match self.0.send(operation) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } - } - - pub fn word_fid_docids( - &self, - merger: Merger, - ) -> StdResult<(), SendError<()>> { - let operation = MergerOperation::WordFidDocidsMerger(merger); - match self.0.send(operation) { + match self.0.send(D::new_merger_operation(merger)) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 539b6d602..3b1fc97c5 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -11,14 +11,21 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::ThreadPool; pub use update_by_function::UpdateByFunction; -use super::channel::{extractors_merger_channels, merger_writer_channel, EntryOperation}; +use super::channel::{ + extractors_merger_channels, merger_writer_channel, EntryOperation, ExactWordDocids, WordDocids, + WordFidDocids, WordPositionDocids, +}; use super::document_change::DocumentChange; -use super::extract::{SearchableExtractor, WordDocidsExtractor, WordFidDocidsExtractor}; +use super::extract::{ + ExactWordDocidsExtractor, SearchableExtractor, WordDocidsExtractor, WordFidDocidsExtractor, + WordPositionDocidsExtractor, +}; use super::merger::merge_grenad_entries; use super::StdResult; use crate::documents::{ obkv_to_object, DocumentsBatchCursor, DocumentsBatchIndex, PrimaryKey, DEFAULT_PRIMARY_KEY, }; +use crate::update::new::channel::{DatabaseType, ExtractorSender}; use crate::update::GrenadParameters; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; @@ -82,36 +89,43 @@ where let docid = insertion.docid(); let content = insertion.new(); extractor_sender.document_insert(docid, content.boxed()).unwrap(); - // extracted_dictionary_sender.send(self, dictionary: &[u8]); } } Ok(()) as Result<_> })?; - // word docids - let merger = WordDocidsExtractor::run_extraction( + extract_and_send_docids::( index, &global_fields_ids_map, - /// TODO: GrenadParameters::default() should be removed in favor a passed parameter GrenadParameters::default(), document_changes.clone(), + &extractor_sender, )?; - /// TODO: manage the errors correctly - extractor_sender.word_docids(merger).unwrap(); - - // word fid docids - let merger = WordFidDocidsExtractor::run_extraction( + extract_and_send_docids::( index, &global_fields_ids_map, - /// TODO: GrenadParameters::default() should be removed in favor a passed parameter GrenadParameters::default(), document_changes.clone(), + &extractor_sender, )?; - /// TODO: manage the errors correctly - extractor_sender.word_fid_docids(merger).unwrap(); + extract_and_send_docids::( + index, + &global_fields_ids_map, + GrenadParameters::default(), + document_changes.clone(), + &extractor_sender, + )?; + + extract_and_send_docids::( + index, + &global_fields_ids_map, + GrenadParameters::default(), + document_changes.clone(), + &extractor_sender, + )?; Ok(()) as Result<_> }) @@ -148,6 +162,20 @@ where Ok(()) } +/// TODO: GrenadParameters::default() should be removed in favor a passed parameter +/// TODO: manage the errors correctly +/// TODO: we must have a single trait that also gives the extractor type +fn extract_and_send_docids( + index: &Index, + fields_ids_map: &GlobalFieldsIdsMap, + indexer: GrenadParameters, + document_changes: impl IntoParallelIterator>, + sender: &ExtractorSender, +) -> Result<()> { + let merger = E::run_extraction(index, fields_ids_map, indexer, document_changes)?; + Ok(sender.send_searchable::(merger).unwrap()) +} + /// TODO move this elsewhere pub fn guess_primary_key<'a>( rtxn: &'a RoTxn<'a>, diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index c7f1a4385..976fe435f 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -1,16 +1,24 @@ +use std::fs::File; +use std::io; + use fst::set::OpBuilder; use fst::{Set, SetBuilder}; +use grenad::Merger; use heed::types::Bytes; -use heed::RoTxn; +use heed::{Database, RoTxn}; use memmap2::Mmap; use roaring::RoaringBitmap; use tempfile::tempfile; -use super::channel::{MergerReceiver, MergerSender}; +use super::channel::{ + DatabaseType, DocidsSender, ExactWordDocids, MergerReceiver, MergerSender, WordDocids, + WordFidDocids, WordPositionDocids, +}; use super::KvReaderDelAdd; use crate::index::main_key::WORDS_FST_KEY; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; +use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{CboRoaringBitmapCodec, Index, Result}; /// TODO We must return some infos/stats @@ -26,34 +34,18 @@ pub fn merge_grenad_entries( for merger_operation in receiver { match merger_operation { MergerOperation::WordDocidsMerger(merger) => { - let word_docids_sender = sender.word_docids(); - let database = index.word_docids.remap_types::(); let mut add_words_fst = SetBuilder::new(tempfile()?)?; let mut del_words_fst = SetBuilder::new(tempfile()?)?; - /// TODO manage the error correctly - let mut merger_iter = merger.into_stream_merger_iter().unwrap(); - - // TODO manage the error correctly - while let Some((key, deladd)) = merger_iter.next().unwrap() { - let current = database.get(rtxn, key)?; - let deladd: &KvReaderDelAdd = deladd.into(); - let del = deladd.get(DelAdd::Deletion); - let add = deladd.get(DelAdd::Addition); - - match merge_cbo_bitmaps(current, del, add)? { - Operation::Write(bitmap) => { - let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); - word_docids_sender.write(key, value).unwrap(); - add_words_fst.insert(key)?; - } - Operation::Delete => { - word_docids_sender.delete(key).unwrap(); - del_words_fst.insert(key)?; - } - Operation::Ignore => (), - } - } + merge_and_send_docids( + merger, + index.word_docids.remap_types(), + rtxn, + &mut buffer, + sender.docids::(), + |key| add_words_fst.insert(key), + |key| del_words_fst.insert(key), + )?; // Move that into a dedicated function let words_fst = index.words_fst(rtxn)?; @@ -66,7 +58,6 @@ pub fn merge_grenad_entries( let del_words_fst_mmap = unsafe { Mmap::map(&del_words_fst_file)? }; let del_words_fst = Set::new(&del_words_fst_mmap)?; - // TO BE IMPROVED @many let diff = words_fst.op().add(&del_words_fst).difference(); let stream = add_words_fst.op().add(diff).union(); @@ -79,31 +70,38 @@ pub fn merge_grenad_entries( let main_sender = sender.main(); main_sender.write_words_fst(&words_fst_mmap).unwrap(); } + MergerOperation::ExactWordDocidsMerger(merger) => { + merge_and_send_docids( + merger, + index.exact_word_docids.remap_types(), + rtxn, + &mut buffer, + sender.docids::(), + |_key| Ok(()), + |_key| Ok(()), + )?; + } MergerOperation::WordFidDocidsMerger(merger) => { - let word_docids_sender = sender.word_fid_docids(); - let database = index.word_fid_docids.remap_types::(); - - /// TODO manage the error correctly - let mut merger_iter = merger.into_stream_merger_iter().unwrap(); - - // TODO manage the error correctly - while let Some((key, deladd)) = merger_iter.next().unwrap() { - let current = database.get(rtxn, key)?; - let deladd: &KvReaderDelAdd = deladd.into(); - let del = deladd.get(DelAdd::Deletion); - let add = deladd.get(DelAdd::Addition); - - match merge_cbo_bitmaps(current, del, add)? { - Operation::Write(bitmap) => { - let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); - word_docids_sender.write(key, value).unwrap(); - } - Operation::Delete => { - word_docids_sender.delete(key).unwrap(); - } - Operation::Ignore => (), - } - } + merge_and_send_docids( + merger, + index.word_fid_docids.remap_types(), + rtxn, + &mut buffer, + sender.docids::(), + |_key| Ok(()), + |_key| Ok(()), + )?; + } + MergerOperation::WordPositionDocidsMerger(merger) => { + merge_and_send_docids( + merger, + index.word_position_docids.remap_types(), + rtxn, + &mut buffer, + sender.docids::(), + |_key| Ok(()), + |_key| Ok(()), + )?; } MergerOperation::InsertDocument { docid, document } => { documents_ids.insert(docid); @@ -128,6 +126,39 @@ pub fn merge_grenad_entries( Ok(()) } +fn merge_and_send_docids( + merger: Merger, + database: Database, + rtxn: &RoTxn<'_>, + buffer: &mut Vec, + word_docids_sender: DocidsSender<'_, D>, + mut add_key: impl FnMut(&[u8]) -> fst::Result<()>, + mut del_key: impl FnMut(&[u8]) -> fst::Result<()>, +) -> Result<()> { + let mut merger_iter = merger.into_stream_merger_iter().unwrap(); + while let Some((key, deladd)) = merger_iter.next().unwrap() { + let current = database.get(rtxn, key)?; + let deladd: &KvReaderDelAdd = deladd.into(); + let del = deladd.get(DelAdd::Deletion); + let add = deladd.get(DelAdd::Addition); + + match merge_cbo_bitmaps(current, del, add)? { + Operation::Write(bitmap) => { + let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); + word_docids_sender.write(key, value).unwrap(); + add_key(key)?; + } + Operation::Delete => { + word_docids_sender.delete(key).unwrap(); + del_key(key)?; + } + Operation::Ignore => (), + } + } + + Ok(()) +} + enum Operation { Write(RoaringBitmap), Delete,