Factorize some stuff

This commit is contained in:
Clément Renault 2024-09-04 12:17:13 +02:00
parent 6d74fb0229
commit 98e48371c3
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 166 additions and 93 deletions

View File

@ -9,7 +9,7 @@ use super::StdResult;
use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY};
use crate::update::new::KvReaderFieldId;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{DocumentId, Index};
use crate::{CboRoaringBitmapCodec, DocumentId, Index};
/// The capacity of the channel is currently in number of messages.
pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) {
@ -103,7 +103,9 @@ pub struct WriterOperation {
pub enum Database {
WordDocids,
ExactWordDocids,
WordFidDocids,
WordPositionDocids,
Documents,
Main,
}
@ -114,7 +116,9 @@ impl WriterOperation {
Database::Main => index.main.remap_types(),
Database::Documents => index.documents.remap_types(),
Database::WordDocids => index.word_docids.remap_types(),
Database::ExactWordDocids => index.exact_word_docids.remap_types(),
Database::WordFidDocids => index.word_fid_docids.remap_types(),
Database::WordPositionDocids => index.word_position_docids.remap_types(),
}
}
@ -141,11 +145,7 @@ impl MergerSender {
MainSender(&self.0)
}
pub fn word_docids(&self) -> DocidsSender<'_, WordDocids> {
DocidsSender { sender: &self.0, _marker: PhantomData }
}
pub fn word_fid_docids(&self) -> DocidsSender<'_, WordFidDocids> {
pub fn docids<D: DatabaseType>(&self) -> DocidsSender<'_, D> {
DocidsSender { sender: &self.0, _marker: PhantomData }
}
@ -187,21 +187,45 @@ impl MainSender<'_> {
}
pub enum WordDocids {}
pub enum ExactWordDocids {}
pub enum WordFidDocids {}
pub enum WordPositionDocids {}
pub trait DatabaseType {
fn database() -> Database;
const DATABASE: Database;
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation;
}
impl DatabaseType for WordDocids {
fn database() -> Database {
Database::WordDocids
const DATABASE: Database = Database::WordDocids;
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordDocidsMerger(merger)
}
}
impl DatabaseType for ExactWordDocids {
const DATABASE: Database = Database::ExactWordDocids;
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::ExactWordDocidsMerger(merger)
}
}
impl DatabaseType for WordFidDocids {
fn database() -> Database {
Database::WordFidDocids
const DATABASE: Database = Database::WordFidDocids;
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordFidDocidsMerger(merger)
}
}
impl DatabaseType for WordPositionDocids {
const DATABASE: Database = Database::WordPositionDocids;
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordPositionDocidsMerger(merger)
}
}
@ -213,7 +237,7 @@ pub struct DocidsSender<'a, D> {
impl<D: DatabaseType> DocidsSender<'_, D> {
pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Write(KeyValueEntry::from_key_value(key, value));
match self.sender.send(WriterOperation { database: D::database(), entry }) {
match self.sender.send(WriterOperation { database: D::DATABASE, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
@ -221,7 +245,7 @@ impl<D: DatabaseType> DocidsSender<'_, D> {
pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
match self.sender.send(WriterOperation { database: D::database(), entry }) {
match self.sender.send(WriterOperation { database: D::DATABASE, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
@ -258,7 +282,9 @@ impl DocumentsSender<'_> {
pub enum MergerOperation {
WordDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
ExactWordDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordFidDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordPositionDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
InsertDocument { docid: DocumentId, document: Box<KvReaderFieldId> },
DeleteDocument { docid: DocumentId },
}
@ -295,23 +321,11 @@ impl ExtractorSender {
}
}
pub fn word_docids(
pub fn send_searchable<D: DatabaseType>(
&self,
merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
) -> StdResult<(), SendError<()>> {
let operation = MergerOperation::WordDocidsMerger(merger);
match self.0.send(operation) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn word_fid_docids(
&self,
merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
) -> StdResult<(), SendError<()>> {
let operation = MergerOperation::WordFidDocidsMerger(merger);
match self.0.send(operation) {
match self.0.send(D::new_merger_operation(merger)) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}

View File

@ -11,14 +11,21 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::ThreadPool;
pub use update_by_function::UpdateByFunction;
use super::channel::{extractors_merger_channels, merger_writer_channel, EntryOperation};
use super::channel::{
extractors_merger_channels, merger_writer_channel, EntryOperation, ExactWordDocids, WordDocids,
WordFidDocids, WordPositionDocids,
};
use super::document_change::DocumentChange;
use super::extract::{SearchableExtractor, WordDocidsExtractor, WordFidDocidsExtractor};
use super::extract::{
ExactWordDocidsExtractor, SearchableExtractor, WordDocidsExtractor, WordFidDocidsExtractor,
WordPositionDocidsExtractor,
};
use super::merger::merge_grenad_entries;
use super::StdResult;
use crate::documents::{
obkv_to_object, DocumentsBatchCursor, DocumentsBatchIndex, PrimaryKey, DEFAULT_PRIMARY_KEY,
};
use crate::update::new::channel::{DatabaseType, ExtractorSender};
use crate::update::GrenadParameters;
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
@ -82,36 +89,43 @@ where
let docid = insertion.docid();
let content = insertion.new();
extractor_sender.document_insert(docid, content.boxed()).unwrap();
// extracted_dictionary_sender.send(self, dictionary: &[u8]);
}
}
Ok(()) as Result<_>
})?;
// word docids
let merger = WordDocidsExtractor::run_extraction(
extract_and_send_docids::<WordDocidsExtractor, WordDocids>(
index,
&global_fields_ids_map,
/// TODO: GrenadParameters::default() should be removed in favor a passed parameter
GrenadParameters::default(),
document_changes.clone(),
&extractor_sender,
)?;
/// TODO: manage the errors correctly
extractor_sender.word_docids(merger).unwrap();
// word fid docids
let merger = WordFidDocidsExtractor::run_extraction(
extract_and_send_docids::<WordFidDocidsExtractor, WordFidDocids>(
index,
&global_fields_ids_map,
/// TODO: GrenadParameters::default() should be removed in favor a passed parameter
GrenadParameters::default(),
document_changes.clone(),
&extractor_sender,
)?;
/// TODO: manage the errors correctly
extractor_sender.word_fid_docids(merger).unwrap();
extract_and_send_docids::<ExactWordDocidsExtractor, ExactWordDocids>(
index,
&global_fields_ids_map,
GrenadParameters::default(),
document_changes.clone(),
&extractor_sender,
)?;
extract_and_send_docids::<WordPositionDocidsExtractor, WordPositionDocids>(
index,
&global_fields_ids_map,
GrenadParameters::default(),
document_changes.clone(),
&extractor_sender,
)?;
Ok(()) as Result<_>
})
@ -148,6 +162,20 @@ where
Ok(())
}
/// TODO: GrenadParameters::default() should be removed in favor a passed parameter
/// TODO: manage the errors correctly
/// TODO: we must have a single trait that also gives the extractor type
fn extract_and_send_docids<E: SearchableExtractor, D: DatabaseType>(
index: &Index,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
sender: &ExtractorSender,
) -> Result<()> {
let merger = E::run_extraction(index, fields_ids_map, indexer, document_changes)?;
Ok(sender.send_searchable::<D>(merger).unwrap())
}
/// TODO move this elsewhere
pub fn guess_primary_key<'a>(
rtxn: &'a RoTxn<'a>,

View File

@ -1,16 +1,24 @@
use std::fs::File;
use std::io;
use fst::set::OpBuilder;
use fst::{Set, SetBuilder};
use grenad::Merger;
use heed::types::Bytes;
use heed::RoTxn;
use heed::{Database, RoTxn};
use memmap2::Mmap;
use roaring::RoaringBitmap;
use tempfile::tempfile;
use super::channel::{MergerReceiver, MergerSender};
use super::channel::{
DatabaseType, DocidsSender, ExactWordDocids, MergerReceiver, MergerSender, WordDocids,
WordFidDocids, WordPositionDocids,
};
use super::KvReaderDelAdd;
use crate::index::main_key::WORDS_FST_KEY;
use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{CboRoaringBitmapCodec, Index, Result};
/// TODO We must return some infos/stats
@ -26,34 +34,18 @@ pub fn merge_grenad_entries(
for merger_operation in receiver {
match merger_operation {
MergerOperation::WordDocidsMerger(merger) => {
let word_docids_sender = sender.word_docids();
let database = index.word_docids.remap_types::<Bytes, Bytes>();
let mut add_words_fst = SetBuilder::new(tempfile()?)?;
let mut del_words_fst = SetBuilder::new(tempfile()?)?;
/// TODO manage the error correctly
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
// TODO manage the error correctly
while let Some((key, deladd)) = merger_iter.next().unwrap() {
let current = database.get(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into();
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer);
word_docids_sender.write(key, value).unwrap();
add_words_fst.insert(key)?;
}
Operation::Delete => {
word_docids_sender.delete(key).unwrap();
del_words_fst.insert(key)?;
}
Operation::Ignore => (),
}
}
merge_and_send_docids(
merger,
index.word_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<WordDocids>(),
|key| add_words_fst.insert(key),
|key| del_words_fst.insert(key),
)?;
// Move that into a dedicated function
let words_fst = index.words_fst(rtxn)?;
@ -66,7 +58,6 @@ pub fn merge_grenad_entries(
let del_words_fst_mmap = unsafe { Mmap::map(&del_words_fst_file)? };
let del_words_fst = Set::new(&del_words_fst_mmap)?;
// TO BE IMPROVED @many
let diff = words_fst.op().add(&del_words_fst).difference();
let stream = add_words_fst.op().add(diff).union();
@ -79,31 +70,38 @@ pub fn merge_grenad_entries(
let main_sender = sender.main();
main_sender.write_words_fst(&words_fst_mmap).unwrap();
}
MergerOperation::ExactWordDocidsMerger(merger) => {
merge_and_send_docids(
merger,
index.exact_word_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<ExactWordDocids>(),
|_key| Ok(()),
|_key| Ok(()),
)?;
}
MergerOperation::WordFidDocidsMerger(merger) => {
let word_docids_sender = sender.word_fid_docids();
let database = index.word_fid_docids.remap_types::<Bytes, Bytes>();
/// TODO manage the error correctly
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
// TODO manage the error correctly
while let Some((key, deladd)) = merger_iter.next().unwrap() {
let current = database.get(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into();
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer);
word_docids_sender.write(key, value).unwrap();
}
Operation::Delete => {
word_docids_sender.delete(key).unwrap();
}
Operation::Ignore => (),
}
}
merge_and_send_docids(
merger,
index.word_fid_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<WordFidDocids>(),
|_key| Ok(()),
|_key| Ok(()),
)?;
}
MergerOperation::WordPositionDocidsMerger(merger) => {
merge_and_send_docids(
merger,
index.word_position_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<WordPositionDocids>(),
|_key| Ok(()),
|_key| Ok(()),
)?;
}
MergerOperation::InsertDocument { docid, document } => {
documents_ids.insert(docid);
@ -128,6 +126,39 @@ pub fn merge_grenad_entries(
Ok(())
}
fn merge_and_send_docids<D: DatabaseType>(
merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
database: Database<Bytes, Bytes>,
rtxn: &RoTxn<'_>,
buffer: &mut Vec<u8>,
word_docids_sender: DocidsSender<'_, D>,
mut add_key: impl FnMut(&[u8]) -> fst::Result<()>,
mut del_key: impl FnMut(&[u8]) -> fst::Result<()>,
) -> Result<()> {
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
while let Some((key, deladd)) = merger_iter.next().unwrap() {
let current = database.get(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into();
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
word_docids_sender.write(key, value).unwrap();
add_key(key)?;
}
Operation::Delete => {
word_docids_sender.delete(key).unwrap();
del_key(key)?;
}
Operation::Ignore => (),
}
}
Ok(())
}
enum Operation {
Write(RoaringBitmap),
Delete,