mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 14:54:27 +01:00
Send the WordsFst by using an Mmap
This commit is contained in:
parent
98e48371c3
commit
1d59c19cd2
@ -4,12 +4,13 @@ use std::marker::PhantomData;
|
|||||||
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
|
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
|
||||||
use grenad::Merger;
|
use grenad::Merger;
|
||||||
use heed::types::Bytes;
|
use heed::types::Bytes;
|
||||||
|
use memmap2::Mmap;
|
||||||
|
|
||||||
use super::StdResult;
|
use super::StdResult;
|
||||||
use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY};
|
use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY};
|
||||||
use crate::update::new::KvReaderFieldId;
|
use crate::update::new::KvReaderFieldId;
|
||||||
use crate::update::MergeDeladdCboRoaringBitmaps;
|
use crate::update::MergeDeladdCboRoaringBitmaps;
|
||||||
use crate::{CboRoaringBitmapCodec, DocumentId, Index};
|
use crate::{DocumentId, Index};
|
||||||
|
|
||||||
/// The capacity of the channel is currently in number of messages.
|
/// The capacity of the channel is currently in number of messages.
|
||||||
pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) {
|
pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) {
|
||||||
@ -23,26 +24,35 @@ pub fn extractors_merger_channels(cap: usize) -> (ExtractorSender, MergerReceive
|
|||||||
(ExtractorSender(sender), MergerReceiver(receiver))
|
(ExtractorSender(sender), MergerReceiver(receiver))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct KeyValueEntry {
|
pub enum KeyValueEntry {
|
||||||
key_length: usize,
|
SmallInMemory { key_length: usize, data: Box<[u8]> },
|
||||||
data: Box<[u8]>,
|
LargeOnDisk { key: Box<[u8]>, value: Mmap },
|
||||||
}
|
}
|
||||||
|
|
||||||
impl KeyValueEntry {
|
impl KeyValueEntry {
|
||||||
pub fn from_key_value(key: &[u8], value: &[u8]) -> Self {
|
pub fn from_small_key_value(key: &[u8], value: &[u8]) -> Self {
|
||||||
let mut data = Vec::with_capacity(key.len() + value.len());
|
let mut data = Vec::with_capacity(key.len() + value.len());
|
||||||
data.extend_from_slice(key);
|
data.extend_from_slice(key);
|
||||||
data.extend_from_slice(value);
|
data.extend_from_slice(value);
|
||||||
|
KeyValueEntry::SmallInMemory { key_length: key.len(), data: data.into_boxed_slice() }
|
||||||
|
}
|
||||||
|
|
||||||
KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() }
|
pub fn from_large_key_value(key: &[u8], value: Mmap) -> Self {
|
||||||
|
KeyValueEntry::LargeOnDisk { key: key.to_vec().into_boxed_slice(), value }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn key(&self) -> &[u8] {
|
pub fn key(&self) -> &[u8] {
|
||||||
&self.data.as_ref()[..self.key_length]
|
match self {
|
||||||
|
KeyValueEntry::SmallInMemory { key_length, data } => &data.as_ref()[..*key_length],
|
||||||
|
KeyValueEntry::LargeOnDisk { key, value: _ } => key.as_ref(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn value(&self) -> &[u8] {
|
pub fn value(&self) -> &[u8] {
|
||||||
&self.data.as_ref()[self.key_length..]
|
match self {
|
||||||
|
KeyValueEntry::SmallInMemory { key_length, data } => &data.as_ref()[*key_length..],
|
||||||
|
KeyValueEntry::LargeOnDisk { key: _, value } => value.as_ref(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -154,7 +164,7 @@ impl MergerSender {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn send_documents_ids(&self, bitmap: &[u8]) -> StdResult<(), SendError<()>> {
|
pub fn send_documents_ids(&self, bitmap: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
let entry = EntryOperation::Write(KeyValueEntry::from_key_value(
|
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(
|
||||||
DOCUMENTS_IDS_KEY.as_bytes(),
|
DOCUMENTS_IDS_KEY.as_bytes(),
|
||||||
bitmap,
|
bitmap,
|
||||||
));
|
));
|
||||||
@ -168,9 +178,11 @@ impl MergerSender {
|
|||||||
pub struct MainSender<'a>(&'a Sender<WriterOperation>);
|
pub struct MainSender<'a>(&'a Sender<WriterOperation>);
|
||||||
|
|
||||||
impl MainSender<'_> {
|
impl MainSender<'_> {
|
||||||
pub fn write_words_fst(&self, value: &[u8]) -> StdResult<(), SendError<()>> {
|
pub fn write_words_fst(&self, value: Mmap) -> StdResult<(), SendError<()>> {
|
||||||
let entry =
|
let entry = EntryOperation::Write(KeyValueEntry::from_large_key_value(
|
||||||
EntryOperation::Write(KeyValueEntry::from_key_value(WORDS_FST_KEY.as_bytes(), value));
|
WORDS_FST_KEY.as_bytes(),
|
||||||
|
value,
|
||||||
|
));
|
||||||
match self.0.send(WriterOperation { database: Database::Main, entry }) {
|
match self.0.send(WriterOperation { database: Database::Main, entry }) {
|
||||||
Ok(()) => Ok(()),
|
Ok(()) => Ok(()),
|
||||||
Err(SendError(_)) => Err(SendError(())),
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
@ -236,7 +248,7 @@ pub struct DocidsSender<'a, D> {
|
|||||||
|
|
||||||
impl<D: DatabaseType> DocidsSender<'_, D> {
|
impl<D: DatabaseType> DocidsSender<'_, D> {
|
||||||
pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
|
pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
let entry = EntryOperation::Write(KeyValueEntry::from_key_value(key, value));
|
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value));
|
||||||
match self.sender.send(WriterOperation { database: D::DATABASE, entry }) {
|
match self.sender.send(WriterOperation { database: D::DATABASE, entry }) {
|
||||||
Ok(()) => Ok(()),
|
Ok(()) => Ok(()),
|
||||||
Err(SendError(_)) => Err(SendError(())),
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
@ -261,7 +273,7 @@ impl DocumentsSender<'_> {
|
|||||||
docid: DocumentId,
|
docid: DocumentId,
|
||||||
document: &KvReaderFieldId,
|
document: &KvReaderFieldId,
|
||||||
) -> StdResult<(), SendError<()>> {
|
) -> StdResult<(), SendError<()>> {
|
||||||
let entry = EntryOperation::Write(KeyValueEntry::from_key_value(
|
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(
|
||||||
&docid.to_be_bytes(),
|
&docid.to_be_bytes(),
|
||||||
document.as_bytes(),
|
document.as_bytes(),
|
||||||
));
|
));
|
||||||
|
@ -49,26 +49,8 @@ pub fn merge_grenad_entries(
|
|||||||
|
|
||||||
// Move that into a dedicated function
|
// Move that into a dedicated function
|
||||||
let words_fst = index.words_fst(rtxn)?;
|
let words_fst = index.words_fst(rtxn)?;
|
||||||
|
let mmap = compute_new_words_fst(add_words_fst, del_words_fst, words_fst)?;
|
||||||
let add_words_fst_file = add_words_fst.into_inner()?;
|
sender.main().write_words_fst(mmap).unwrap();
|
||||||
let add_words_fst_mmap = unsafe { Mmap::map(&add_words_fst_file)? };
|
|
||||||
let add_words_fst = Set::new(&add_words_fst_mmap)?;
|
|
||||||
|
|
||||||
let del_words_fst_file = del_words_fst.into_inner()?;
|
|
||||||
let del_words_fst_mmap = unsafe { Mmap::map(&del_words_fst_file)? };
|
|
||||||
let del_words_fst = Set::new(&del_words_fst_mmap)?;
|
|
||||||
|
|
||||||
let diff = words_fst.op().add(&del_words_fst).difference();
|
|
||||||
let stream = add_words_fst.op().add(diff).union();
|
|
||||||
|
|
||||||
let mut words_fst = SetBuilder::new(tempfile()?)?;
|
|
||||||
words_fst.extend_stream(stream)?;
|
|
||||||
let words_fst_file = words_fst.into_inner()?;
|
|
||||||
let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? };
|
|
||||||
|
|
||||||
// PLEASE SEND THIS AS AN MMAP
|
|
||||||
let main_sender = sender.main();
|
|
||||||
main_sender.write_words_fst(&words_fst_mmap).unwrap();
|
|
||||||
}
|
}
|
||||||
MergerOperation::ExactWordDocidsMerger(merger) => {
|
MergerOperation::ExactWordDocidsMerger(merger) => {
|
||||||
merge_and_send_docids(
|
merge_and_send_docids(
|
||||||
@ -126,6 +108,30 @@ pub fn merge_grenad_entries(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn compute_new_words_fst(
|
||||||
|
add_words_fst: SetBuilder<File>,
|
||||||
|
del_words_fst: SetBuilder<File>,
|
||||||
|
words_fst: Set<std::borrow::Cow<'_, [u8]>>,
|
||||||
|
) -> Result<Mmap> {
|
||||||
|
let add_words_fst_file = add_words_fst.into_inner()?;
|
||||||
|
let add_words_fst_mmap = unsafe { Mmap::map(&add_words_fst_file)? };
|
||||||
|
let add_words_fst = Set::new(&add_words_fst_mmap)?;
|
||||||
|
|
||||||
|
let del_words_fst_file = del_words_fst.into_inner()?;
|
||||||
|
let del_words_fst_mmap = unsafe { Mmap::map(&del_words_fst_file)? };
|
||||||
|
let del_words_fst = Set::new(&del_words_fst_mmap)?;
|
||||||
|
|
||||||
|
let diff = words_fst.op().add(&del_words_fst).difference();
|
||||||
|
let stream = add_words_fst.op().add(diff).union();
|
||||||
|
|
||||||
|
let mut words_fst = SetBuilder::new(tempfile()?)?;
|
||||||
|
words_fst.extend_stream(stream)?;
|
||||||
|
let words_fst_file = words_fst.into_inner()?;
|
||||||
|
let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? };
|
||||||
|
|
||||||
|
Ok(words_fst_mmap)
|
||||||
|
}
|
||||||
|
|
||||||
fn merge_and_send_docids<D: DatabaseType>(
|
fn merge_and_send_docids<D: DatabaseType>(
|
||||||
merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
|
merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
|
||||||
database: Database<Bytes, Bytes>,
|
database: Database<Bytes, Bytes>,
|
||||||
|
Loading…
Reference in New Issue
Block a user