This commit is contained in:
Clément Renault 2024-10-29 14:38:52 +01:00
parent 82f6e3f3b9
commit 31680f3014
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
8 changed files with 151 additions and 191 deletions

View File

@ -1,8 +1,6 @@
use std::fs::File;
use std::marker::PhantomData;
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
use grenad::Merger;
use heed::types::Bytes;
use memmap2::Mmap;
use roaring::RoaringBitmap;
@ -10,8 +8,8 @@ use roaring::RoaringBitmap;
use super::extract::FacetKind;
use super::StdResult;
use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::update::new::extract::CboCachedSorter;
use crate::update::new::KvReaderFieldId;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{DocumentId, Index};
/// The capacity of the channel is currently in number of messages.
@ -29,7 +27,9 @@ pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) {
}
/// The capacity of the channel is currently in number of messages.
pub fn extractors_merger_channels(cap: usize) -> (ExtractorSender, MergerReceiver) {
pub fn extractors_merger_channels<'extractor>(
cap: usize,
) -> (ExtractorSender<'extractor>, MergerReceiver<'extractor>) {
let (sender, receiver) = crossbeam_channel::bounded(cap);
(ExtractorSender(sender), MergerReceiver(receiver))
}
@ -313,7 +313,9 @@ pub trait DatabaseType {
}
pub trait MergerOperationType {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation;
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor>;
}
impl DatabaseType for ExactWordDocids {
@ -321,8 +323,10 @@ impl DatabaseType for ExactWordDocids {
}
impl MergerOperationType for ExactWordDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::ExactWordDocidsMerger(merger)
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::ExactWordDocidsMerger(caches)
}
}
@ -331,8 +335,10 @@ impl DatabaseType for FidWordCountDocids {
}
impl MergerOperationType for FidWordCountDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::FidWordCountDocidsMerger(merger)
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::FidWordCountDocidsMerger(caches)
}
}
@ -341,8 +347,10 @@ impl DatabaseType for WordDocids {
}
impl MergerOperationType for WordDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordDocidsMerger(merger)
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::WordDocidsMerger(caches)
}
}
@ -351,8 +359,10 @@ impl DatabaseType for WordFidDocids {
}
impl MergerOperationType for WordFidDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordFidDocidsMerger(merger)
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::WordFidDocidsMerger(caches)
}
}
@ -361,8 +371,10 @@ impl DatabaseType for WordPairProximityDocids {
}
impl MergerOperationType for WordPairProximityDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordPairProximityDocidsMerger(merger)
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::WordPairProximityDocidsMerger(caches)
}
}
@ -371,14 +383,18 @@ impl DatabaseType for WordPositionDocids {
}
impl MergerOperationType for WordPositionDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordPositionDocidsMerger(merger)
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::WordPositionDocidsMerger(caches)
}
}
impl MergerOperationType for FacetDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::FacetDocidsMerger(merger)
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::FacetDocidsMerger(caches)
}
}
@ -489,23 +505,23 @@ impl DocumentsSender<'_> {
}
}
pub enum MergerOperation {
ExactWordDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
FidWordCountDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordFidDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordPairProximityDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordPositionDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
FacetDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
pub enum MergerOperation<'extractor> {
ExactWordDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
FidWordCountDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
WordDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
WordFidDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
WordPairProximityDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
WordPositionDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
FacetDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
DeleteDocument { docid: DocumentId, external_id: String },
InsertDocument { docid: DocumentId, external_id: String, document: Box<KvReaderFieldId> },
FinishedDocument,
}
pub struct MergerReceiver(Receiver<MergerOperation>);
pub struct MergerReceiver<'extractor>(Receiver<MergerOperation<'extractor>>);
impl IntoIterator for MergerReceiver {
type Item = MergerOperation;
impl<'extractor> IntoIterator for MergerReceiver<'extractor> {
type Item = MergerOperation<'extractor>;
type IntoIter = IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
@ -513,27 +529,27 @@ impl IntoIterator for MergerReceiver {
}
}
pub struct ExtractorSender(Sender<MergerOperation>);
pub struct ExtractorSender<'extractor>(Sender<MergerOperation<'extractor>>);
impl ExtractorSender {
pub fn document_sender(&self) -> DocumentSender<'_> {
impl<'extractor> ExtractorSender<'extractor> {
pub fn document_sender(&self) -> DocumentSender<'_, 'extractor> {
DocumentSender(Some(&self.0))
}
pub fn send_searchable<D: MergerOperationType>(
&self,
merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> StdResult<(), SendError<()>> {
match self.0.send(D::new_merger_operation(merger)) {
match self.0.send(D::new_merger_operation(caches)) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
pub struct DocumentSender<'a>(Option<&'a Sender<MergerOperation>>);
pub struct DocumentSender<'a, 'extractor>(Option<&'a Sender<MergerOperation<'extractor>>>);
impl DocumentSender<'_> {
impl DocumentSender<'_, '_> {
pub fn insert(
&self,
docid: DocumentId,
@ -564,7 +580,7 @@ impl DocumentSender<'_> {
}
}
impl Drop for DocumentSender<'_> {
impl Drop for DocumentSender<'_, '_> {
fn drop(&mut self) {
if let Some(sender) = self.0.take() {
let _ = sender.send(MergerOperation::FinishedDocument);

View File

@ -392,7 +392,7 @@ pub fn transpose_and_freeze_caches<'a, 'extractor>(
/// # Panics
///
/// - If the bucket IDs in these frozen caches are not exactly the same.
pub fn merge_caches<F>(frozen: Vec<FrozenCache>, mut iter: F) -> Result<()>
pub fn merge_caches<F>(frozen: Vec<FrozenCache>, mut f: F) -> Result<()>
where
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>,
{
@ -455,7 +455,7 @@ where
}
// We send the merged entry outside.
(iter)(first_key, output)?;
(f)(first_key, output)?;
// Don't forget to put the first entry back into the heap.
if first_entry.cursor.move_on_next()?.is_some() {
@ -478,7 +478,7 @@ where
}
// We send the merged entry outside.
(iter)(key, output)?;
(f)(key, output)?;
}
}
}

View File

@ -1,12 +1,9 @@
use std::cell::RefCell;
use std::collections::HashSet;
use std::fs::File;
use std::ops::DerefMut as _;
use bumpalo::Bump;
use grenad::Merger;
use heed::RoTxn;
use raw_collections::alloc::RefBump;
use serde_json::Value;
use super::super::cache::CboCachedSorter;
@ -19,16 +16,16 @@ use crate::update::new::indexer::document_changes::{
IndexingContext, RefCellExt, ThreadLocal,
};
use crate::update::new::DocumentChange;
use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::update::GrenadParameters;
use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
pub struct FacetedExtractorData<'extractor> {
attributes_to_extract: &'extractor [&'extractor str],
pub struct FacetedExtractorData<'a> {
attributes_to_extract: &'a [&'a str],
grenad_parameters: GrenadParameters,
max_memory: Option<usize>,
}
impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> {
impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> {
type Data = RefCell<CboCachedSorter<'extractor>>;
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
@ -217,12 +214,12 @@ fn truncate_str(s: &str) -> &str {
impl DocidsExtractor for FacetedDocidsExtractor {
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<Bump>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Vec<CboCachedSorter<'extractor>>> {
let max_memory = grenad_parameters.max_memory_by_thread();
let index = indexing_context.index;
@ -251,26 +248,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
&datastore,
)?;
}
{
let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
let span =
tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter();
let readers: Vec<_> = datastore
.into_iter()
// .par_bridge() // T is !Send
.map(|cached_sorter| {
let cached_sorter = cached_sorter.into_inner();
let sorter = cached_sorter.into_sorter()?;
sorter.into_reader_cursors()
})
.collect();
for reader in readers {
builder.extend(reader?);
}
Ok(builder.build())
}
Ok(datastore.into_iter().map(RefCell::into_inner).collect())
}
}

View File

@ -2,25 +2,22 @@ mod cache;
mod faceted;
mod searchable;
use std::cell::RefCell;
use std::fs::File;
use bumpalo::Bump;
pub use cache::{merge_caches, transpose_and_freeze_caches, CboCachedSorter, DelAddRoaringBitmap};
pub use faceted::*;
use grenad::Merger;
pub use searchable::*;
use super::indexer::document_changes::{DocumentChanges, FullySend, IndexingContext, ThreadLocal};
use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::update::GrenadParameters;
use crate::Result;
pub trait DocidsExtractor {
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<Bump>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>;
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Vec<CboCachedSorter<'extractor>>>;
}
/// TODO move in permissive json pointer

View File

@ -7,7 +7,6 @@ use std::ops::DerefMut as _;
use bumpalo::Bump;
use grenad::{Merger, MergerBuilder};
use heed::RoTxn;
use raw_collections::alloc::RefBump;
use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
use crate::update::new::extract::cache::CboCachedSorter;
@ -157,15 +156,15 @@ struct WordDocidsMergerBuilders {
fid_word_count_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
}
pub struct WordDocidsMergers {
pub word_fid_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub word_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub exact_word_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub word_position_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub fid_word_count_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub struct WordDocidsMergers<'extractor> {
pub word_fid_docids: Vec<Vec<CboCachedSorter<'extractor>>>,
pub word_docids: Vec<Vec<CboCachedSorter<'extractor>>>,
pub exact_word_docids: Vec<Vec<CboCachedSorter<'extractor>>>,
pub word_position_docids: Vec<Vec<CboCachedSorter<'extractor>>>,
pub fid_word_count_docids: Vec<Vec<CboCachedSorter<'extractor>>>,
}
impl WordDocidsMergerBuilders {
impl<'extractor> WordDocidsMergerBuilders<'extractor> {
fn new() -> Self {
Self {
word_fid_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
@ -202,7 +201,7 @@ impl WordDocidsMergerBuilders {
Ok(())
}
fn build(self) -> WordDocidsMergers {
fn build(self) -> WordDocidsMergers<'extractor> {
WordDocidsMergers {
word_fid_docids: self.word_fid_docids.build(),
word_docids: self.word_docids.build(),

View File

@ -9,9 +9,7 @@ use std::marker::PhantomData;
use bumpalo::Bump;
pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers};
pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor;
use grenad::Merger;
use heed::RoTxn;
use raw_collections::alloc::RefBump;
use tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::cache::CboCachedSorter;
@ -21,18 +19,18 @@ use crate::update::new::indexer::document_changes::{
IndexingContext, ThreadLocal,
};
use crate::update::new::DocumentChange;
use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::update::GrenadParameters;
use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
pub struct SearchableExtractorData<'extractor, EX: SearchableExtractor> {
tokenizer: &'extractor DocumentTokenizer<'extractor>,
pub struct SearchableExtractorData<'a, EX: SearchableExtractor> {
tokenizer: &'a DocumentTokenizer<'a>,
grenad_parameters: GrenadParameters,
max_memory: Option<usize>,
_ex: PhantomData<EX>,
}
impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
for SearchableExtractorData<'extractor, EX>
impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
for SearchableExtractorData<'a, EX>
{
type Data = RefCell<CboCachedSorter<'extractor>>;
@ -50,12 +48,12 @@ impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
}
pub trait SearchableExtractor: Sized + Sync {
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<Bump>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Vec<CboCachedSorter<'extractor>>> {
let max_memory = grenad_parameters.max_memory_by_thread();
let rtxn = indexing_context.index.read_txn()?;
@ -107,28 +105,8 @@ pub trait SearchableExtractor: Sized + Sync {
&datastore,
)?;
}
{
let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
let span =
tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter();
let readers: Vec<_> = datastore
.into_iter()
// .par_bridge() // T is !Send
.map(|cache_sorter| {
let cached_sorter = cache_sorter.into_inner();
let sorter = cached_sorter.into_sorter()?;
sorter.into_reader_cursors()
})
.collect();
for reader in readers {
builder.extend(reader?);
}
Ok(builder.build())
}
Ok(datastore.into_iter().map(RefCell::into_inner).collect())
}
fn extract_document_change(
@ -144,12 +122,12 @@ pub trait SearchableExtractor: Sized + Sync {
}
impl<T: SearchableExtractor> DocidsExtractor for T {
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<Bump>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Vec<CboCachedSorter<'extractor>>> {
Self::run_extraction(
grenad_parameters,
document_changes,

View File

@ -20,7 +20,7 @@ use super::channel::*;
use super::document::write_to_obkv;
use super::document_change::DocumentChange;
use super::extract::*;
use super::merger::{merge_grenad_entries, FacetFieldIdsDelta};
use super::merger::{merge_caches_entries, FacetFieldIdsDelta};
use super::word_fst_builder::PrefixDelta;
use super::words_prefix_docids::{
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
@ -33,7 +33,7 @@ use crate::update::new::channel::ExtractorSender;
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
use crate::update::settings::InnerIndexSettings;
use crate::update::{FacetsUpdateBulk, GrenadParameters};
use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
pub(crate) mod de;
pub mod document_changes;
@ -42,11 +42,11 @@ mod document_operation;
mod partial_dump;
mod update_by_function;
struct DocumentExtractor<'a> {
document_sender: &'a DocumentSender<'a>,
struct DocumentExtractor<'a, 'extractor> {
document_sender: &'a DocumentSender<'a, 'extractor>,
}
impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> {
impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a, 'extractor> {
type Data = FullySend<()>;
fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
@ -179,6 +179,7 @@ where
word_position_docids,
fid_word_count_docids,
} = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?;
extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap();
extractor_sender.send_searchable::<WordFidDocids>(word_fid_docids).unwrap();
extractor_sender.send_searchable::<ExactWordDocids>(exact_word_docids).unwrap();
@ -239,7 +240,7 @@ where
tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "merge");
let _entered = span.enter();
let rtxn = index.read_txn().unwrap();
merge_grenad_entries(
merge_caches_entries(
merger_receiver,
merger_sender,
&rtxn,
@ -352,6 +353,7 @@ fn extract_and_send_docids<
'fid,
'indexer,
'index,
'extractor,
DC: DocumentChanges<'pl>,
E: DocidsExtractor,
D: MergerOperationType,
@ -359,12 +361,12 @@ fn extract_and_send_docids<
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<Bump>>,
sender: &ExtractorSender,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
sender: &ExtractorSender<'extractor>,
) -> Result<()> {
let merger =
let caches =
E::run_extraction(grenad_parameters, document_changes, indexing_context, extractor_allocs)?;
sender.send_searchable::<D>(merger).unwrap();
sender.send_searchable::<D>(caches).unwrap();
Ok(())
}

View File

@ -6,13 +6,16 @@ use grenad::Merger;
use hashbrown::HashSet;
use heed::types::Bytes;
use heed::{Database, RoTxn};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap;
use super::channel::*;
use super::extract::FacetKind;
use super::extract::{
merge_caches, transpose_and_freeze_caches, CboCachedSorter, DelAddRoaringBitmap, FacetKind,
};
use super::word_fst_builder::{PrefixData, PrefixDelta};
use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId};
use crate::update::del_add::DelAdd;
use crate::update::del_add::{DelAdd, DelAddOperation};
use crate::update::new::channel::MergerOperation;
use crate::update::new::word_fst_builder::WordFstBuilder;
use crate::update::MergeDeladdCboRoaringBitmaps;
@ -20,7 +23,7 @@ use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap,
/// TODO We must return some infos/stats
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")]
pub fn merge_grenad_entries(
pub fn merge_caches_entries(
receiver: MergerReceiver,
sender: MergerSender,
rtxn: &RoTxn,
@ -34,12 +37,12 @@ pub fn merge_grenad_entries(
for merger_operation in receiver {
match merger_operation {
MergerOperation::ExactWordDocidsMerger(merger) => {
MergerOperation::ExactWordDocidsMerger(caches) => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter();
merge_and_send_docids(
merger,
caches,
/// TODO do a MergerOperation::database(&Index) -> Database<Bytes, Bytes>.
index.exact_word_docids.remap_types(),
rtxn,
@ -192,8 +195,6 @@ pub fn merge_grenad_entries(
sender.send_documents_ids(documents_ids).unwrap();
}
// ...
Ok(merger_result)
}
@ -254,70 +255,62 @@ impl GeoExtractor {
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_docids(
merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
fn merge_and_send_docids<'extractor>(
mut caches: Vec<Vec<CboCachedSorter<'extractor>>>,
database: Database<Bytes, Bytes>,
rtxn: &RoTxn<'_>,
buffer: &mut Vec<u8>,
docids_sender: impl DocidsSender,
mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()>,
) -> Result<()> {
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
while let Some((key, deladd)) = merger_iter.next().unwrap() {
transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| {
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into();
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
docids_sender.write(key, value).unwrap();
register_key(DelAdd::Addition, key)?;
register_key(DelAdd::Addition, key)
}
Operation::Delete => {
docids_sender.delete(key).unwrap();
register_key(DelAdd::Deletion, key)?;
register_key(DelAdd::Deletion, key)
}
Operation::Ignore => (),
Operation::Ignore => Ok(()),
}
}
Ok(())
})
})
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_facet_docids(
merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
fn merge_and_send_facet_docids<'extractor>(
mut caches: Vec<Vec<CboCachedSorter<'extractor>>>,
database: FacetDatabases,
rtxn: &RoTxn<'_>,
buffer: &mut Vec<u8>,
docids_sender: impl DocidsSender,
facet_field_ids_delta: &mut FacetFieldIdsDelta,
) -> Result<()> {
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
while let Some((key, deladd)) = merger_iter.next().unwrap() {
transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| {
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get_cbo_roaring_bytes_value(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into();
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
facet_field_ids_delta.register_from_key(key);
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
docids_sender.write(key, value).unwrap();
Ok(())
}
Operation::Delete => {
facet_field_ids_delta.register_from_key(key);
docids_sender.delete(key).unwrap();
}
Operation::Ignore => (),
}
}
Ok(())
}
Operation::Ignore => Ok(()),
}
})
})
}
struct FacetDatabases<'a> {
index: &'a Index,
@ -409,13 +402,10 @@ enum Operation {
/// A function that merges the DelAdd CboRoaringBitmaps with the current bitmap.
fn merge_cbo_bitmaps(
current: Option<&[u8]>,
del: Option<&[u8]>,
add: Option<&[u8]>,
del: Option<RoaringBitmap>,
add: Option<RoaringBitmap>,
) -> Result<Operation> {
let current = current.map(CboRoaringBitmapCodec::deserialize_from).transpose()?;
let del = del.map(CboRoaringBitmapCodec::deserialize_from).transpose()?;
let add = add.map(CboRoaringBitmapCodec::deserialize_from).transpose()?;
match (current, del, add) {
(None, None, None) => Ok(Operation::Ignore), // but it's strange
(None, None, Some(add)) => Ok(Operation::Write(add)),