diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index d94b2cc00..d5739a75e 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -5,6 +5,7 @@ use grenad::Merger; use heed::types::Bytes; use super::StdResult; +use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY}; use crate::update::new::KvReaderFieldId; use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{DocumentId, Index}; @@ -22,12 +23,14 @@ pub fn extractors_merger_channels(cap: usize) -> ExtractorsMergerChannels { ExtractorsMergerChannels { merger_receiver: MergerReceiver(receiver), deladd_cbo_roaring_bitmap_sender: DeladdCboRoaringBitmapSender(sender.clone()), + extracted_documents_sender: ExtractedDocumentsSender(sender.clone()), } } pub struct ExtractorsMergerChannels { pub merger_receiver: MergerReceiver, pub deladd_cbo_roaring_bitmap_sender: DeladdCboRoaringBitmapSender, + pub extracted_documents_sender: ExtractedDocumentsSender, } pub struct KeyValueEntry { @@ -95,18 +98,37 @@ impl DocumentEntry { } } -pub enum WriterOperation { - WordDocids(EntryOperation), - Document(DocumentEntry), +pub struct DocumentDeletionEntry(DocumentId); + +impl DocumentDeletionEntry { + pub fn key(&self) -> [u8; 4] { + self.0.to_be_bytes() + } +} + +pub struct WriterOperation { + database: Database, + entry: EntryOperation, +} + +pub enum Database { + WordDocids, + Documents, + Main, } impl WriterOperation { pub fn database(&self, index: &Index) -> heed::Database { - match self { - WriterOperation::WordDocids(_) => index.word_docids.remap_types(), - WriterOperation::Document(_) => index.documents.remap_types(), + match self.database { + Database::Main => index.main.remap_types(), + Database::Documents => index.documents.remap_types(), + Database::WordDocids => index.word_docids.remap_types(), } } + + pub fn entry(self) -> EntryOperation { + self.entry + } } pub struct WriterReceiver(Receiver); @@ -123,37 +145,93 @@ impl IntoIterator for WriterReceiver { pub struct MergerSender(Sender); impl MergerSender { + pub fn main(&self) -> MainSender<'_> { + MainSender(&self.0) + } + pub fn word_docids(&self) -> WordDocidsSender<'_> { WordDocidsSender(&self.0) } + + pub fn documents(&self) -> DocumentsSender<'_> { + DocumentsSender(&self.0) + } + + pub fn send_documents_ids(&self, bitmap: &[u8]) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Write(KeyValueEntry::from_key_value( + DOCUMENTS_IDS_KEY.as_bytes(), + bitmap, + )); + match self.0.send(WriterOperation { database: Database::Main, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } +} + +pub struct MainSender<'a>(&'a Sender); + +impl MainSender<'_> { + pub fn write_words_fst(&self, value: &[u8]) -> StdResult<(), SendError<()>> { + let entry = + EntryOperation::Write(KeyValueEntry::from_key_value(WORDS_FST_KEY.as_bytes(), value)); + match self.0.send(WriterOperation { database: Database::Main, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Delete(KeyEntry::from_key(key)); + match self.0.send(WriterOperation { database: Database::Main, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } } pub struct WordDocidsSender<'a>(&'a Sender); impl WordDocidsSender<'_> { pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { - let operation = EntryOperation::Write(KeyValueEntry::from_key_value(key, value)); - match self.0.send(WriterOperation::WordDocids(operation)) { + let entry = EntryOperation::Write(KeyValueEntry::from_key_value(key, value)); + match self.0.send(WriterOperation { database: Database::WordDocids, entry }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } } pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { - let operation = EntryOperation::Delete(KeyEntry::from_key(key)); - match self.0.send(WriterOperation::WordDocids(operation)) { + let entry = EntryOperation::Delete(KeyEntry::from_key(key)); + match self.0.send(WriterOperation { database: Database::WordDocids, entry }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } } } -#[derive(Clone)] -pub struct DocumentSender(Sender); +pub struct DocumentsSender<'a>(&'a Sender); -impl DocumentSender { - pub fn send(&self, document: DocumentEntry) -> StdResult<(), SendError<()>> { - match self.0.send(WriterOperation::Document(document)) { +impl DocumentsSender<'_> { + /// TODO do that efficiently + pub fn uncompressed( + &self, + docid: DocumentId, + document: &KvReaderFieldId, + ) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Write(KeyValueEntry::from_key_value( + &docid.to_be_bytes(), + document.as_bytes(), + )); + match self.0.send(WriterOperation { database: Database::Documents, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Delete(KeyEntry::from_key(&docid.to_be_bytes())); + match self.0.send(WriterOperation { database: Database::Documents, entry }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } @@ -162,6 +240,8 @@ impl DocumentSender { pub enum MergerOperation { WordDocidsMerger(Merger), + InsertDocument { docid: DocumentId, document: Box }, + DeleteDocument { docid: DocumentId }, } pub struct MergerReceiver(Receiver); @@ -190,3 +270,26 @@ impl DeladdCboRoaringBitmapSender { } } } + +#[derive(Clone)] +pub struct ExtractedDocumentsSender(Sender); + +impl ExtractedDocumentsSender { + pub fn insert( + &self, + docid: DocumentId, + document: Box, + ) -> StdResult<(), SendError<()>> { + match self.0.send(MergerOperation::InsertDocument { docid, document }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { + match self.0.send(MergerOperation::DeleteDocument { docid }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } +} diff --git a/milli/src/update/new/extract/extract_word_docids.rs b/milli/src/update/new/extract/extract_word_docids.rs index 55f13f221..cbb28b956 100644 --- a/milli/src/update/new/extract/extract_word_docids.rs +++ b/milli/src/update/new/extract/extract_word_docids.rs @@ -1,15 +1,15 @@ use std::fs::File; use charabia::TokenizerBuilder; -use grenad::{Merger, ReaderCursor}; +use grenad::Merger; use heed::RoTxn; -use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use super::cache::CachedSorter; use super::tokenize_document::DocumentTokenizer; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; +use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; pub trait SearchableExtractor { fn run_extraction( diff --git a/milli/src/update/new/extract/tokenize_document.rs b/milli/src/update/new/extract/tokenize_document.rs index 1494dd4b2..ed4e6b89d 100644 --- a/milli/src/update/new/extract/tokenize_document.rs +++ b/milli/src/update/new/extract/tokenize_document.rs @@ -1,13 +1,11 @@ use std::collections::HashMap; -use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder}; -use heed::RoTxn; +use charabia::{SeparatorKind, Token, TokenKind, Tokenizer}; use serde_json::Value; use crate::update::new::KvReaderFieldId; use crate::{ - FieldId, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, LocalizedAttributesRule, - Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH, + FieldId, GlobalFieldsIdsMap, InternalError, LocalizedAttributesRule, Result, MAX_WORD_LENGTH, }; pub struct DocumentTokenizer<'a> { @@ -239,6 +237,8 @@ mod test { use serde_json::json; use super::*; + use crate::FieldsIdsMap; + #[test] fn test_tokenize_document() { let mut fields_ids_map = FieldsIdsMap::new(); diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 1b763f5f9..7a9999c28 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -13,7 +13,6 @@ pub use update_by_function::UpdateByFunction; use super::channel::{ extractors_merger_channels, merger_writer_channel, EntryOperation, ExtractorsMergerChannels, - WriterOperation, }; use super::document_change::DocumentChange; use super::extract::{SearchableExtractor, WordDocidsExtractor}; @@ -57,8 +56,11 @@ where PI::Iter: Clone, { let (merger_sender, writer_receiver) = merger_writer_channel(100); - let ExtractorsMergerChannels { merger_receiver, deladd_cbo_roaring_bitmap_sender } = - extractors_merger_channels(100); + let ExtractorsMergerChannels { + merger_receiver, + deladd_cbo_roaring_bitmap_sender, + extracted_documents_sender, + } = extractors_merger_channels(100); let fields_ids_map_lock = RwLock::new(fields_ids_map); let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock); @@ -68,6 +70,28 @@ where let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { pool.in_place_scope(|_s| { let document_changes = document_changes.into_par_iter(); + + // document but we need to create a function that collects and compresses documents. + document_changes.clone().into_par_iter().try_for_each(|result| { + match result? { + DocumentChange::Deletion(deletion) => { + let docid = deletion.docid(); + extracted_documents_sender.delete(docid).unwrap(); + } + DocumentChange::Update(update) => { + let docid = update.docid(); + let content = update.new(); + extracted_documents_sender.insert(docid, content.boxed()).unwrap(); + } + DocumentChange::Insertion(insertion) => { + let docid = insertion.docid(); + let content = insertion.new(); + extracted_documents_sender.insert(docid, content.boxed()).unwrap(); + } + } + Ok(()) as Result<_> + })?; + // word docids let merger = WordDocidsExtractor::run_extraction( index, @@ -90,15 +114,15 @@ where merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index) })?; - // TODO Split this code into another function for operation in writer_receiver { let database = operation.database(index); - match operation { - WriterOperation::WordDocids(operation) => match operation { - EntryOperation::Delete(e) => database.delete(wtxn, e.entry()).map(drop)?, - EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?, - }, - WriterOperation::Document(e) => database.put(wtxn, &e.key(), e.content())?, + match operation.entry() { + EntryOperation::Delete(e) => { + if !database.delete(wtxn, e.entry())? { + unreachable!("We tried to delete an unknown key") + } + } + EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?, } } diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index e07262de8..b21f20b0f 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -1,9 +1,14 @@ +use fst::set::OpBuilder; +use fst::{Set, SetBuilder}; use heed::types::Bytes; use heed::RoTxn; +use memmap2::Mmap; use roaring::RoaringBitmap; +use tempfile::tempfile; use super::channel::{MergerReceiver, MergerSender}; use super::KvReaderDelAdd; +use crate::index::main_key::WORDS_FST_KEY; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; use crate::{CboRoaringBitmapCodec, Index, Result}; @@ -16,12 +21,15 @@ pub fn merge_grenad_entries( index: &Index, ) -> Result<()> { let mut buffer = Vec::new(); + let mut documents_ids = index.documents_ids(rtxn)?; for merger_operation in receiver { match merger_operation { MergerOperation::WordDocidsMerger(merger) => { - let sender = sender.word_docids(); + let word_docids_sender = sender.word_docids(); let database = index.word_docids.remap_types::(); + let mut add_words_fst = SetBuilder::new(tempfile()?)?; + let mut del_words_fst = SetBuilder::new(tempfile()?)?; /// TODO manage the error correctly let mut merger_iter = merger.into_stream_merger_iter().unwrap(); @@ -35,17 +43,62 @@ pub fn merge_grenad_entries( match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { - let value = cbo_serialize_into_vec(&bitmap, &mut buffer); - sender.write(key, value).unwrap(); + let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); + word_docids_sender.write(key, value).unwrap(); + add_words_fst.insert(key)?; + } + Operation::Delete => { + word_docids_sender.delete(key).unwrap(); + del_words_fst.insert(key)?; } - Operation::Delete => sender.delete(key).unwrap(), Operation::Ignore => (), } } + + // Move that into a dedicated function + let words_fst = index.words_fst(rtxn)?; + + let add_words_fst_file = add_words_fst.into_inner()?; + let add_words_fst_mmap = unsafe { Mmap::map(&add_words_fst_file)? }; + let add_words_fst = Set::new(&add_words_fst_mmap)?; + + let del_words_fst_file = del_words_fst.into_inner()?; + let del_words_fst_mmap = unsafe { Mmap::map(&del_words_fst_file)? }; + let del_words_fst = Set::new(&del_words_fst_mmap)?; + + // TO BE IMPROVED @many + let diff = words_fst.op().add(&del_words_fst).difference(); + let stream = add_words_fst.op().add(diff).union(); + + let mut words_fst = SetBuilder::new(tempfile()?)?; + words_fst.extend_stream(stream)?; + let words_fst_file = words_fst.into_inner()?; + let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? }; + + // PLEASE SEND THIS AS AN MMAP + let main_sender = sender.main(); + main_sender.write_words_fst(&words_fst_mmap).unwrap(); + } + MergerOperation::InsertDocument { docid, document } => { + documents_ids.insert(docid); + sender.documents().uncompressed(docid, &document).unwrap(); + } + MergerOperation::DeleteDocument { docid } => { + if !documents_ids.remove(docid) { + unreachable!("Tried deleting a document that we do not know about"); + } + sender.documents().delete(docid).unwrap(); } } } + // Send the documents ids unionized with the current one + /// TODO return the slice of bytes directly + serialize_bitmap_into_vec(&documents_ids, &mut buffer); + sender.send_documents_ids(&buffer).unwrap(); + + // ... + Ok(()) } @@ -86,9 +139,16 @@ fn merge_cbo_bitmaps( } } -/// Return the slice directly from the serialize_into method -fn cbo_serialize_into_vec<'b>(bitmap: &RoaringBitmap, buffer: &'b mut Vec) -> &'b [u8] { +/// TODO Return the slice directly from the serialize_into method +fn cbo_bitmap_serialize_into_vec<'b>(bitmap: &RoaringBitmap, buffer: &'b mut Vec) -> &'b [u8] { buffer.clear(); CboRoaringBitmapCodec::serialize_into(bitmap, buffer); buffer.as_slice() } + +/// TODO Return the slice directly from the serialize_into method +fn serialize_bitmap_into_vec<'b>(bitmap: &RoaringBitmap, buffer: &'b mut Vec) { + buffer.clear(); + bitmap.serialize_into(buffer).unwrap(); + // buffer.as_slice() +}