diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 8888132e3..e9a795bf5 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -4,12 +4,13 @@ use std::marker::PhantomData; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use grenad::Merger; use heed::types::Bytes; +use memmap2::Mmap; use super::StdResult; use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY}; use crate::update::new::KvReaderFieldId; use crate::update::MergeDeladdCboRoaringBitmaps; -use crate::{CboRoaringBitmapCodec, DocumentId, Index}; +use crate::{DocumentId, Index}; /// The capacity of the channel is currently in number of messages. pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) { @@ -23,26 +24,35 @@ pub fn extractors_merger_channels(cap: usize) -> (ExtractorSender, MergerReceive (ExtractorSender(sender), MergerReceiver(receiver)) } -pub struct KeyValueEntry { - key_length: usize, - data: Box<[u8]>, +pub enum KeyValueEntry { + SmallInMemory { key_length: usize, data: Box<[u8]> }, + LargeOnDisk { key: Box<[u8]>, value: Mmap }, } impl KeyValueEntry { - pub fn from_key_value(key: &[u8], value: &[u8]) -> Self { + pub fn from_small_key_value(key: &[u8], value: &[u8]) -> Self { let mut data = Vec::with_capacity(key.len() + value.len()); data.extend_from_slice(key); data.extend_from_slice(value); + KeyValueEntry::SmallInMemory { key_length: key.len(), data: data.into_boxed_slice() } + } - KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() } + pub fn from_large_key_value(key: &[u8], value: Mmap) -> Self { + KeyValueEntry::LargeOnDisk { key: key.to_vec().into_boxed_slice(), value } } pub fn key(&self) -> &[u8] { - &self.data.as_ref()[..self.key_length] + match self { + KeyValueEntry::SmallInMemory { key_length, data } => &data.as_ref()[..*key_length], + KeyValueEntry::LargeOnDisk { key, value: _ } => key.as_ref(), + } } pub fn value(&self) -> &[u8] { - &self.data.as_ref()[self.key_length..] + match self { + KeyValueEntry::SmallInMemory { key_length, data } => &data.as_ref()[*key_length..], + KeyValueEntry::LargeOnDisk { key: _, value } => value.as_ref(), + } } } @@ -154,7 +164,7 @@ impl MergerSender { } pub fn send_documents_ids(&self, bitmap: &[u8]) -> StdResult<(), SendError<()>> { - let entry = EntryOperation::Write(KeyValueEntry::from_key_value( + let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value( DOCUMENTS_IDS_KEY.as_bytes(), bitmap, )); @@ -168,9 +178,11 @@ impl MergerSender { pub struct MainSender<'a>(&'a Sender); impl MainSender<'_> { - pub fn write_words_fst(&self, value: &[u8]) -> StdResult<(), SendError<()>> { - let entry = - EntryOperation::Write(KeyValueEntry::from_key_value(WORDS_FST_KEY.as_bytes(), value)); + pub fn write_words_fst(&self, value: Mmap) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Write(KeyValueEntry::from_large_key_value( + WORDS_FST_KEY.as_bytes(), + value, + )); match self.0.send(WriterOperation { database: Database::Main, entry }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), @@ -236,7 +248,7 @@ pub struct DocidsSender<'a, D> { impl DocidsSender<'_, D> { pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { - let entry = EntryOperation::Write(KeyValueEntry::from_key_value(key, value)); + let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); match self.sender.send(WriterOperation { database: D::DATABASE, entry }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), @@ -261,7 +273,7 @@ impl DocumentsSender<'_> { docid: DocumentId, document: &KvReaderFieldId, ) -> StdResult<(), SendError<()>> { - let entry = EntryOperation::Write(KeyValueEntry::from_key_value( + let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value( &docid.to_be_bytes(), document.as_bytes(), )); diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 976fe435f..35449b475 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -49,26 +49,8 @@ pub fn merge_grenad_entries( // Move that into a dedicated function let words_fst = index.words_fst(rtxn)?; - - let add_words_fst_file = add_words_fst.into_inner()?; - let add_words_fst_mmap = unsafe { Mmap::map(&add_words_fst_file)? }; - let add_words_fst = Set::new(&add_words_fst_mmap)?; - - let del_words_fst_file = del_words_fst.into_inner()?; - let del_words_fst_mmap = unsafe { Mmap::map(&del_words_fst_file)? }; - let del_words_fst = Set::new(&del_words_fst_mmap)?; - - let diff = words_fst.op().add(&del_words_fst).difference(); - let stream = add_words_fst.op().add(diff).union(); - - let mut words_fst = SetBuilder::new(tempfile()?)?; - words_fst.extend_stream(stream)?; - let words_fst_file = words_fst.into_inner()?; - let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? }; - - // PLEASE SEND THIS AS AN MMAP - let main_sender = sender.main(); - main_sender.write_words_fst(&words_fst_mmap).unwrap(); + let mmap = compute_new_words_fst(add_words_fst, del_words_fst, words_fst)?; + sender.main().write_words_fst(mmap).unwrap(); } MergerOperation::ExactWordDocidsMerger(merger) => { merge_and_send_docids( @@ -126,6 +108,30 @@ pub fn merge_grenad_entries( Ok(()) } +fn compute_new_words_fst( + add_words_fst: SetBuilder, + del_words_fst: SetBuilder, + words_fst: Set>, +) -> Result { + let add_words_fst_file = add_words_fst.into_inner()?; + let add_words_fst_mmap = unsafe { Mmap::map(&add_words_fst_file)? }; + let add_words_fst = Set::new(&add_words_fst_mmap)?; + + let del_words_fst_file = del_words_fst.into_inner()?; + let del_words_fst_mmap = unsafe { Mmap::map(&del_words_fst_file)? }; + let del_words_fst = Set::new(&del_words_fst_mmap)?; + + let diff = words_fst.op().add(&del_words_fst).difference(); + let stream = add_words_fst.op().add(diff).union(); + + let mut words_fst = SetBuilder::new(tempfile()?)?; + words_fst.extend_stream(stream)?; + let words_fst_file = words_fst.into_inner()?; + let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? }; + + Ok(words_fst_mmap) +} + fn merge_and_send_docids( merger: Merger, database: Database,