mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-09 22:48:54 +01:00
Extract and write the documents and words fst in the database
This commit is contained in:
parent
52d32b4ee9
commit
27b4cab857
@ -5,6 +5,7 @@ use grenad::Merger;
|
|||||||
use heed::types::Bytes;
|
use heed::types::Bytes;
|
||||||
|
|
||||||
use super::StdResult;
|
use super::StdResult;
|
||||||
|
use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY};
|
||||||
use crate::update::new::KvReaderFieldId;
|
use crate::update::new::KvReaderFieldId;
|
||||||
use crate::update::MergeDeladdCboRoaringBitmaps;
|
use crate::update::MergeDeladdCboRoaringBitmaps;
|
||||||
use crate::{DocumentId, Index};
|
use crate::{DocumentId, Index};
|
||||||
@ -22,12 +23,14 @@ pub fn extractors_merger_channels(cap: usize) -> ExtractorsMergerChannels {
|
|||||||
ExtractorsMergerChannels {
|
ExtractorsMergerChannels {
|
||||||
merger_receiver: MergerReceiver(receiver),
|
merger_receiver: MergerReceiver(receiver),
|
||||||
deladd_cbo_roaring_bitmap_sender: DeladdCboRoaringBitmapSender(sender.clone()),
|
deladd_cbo_roaring_bitmap_sender: DeladdCboRoaringBitmapSender(sender.clone()),
|
||||||
|
extracted_documents_sender: ExtractedDocumentsSender(sender.clone()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ExtractorsMergerChannels {
|
pub struct ExtractorsMergerChannels {
|
||||||
pub merger_receiver: MergerReceiver,
|
pub merger_receiver: MergerReceiver,
|
||||||
pub deladd_cbo_roaring_bitmap_sender: DeladdCboRoaringBitmapSender,
|
pub deladd_cbo_roaring_bitmap_sender: DeladdCboRoaringBitmapSender,
|
||||||
|
pub extracted_documents_sender: ExtractedDocumentsSender,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct KeyValueEntry {
|
pub struct KeyValueEntry {
|
||||||
@ -95,18 +98,37 @@ impl DocumentEntry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum WriterOperation {
|
pub struct DocumentDeletionEntry(DocumentId);
|
||||||
WordDocids(EntryOperation),
|
|
||||||
Document(DocumentEntry),
|
impl DocumentDeletionEntry {
|
||||||
|
pub fn key(&self) -> [u8; 4] {
|
||||||
|
self.0.to_be_bytes()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct WriterOperation {
|
||||||
|
database: Database,
|
||||||
|
entry: EntryOperation,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum Database {
|
||||||
|
WordDocids,
|
||||||
|
Documents,
|
||||||
|
Main,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WriterOperation {
|
impl WriterOperation {
|
||||||
pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> {
|
pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> {
|
||||||
match self {
|
match self.database {
|
||||||
WriterOperation::WordDocids(_) => index.word_docids.remap_types(),
|
Database::Main => index.main.remap_types(),
|
||||||
WriterOperation::Document(_) => index.documents.remap_types(),
|
Database::Documents => index.documents.remap_types(),
|
||||||
|
Database::WordDocids => index.word_docids.remap_types(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn entry(self) -> EntryOperation {
|
||||||
|
self.entry
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WriterReceiver(Receiver<WriterOperation>);
|
pub struct WriterReceiver(Receiver<WriterOperation>);
|
||||||
@ -123,37 +145,93 @@ impl IntoIterator for WriterReceiver {
|
|||||||
pub struct MergerSender(Sender<WriterOperation>);
|
pub struct MergerSender(Sender<WriterOperation>);
|
||||||
|
|
||||||
impl MergerSender {
|
impl MergerSender {
|
||||||
|
pub fn main(&self) -> MainSender<'_> {
|
||||||
|
MainSender(&self.0)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn word_docids(&self) -> WordDocidsSender<'_> {
|
pub fn word_docids(&self) -> WordDocidsSender<'_> {
|
||||||
WordDocidsSender(&self.0)
|
WordDocidsSender(&self.0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn documents(&self) -> DocumentsSender<'_> {
|
||||||
|
DocumentsSender(&self.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send_documents_ids(&self, bitmap: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
|
let entry = EntryOperation::Write(KeyValueEntry::from_key_value(
|
||||||
|
DOCUMENTS_IDS_KEY.as_bytes(),
|
||||||
|
bitmap,
|
||||||
|
));
|
||||||
|
match self.0.send(WriterOperation { database: Database::Main, entry }) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct MainSender<'a>(&'a Sender<WriterOperation>);
|
||||||
|
|
||||||
|
impl MainSender<'_> {
|
||||||
|
pub fn write_words_fst(&self, value: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
|
let entry =
|
||||||
|
EntryOperation::Write(KeyValueEntry::from_key_value(WORDS_FST_KEY.as_bytes(), value));
|
||||||
|
match self.0.send(WriterOperation { database: Database::Main, entry }) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
|
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
|
||||||
|
match self.0.send(WriterOperation { database: Database::Main, entry }) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WordDocidsSender<'a>(&'a Sender<WriterOperation>);
|
pub struct WordDocidsSender<'a>(&'a Sender<WriterOperation>);
|
||||||
|
|
||||||
impl WordDocidsSender<'_> {
|
impl WordDocidsSender<'_> {
|
||||||
pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
|
pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
let operation = EntryOperation::Write(KeyValueEntry::from_key_value(key, value));
|
let entry = EntryOperation::Write(KeyValueEntry::from_key_value(key, value));
|
||||||
match self.0.send(WriterOperation::WordDocids(operation)) {
|
match self.0.send(WriterOperation { database: Database::WordDocids, entry }) {
|
||||||
Ok(()) => Ok(()),
|
Ok(()) => Ok(()),
|
||||||
Err(SendError(_)) => Err(SendError(())),
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
|
pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
let operation = EntryOperation::Delete(KeyEntry::from_key(key));
|
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
|
||||||
match self.0.send(WriterOperation::WordDocids(operation)) {
|
match self.0.send(WriterOperation { database: Database::WordDocids, entry }) {
|
||||||
Ok(()) => Ok(()),
|
Ok(()) => Ok(()),
|
||||||
Err(SendError(_)) => Err(SendError(())),
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
pub struct DocumentsSender<'a>(&'a Sender<WriterOperation>);
|
||||||
pub struct DocumentSender(Sender<WriterOperation>);
|
|
||||||
|
|
||||||
impl DocumentSender {
|
impl DocumentsSender<'_> {
|
||||||
pub fn send(&self, document: DocumentEntry) -> StdResult<(), SendError<()>> {
|
/// TODO do that efficiently
|
||||||
match self.0.send(WriterOperation::Document(document)) {
|
pub fn uncompressed(
|
||||||
|
&self,
|
||||||
|
docid: DocumentId,
|
||||||
|
document: &KvReaderFieldId,
|
||||||
|
) -> StdResult<(), SendError<()>> {
|
||||||
|
let entry = EntryOperation::Write(KeyValueEntry::from_key_value(
|
||||||
|
&docid.to_be_bytes(),
|
||||||
|
document.as_bytes(),
|
||||||
|
));
|
||||||
|
match self.0.send(WriterOperation { database: Database::Documents, entry }) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
|
||||||
|
let entry = EntryOperation::Delete(KeyEntry::from_key(&docid.to_be_bytes()));
|
||||||
|
match self.0.send(WriterOperation { database: Database::Documents, entry }) {
|
||||||
Ok(()) => Ok(()),
|
Ok(()) => Ok(()),
|
||||||
Err(SendError(_)) => Err(SendError(())),
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
}
|
}
|
||||||
@ -162,6 +240,8 @@ impl DocumentSender {
|
|||||||
|
|
||||||
pub enum MergerOperation {
|
pub enum MergerOperation {
|
||||||
WordDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
|
WordDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
|
||||||
|
InsertDocument { docid: DocumentId, document: Box<KvReaderFieldId> },
|
||||||
|
DeleteDocument { docid: DocumentId },
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct MergerReceiver(Receiver<MergerOperation>);
|
pub struct MergerReceiver(Receiver<MergerOperation>);
|
||||||
@ -190,3 +270,26 @@ impl DeladdCboRoaringBitmapSender {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ExtractedDocumentsSender(Sender<MergerOperation>);
|
||||||
|
|
||||||
|
impl ExtractedDocumentsSender {
|
||||||
|
pub fn insert(
|
||||||
|
&self,
|
||||||
|
docid: DocumentId,
|
||||||
|
document: Box<KvReaderFieldId>,
|
||||||
|
) -> StdResult<(), SendError<()>> {
|
||||||
|
match self.0.send(MergerOperation::InsertDocument { docid, document }) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
|
||||||
|
match self.0.send(MergerOperation::DeleteDocument { docid }) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,15 +1,15 @@
|
|||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
|
|
||||||
use charabia::TokenizerBuilder;
|
use charabia::TokenizerBuilder;
|
||||||
use grenad::{Merger, ReaderCursor};
|
use grenad::Merger;
|
||||||
use heed::RoTxn;
|
use heed::RoTxn;
|
||||||
use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
|
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||||
|
|
||||||
use super::cache::CachedSorter;
|
use super::cache::CachedSorter;
|
||||||
use super::tokenize_document::DocumentTokenizer;
|
use super::tokenize_document::DocumentTokenizer;
|
||||||
use crate::update::new::{DocumentChange, ItemsPool};
|
use crate::update::new::{DocumentChange, ItemsPool};
|
||||||
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
|
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
|
||||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
||||||
|
|
||||||
pub trait SearchableExtractor {
|
pub trait SearchableExtractor {
|
||||||
fn run_extraction(
|
fn run_extraction(
|
||||||
|
@ -1,13 +1,11 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder};
|
use charabia::{SeparatorKind, Token, TokenKind, Tokenizer};
|
||||||
use heed::RoTxn;
|
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::update::new::KvReaderFieldId;
|
use crate::update::new::KvReaderFieldId;
|
||||||
use crate::{
|
use crate::{
|
||||||
FieldId, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, LocalizedAttributesRule,
|
FieldId, GlobalFieldsIdsMap, InternalError, LocalizedAttributesRule, Result, MAX_WORD_LENGTH,
|
||||||
Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct DocumentTokenizer<'a> {
|
pub struct DocumentTokenizer<'a> {
|
||||||
@ -239,6 +237,8 @@ mod test {
|
|||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::FieldsIdsMap;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_tokenize_document() {
|
fn test_tokenize_document() {
|
||||||
let mut fields_ids_map = FieldsIdsMap::new();
|
let mut fields_ids_map = FieldsIdsMap::new();
|
||||||
|
@ -13,7 +13,6 @@ pub use update_by_function::UpdateByFunction;
|
|||||||
|
|
||||||
use super::channel::{
|
use super::channel::{
|
||||||
extractors_merger_channels, merger_writer_channel, EntryOperation, ExtractorsMergerChannels,
|
extractors_merger_channels, merger_writer_channel, EntryOperation, ExtractorsMergerChannels,
|
||||||
WriterOperation,
|
|
||||||
};
|
};
|
||||||
use super::document_change::DocumentChange;
|
use super::document_change::DocumentChange;
|
||||||
use super::extract::{SearchableExtractor, WordDocidsExtractor};
|
use super::extract::{SearchableExtractor, WordDocidsExtractor};
|
||||||
@ -57,8 +56,11 @@ where
|
|||||||
PI::Iter: Clone,
|
PI::Iter: Clone,
|
||||||
{
|
{
|
||||||
let (merger_sender, writer_receiver) = merger_writer_channel(100);
|
let (merger_sender, writer_receiver) = merger_writer_channel(100);
|
||||||
let ExtractorsMergerChannels { merger_receiver, deladd_cbo_roaring_bitmap_sender } =
|
let ExtractorsMergerChannels {
|
||||||
extractors_merger_channels(100);
|
merger_receiver,
|
||||||
|
deladd_cbo_roaring_bitmap_sender,
|
||||||
|
extracted_documents_sender,
|
||||||
|
} = extractors_merger_channels(100);
|
||||||
|
|
||||||
let fields_ids_map_lock = RwLock::new(fields_ids_map);
|
let fields_ids_map_lock = RwLock::new(fields_ids_map);
|
||||||
let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock);
|
let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock);
|
||||||
@ -68,6 +70,28 @@ where
|
|||||||
let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
|
let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
|
||||||
pool.in_place_scope(|_s| {
|
pool.in_place_scope(|_s| {
|
||||||
let document_changes = document_changes.into_par_iter();
|
let document_changes = document_changes.into_par_iter();
|
||||||
|
|
||||||
|
// document but we need to create a function that collects and compresses documents.
|
||||||
|
document_changes.clone().into_par_iter().try_for_each(|result| {
|
||||||
|
match result? {
|
||||||
|
DocumentChange::Deletion(deletion) => {
|
||||||
|
let docid = deletion.docid();
|
||||||
|
extracted_documents_sender.delete(docid).unwrap();
|
||||||
|
}
|
||||||
|
DocumentChange::Update(update) => {
|
||||||
|
let docid = update.docid();
|
||||||
|
let content = update.new();
|
||||||
|
extracted_documents_sender.insert(docid, content.boxed()).unwrap();
|
||||||
|
}
|
||||||
|
DocumentChange::Insertion(insertion) => {
|
||||||
|
let docid = insertion.docid();
|
||||||
|
let content = insertion.new();
|
||||||
|
extracted_documents_sender.insert(docid, content.boxed()).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(()) as Result<_>
|
||||||
|
})?;
|
||||||
|
|
||||||
// word docids
|
// word docids
|
||||||
let merger = WordDocidsExtractor::run_extraction(
|
let merger = WordDocidsExtractor::run_extraction(
|
||||||
index,
|
index,
|
||||||
@ -90,15 +114,15 @@ where
|
|||||||
merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index)
|
merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// TODO Split this code into another function
|
|
||||||
for operation in writer_receiver {
|
for operation in writer_receiver {
|
||||||
let database = operation.database(index);
|
let database = operation.database(index);
|
||||||
match operation {
|
match operation.entry() {
|
||||||
WriterOperation::WordDocids(operation) => match operation {
|
EntryOperation::Delete(e) => {
|
||||||
EntryOperation::Delete(e) => database.delete(wtxn, e.entry()).map(drop)?,
|
if !database.delete(wtxn, e.entry())? {
|
||||||
EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?,
|
unreachable!("We tried to delete an unknown key")
|
||||||
},
|
}
|
||||||
WriterOperation::Document(e) => database.put(wtxn, &e.key(), e.content())?,
|
}
|
||||||
|
EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,9 +1,14 @@
|
|||||||
|
use fst::set::OpBuilder;
|
||||||
|
use fst::{Set, SetBuilder};
|
||||||
use heed::types::Bytes;
|
use heed::types::Bytes;
|
||||||
use heed::RoTxn;
|
use heed::RoTxn;
|
||||||
|
use memmap2::Mmap;
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
use tempfile::tempfile;
|
||||||
|
|
||||||
use super::channel::{MergerReceiver, MergerSender};
|
use super::channel::{MergerReceiver, MergerSender};
|
||||||
use super::KvReaderDelAdd;
|
use super::KvReaderDelAdd;
|
||||||
|
use crate::index::main_key::WORDS_FST_KEY;
|
||||||
use crate::update::del_add::DelAdd;
|
use crate::update::del_add::DelAdd;
|
||||||
use crate::update::new::channel::MergerOperation;
|
use crate::update::new::channel::MergerOperation;
|
||||||
use crate::{CboRoaringBitmapCodec, Index, Result};
|
use crate::{CboRoaringBitmapCodec, Index, Result};
|
||||||
@ -16,12 +21,15 @@ pub fn merge_grenad_entries(
|
|||||||
index: &Index,
|
index: &Index,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
|
let mut documents_ids = index.documents_ids(rtxn)?;
|
||||||
|
|
||||||
for merger_operation in receiver {
|
for merger_operation in receiver {
|
||||||
match merger_operation {
|
match merger_operation {
|
||||||
MergerOperation::WordDocidsMerger(merger) => {
|
MergerOperation::WordDocidsMerger(merger) => {
|
||||||
let sender = sender.word_docids();
|
let word_docids_sender = sender.word_docids();
|
||||||
let database = index.word_docids.remap_types::<Bytes, Bytes>();
|
let database = index.word_docids.remap_types::<Bytes, Bytes>();
|
||||||
|
let mut add_words_fst = SetBuilder::new(tempfile()?)?;
|
||||||
|
let mut del_words_fst = SetBuilder::new(tempfile()?)?;
|
||||||
|
|
||||||
/// TODO manage the error correctly
|
/// TODO manage the error correctly
|
||||||
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
|
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
|
||||||
@ -35,17 +43,62 @@ pub fn merge_grenad_entries(
|
|||||||
|
|
||||||
match merge_cbo_bitmaps(current, del, add)? {
|
match merge_cbo_bitmaps(current, del, add)? {
|
||||||
Operation::Write(bitmap) => {
|
Operation::Write(bitmap) => {
|
||||||
let value = cbo_serialize_into_vec(&bitmap, &mut buffer);
|
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer);
|
||||||
sender.write(key, value).unwrap();
|
word_docids_sender.write(key, value).unwrap();
|
||||||
|
add_words_fst.insert(key)?;
|
||||||
|
}
|
||||||
|
Operation::Delete => {
|
||||||
|
word_docids_sender.delete(key).unwrap();
|
||||||
|
del_words_fst.insert(key)?;
|
||||||
}
|
}
|
||||||
Operation::Delete => sender.delete(key).unwrap(),
|
|
||||||
Operation::Ignore => (),
|
Operation::Ignore => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Move that into a dedicated function
|
||||||
|
let words_fst = index.words_fst(rtxn)?;
|
||||||
|
|
||||||
|
let add_words_fst_file = add_words_fst.into_inner()?;
|
||||||
|
let add_words_fst_mmap = unsafe { Mmap::map(&add_words_fst_file)? };
|
||||||
|
let add_words_fst = Set::new(&add_words_fst_mmap)?;
|
||||||
|
|
||||||
|
let del_words_fst_file = del_words_fst.into_inner()?;
|
||||||
|
let del_words_fst_mmap = unsafe { Mmap::map(&del_words_fst_file)? };
|
||||||
|
let del_words_fst = Set::new(&del_words_fst_mmap)?;
|
||||||
|
|
||||||
|
// TO BE IMPROVED @many
|
||||||
|
let diff = words_fst.op().add(&del_words_fst).difference();
|
||||||
|
let stream = add_words_fst.op().add(diff).union();
|
||||||
|
|
||||||
|
let mut words_fst = SetBuilder::new(tempfile()?)?;
|
||||||
|
words_fst.extend_stream(stream)?;
|
||||||
|
let words_fst_file = words_fst.into_inner()?;
|
||||||
|
let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? };
|
||||||
|
|
||||||
|
// PLEASE SEND THIS AS AN MMAP
|
||||||
|
let main_sender = sender.main();
|
||||||
|
main_sender.write_words_fst(&words_fst_mmap).unwrap();
|
||||||
|
}
|
||||||
|
MergerOperation::InsertDocument { docid, document } => {
|
||||||
|
documents_ids.insert(docid);
|
||||||
|
sender.documents().uncompressed(docid, &document).unwrap();
|
||||||
|
}
|
||||||
|
MergerOperation::DeleteDocument { docid } => {
|
||||||
|
if !documents_ids.remove(docid) {
|
||||||
|
unreachable!("Tried deleting a document that we do not know about");
|
||||||
|
}
|
||||||
|
sender.documents().delete(docid).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send the documents ids unionized with the current one
|
||||||
|
/// TODO return the slice of bytes directly
|
||||||
|
serialize_bitmap_into_vec(&documents_ids, &mut buffer);
|
||||||
|
sender.send_documents_ids(&buffer).unwrap();
|
||||||
|
|
||||||
|
// ...
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,9 +139,16 @@ fn merge_cbo_bitmaps(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the slice directly from the serialize_into method
|
/// TODO Return the slice directly from the serialize_into method
|
||||||
fn cbo_serialize_into_vec<'b>(bitmap: &RoaringBitmap, buffer: &'b mut Vec<u8>) -> &'b [u8] {
|
fn cbo_bitmap_serialize_into_vec<'b>(bitmap: &RoaringBitmap, buffer: &'b mut Vec<u8>) -> &'b [u8] {
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
CboRoaringBitmapCodec::serialize_into(bitmap, buffer);
|
CboRoaringBitmapCodec::serialize_into(bitmap, buffer);
|
||||||
buffer.as_slice()
|
buffer.as_slice()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// TODO Return the slice directly from the serialize_into method
|
||||||
|
fn serialize_bitmap_into_vec<'b>(bitmap: &RoaringBitmap, buffer: &'b mut Vec<u8>) {
|
||||||
|
buffer.clear();
|
||||||
|
bitmap.serialize_into(buffer).unwrap();
|
||||||
|
// buffer.as_slice()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user