No more errors

This commit is contained in:
Clément Renault 2024-10-29 18:23:40 +01:00
parent f637d7e80f
commit e41c9c3997
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
9 changed files with 125 additions and 142 deletions

View File

@ -1,4 +1,5 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
use heed::types::Bytes; use heed::types::Bytes;
@ -194,22 +195,22 @@ impl IntoIterator for WriterReceiver {
pub struct ExtractorSender { pub struct ExtractorSender {
sender: Sender<WriterOperation>, sender: Sender<WriterOperation>,
/// The number of message we sent in total in the channel. /// The number of message we sent in total in the channel.
send_count: std::cell::Cell<usize>, send_count: AtomicUsize,
/// The number of times we sent something in a channel that was full. /// The number of times we sent something in a channel that was full.
writer_contentious_count: std::cell::Cell<usize>, writer_contentious_count: AtomicUsize,
/// The number of times we sent something in a channel that was empty. /// The number of times we sent something in a channel that was empty.
extractor_contentious_count: std::cell::Cell<usize>, extractor_contentious_count: AtomicUsize,
} }
impl Drop for ExtractorSender { impl Drop for ExtractorSender {
fn drop(&mut self) { fn drop(&mut self) {
eprintln!( eprintln!(
"Extractor channel stats: {} sends, {} writer contentions ({}%), {} extractor contentions ({}%)", "Extractor channel stats: {} sends, {} writer contentions ({}%), {} extractor contentions ({}%)",
self.send_count.get(), self.send_count.load(Ordering:: SeqCst),
self.writer_contentious_count.get(), self.writer_contentious_count.load(Ordering::SeqCst),
(self.writer_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0, (self.writer_contentious_count.load(Ordering::SeqCst) as f32 / self.send_count.load(Ordering::SeqCst) as f32) * 100.0,
self.extractor_contentious_count.get(), self.extractor_contentious_count.load(Ordering::SeqCst),
(self.extractor_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0 (self.extractor_contentious_count.load(Ordering::SeqCst) as f32 / self.send_count.load(Ordering::SeqCst) as f32) * 100.0
) )
} }
} }
@ -244,12 +245,12 @@ impl ExtractorSender {
fn send(&self, op: WriterOperation) -> StdResult<(), SendError<()>> { fn send(&self, op: WriterOperation) -> StdResult<(), SendError<()>> {
if self.sender.is_full() { if self.sender.is_full() {
self.writer_contentious_count.set(self.writer_contentious_count.get() + 1); self.writer_contentious_count.fetch_add(1, Ordering::SeqCst);
} }
if self.sender.is_empty() { if self.sender.is_empty() {
self.extractor_contentious_count.set(self.extractor_contentious_count.get() + 1); self.extractor_contentious_count.fetch_add(1, Ordering::SeqCst);
} }
self.send_count.set(self.send_count.get() + 1); self.send_count.fetch_add(1, Ordering::SeqCst);
match self.sender.send(op) { match self.sender.send(op) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())), Err(SendError(_)) => Err(SendError(())),

View File

@ -66,10 +66,10 @@ use crate::{CboRoaringBitmapCodec, Result};
/// A cache that stores bytes keys associated to CboDelAddRoaringBitmaps. /// A cache that stores bytes keys associated to CboDelAddRoaringBitmaps.
/// ///
/// Internally balances the content over `N` buckets for future merging. /// Internally balances the content over `N` buckets for future merging.
pub struct CboCachedSorter<'extractor> { pub struct BalancedCaches<'extractor> {
hasher: DefaultHashBuilder, hasher: DefaultHashBuilder,
alloc: &'extractor Bump, alloc: &'extractor Bump,
max_memory: usize, max_memory: Option<usize>,
caches: InnerCaches<'extractor>, caches: InnerCaches<'extractor>,
} }
@ -78,8 +78,8 @@ enum InnerCaches<'extractor> {
Spilling(SpillingCaches<'extractor>), Spilling(SpillingCaches<'extractor>),
} }
impl<'extractor> CboCachedSorter<'extractor> { impl<'extractor> BalancedCaches<'extractor> {
pub fn new_in(buckets: usize, max_memory: usize, alloc: &'extractor Bump) -> Self { pub fn new_in(buckets: usize, max_memory: Option<usize>, alloc: &'extractor Bump) -> Self {
Self { Self {
hasher: DefaultHashBuilder::default(), hasher: DefaultHashBuilder::default(),
max_memory, max_memory,
@ -98,7 +98,7 @@ impl<'extractor> CboCachedSorter<'extractor> {
} }
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> Result<()> { pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> Result<()> {
if self.alloc.allocated_bytes() >= self.max_memory { if self.max_memory.map_or(false, |mm| self.alloc.allocated_bytes() >= mm) {
self.start_spilling()?; self.start_spilling()?;
} }
@ -115,7 +115,7 @@ impl<'extractor> CboCachedSorter<'extractor> {
} }
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> Result<()> { pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> Result<()> {
if self.alloc.allocated_bytes() >= self.max_memory { if self.max_memory.map_or(false, |mm| self.alloc.allocated_bytes() >= mm) {
self.start_spilling()?; self.start_spilling()?;
} }
@ -134,7 +134,7 @@ impl<'extractor> CboCachedSorter<'extractor> {
/// Make sure the cache is no longer allocating data /// Make sure the cache is no longer allocating data
/// and writes every new and unknow entry to disk. /// and writes every new and unknow entry to disk.
fn start_spilling(&mut self) -> Result<()> { fn start_spilling(&mut self) -> Result<()> {
let CboCachedSorter { hasher: _, alloc: _, max_memory: _, caches } = self; let BalancedCaches { hasher: _, alloc: _, max_memory: _, caches } = self;
if let InnerCaches::Normal(normal_caches) = caches { if let InnerCaches::Normal(normal_caches) = caches {
let dummy = NormalCaches { caches: Vec::new() }; let dummy = NormalCaches { caches: Vec::new() };
@ -173,7 +173,7 @@ impl<'extractor> CboCachedSorter<'extractor> {
} }
} }
unsafe impl MostlySend for CboCachedSorter<'_> {} unsafe impl MostlySend for BalancedCaches<'_> {}
struct NormalCaches<'extractor> { struct NormalCaches<'extractor> {
caches: caches:
@ -371,9 +371,9 @@ pub struct FrozenCache<'a, 'extractor> {
} }
pub fn transpose_and_freeze_caches<'a, 'extractor>( pub fn transpose_and_freeze_caches<'a, 'extractor>(
caches: &'a mut [CboCachedSorter<'extractor>], caches: &'a mut [BalancedCaches<'extractor>],
) -> Result<Vec<Vec<FrozenCache<'a, 'extractor>>>> { ) -> Result<Vec<Vec<FrozenCache<'a, 'extractor>>>> {
let width = caches.get(0).map(CboCachedSorter::buckets).unwrap_or(0); let width = caches.get(0).map(BalancedCaches::buckets).unwrap_or(0);
let mut bucket_caches: Vec<_> = iter::repeat_with(Vec::new).take(width).collect(); let mut bucket_caches: Vec<_> = iter::repeat_with(Vec::new).take(width).collect();
for thread_cache in caches { for thread_cache in caches {

View File

@ -6,7 +6,7 @@ use bumpalo::Bump;
use heed::RoTxn; use heed::RoTxn;
use serde_json::Value; use serde_json::Value;
use super::super::cache::CboCachedSorter; use super::super::cache::BalancedCaches;
use super::facet_document::extract_document_facets; use super::facet_document::extract_document_facets;
use super::FacetKind; use super::FacetKind;
use crate::facet::value_encoding::f64_into_bytes; use crate::facet::value_encoding::f64_into_bytes;
@ -22,14 +22,18 @@ use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
pub struct FacetedExtractorData<'a> { pub struct FacetedExtractorData<'a> {
attributes_to_extract: &'a [&'a str], attributes_to_extract: &'a [&'a str],
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
max_memory: Option<usize>, buckets: usize,
} }
impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> { impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> {
type Data = RefCell<CboCachedSorter<'extractor>>; type Data = RefCell<BalancedCaches<'extractor>>;
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> { fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(CboCachedSorter::new_in(extractor_alloc)?)) Ok(RefCell::new(BalancedCaches::new_in(
self.buckets,
self.grenad_parameters.max_memory,
extractor_alloc,
)))
} }
fn process( fn process(
@ -45,7 +49,7 @@ pub struct FacetedDocidsExtractor;
impl FacetedDocidsExtractor { impl FacetedDocidsExtractor {
fn extract_document_change( fn extract_document_change(
context: &DocumentChangeContext<RefCell<CboCachedSorter>>, context: &DocumentChangeContext<RefCell<BalancedCaches>>,
attributes_to_extract: &[&str], attributes_to_extract: &[&str],
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {
@ -62,7 +66,7 @@ impl FacetedDocidsExtractor {
Self::facet_fn_with_options( Self::facet_fn_with_options(
&context.doc_alloc, &context.doc_alloc,
cached_sorter.deref_mut(), cached_sorter.deref_mut(),
CboCachedSorter::insert_del_u32, BalancedCaches::insert_del_u32,
inner.docid(), inner.docid(),
fid, fid,
value, value,
@ -79,7 +83,7 @@ impl FacetedDocidsExtractor {
Self::facet_fn_with_options( Self::facet_fn_with_options(
&context.doc_alloc, &context.doc_alloc,
cached_sorter.deref_mut(), cached_sorter.deref_mut(),
CboCachedSorter::insert_del_u32, BalancedCaches::insert_del_u32,
inner.docid(), inner.docid(),
fid, fid,
value, value,
@ -96,7 +100,7 @@ impl FacetedDocidsExtractor {
Self::facet_fn_with_options( Self::facet_fn_with_options(
&context.doc_alloc, &context.doc_alloc,
cached_sorter.deref_mut(), cached_sorter.deref_mut(),
CboCachedSorter::insert_add_u32, BalancedCaches::insert_add_u32,
inner.docid(), inner.docid(),
fid, fid,
value, value,
@ -113,7 +117,7 @@ impl FacetedDocidsExtractor {
Self::facet_fn_with_options( Self::facet_fn_with_options(
&context.doc_alloc, &context.doc_alloc,
cached_sorter.deref_mut(), cached_sorter.deref_mut(),
CboCachedSorter::insert_add_u32, BalancedCaches::insert_add_u32,
inner.docid(), inner.docid(),
fid, fid,
value, value,
@ -126,8 +130,8 @@ impl FacetedDocidsExtractor {
fn facet_fn_with_options<'extractor>( fn facet_fn_with_options<'extractor>(
doc_alloc: &Bump, doc_alloc: &Bump,
cached_sorter: &mut CboCachedSorter<'extractor>, cached_sorter: &mut BalancedCaches<'extractor>,
cache_fn: impl Fn(&mut CboCachedSorter<'extractor>, &[u8], u32) -> Result<()>, cache_fn: impl Fn(&mut BalancedCaches<'extractor>, &[u8], u32) -> Result<()>,
docid: DocumentId, docid: DocumentId,
fid: FieldId, fid: FieldId,
value: &Value, value: &Value,
@ -219,7 +223,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>, indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Vec<CboCachedSorter<'extractor>>> { ) -> Result<Vec<BalancedCaches<'extractor>>> {
let max_memory = grenad_parameters.max_memory_by_thread(); let max_memory = grenad_parameters.max_memory_by_thread();
let index = indexing_context.index; let index = indexing_context.index;
@ -238,7 +242,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
let extractor = FacetedExtractorData { let extractor = FacetedExtractorData {
attributes_to_extract: &attributes_to_extract, attributes_to_extract: &attributes_to_extract,
grenad_parameters, grenad_parameters,
max_memory, buckets: rayon::max_num_threads(),
}; };
for_each_document_change( for_each_document_change(
document_changes, document_changes,

View File

@ -3,7 +3,7 @@ mod faceted;
mod searchable; mod searchable;
use bumpalo::Bump; use bumpalo::Bump;
pub use cache::{merge_caches, transpose_and_freeze_caches, CboCachedSorter, DelAddRoaringBitmap}; pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap};
pub use faceted::*; pub use faceted::*;
pub use searchable::*; pub use searchable::*;
@ -17,7 +17,7 @@ pub trait DocidsExtractor {
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>, indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Vec<CboCachedSorter<'extractor>>>; ) -> Result<Vec<BalancedCaches<'extractor>>>;
} }
/// TODO move in permissive json pointer /// TODO move in permissive json pointer

View File

@ -1,15 +1,14 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::io;
use std::ops::DerefMut as _; use std::ops::DerefMut as _;
use bumpalo::Bump; use bumpalo::Bump;
use grenad::{Merger, MergerBuilder}; use grenad::MergerBuilder;
use heed::RoTxn; use heed::RoTxn;
use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::cache::BalancedCaches;
use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::extract::perm_json_p::contained_in;
use crate::update::new::indexer::document_changes::{ use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend,
@ -21,30 +20,30 @@ use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_
const MAX_COUNTED_WORDS: usize = 30; const MAX_COUNTED_WORDS: usize = 30;
pub struct WordDocidsCachedSorters<'extractor> { pub struct WordDocidsBalancedCaches<'extractor> {
word_fid_docids: CboCachedSorter<'extractor>, word_fid_docids: BalancedCaches<'extractor>,
word_docids: CboCachedSorter<'extractor>, word_docids: BalancedCaches<'extractor>,
exact_word_docids: CboCachedSorter<'extractor>, exact_word_docids: BalancedCaches<'extractor>,
word_position_docids: CboCachedSorter<'extractor>, word_position_docids: BalancedCaches<'extractor>,
fid_word_count_docids: CboCachedSorter<'extractor>, fid_word_count_docids: BalancedCaches<'extractor>,
fid_word_count: HashMap<FieldId, (usize, usize)>, fid_word_count: HashMap<FieldId, (usize, usize)>,
current_docid: Option<DocumentId>, current_docid: Option<DocumentId>,
} }
unsafe impl<'extractor> MostlySend for WordDocidsCachedSorters<'extractor> {} unsafe impl<'extractor> MostlySend for WordDocidsBalancedCaches<'extractor> {}
impl<'extractor> WordDocidsCachedSorters<'extractor> { impl<'extractor> WordDocidsBalancedCaches<'extractor> {
/// TODO Make sure to give the same max_memory to all of them, without splitting it /// TODO Make sure to give the same max_memory to all of them, without splitting it
pub fn new_in(alloc: &'extractor Bump) -> io::Result<Self> { pub fn new_in(buckets: usize, max_memory: Option<usize>, alloc: &'extractor Bump) -> Self {
Ok(Self { Self {
word_fid_docids: CboCachedSorter::new_in(alloc)?, word_fid_docids: BalancedCaches::new_in(buckets, max_memory, alloc),
word_docids: CboCachedSorter::new_in(alloc)?, word_docids: BalancedCaches::new_in(buckets, max_memory, alloc),
exact_word_docids: CboCachedSorter::new_in(alloc)?, exact_word_docids: BalancedCaches::new_in(buckets, max_memory, alloc),
word_position_docids: CboCachedSorter::new_in(alloc)?, word_position_docids: BalancedCaches::new_in(buckets, max_memory, alloc),
fid_word_count_docids: CboCachedSorter::new_in(alloc)?, fid_word_count_docids: BalancedCaches::new_in(buckets, max_memory, alloc),
fid_word_count: HashMap::new(), fid_word_count: HashMap::new(),
current_docid: None, current_docid: None,
}) }
} }
fn insert_add_u32( fn insert_add_u32(
@ -148,35 +147,27 @@ impl<'extractor> WordDocidsCachedSorters<'extractor> {
} }
} }
struct WordDocidsMergerBuilders {
word_fid_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
word_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
exact_word_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
word_position_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
fid_word_count_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
}
pub struct WordDocidsMergers<'extractor> { pub struct WordDocidsMergers<'extractor> {
pub word_docids: Vec<CboCachedSorter<'extractor>>, pub word_docids: Vec<BalancedCaches<'extractor>>,
pub word_fid_docids: Vec<CboCachedSorter<'extractor>>, pub word_fid_docids: Vec<BalancedCaches<'extractor>>,
pub exact_word_docids: Vec<CboCachedSorter<'extractor>>, pub exact_word_docids: Vec<BalancedCaches<'extractor>>,
pub word_position_docids: Vec<CboCachedSorter<'extractor>>, pub word_position_docids: Vec<BalancedCaches<'extractor>>,
pub fid_word_count_docids: Vec<CboCachedSorter<'extractor>>, pub fid_word_count_docids: Vec<BalancedCaches<'extractor>>,
} }
impl<'extractor> WordDocidsMergerBuilders<'extractor> { impl<'extractor> WordDocidsMergers<'extractor> {
fn new() -> Self { fn new() -> Self {
Self { Self {
word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), word_docids: Vec::new(),
word_fid_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), word_fid_docids: Vec::new(),
exact_word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), exact_word_docids: Vec::new(),
word_position_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), word_position_docids: Vec::new(),
fid_word_count_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), fid_word_count_docids: Vec::new(),
} }
} }
fn add_sorters(&mut self, other: WordDocidsCachedSorters) -> Result<()> { fn push(&mut self, other: WordDocidsBalancedCaches<'extractor>) -> Result<()> {
let WordDocidsCachedSorters { let WordDocidsBalancedCaches {
word_docids, word_docids,
word_fid_docids, word_fid_docids,
exact_word_docids, exact_word_docids,
@ -186,43 +177,31 @@ impl<'extractor> WordDocidsMergerBuilders<'extractor> {
current_docid: _, current_docid: _,
} = other; } = other;
let word_docids_entries = word_docids.into_unordered_entries()?; self.word_docids.push(word_docids);
let word_fid_docids_entries = word_fid_docids.into_unordered_entries()?; self.word_fid_docids.push(word_fid_docids);
let exact_word_docids_entries = exact_word_docids.into_unordered_entries()?; self.exact_word_docids.push(exact_word_docids);
let word_position_docids_entries = word_position_docids.into_unordered_entries()?; self.word_position_docids.push(word_position_docids);
let fid_word_count_docids_entries = fid_word_count_docids.into_unordered_entries()?; self.fid_word_count_docids.push(fid_word_count_docids);
self.word_docids.push(word_docids_entries);
self.word_fid_docids.push(word_fid_docids_entries);
self.exact_word_docids.push(exact_word_docids_entries);
self.word_position_docids.push(word_position_docids_entries);
self.fid_word_count_docids.push(fid_word_count_docids_entries);
Ok(()) Ok(())
} }
fn build(self) -> WordDocidsMergers<'extractor> {
WordDocidsMergers {
word_docids: self.word_docids.build(),
word_fid_docids: self.word_fid_docids.build(),
exact_word_docids: self.exact_word_docids.build(),
word_position_docids: self.word_position_docids.build(),
fid_word_count_docids: self.fid_word_count_docids.build(),
}
}
} }
pub struct WordDocidsExtractorData<'extractor> { pub struct WordDocidsExtractorData<'a> {
tokenizer: &'extractor DocumentTokenizer<'extractor>, tokenizer: &'a DocumentTokenizer<'a>,
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
max_memory: Option<usize>, buckets: usize,
} }
impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { impl<'a, 'extractor> Extractor<'extractor> for WordDocidsExtractorData<'a> {
type Data = RefCell<Option<WordDocidsCachedSorters<'extractor>>>; type Data = RefCell<Option<WordDocidsBalancedCaches<'extractor>>>;
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> { fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in(extractor_alloc)?))) Ok(RefCell::new(Some(WordDocidsBalancedCaches::new_in(
self.buckets,
self.grenad_parameters.max_memory,
extractor_alloc,
))))
} }
fn process( fn process(
@ -243,10 +222,9 @@ impl WordDocidsExtractors {
indexing_context: IndexingContext<'fid, 'indexer, 'index>, indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<WordDocidsMergers<'extractor>> { ) -> Result<WordDocidsMergers<'extractor>> {
let max_memory = grenad_parameters.max_memory_by_thread();
let index = indexing_context.index; let index = indexing_context.index;
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
let stop_words = index.stop_words(&rtxn)?; let stop_words = index.stop_words(&rtxn)?;
let allowed_separators = index.allowed_separators(&rtxn)?; let allowed_separators = index.allowed_separators(&rtxn)?;
let allowed_separators: Option<Vec<_>> = let allowed_separators: Option<Vec<_>> =
@ -284,7 +262,7 @@ impl WordDocidsExtractors {
let extractor = WordDocidsExtractorData { let extractor = WordDocidsExtractorData {
tokenizer: &document_tokenizer, tokenizer: &document_tokenizer,
grenad_parameters, grenad_parameters,
max_memory, buckets: rayon::max_num_threads(),
}; };
for_each_document_change( for_each_document_change(
@ -296,21 +274,16 @@ impl WordDocidsExtractors {
)?; )?;
} }
{ let mut merger = WordDocidsMergers::new();
let span =
tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter();
let mut builder = WordDocidsMergerBuilders::new();
for cache in datastore.into_iter().flat_map(RefCell::into_inner) { for cache in datastore.into_iter().flat_map(RefCell::into_inner) {
builder.add_sorters(cache)?; merger.push(cache)?;
} }
Ok(builder.build()) Ok(merger)
}
} }
fn extract_document_change( fn extract_document_change(
context: &DocumentChangeContext<RefCell<Option<WordDocidsCachedSorters>>>, context: &DocumentChangeContext<RefCell<Option<WordDocidsBalancedCaches>>>,
document_tokenizer: &DocumentTokenizer, document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {

View File

@ -8,7 +8,7 @@ use super::tokenize_document::DocumentTokenizer;
use super::SearchableExtractor; use super::SearchableExtractor;
use crate::proximity::{index_proximity, MAX_DISTANCE}; use crate::proximity::{index_proximity, MAX_DISTANCE};
use crate::update::new::document::Document; use crate::update::new::document::Document;
use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::cache::BalancedCaches;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, FullySend, RefCellExt}; use crate::update::new::indexer::document_changes::{DocumentChangeContext, FullySend, RefCellExt};
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::MergeDeladdCboRoaringBitmaps; use crate::update::MergeDeladdCboRoaringBitmaps;
@ -32,7 +32,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
// and to store the docids of the documents that have a number of words in a given field // and to store the docids of the documents that have a number of words in a given field
// equal to or under than MAX_COUNTED_WORDS. // equal to or under than MAX_COUNTED_WORDS.
fn extract_document_change( fn extract_document_change(
context: &DocumentChangeContext<RefCell<CboCachedSorter>>, context: &DocumentChangeContext<RefCell<BalancedCaches>>,
document_tokenizer: &DocumentTokenizer, document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {

View File

@ -12,7 +12,7 @@ pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor;
use heed::RoTxn; use heed::RoTxn;
use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::cache::CboCachedSorter; use super::cache::BalancedCaches;
use super::DocidsExtractor; use super::DocidsExtractor;
use crate::update::new::indexer::document_changes::{ use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend,
@ -25,17 +25,21 @@ use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
pub struct SearchableExtractorData<'a, EX: SearchableExtractor> { pub struct SearchableExtractorData<'a, EX: SearchableExtractor> {
tokenizer: &'a DocumentTokenizer<'a>, tokenizer: &'a DocumentTokenizer<'a>,
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
max_memory: Option<usize>, buckets: usize,
_ex: PhantomData<EX>, _ex: PhantomData<EX>,
} }
impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
for SearchableExtractorData<'a, EX> for SearchableExtractorData<'a, EX>
{ {
type Data = RefCell<CboCachedSorter<'extractor>>; type Data = RefCell<BalancedCaches<'extractor>>;
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> { fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(CboCachedSorter::new_in(extractor_alloc)?)) Ok(RefCell::new(BalancedCaches::new_in(
self.buckets,
self.grenad_parameters.max_memory,
extractor_alloc,
)))
} }
fn process( fn process(
@ -53,7 +57,7 @@ pub trait SearchableExtractor: Sized + Sync {
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>, indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Vec<CboCachedSorter<'extractor>>> { ) -> Result<Vec<BalancedCaches<'extractor>>> {
let max_memory = grenad_parameters.max_memory_by_thread(); let max_memory = grenad_parameters.max_memory_by_thread();
let rtxn = indexing_context.index.read_txn()?; let rtxn = indexing_context.index.read_txn()?;
@ -87,7 +91,7 @@ pub trait SearchableExtractor: Sized + Sync {
let extractor_data: SearchableExtractorData<Self> = SearchableExtractorData { let extractor_data: SearchableExtractorData<Self> = SearchableExtractorData {
tokenizer: &document_tokenizer, tokenizer: &document_tokenizer,
grenad_parameters, grenad_parameters,
max_memory, buckets: rayon::max_num_threads(),
_ex: PhantomData, _ex: PhantomData,
}; };
@ -110,7 +114,7 @@ pub trait SearchableExtractor: Sized + Sync {
} }
fn extract_document_change( fn extract_document_change(
context: &DocumentChangeContext<RefCell<CboCachedSorter>>, context: &DocumentChangeContext<RefCell<BalancedCaches>>,
document_tokenizer: &DocumentTokenizer, document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()>; ) -> Result<()>;
@ -127,7 +131,7 @@ impl<T: SearchableExtractor> DocidsExtractor for T {
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>, indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Vec<CboCachedSorter<'extractor>>> { ) -> Result<Vec<BalancedCaches<'extractor>>> {
Self::run_extraction( Self::run_extraction(
grenad_parameters, grenad_parameters,
document_changes, document_changes,

View File

@ -42,11 +42,11 @@ mod document_operation;
mod partial_dump; mod partial_dump;
mod update_by_function; mod update_by_function;
struct DocumentExtractor<'a, 'extractor> { struct DocumentExtractor<'a> {
document_sender: &'a DocumentSender<'a, 'extractor>, documents_sender: &'a DocumentsSender<'a>,
} }
impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a, 'extractor> { impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> {
type Data = FullySend<()>; type Data = FullySend<()>;
fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> { fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
@ -70,7 +70,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a, 'extractor>
match change { match change {
DocumentChange::Deletion(deletion) => { DocumentChange::Deletion(deletion) => {
let docid = deletion.docid(); let docid = deletion.docid();
self.document_sender.delete(docid, external_docid).unwrap(); self.documents_sender.delete(docid, external_docid).unwrap();
} }
/// TODO: change NONE by SOME(vector) when implemented /// TODO: change NONE by SOME(vector) when implemented
DocumentChange::Update(update) => { DocumentChange::Update(update) => {
@ -79,14 +79,14 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a, 'extractor>
update.new(&context.txn, context.index, &context.db_fields_ids_map)?; update.new(&context.txn, context.index, &context.db_fields_ids_map)?;
let content = let content =
write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?; write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?;
self.document_sender.insert(docid, external_docid, content.boxed()).unwrap(); self.documents_sender.uncompressed(docid, external_docid, &content).unwrap();
} }
DocumentChange::Insertion(insertion) => { DocumentChange::Insertion(insertion) => {
let docid = insertion.docid(); let docid = insertion.docid();
let content = insertion.new(); let content = insertion.new();
let content = let content =
write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?; write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?;
self.document_sender.insert(docid, external_docid, content.boxed()).unwrap(); self.documents_sender.uncompressed(docid, external_docid, &content).unwrap();
// extracted_dictionary_sender.send(self, dictionary: &[u8]); // extracted_dictionary_sender.send(self, dictionary: &[u8]);
} }
} }
@ -137,7 +137,7 @@ where
// document but we need to create a function that collects and compresses documents. // document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.documents(); let document_sender = extractor_sender.documents();
let document_extractor = DocumentExtractor { document_sender: &document_sender }; let document_extractor = DocumentExtractor { documents_sender: &document_sender };
let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?; for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?;
@ -180,6 +180,7 @@ where
// TODO Word Docids Merger // TODO Word Docids Merger
// extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap(); // extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap();
{ {
let rtxn = index.read_txn()?;
let words_fst = index.words_fst(&rtxn)?; let words_fst = index.words_fst(&rtxn)?;
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
let prefix_settings = index.prefix_settings(&rtxn)?; let prefix_settings = index.prefix_settings(&rtxn)?;

View File

@ -9,7 +9,7 @@ use roaring::RoaringBitmap;
use super::channel::*; use super::channel::*;
use super::extract::{ use super::extract::{
merge_caches, transpose_and_freeze_caches, CboCachedSorter, DelAddRoaringBitmap, FacetKind, merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind,
}; };
use super::word_fst_builder::PrefixDelta; use super::word_fst_builder::PrefixDelta;
use super::DocumentChange; use super::DocumentChange;
@ -73,8 +73,8 @@ impl GeoExtractor {
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_docids<'extractor>( pub fn merge_and_send_docids<'extractor>(
mut caches: Vec<CboCachedSorter<'extractor>>, mut caches: Vec<BalancedCaches<'extractor>>,
database: Database<Bytes, Bytes>, database: Database<Bytes, Bytes>,
index: &Index, index: &Index,
docids_sender: impl DocidsSender + Sync, docids_sender: impl DocidsSender + Sync,
@ -104,8 +104,8 @@ fn merge_and_send_docids<'extractor>(
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_facet_docids<'extractor>( pub fn merge_and_send_facet_docids<'extractor>(
mut caches: Vec<CboCachedSorter<'extractor>>, mut caches: Vec<BalancedCaches<'extractor>>,
database: FacetDatabases, database: FacetDatabases,
index: &Index, index: &Index,
docids_sender: impl DocidsSender + Sync, docids_sender: impl DocidsSender + Sync,
@ -134,12 +134,12 @@ fn merge_and_send_facet_docids<'extractor>(
}) })
} }
struct FacetDatabases<'a> { pub struct FacetDatabases<'a> {
index: &'a Index, index: &'a Index,
} }
impl<'a> FacetDatabases<'a> { impl<'a> FacetDatabases<'a> {
fn new(index: &'a Index) -> Self { pub fn new(index: &'a Index) -> Self {
Self { index } Self { index }
} }
@ -168,7 +168,7 @@ pub struct FacetFieldIdsDelta {
} }
impl FacetFieldIdsDelta { impl FacetFieldIdsDelta {
fn new() -> Self { pub fn new() -> Self {
Self { Self {
modified_facet_string_ids: HashSet::new(), modified_facet_string_ids: HashSet::new(),
modified_facet_number_ids: HashSet::new(), modified_facet_number_ids: HashSet::new(),