use std::fs::File; use fst::{Set, SetBuilder}; use grenad::Merger; use heed::types::Bytes; use heed::{Database, RoTxn}; use memmap2::Mmap; use roaring::RoaringBitmap; use tempfile::tempfile; use super::channel::*; use super::KvReaderDelAdd; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{CboRoaringBitmapCodec, Index, Result}; /// TODO We must return some infos/stats #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] pub fn merge_grenad_entries( receiver: MergerReceiver, sender: MergerSender, rtxn: &RoTxn, index: &Index, ) -> Result<()> { let mut buffer = Vec::new(); let mut documents_ids = index.documents_ids(rtxn)?; for merger_operation in receiver { match merger_operation { MergerOperation::ExactWordDocidsMerger(merger) => { let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); let _entered = span.enter(); merge_and_send_docids( merger, /// TODO do a MergerOperation::database(&Index) -> Database. index.exact_word_docids.remap_types(), rtxn, &mut buffer, sender.docids::(), |_key| Ok(()), |_key| Ok(()), )?; } MergerOperation::FidWordCountDocidsMerger(merger) => { let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); let _entered = span.enter(); merge_and_send_docids( merger, index.field_id_word_count_docids.remap_types(), rtxn, &mut buffer, sender.docids::(), |_key| Ok(()), |_key| Ok(()), )?; } MergerOperation::WordDocidsMerger(merger) => { let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); let _entered = span.enter(); let mut add_words_fst = SetBuilder::new(tempfile()?)?; let mut del_words_fst = SetBuilder::new(tempfile()?)?; merge_and_send_docids( merger, index.word_docids.remap_types(), rtxn, &mut buffer, sender.docids::(), |key| add_words_fst.insert(key), |key| del_words_fst.insert(key), )?; // Move that into a dedicated function let words_fst = index.words_fst(rtxn)?; let mmap = compute_new_words_fst(add_words_fst, del_words_fst, words_fst)?; sender.main().write_words_fst(mmap).unwrap(); } MergerOperation::WordFidDocidsMerger(merger) => { let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); let _entered = span.enter(); merge_and_send_docids( merger, index.word_fid_docids.remap_types(), rtxn, &mut buffer, sender.docids::(), |_key| Ok(()), |_key| Ok(()), )?; } MergerOperation::WordPairProximityDocidsMerger(merger) => { let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids"); let _entered = span.enter(); merge_and_send_docids( merger, index.word_pair_proximity_docids.remap_types(), rtxn, &mut buffer, sender.docids::(), |_key| Ok(()), |_key| Ok(()), )?; } MergerOperation::WordPositionDocidsMerger(merger) => { let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); let _entered = span.enter(); merge_and_send_docids( merger, index.word_position_docids.remap_types(), rtxn, &mut buffer, sender.docids::(), |_key| Ok(()), |_key| Ok(()), )?; } MergerOperation::InsertDocument { docid, document } => { let span = tracing::trace_span!(target: "indexing::documents::merge", "insert_document"); let _entered = span.enter(); documents_ids.insert(docid); sender.documents().uncompressed(docid, &document).unwrap(); } MergerOperation::DeleteDocument { docid } => { let span = tracing::trace_span!(target: "indexing::documents::merge", "delete_document"); let _entered = span.enter(); if !documents_ids.remove(docid) { unreachable!("Tried deleting a document that we do not know about"); } sender.documents().delete(docid).unwrap(); } } } { let span = tracing::trace_span!(target: "indexing::documents::merge", "documents_ids"); let _entered = span.enter(); // Send the documents ids unionized with the current one /// TODO return the slice of bytes directly serialize_bitmap_into_vec(&documents_ids, &mut buffer); sender.send_documents_ids(&buffer).unwrap(); } // ... Ok(()) } fn compute_new_words_fst( add_words_fst: SetBuilder, del_words_fst: SetBuilder, words_fst: Set>, ) -> Result { let add_words_fst_file = add_words_fst.into_inner()?; let add_words_fst_mmap = unsafe { Mmap::map(&add_words_fst_file)? }; let add_words_fst = Set::new(&add_words_fst_mmap)?; let del_words_fst_file = del_words_fst.into_inner()?; let del_words_fst_mmap = unsafe { Mmap::map(&del_words_fst_file)? }; let del_words_fst = Set::new(&del_words_fst_mmap)?; let diff = words_fst.op().add(&del_words_fst).difference(); let stream = add_words_fst.op().add(diff).union(); let mut words_fst = SetBuilder::new(tempfile()?)?; words_fst.extend_stream(stream)?; let words_fst_file = words_fst.into_inner()?; let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? }; Ok(words_fst_mmap) } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] fn merge_and_send_docids( merger: Merger, database: Database, rtxn: &RoTxn<'_>, buffer: &mut Vec, word_docids_sender: DocidsSender<'_, D>, mut add_key: impl FnMut(&[u8]) -> fst::Result<()>, mut del_key: impl FnMut(&[u8]) -> fst::Result<()>, ) -> Result<()> { let mut merger_iter = merger.into_stream_merger_iter().unwrap(); while let Some((key, deladd)) = merger_iter.next().unwrap() { let current = database.get(rtxn, key)?; let deladd: &KvReaderDelAdd = deladd.into(); let del = deladd.get(DelAdd::Deletion); let add = deladd.get(DelAdd::Addition); match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); word_docids_sender.write(key, value).unwrap(); add_key(key)?; } Operation::Delete => { word_docids_sender.delete(key).unwrap(); del_key(key)?; } Operation::Ignore => (), } } Ok(()) } enum Operation { Write(RoaringBitmap), Delete, Ignore, } /// A function that merges the DelAdd CboRoaringBitmaps with the current bitmap. fn merge_cbo_bitmaps( current: Option<&[u8]>, del: Option<&[u8]>, add: Option<&[u8]>, ) -> Result { let current = current.map(CboRoaringBitmapCodec::deserialize_from).transpose()?; let del = del.map(CboRoaringBitmapCodec::deserialize_from).transpose()?; let add = add.map(CboRoaringBitmapCodec::deserialize_from).transpose()?; match (current, del, add) { (None, None, None) => Ok(Operation::Ignore), // but it's strange (None, None, Some(add)) => Ok(Operation::Write(add)), (None, Some(_del), None) => Ok(Operation::Ignore), // but it's strange (None, Some(_del), Some(add)) => Ok(Operation::Write(add)), (Some(_current), None, None) => Ok(Operation::Ignore), // but it's strange (Some(current), None, Some(add)) => Ok(Operation::Write(current | add)), (Some(current), Some(del), add) => { let output = match add { Some(add) => (current - del) | add, None => current - del, }; if output.is_empty() { Ok(Operation::Delete) } else { Ok(Operation::Write(output)) } } } } /// TODO Return the slice directly from the serialize_into method fn cbo_bitmap_serialize_into_vec<'b>(bitmap: &RoaringBitmap, buffer: &'b mut Vec) -> &'b [u8] { buffer.clear(); CboRoaringBitmapCodec::serialize_into(bitmap, buffer); buffer.as_slice() } /// TODO Return the slice directly from the serialize_into method fn serialize_bitmap_into_vec(bitmap: &RoaringBitmap, buffer: &mut Vec) { buffer.clear(); bitmap.serialize_into(buffer).unwrap(); // buffer.as_slice() }