This commit is contained in:
Clément Renault 2024-10-29 16:23:19 +01:00
parent 31680f3014
commit f637d7e80f
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
7 changed files with 167 additions and 474 deletions

View File

@ -8,32 +8,23 @@ use roaring::RoaringBitmap;
use super::extract::FacetKind; use super::extract::FacetKind;
use super::StdResult; use super::StdResult;
use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::update::new::extract::CboCachedSorter;
use crate::update::new::KvReaderFieldId; use crate::update::new::KvReaderFieldId;
use crate::{DocumentId, Index}; use crate::{DocumentId, Index};
/// The capacity of the channel is currently in number of messages. /// The capacity of the channel is currently in number of messages.
pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) { pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver) {
let (sender, receiver) = crossbeam_channel::bounded(cap); let (sender, receiver) = crossbeam_channel::bounded(cap);
( (
MergerSender { ExtractorSender {
sender, sender,
send_count: Default::default(), send_count: Default::default(),
writer_contentious_count: Default::default(), writer_contentious_count: Default::default(),
merger_contentious_count: Default::default(), extractor_contentious_count: Default::default(),
}, },
WriterReceiver(receiver), WriterReceiver(receiver),
) )
} }
/// The capacity of the channel is currently in number of messages.
pub fn extractors_merger_channels<'extractor>(
cap: usize,
) -> (ExtractorSender<'extractor>, MergerReceiver<'extractor>) {
let (sender, receiver) = crossbeam_channel::bounded(cap);
(ExtractorSender(sender), MergerReceiver(receiver))
}
pub enum KeyValueEntry { pub enum KeyValueEntry {
SmallInMemory { key_length: usize, data: Box<[u8]> }, SmallInMemory { key_length: usize, data: Box<[u8]> },
LargeOnDisk { key: Box<[u8]>, value: Mmap }, LargeOnDisk { key: Box<[u8]>, value: Mmap },
@ -200,30 +191,30 @@ impl IntoIterator for WriterReceiver {
} }
} }
pub struct MergerSender { pub struct ExtractorSender {
sender: Sender<WriterOperation>, sender: Sender<WriterOperation>,
/// The number of message we send in total in the channel. /// The number of message we sent in total in the channel.
send_count: std::cell::Cell<usize>, send_count: std::cell::Cell<usize>,
/// The number of times we sent something in a channel that was full. /// The number of times we sent something in a channel that was full.
writer_contentious_count: std::cell::Cell<usize>, writer_contentious_count: std::cell::Cell<usize>,
/// The number of times we sent something in a channel that was empty. /// The number of times we sent something in a channel that was empty.
merger_contentious_count: std::cell::Cell<usize>, extractor_contentious_count: std::cell::Cell<usize>,
} }
impl Drop for MergerSender { impl Drop for ExtractorSender {
fn drop(&mut self) { fn drop(&mut self) {
eprintln!( eprintln!(
"Merger channel stats: {} sends, {} writer contentions ({}%), {} merger contentions ({}%)", "Extractor channel stats: {} sends, {} writer contentions ({}%), {} extractor contentions ({}%)",
self.send_count.get(), self.send_count.get(),
self.writer_contentious_count.get(), self.writer_contentious_count.get(),
(self.writer_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0, (self.writer_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0,
self.merger_contentious_count.get(), self.extractor_contentious_count.get(),
(self.merger_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0 (self.extractor_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0
) )
} }
} }
impl MergerSender { impl ExtractorSender {
pub fn main(&self) -> MainSender<'_> { pub fn main(&self) -> MainSender<'_> {
MainSender(self) MainSender(self)
} }
@ -256,7 +247,7 @@ impl MergerSender {
self.writer_contentious_count.set(self.writer_contentious_count.get() + 1); self.writer_contentious_count.set(self.writer_contentious_count.get() + 1);
} }
if self.sender.is_empty() { if self.sender.is_empty() {
self.merger_contentious_count.set(self.merger_contentious_count.get() + 1); self.extractor_contentious_count.set(self.extractor_contentious_count.get() + 1);
} }
self.send_count.set(self.send_count.get() + 1); self.send_count.set(self.send_count.get() + 1);
match self.sender.send(op) { match self.sender.send(op) {
@ -266,7 +257,7 @@ impl MergerSender {
} }
} }
pub struct MainSender<'a>(&'a MergerSender); pub struct MainSender<'a>(&'a ExtractorSender);
impl MainSender<'_> { impl MainSender<'_> {
pub fn write_words_fst(&self, value: Mmap) -> StdResult<(), SendError<()>> { pub fn write_words_fst(&self, value: Mmap) -> StdResult<(), SendError<()>> {
@ -312,99 +303,37 @@ pub trait DatabaseType {
const DATABASE: Database; const DATABASE: Database;
} }
pub trait MergerOperationType {
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor>;
}
impl DatabaseType for ExactWordDocids { impl DatabaseType for ExactWordDocids {
const DATABASE: Database = Database::ExactWordDocids; const DATABASE: Database = Database::ExactWordDocids;
} }
impl MergerOperationType for ExactWordDocids {
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::ExactWordDocidsMerger(caches)
}
}
impl DatabaseType for FidWordCountDocids { impl DatabaseType for FidWordCountDocids {
const DATABASE: Database = Database::FidWordCountDocids; const DATABASE: Database = Database::FidWordCountDocids;
} }
impl MergerOperationType for FidWordCountDocids {
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::FidWordCountDocidsMerger(caches)
}
}
impl DatabaseType for WordDocids { impl DatabaseType for WordDocids {
const DATABASE: Database = Database::WordDocids; const DATABASE: Database = Database::WordDocids;
} }
impl MergerOperationType for WordDocids {
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::WordDocidsMerger(caches)
}
}
impl DatabaseType for WordFidDocids { impl DatabaseType for WordFidDocids {
const DATABASE: Database = Database::WordFidDocids; const DATABASE: Database = Database::WordFidDocids;
} }
impl MergerOperationType for WordFidDocids {
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::WordFidDocidsMerger(caches)
}
}
impl DatabaseType for WordPairProximityDocids { impl DatabaseType for WordPairProximityDocids {
const DATABASE: Database = Database::WordPairProximityDocids; const DATABASE: Database = Database::WordPairProximityDocids;
} }
impl MergerOperationType for WordPairProximityDocids {
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::WordPairProximityDocidsMerger(caches)
}
}
impl DatabaseType for WordPositionDocids { impl DatabaseType for WordPositionDocids {
const DATABASE: Database = Database::WordPositionDocids; const DATABASE: Database = Database::WordPositionDocids;
} }
impl MergerOperationType for WordPositionDocids {
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::WordPositionDocidsMerger(caches)
}
}
impl MergerOperationType for FacetDocids {
fn new_merger_operation<'extractor>(
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> MergerOperation<'extractor> {
MergerOperation::FacetDocidsMerger(caches)
}
}
pub trait DocidsSender { pub trait DocidsSender {
fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>>; fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>>;
fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>>; fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>>;
} }
pub struct WordDocidsSender<'a, D> { pub struct WordDocidsSender<'a, D> {
sender: &'a MergerSender, sender: &'a ExtractorSender,
_marker: PhantomData<D>, _marker: PhantomData<D>,
} }
@ -427,7 +356,7 @@ impl<D: DatabaseType> DocidsSender for WordDocidsSender<'_, D> {
} }
pub struct FacetDocidsSender<'a> { pub struct FacetDocidsSender<'a> {
sender: &'a MergerSender, sender: &'a ExtractorSender,
} }
impl DocidsSender for FacetDocidsSender<'_> { impl DocidsSender for FacetDocidsSender<'_> {
@ -461,7 +390,7 @@ impl DocidsSender for FacetDocidsSender<'_> {
} }
} }
pub struct DocumentsSender<'a>(&'a MergerSender); pub struct DocumentsSender<'a>(&'a ExtractorSender);
impl DocumentsSender<'_> { impl DocumentsSender<'_> {
/// TODO do that efficiently /// TODO do that efficiently
@ -504,86 +433,3 @@ impl DocumentsSender<'_> {
} }
} }
} }
pub enum MergerOperation<'extractor> {
ExactWordDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
FidWordCountDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
WordDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
WordFidDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
WordPairProximityDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
WordPositionDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
FacetDocidsMerger(Vec<Vec<CboCachedSorter<'extractor>>>),
DeleteDocument { docid: DocumentId, external_id: String },
InsertDocument { docid: DocumentId, external_id: String, document: Box<KvReaderFieldId> },
FinishedDocument,
}
pub struct MergerReceiver<'extractor>(Receiver<MergerOperation<'extractor>>);
impl<'extractor> IntoIterator for MergerReceiver<'extractor> {
type Item = MergerOperation<'extractor>;
type IntoIter = IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
pub struct ExtractorSender<'extractor>(Sender<MergerOperation<'extractor>>);
impl<'extractor> ExtractorSender<'extractor> {
pub fn document_sender(&self) -> DocumentSender<'_, 'extractor> {
DocumentSender(Some(&self.0))
}
pub fn send_searchable<D: MergerOperationType>(
&self,
caches: Vec<Vec<CboCachedSorter<'extractor>>>,
) -> StdResult<(), SendError<()>> {
match self.0.send(D::new_merger_operation(caches)) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
pub struct DocumentSender<'a, 'extractor>(Option<&'a Sender<MergerOperation<'extractor>>>);
impl DocumentSender<'_, '_> {
pub fn insert(
&self,
docid: DocumentId,
external_id: String,
document: Box<KvReaderFieldId>,
) -> StdResult<(), SendError<()>> {
let sender = self.0.unwrap();
match sender.send(MergerOperation::InsertDocument { docid, external_id, document }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn delete(&self, docid: DocumentId, external_id: String) -> StdResult<(), SendError<()>> {
let sender = self.0.unwrap();
match sender.send(MergerOperation::DeleteDocument { docid, external_id }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn finish(mut self) -> StdResult<(), SendError<()>> {
let sender = self.0.take().unwrap();
match sender.send(MergerOperation::FinishedDocument) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
impl Drop for DocumentSender<'_, '_> {
fn drop(&mut self) {
if let Some(sender) = self.0.take() {
let _ = sender.send(MergerOperation::FinishedDocument);
}
}
}

View File

@ -371,16 +371,14 @@ pub struct FrozenCache<'a, 'extractor> {
} }
pub fn transpose_and_freeze_caches<'a, 'extractor>( pub fn transpose_and_freeze_caches<'a, 'extractor>(
caches: &'a mut [Vec<CboCachedSorter<'extractor>>], caches: &'a mut [CboCachedSorter<'extractor>],
) -> Result<Vec<Vec<FrozenCache<'a, 'extractor>>>> { ) -> Result<Vec<Vec<FrozenCache<'a, 'extractor>>>> {
let width = caches.get(0).map(Vec::len).unwrap_or(0); let width = caches.get(0).map(CboCachedSorter::buckets).unwrap_or(0);
let mut bucket_caches: Vec<_> = iter::repeat_with(Vec::new).take(width).collect(); let mut bucket_caches: Vec<_> = iter::repeat_with(Vec::new).take(width).collect();
for thread_caches in caches { for thread_cache in caches {
for cache in thread_caches.iter_mut() { for frozen in thread_cache.freeze()? {
for frozen in cache.freeze()? { bucket_caches[frozen.bucket].push(frozen);
bucket_caches[frozen.bucket].push(frozen);
}
} }
} }

View File

@ -157,18 +157,18 @@ struct WordDocidsMergerBuilders {
} }
pub struct WordDocidsMergers<'extractor> { pub struct WordDocidsMergers<'extractor> {
pub word_fid_docids: Vec<Vec<CboCachedSorter<'extractor>>>, pub word_docids: Vec<CboCachedSorter<'extractor>>,
pub word_docids: Vec<Vec<CboCachedSorter<'extractor>>>, pub word_fid_docids: Vec<CboCachedSorter<'extractor>>,
pub exact_word_docids: Vec<Vec<CboCachedSorter<'extractor>>>, pub exact_word_docids: Vec<CboCachedSorter<'extractor>>,
pub word_position_docids: Vec<Vec<CboCachedSorter<'extractor>>>, pub word_position_docids: Vec<CboCachedSorter<'extractor>>,
pub fid_word_count_docids: Vec<Vec<CboCachedSorter<'extractor>>>, pub fid_word_count_docids: Vec<CboCachedSorter<'extractor>>,
} }
impl<'extractor> WordDocidsMergerBuilders<'extractor> { impl<'extractor> WordDocidsMergerBuilders<'extractor> {
fn new() -> Self { fn new() -> Self {
Self { Self {
word_fid_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
word_fid_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
exact_word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), exact_word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
word_position_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), word_position_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
fid_word_count_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), fid_word_count_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
@ -177,8 +177,8 @@ impl<'extractor> WordDocidsMergerBuilders<'extractor> {
fn add_sorters(&mut self, other: WordDocidsCachedSorters) -> Result<()> { fn add_sorters(&mut self, other: WordDocidsCachedSorters) -> Result<()> {
let WordDocidsCachedSorters { let WordDocidsCachedSorters {
word_fid_docids,
word_docids, word_docids,
word_fid_docids,
exact_word_docids, exact_word_docids,
word_position_docids, word_position_docids,
fid_word_count_docids, fid_word_count_docids,
@ -186,14 +186,14 @@ impl<'extractor> WordDocidsMergerBuilders<'extractor> {
current_docid: _, current_docid: _,
} = other; } = other;
let word_fid_docids_entries = word_fid_docids.into_unordered_entries()?;
let word_docids_entries = word_docids.into_unordered_entries()?; let word_docids_entries = word_docids.into_unordered_entries()?;
let word_fid_docids_entries = word_fid_docids.into_unordered_entries()?;
let exact_word_docids_entries = exact_word_docids.into_unordered_entries()?; let exact_word_docids_entries = exact_word_docids.into_unordered_entries()?;
let word_position_docids_entries = word_position_docids.into_unordered_entries()?; let word_position_docids_entries = word_position_docids.into_unordered_entries()?;
let fid_word_count_docids_entries = fid_word_count_docids.into_unordered_entries()?; let fid_word_count_docids_entries = fid_word_count_docids.into_unordered_entries()?;
self.word_fid_docids.push(word_fid_docids_entries);
self.word_docids.push(word_docids_entries); self.word_docids.push(word_docids_entries);
self.word_fid_docids.push(word_fid_docids_entries);
self.exact_word_docids.push(exact_word_docids_entries); self.exact_word_docids.push(exact_word_docids_entries);
self.word_position_docids.push(word_position_docids_entries); self.word_position_docids.push(word_position_docids_entries);
self.fid_word_count_docids.push(fid_word_count_docids_entries); self.fid_word_count_docids.push(fid_word_count_docids_entries);
@ -203,8 +203,8 @@ impl<'extractor> WordDocidsMergerBuilders<'extractor> {
fn build(self) -> WordDocidsMergers<'extractor> { fn build(self) -> WordDocidsMergers<'extractor> {
WordDocidsMergers { WordDocidsMergers {
word_fid_docids: self.word_fid_docids.build(),
word_docids: self.word_docids.build(), word_docids: self.word_docids.build(),
word_fid_docids: self.word_fid_docids.build(),
exact_word_docids: self.exact_word_docids.build(), exact_word_docids: self.exact_word_docids.build(),
word_position_docids: self.word_position_docids.build(), word_position_docids: self.word_position_docids.build(),
fid_word_count_docids: self.fid_word_count_docids.build(), fid_word_count_docids: self.fid_word_count_docids.build(),
@ -237,12 +237,12 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
pub struct WordDocidsExtractors; pub struct WordDocidsExtractors;
impl WordDocidsExtractors { impl WordDocidsExtractors {
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>(
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>, indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<WordDocidsMergers> { ) -> Result<WordDocidsMergers<'extractor>> {
let max_memory = grenad_parameters.max_memory_by_thread(); let max_memory = grenad_parameters.max_memory_by_thread();
let index = indexing_context.index; let index = indexing_context.index;

View File

@ -15,6 +15,7 @@ use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{FieldId, GlobalFieldsIdsMap, Index, Result}; use crate::{FieldId, GlobalFieldsIdsMap, Index, Result};
pub struct WordPairProximityDocidsExtractor; pub struct WordPairProximityDocidsExtractor;
impl SearchableExtractor for WordPairProximityDocidsExtractor { impl SearchableExtractor for WordPairProximityDocidsExtractor {
fn attributes_to_extract<'a>( fn attributes_to_extract<'a>(
rtxn: &'a RoTxn, rtxn: &'a RoTxn,

View File

@ -1,4 +1,3 @@
use std::cell::RefCell;
use std::sync::RwLock; use std::sync::RwLock;
use std::thread::{self, Builder}; use std::thread::{self, Builder};
@ -20,7 +19,7 @@ use super::channel::*;
use super::document::write_to_obkv; use super::document::write_to_obkv;
use super::document_change::DocumentChange; use super::document_change::DocumentChange;
use super::extract::*; use super::extract::*;
use super::merger::{merge_caches_entries, FacetFieldIdsDelta}; use super::merger::{FacetDatabases, FacetFieldIdsDelta};
use super::word_fst_builder::PrefixDelta; use super::word_fst_builder::PrefixDelta;
use super::words_prefix_docids::{ use super::words_prefix_docids::{
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
@ -29,8 +28,9 @@ use super::{StdResult, TopLevelMap};
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::facet::FacetType; use crate::facet::FacetType;
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::new::channel::ExtractorSender; use crate::update::new::word_fst_builder::{PrefixData, WordFstBuilder};
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids};
use crate::update::settings::InnerIndexSettings; use crate::update::settings::InnerIndexSettings;
use crate::update::{FacetsUpdateBulk, GrenadParameters}; use crate::update::{FacetsUpdateBulk, GrenadParameters};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
@ -111,10 +111,8 @@ pub fn index<'pl, 'indexer, 'index, DC>(
where where
DC: DocumentChanges<'pl>, DC: DocumentChanges<'pl>,
{ {
let (merger_sender, writer_receiver) = merger_writer_channel(10_000); // TODO find a better channel limit
// This channel acts as a rendezvous point to ensure that we are one task ahead let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000);
let (extractor_sender, merger_receiver) = extractors_merger_channels(4);
let new_fields_ids_map = RwLock::new(new_fields_ids_map); let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads()); let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads());
@ -138,12 +136,12 @@ where
let _entered = span.enter(); let _entered = span.enter();
// document but we need to create a function that collects and compresses documents. // document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.document_sender(); let document_sender = extractor_sender.documents();
let document_extractor = DocumentExtractor { document_sender: &document_sender}; let document_extractor = DocumentExtractor { document_sender: &document_sender };
let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?; for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?;
document_sender.finish().unwrap(); // document_sender.finish().unwrap();
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
let max_memory = TEN_GIB / dbg!(rayon::current_num_threads()); let max_memory = TEN_GIB / dbg!(rayon::current_num_threads());
@ -155,16 +153,15 @@ where
{ {
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
let _entered = span.enter(); let _entered = span.enter();
extract_and_send_docids::<
_, let mut facet_field_ids_delta = FacetFieldIdsDelta::new();
FacetedDocidsExtractor, let caches = FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?;
FacetDocids, merge_and_send_facet_docids(
>( caches,
grenad_parameters, FacetDatabases::new(index),
document_changes, &index,
indexing_context, extractor_sender.facet_docids(),
&mut extractor_allocs, &mut facet_field_ids_delta,
&extractor_sender,
)?; )?;
} }
@ -173,18 +170,88 @@ where
let _entered = span.enter(); let _entered = span.enter();
let WordDocidsMergers { let WordDocidsMergers {
word_fid_docids,
word_docids, word_docids,
word_fid_docids,
exact_word_docids, exact_word_docids,
word_position_docids, word_position_docids,
fid_word_count_docids, fid_word_count_docids,
} = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?; } = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?;
extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap(); // TODO Word Docids Merger
extractor_sender.send_searchable::<WordFidDocids>(word_fid_docids).unwrap(); // extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap();
extractor_sender.send_searchable::<ExactWordDocids>(exact_word_docids).unwrap(); {
extractor_sender.send_searchable::<WordPositionDocids>(word_position_docids).unwrap(); let words_fst = index.words_fst(&rtxn)?;
extractor_sender.send_searchable::<FidWordCountDocids>(fid_word_count_docids).unwrap(); let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
let prefix_settings = index.prefix_settings(&rtxn)?;
word_fst_builder.with_prefix_settings(prefix_settings);
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter();
merge_and_send_docids(
word_docids,
index.word_docids.remap_types(),
&index,
extractor_sender.docids::<WordDocids>(),
|deladd, key| word_fst_builder.register_word(deladd, key),
)?;
}
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "words_fst");
let _entered = span.enter();
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?;
extractor_sender.main().write_words_fst(word_fst_mmap).unwrap();
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
extractor_sender.main().write_words_prefixes_fst(prefixes_fst_mmap).unwrap();
// merger_result.prefix_delta = Some(prefix_delta);
}
}
}
// Word Fid Docids Merging
// extractor_sender.send_searchable::<WordFidDocids>(word_fid_docids).unwrap();
merge_and_send_docids(
word_fid_docids,
index.word_fid_docids.remap_types(),
&index,
extractor_sender.docids::<WordFidDocids>(),
|_, _key| Ok(()),
)?;
// Exact Word Docids Merging
// extractor_sender.send_searchable::<ExactWordDocids>(exact_word_docids).unwrap();
merge_and_send_docids(
exact_word_docids,
index.exact_word_docids.remap_types(),
&index,
extractor_sender.docids::<ExactWordDocids>(),
|_, _key| Ok(()),
)?;
// Word Position Docids Merging
// extractor_sender.send_searchable::<WordPositionDocids>(word_position_docids).unwrap();
merge_and_send_docids(
word_position_docids,
index.word_position_docids.remap_types(),
&index,
extractor_sender.docids::<WordPositionDocids>(),
|_, _key| Ok(()),
)?;
// Fid Word Count Docids Merging
// extractor_sender.send_searchable::<FidWordCountDocids>(fid_word_count_docids).unwrap();
merge_and_send_docids(
fid_word_count_docids,
index.field_id_word_count_docids.remap_types(),
&index,
extractor_sender.docids::<FidWordCountDocids>(),
|_, _key| Ok(()),
)?;
} }
// run the proximity extraction only if the precision is by word // run the proximity extraction only if the precision is by word
@ -194,16 +261,13 @@ where
if proximity_precision == ProximityPrecision::ByWord { if proximity_precision == ProximityPrecision::ByWord {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
let _entered = span.enter(); let _entered = span.enter();
extract_and_send_docids::< let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?;
_, merge_and_send_docids(
WordPairProximityDocidsExtractor, caches,
WordPairProximityDocids, index.word_pair_proximity_docids.remap_types(),
>( &index,
grenad_parameters, extractor_sender.docids::<WordPairProximityDocids>(),
document_changes, |_, _| Ok(()),
indexing_context,
&mut extractor_allocs,
&extractor_sender,
)?; )?;
} }
@ -232,22 +296,7 @@ where
})?; })?;
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
let indexer_span = tracing::Span::current(); let indexer_span = tracing::Span::current();
// TODO manage the errors correctly
let merger_thread = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || {
let span =
tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "merge");
let _entered = span.enter();
let rtxn = index.read_txn().unwrap();
merge_caches_entries(
merger_receiver,
merger_sender,
&rtxn,
index,
global_fields_ids_map,
)
})?;
for operation in writer_receiver { for operation in writer_receiver {
let database = operation.database(index); let database = operation.database(index);
@ -263,15 +312,14 @@ where
/// TODO handle the panicking threads /// TODO handle the panicking threads
handle.join().unwrap()?; handle.join().unwrap()?;
let merger_result = merger_thread.join().unwrap()?;
if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta { // if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta {
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; // compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
} // }
if let Some(prefix_delta) = merger_result.prefix_delta { // if let Some(prefix_delta) = merger_result.prefix_delta {
compute_prefix_database(index, wtxn, prefix_delta)?; // compute_prefix_database(index, wtxn, prefix_delta)?;
} // }
Ok(()) as Result<_> Ok(()) as Result<_>
})?; })?;
@ -345,31 +393,6 @@ fn compute_facet_level_database(
Ok(()) Ok(())
} }
/// TODO: GrenadParameters::default() should be removed in favor a passed parameter
/// TODO: manage the errors correctly
/// TODO: we must have a single trait that also gives the extractor type
fn extract_and_send_docids<
'pl,
'fid,
'indexer,
'index,
'extractor,
DC: DocumentChanges<'pl>,
E: DocidsExtractor,
D: MergerOperationType,
>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
sender: &ExtractorSender<'extractor>,
) -> Result<()> {
let caches =
E::run_extraction(grenad_parameters, document_changes, indexing_context, extractor_allocs)?;
sender.send_searchable::<D>(caches).unwrap();
Ok(())
}
/// Returns the primary key that has already been set for this index or the /// Returns the primary key that has already been set for this index or the
/// one we will guess by searching for the first key that contains "id" as a substring, /// one we will guess by searching for the first key that contains "id" as a substring,
/// and whether the primary key changed /// and whether the primary key changed

View File

@ -1,8 +1,6 @@
use std::fs::File;
use std::io::{self}; use std::io::{self};
use bincode::ErrorKind; use bincode::ErrorKind;
use grenad::Merger;
use hashbrown::HashSet; use hashbrown::HashSet;
use heed::types::Bytes; use heed::types::Bytes;
use heed::{Database, RoTxn}; use heed::{Database, RoTxn};
@ -13,191 +11,11 @@ use super::channel::*;
use super::extract::{ use super::extract::{
merge_caches, transpose_and_freeze_caches, CboCachedSorter, DelAddRoaringBitmap, FacetKind, merge_caches, transpose_and_freeze_caches, CboCachedSorter, DelAddRoaringBitmap, FacetKind,
}; };
use super::word_fst_builder::{PrefixData, PrefixDelta}; use super::word_fst_builder::PrefixDelta;
use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId}; use super::DocumentChange;
use crate::update::del_add::{DelAdd, DelAddOperation}; use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation;
use crate::update::new::word_fst_builder::WordFstBuilder;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result}; use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result};
/// TODO We must return some infos/stats
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")]
pub fn merge_caches_entries(
receiver: MergerReceiver,
sender: MergerSender,
rtxn: &RoTxn,
index: &Index,
global_fields_ids_map: GlobalFieldsIdsMap<'_>,
) -> Result<MergerResult> {
let mut buffer: Vec<u8> = Vec::new();
let mut documents_ids = index.documents_ids(rtxn)?;
let mut geo_extractor = GeoExtractor::new(rtxn, index)?;
let mut merger_result = MergerResult::default();
for merger_operation in receiver {
match merger_operation {
MergerOperation::ExactWordDocidsMerger(caches) => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter();
merge_and_send_docids(
caches,
/// TODO do a MergerOperation::database(&Index) -> Database<Bytes, Bytes>.
index.exact_word_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<ExactWordDocids>(),
|_, _key| Ok(()),
)?;
}
MergerOperation::FidWordCountDocidsMerger(merger) => {
let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
let _entered = span.enter();
merge_and_send_docids(
merger,
index.field_id_word_count_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<FidWordCountDocids>(),
|_, _key| Ok(()),
)?;
}
MergerOperation::WordDocidsMerger(merger) => {
let words_fst = index.words_fst(rtxn)?;
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
let prefix_settings = index.prefix_settings(rtxn)?;
word_fst_builder.with_prefix_settings(prefix_settings);
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter();
merge_and_send_docids(
merger,
index.word_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<WordDocids>(),
|deladd, key| word_fst_builder.register_word(deladd, key),
)?;
}
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "words_fst");
let _entered = span.enter();
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, rtxn)?;
sender.main().write_words_fst(word_fst_mmap).unwrap();
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
sender.main().write_words_prefixes_fst(prefixes_fst_mmap).unwrap();
merger_result.prefix_delta = Some(prefix_delta);
}
}
}
MergerOperation::WordFidDocidsMerger(merger) => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
let _entered = span.enter();
merge_and_send_docids(
merger,
index.word_fid_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<WordFidDocids>(),
|_, _key| Ok(()),
)?;
}
MergerOperation::WordPairProximityDocidsMerger(merger) => {
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids");
let _entered = span.enter();
merge_and_send_docids(
merger,
index.word_pair_proximity_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<WordPairProximityDocids>(),
|_, _key| Ok(()),
)?;
}
MergerOperation::WordPositionDocidsMerger(merger) => {
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
let _entered = span.enter();
merge_and_send_docids(
merger,
index.word_position_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<WordPositionDocids>(),
|_, _key| Ok(()),
)?;
}
MergerOperation::InsertDocument { docid, external_id, document } => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "insert_document");
let _entered = span.enter();
documents_ids.insert(docid);
sender.documents().uncompressed(docid, external_id.clone(), &document).unwrap();
if let Some(geo_extractor) = geo_extractor.as_mut() {
let current = index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?;
let current: Option<&KvReaderFieldId> = current.map(Into::into);
let change = match current {
Some(current) => DocumentChange::Update(todo!()),
None => DocumentChange::Insertion(todo!()),
};
geo_extractor.manage_change(&mut global_fields_ids_map, &change)?;
}
}
MergerOperation::DeleteDocument { docid, external_id } => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "delete_document");
let _entered = span.enter();
if !documents_ids.remove(docid) {
unreachable!("Tried deleting a document that we do not know about");
}
sender.documents().delete(docid, external_id.clone()).unwrap();
if let Some(geo_extractor) = geo_extractor.as_mut() {
let change = DocumentChange::Deletion(Deletion::create(docid, todo!()));
geo_extractor.manage_change(&mut global_fields_ids_map, &change)?;
}
}
MergerOperation::FinishedDocument => {
// send the rtree
}
MergerOperation::FacetDocidsMerger(merger) => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "facet_docids");
let _entered = span.enter();
let mut facet_field_ids_delta = FacetFieldIdsDelta::new();
merge_and_send_facet_docids(
merger,
FacetDatabases::new(index),
rtxn,
&mut buffer,
sender.facet_docids(),
&mut facet_field_ids_delta,
)?;
merger_result.facet_field_ids_delta = Some(facet_field_ids_delta);
}
}
}
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "documents_ids");
let _entered = span.enter();
// Send the documents ids unionized with the current one
sender.send_documents_ids(documents_ids).unwrap();
}
Ok(merger_result)
}
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct MergerResult { pub struct MergerResult {
/// The delta of the prefixes /// The delta of the prefixes
@ -256,25 +74,28 @@ impl GeoExtractor {
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_docids<'extractor>( fn merge_and_send_docids<'extractor>(
mut caches: Vec<Vec<CboCachedSorter<'extractor>>>, mut caches: Vec<CboCachedSorter<'extractor>>,
database: Database<Bytes, Bytes>, database: Database<Bytes, Bytes>,
rtxn: &RoTxn<'_>, index: &Index,
buffer: &mut Vec<u8>, docids_sender: impl DocidsSender + Sync,
docids_sender: impl DocidsSender,
mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()>, mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()>,
) -> Result<()> { ) -> Result<()> {
transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| { transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| {
let rtxn = index.read_txn()?;
let mut buffer = Vec::new();
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get(rtxn, key)?; let current = database.get(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? { match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => { Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer);
docids_sender.write(key, value).unwrap(); docids_sender.write(key, value).unwrap();
register_key(DelAdd::Addition, key) // register_key(DelAdd::Addition, key)
Ok(())
} }
Operation::Delete => { Operation::Delete => {
docids_sender.delete(key).unwrap(); docids_sender.delete(key).unwrap();
register_key(DelAdd::Deletion, key) // register_key(DelAdd::Deletion, key)
Ok(())
} }
Operation::Ignore => Ok(()), Operation::Ignore => Ok(()),
} }
@ -284,25 +105,26 @@ fn merge_and_send_docids<'extractor>(
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_facet_docids<'extractor>( fn merge_and_send_facet_docids<'extractor>(
mut caches: Vec<Vec<CboCachedSorter<'extractor>>>, mut caches: Vec<CboCachedSorter<'extractor>>,
database: FacetDatabases, database: FacetDatabases,
rtxn: &RoTxn<'_>, index: &Index,
buffer: &mut Vec<u8>, docids_sender: impl DocidsSender + Sync,
docids_sender: impl DocidsSender,
facet_field_ids_delta: &mut FacetFieldIdsDelta, facet_field_ids_delta: &mut FacetFieldIdsDelta,
) -> Result<()> { ) -> Result<()> {
transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| { transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| {
let rtxn = index.read_txn()?;
let mut buffer = Vec::new();
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get_cbo_roaring_bytes_value(rtxn, key)?; let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? { match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => { Operation::Write(bitmap) => {
facet_field_ids_delta.register_from_key(key); // facet_field_ids_delta.register_from_key(key);
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer);
docids_sender.write(key, value).unwrap(); docids_sender.write(key, value).unwrap();
Ok(()) Ok(())
} }
Operation::Delete => { Operation::Delete => {
facet_field_ids_delta.register_from_key(key); // facet_field_ids_delta.register_from_key(key);
docids_sender.delete(key).unwrap(); docids_sender.delete(key).unwrap();
Ok(()) Ok(())
} }

View File

@ -1,4 +1,7 @@
pub use document_change::{Deletion, DocumentChange, Insertion, Update}; pub use document_change::{Deletion, DocumentChange, Insertion, Update};
pub use merger::{
merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta,
};
pub use top_level_map::{CowStr, TopLevelMap}; pub use top_level_map::{CowStr, TopLevelMap};
use super::del_add::DelAdd; use super::del_add::DelAdd;