Make progress on the new cache system

This commit is contained in:
Clément Renault 2024-10-17 18:01:25 +02:00
parent f18fed9e32
commit 3a76ccb6e1
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
5 changed files with 173 additions and 306 deletions

View File

@ -1,55 +1,68 @@
use std::cell::RefCell;
use std::fmt::Write as _;
use std::fs::File; use std::fs::File;
use std::io::{self, BufReader, BufWriter, Read as _, Write as _}; use std::io::{self, BufReader, BufWriter, Read as _, Seek, Write as _};
use std::vec; use std::vec;
use bumpalo::Bump;
use grenad::{MergeFunction, Sorter};
use hashbrown::hash_map::RawEntryMut; use hashbrown::hash_map::RawEntryMut;
use raw_collections::alloc::{RefBump, RefBytes}; use raw_collections::alloc::{RefBump, RefBytes};
use roaring::bitmap::Statistics;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use tempfile::tempfile;
use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::new::indexer::document_changes::MostlySend; use crate::update::new::indexer::document_changes::MostlySend;
use crate::CboRoaringBitmapCodec; use crate::CboRoaringBitmapCodec;
const KEY_SIZE: usize = 12; const KEY_SIZE: usize = 12;
// #[derive(Debug)] // #[derive(Debug)]
pub struct CboCachedSorter<'extractor, MF> { pub struct CboCachedSorter<'extractor> {
cache: hashbrown::HashMap< cache: hashbrown::HashMap<
// TODO check the size of it
RefBytes<'extractor>, RefBytes<'extractor>,
DelAddRoaringBitmap, DelAddRoaringBitmap,
hashbrown::DefaultHashBuilder, hashbrown::DefaultHashBuilder,
RefBump<'extractor>, RefBump<'extractor>,
>, >,
alloc: RefBump<'extractor>, alloc: RefBump<'extractor>,
sorter: Sorter<MF>, spilled_entries: UnorderedEntries,
deladd_buffer: Vec<u8>, deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>, cbo_buffer: Vec<u8>,
total_insertions: usize,
fitted_in_key: usize,
} }
impl<'extractor, MF> CboCachedSorter<'extractor, MF> { // # How the Merge Algorithm works
//
// - Collect all hashmaps to the main thread
// - Iterator over all the hashmaps in the different threads
// - Each thread must take care of its own keys (regarding a hash number)
// - Also read the spilled content which are inside
// - Each thread must populate a local hashmap with the entries
// - Every thread send the merged content to the main writing thread
//
// ## Next Step
//
// - Define the size of the buckets in advance to make sure everything fits in memory.
// ```
// let total_buckets = 32;
// (0..total_buckets).par_iter().for_each(|n| {
// let hash = todo!();
// if hash % total_bucket == n {
// // take care of this key
// }
// });
// ```
impl<'extractor> CboCachedSorter<'extractor> {
/// TODO may add the capacity /// TODO may add the capacity
pub fn new_in(sorter: Sorter<MF>, alloc: RefBump<'extractor>) -> Self { pub fn new_in(alloc: RefBump<'extractor>) -> io::Result<Self> {
CboCachedSorter { Ok(CboCachedSorter {
cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)), cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)),
alloc, alloc,
sorter, spilled_entries: tempfile().map(UnorderedEntries::new)?,
deladd_buffer: Vec::new(), deladd_buffer: Vec::new(),
cbo_buffer: Vec::new(), cbo_buffer: Vec::new(),
total_insertions: 0, })
fitted_in_key: 0,
}
} }
} }
impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> { impl<'extractor> CboCachedSorter<'extractor> {
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) { pub fn insert_del_u32(&mut self, key: &[u8], n: u32) {
match self.cache.raw_entry_mut().from_key(key) { match self.cache.raw_entry_mut().from_key(key) {
RawEntryMut::Occupied(mut entry) => { RawEntryMut::Occupied(mut entry) => {
@ -57,8 +70,6 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> {
del.get_or_insert_with(RoaringBitmap::default).insert(n); del.get_or_insert_with(RoaringBitmap::default).insert(n);
} }
RawEntryMut::Vacant(entry) => { RawEntryMut::Vacant(entry) => {
self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let alloc = RefBump::clone(&self.alloc); let alloc = RefBump::clone(&self.alloc);
let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key));
entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del_u32(n)); entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del_u32(n));
@ -73,8 +84,6 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> {
add.get_or_insert_with(RoaringBitmap::default).insert(n); add.get_or_insert_with(RoaringBitmap::default).insert(n);
} }
RawEntryMut::Vacant(entry) => { RawEntryMut::Vacant(entry) => {
self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let alloc = RefBump::clone(&self.alloc); let alloc = RefBump::clone(&self.alloc);
let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key));
entry.insert(RefBytes(key), DelAddRoaringBitmap::new_add_u32(n)); entry.insert(RefBytes(key), DelAddRoaringBitmap::new_add_u32(n));
@ -82,164 +91,104 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> {
} }
} }
fn write_entry<A: AsRef<[u8]>>( pub fn spill_to_disk(self) -> io::Result<SpilledCache> {
sorter: &mut Sorter<MF>, let Self { cache, alloc: _, mut spilled_entries, mut deladd_buffer, mut cbo_buffer } = self;
deladd_buffer: &mut Vec<u8>,
cbo_buffer: &mut Vec<u8>,
key: A,
deladd: DelAddRoaringBitmap,
) -> grenad::Result<(), MF::Error> {
deladd_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(deladd_buffer);
match deladd {
DelAddRoaringBitmap { del: Some(del), add: None } => {
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer);
value_writer.insert(DelAdd::Deletion, &cbo_buffer)?;
}
DelAddRoaringBitmap { del: None, add: Some(add) } => {
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer);
value_writer.insert(DelAdd::Addition, &cbo_buffer)?;
}
DelAddRoaringBitmap { del: Some(del), add: Some(add) } => {
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer);
value_writer.insert(DelAdd::Deletion, &cbo_buffer)?;
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer);
value_writer.insert(DelAdd::Addition, &cbo_buffer)?;
}
DelAddRoaringBitmap { del: None, add: None } => return Ok(()),
}
let bytes = value_writer.into_inner().unwrap();
sorter.insert(key, bytes)
}
pub fn spill_to_disk(self) -> grenad::Result<SpilledCache<MF>, MF::Error> {
let Self {
cache,
alloc: _,
mut sorter,
mut deladd_buffer,
mut cbo_buffer,
total_insertions,
fitted_in_key,
} = self;
for (key, deladd) in cache { for (key, deladd) in cache {
Self::write_entry(&mut sorter, &mut deladd_buffer, &mut cbo_buffer, key, deladd)?; spill_entry_to_disk(
&mut spilled_entries,
&mut deladd_buffer,
&mut cbo_buffer,
&key,
deladd,
)?;
} }
Ok(SpilledCache { sorter, deladd_buffer, cbo_buffer, total_insertions, fitted_in_key }) Ok(SpilledCache { spilled_entries, deladd_buffer, cbo_buffer })
} }
pub fn into_sorter(self) -> grenad::Result<Sorter<MF>, MF::Error> { // TODO Do not spill to disk if not necessary
let Self { pub fn into_unordered_entries(self) -> io::Result<UnorderedEntriesIntoIter> {
cache, let Self { cache, alloc: _, mut spilled_entries, mut cbo_buffer, mut deladd_buffer } = self;
alloc: _,
mut sorter,
mut deladd_buffer,
mut cbo_buffer,
total_insertions,
fitted_in_key,
} = self;
let mut all_n_containers = Vec::new();
let mut all_n_array_containers = Vec::new();
let mut all_n_bitset_containers = Vec::new();
let mut all_n_values_array_containers = Vec::new();
let mut all_n_values_bitset_containers = Vec::new();
let mut all_cardinality = Vec::new();
for (_key, deladd) in &cache {
for bitmap in [&deladd.del, &deladd.add].into_iter().flatten() {
let Statistics {
n_containers,
n_array_containers,
n_bitset_containers,
n_values_array_containers,
n_values_bitset_containers,
cardinality,
..
} = bitmap.statistics();
all_n_containers.push(n_containers);
all_n_array_containers.push(n_array_containers);
all_n_bitset_containers.push(n_bitset_containers);
all_n_values_array_containers.push(n_values_array_containers);
all_n_values_bitset_containers.push(n_values_bitset_containers);
all_cardinality.push(cardinality as u32);
}
}
for (key, deladd) in cache { for (key, deladd) in cache {
Self::write_entry(&mut sorter, &mut deladd_buffer, &mut cbo_buffer, key, deladd)?; spill_entry_to_disk(
&mut spilled_entries,
&mut deladd_buffer,
&mut cbo_buffer,
&key,
deladd,
)?;
} }
let mut output = String::new(); spilled_entries.into_iter_bitmap()
for (name, mut slice) in [
("n_containers", all_n_containers),
("n_array_containers", all_n_array_containers),
("n_bitset_containers", all_n_bitset_containers),
("n_values_array_containers", all_n_values_array_containers),
("n_values_bitset_containers", all_n_values_bitset_containers),
("cardinality", all_cardinality),
] {
let _ = writeln!(&mut output, "{name} (p100) {:?}", Stats::from_slice(&mut slice));
// let _ = writeln!(&mut output, "{name} (p99) {:?}", Stats::from_slice_p99(&mut slice));
}
let _ = writeln!(
&mut output,
"LruCache stats: {} <= {KEY_SIZE} bytes ({}%) on a total of {} insertions",
fitted_in_key,
(fitted_in_key as f32 / total_insertions as f32) * 100.0,
total_insertions,
);
eprintln!("{output}");
Ok(sorter)
} }
} }
pub struct SpilledCache<MF> { fn spill_entry_to_disk(
sorter: Sorter<MF>, spilled_entries: &mut UnorderedEntries,
deladd_buffer: &mut Vec<u8>,
cbo_buffer: &mut Vec<u8>,
key: &[u8],
deladd: DelAddRoaringBitmap,
) -> io::Result<()> {
deladd_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(deladd_buffer);
match deladd {
DelAddRoaringBitmap { del: Some(del), add: None } => {
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer);
value_writer.insert(DelAdd::Deletion, &cbo_buffer)?;
}
DelAddRoaringBitmap { del: None, add: Some(add) } => {
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer);
value_writer.insert(DelAdd::Addition, &cbo_buffer)?;
}
DelAddRoaringBitmap { del: Some(del), add: Some(add) } => {
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer);
value_writer.insert(DelAdd::Deletion, &cbo_buffer)?;
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer);
value_writer.insert(DelAdd::Addition, &cbo_buffer)?;
}
DelAddRoaringBitmap { del: None, add: None } => return Ok(()),
}
let bytes = value_writer.into_inner().unwrap();
spilled_entries.push(key, bytes)
}
pub struct SpilledCache {
spilled_entries: UnorderedEntries,
deladd_buffer: Vec<u8>, deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>, cbo_buffer: Vec<u8>,
total_insertions: usize,
fitted_in_key: usize,
} }
impl<MF> SpilledCache<MF> { impl SpilledCache {
pub fn reconstruct(self, alloc: RefBump<'_>) -> CboCachedSorter<'_, MF> { pub fn reconstruct(self, alloc: RefBump<'_>) -> CboCachedSorter<'_> {
let SpilledCache { sorter, deladd_buffer, cbo_buffer, total_insertions, fitted_in_key } = let SpilledCache { spilled_entries, deladd_buffer, cbo_buffer } = self;
self;
CboCachedSorter { CboCachedSorter {
cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)), cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)),
alloc, alloc,
sorter, spilled_entries,
deladd_buffer, deladd_buffer,
cbo_buffer, cbo_buffer,
total_insertions,
fitted_in_key,
} }
} }
} }
unsafe impl<'extractor, MF: Send> MostlySend for CboCachedSorter<'extractor, MF> {} unsafe impl<'extractor> MostlySend for CboCachedSorter<'extractor> {}
pub struct UnorderedEntries { pub struct UnorderedEntries {
entry_offsets: Vec<(u32, u32)>, entry_sizes: Vec<(u32, u32)>,
file: BufWriter<File>, file: BufWriter<File>,
} }
impl UnorderedEntries { impl UnorderedEntries {
pub fn new(file: File) -> Self { fn new(file: File) -> Self {
UnorderedEntries { entry_offsets: Vec::new(), file: BufWriter::new(file) } UnorderedEntries { entry_sizes: Vec::new(), file: BufWriter::new(file) }
} }
/// Pushes a new tuple of key/value into a file. /// Pushes a new tuple of key/value into a file.
@ -249,33 +198,41 @@ impl UnorderedEntries {
/// # Panics /// # Panics
/// ///
/// - Panics if the key or value length is larger than 2^32 bytes. /// - Panics if the key or value length is larger than 2^32 bytes.
pub fn push(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> { fn push(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
let key_len = key.len().try_into().unwrap(); let key_len = key.len().try_into().unwrap();
let value_len = key.len().try_into().unwrap(); let value_len = value.len().try_into().unwrap();
self.file.write_all(key)?; self.file.write_all(key)?;
self.file.write_all(value)?; self.file.write_all(value)?;
self.entry_offsets.push((key_len, value_len)); self.entry_sizes.push((key_len, value_len));
Ok(()) Ok(())
} }
pub fn into_iter_ref(self) -> IntoIterRef { fn into_iter_bitmap(self) -> io::Result<UnorderedEntriesIntoIter> {
let Self { entry_offsets, file } = self; let Self { entry_sizes, file } = self;
IntoIterRef { entry_offsets: entry_offsets.into_iter(), file, buffer: Vec::new() }
let mut file = file.into_inner().map_err(|e| e.into_error())?;
file.rewind()?;
Ok(UnorderedEntriesIntoIter {
entry_sizes: entry_sizes.into_iter(),
file: BufReader::new(file),
buffer: Vec::new(),
})
} }
} }
pub struct IntoIterRef { pub struct UnorderedEntriesIntoIter {
entry_offsets: vec::IntoIter<(u32, u32)>, entry_sizes: vec::IntoIter<(u32, u32)>,
file: BufReader<File>, file: BufReader<File>,
buffer: Vec<u8>, buffer: Vec<u8>,
} }
impl IntoIterRef { impl UnorderedEntriesIntoIter {
pub fn next(&mut self) -> io::Result<Option<(&[u8], &[u8])>> { fn next_ref(&mut self) -> io::Result<Option<(&[u8], &[u8])>> {
match self.entry_offsets.next() { match self.entry_sizes.next() {
Some((key_len, value_len)) => { Some((key_len, value_len)) => {
let key_len = key_len as usize; let key_len = key_len as usize;
let value_len = value_len as usize; let value_len = value_len as usize;
@ -287,10 +244,25 @@ impl IntoIterRef {
self.file.read_exact(buffer)?; self.file.read_exact(buffer)?;
let buffer = &self.buffer[..total_len]; let buffer = &self.buffer[..total_len];
let (key, value) = buffer.split_at(key_len); Ok(Some(buffer.split_at(key_len)))
debug_assert_eq!(value.len(), value_len); }
None => Ok(None),
}
}
Ok(Some((key, value))) pub fn next_deladd_bitmap(&mut self) -> io::Result<Option<(&[u8], DelAddRoaringBitmap)>> {
match self.next_ref()? {
Some((key, value_bytes)) => {
let reader = KvReaderDelAdd::from_slice(value_bytes);
let del = match reader.get(DelAdd::Deletion) {
Some(del_bytes) => Some(CboRoaringBitmapCodec::deserialize_from(del_bytes)?),
None => None,
};
let add = match reader.get(DelAdd::Addition) {
Some(add_bytes) => Some(CboRoaringBitmapCodec::deserialize_from(add_bytes)?),
None => None,
};
Ok(Some((key, DelAddRoaringBitmap { del, add })))
} }
None => Ok(None), None => Ok(None),
} }

View File

@ -1,14 +1,12 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt::Debug;
use std::fs::File; use std::fs::File;
use std::ops::DerefMut as _; use std::ops::DerefMut as _;
use bumpalo::Bump; use bumpalo::Bump;
use grenad::{MergeFunction, Merger}; use grenad::Merger;
use heed::RoTxn; use heed::RoTxn;
use raw_collections::alloc::RefBump; use raw_collections::alloc::RefBump;
use rayon::iter::{ParallelBridge as _, ParallelIterator as _};
use serde_json::Value; use serde_json::Value;
use super::super::cache::CboCachedSorter; use super::super::cache::CboCachedSorter;
@ -21,7 +19,7 @@ use crate::update::new::indexer::document_changes::{
IndexingContext, RefCellExt, ThreadLocal, IndexingContext, RefCellExt, ThreadLocal,
}; };
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH}; use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
pub struct FacetedExtractorData<'extractor> { pub struct FacetedExtractorData<'extractor> {
@ -31,24 +29,10 @@ pub struct FacetedExtractorData<'extractor> {
} }
impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> { impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> {
type Data = RefCell<CboCachedSorter<'extractor, MergeDeladdCboRoaringBitmaps>>; type Data = RefCell<CboCachedSorter<'extractor>>;
fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> { fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> {
Ok(RefCell::new(CboCachedSorter::new_in( Ok(RefCell::new(CboCachedSorter::new_in(extractor_alloc)?))
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
self.grenad_parameters.chunk_compression_type,
self.grenad_parameters.chunk_compression_level,
self.grenad_parameters.max_nb_chunks,
self.max_memory,
// *NOTE*: this must not be set to true:
// 1. we're already using max parallelism in the pool, so it wouldn't help
// 2. it creates correctness issues if it causes to yield a borrow-mut wielding task
false,
),
extractor_alloc,
)))
} }
fn process( fn process(
@ -64,7 +48,7 @@ pub struct FacetedDocidsExtractor;
impl FacetedDocidsExtractor { impl FacetedDocidsExtractor {
fn extract_document_change( fn extract_document_change(
context: &DocumentChangeContext<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>, context: &DocumentChangeContext<RefCell<CboCachedSorter>>,
attributes_to_extract: &[&str], attributes_to_extract: &[&str],
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {
@ -143,10 +127,10 @@ impl FacetedDocidsExtractor {
} }
} }
fn facet_fn_with_options<'extractor, MF>( fn facet_fn_with_options<'extractor>(
doc_alloc: &Bump, doc_alloc: &Bump,
cached_sorter: &mut CboCachedSorter<'extractor, MF>, cached_sorter: &mut CboCachedSorter<'extractor>,
cache_fn: impl Fn(&mut CboCachedSorter<'extractor, MF>, &[u8], u32), cache_fn: impl Fn(&mut CboCachedSorter<'extractor>, &[u8], u32),
docid: DocumentId, docid: DocumentId,
fid: FieldId, fid: FieldId,
value: &Value, value: &Value,

View File

@ -1,7 +1,7 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::num::NonZero; use std::io;
use std::ops::DerefMut as _; use std::ops::DerefMut as _;
use bumpalo::Bump; use bumpalo::Bump;
@ -17,17 +17,16 @@ use crate::update::new::indexer::document_changes::{
IndexingContext, MostlySend, RefCellExt, ThreadLocal, IndexingContext, MostlySend, RefCellExt, ThreadLocal,
}; };
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
const MAX_COUNTED_WORDS: usize = 30; const MAX_COUNTED_WORDS: usize = 30;
pub struct WordDocidsCachedSorters<'indexer> { pub struct WordDocidsCachedSorters<'indexer> {
word_fid_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, word_fid_docids: CboCachedSorter<'indexer>,
word_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, word_docids: CboCachedSorter<'indexer>,
exact_word_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, exact_word_docids: CboCachedSorter<'indexer>,
word_position_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, word_position_docids: CboCachedSorter<'indexer>,
fid_word_count_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, fid_word_count_docids: CboCachedSorter<'indexer>,
fid_word_count: HashMap<FieldId, (usize, usize)>, fid_word_count: HashMap<FieldId, (usize, usize)>,
current_docid: Option<DocumentId>, current_docid: Option<DocumentId>,
} }
@ -35,83 +34,16 @@ pub struct WordDocidsCachedSorters<'indexer> {
unsafe impl<'indexer> MostlySend for WordDocidsCachedSorters<'indexer> {} unsafe impl<'indexer> MostlySend for WordDocidsCachedSorters<'indexer> {}
impl<'indexer> WordDocidsCachedSorters<'indexer> { impl<'indexer> WordDocidsCachedSorters<'indexer> {
pub fn new_in( pub fn new_in(alloc: RefBump<'indexer>) -> io::Result<Self> {
indexer: GrenadParameters, Ok(Self {
max_memory: Option<usize>, word_fid_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?,
alloc: RefBump<'indexer>, word_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?,
) -> Self { exact_word_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?,
let max_memory = max_memory.map(|max_memory| max_memory / 4); word_position_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?,
fid_word_count_docids: CboCachedSorter::new_in(alloc)?,
let word_fid_docids = CboCachedSorter::new_in(
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
false,
),
RefBump::clone(&alloc),
);
let word_docids = CboCachedSorter::new_in(
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
false,
),
RefBump::clone(&alloc),
);
let exact_word_docids = CboCachedSorter::new_in(
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
false,
),
RefBump::clone(&alloc),
);
let word_position_docids = CboCachedSorter::new_in(
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
false,
),
RefBump::clone(&alloc),
);
let fid_word_count_docids = CboCachedSorter::new_in(
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
false,
),
alloc,
);
Self {
word_fid_docids,
word_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
fid_word_count: HashMap::new(), fid_word_count: HashMap::new(),
current_docid: None, current_docid: None,
} })
} }
fn insert_add_u32( fn insert_add_u32(
@ -253,21 +185,17 @@ impl WordDocidsMergerBuilders {
current_docid: _, current_docid: _,
} = other; } = other;
let word_fid_docids_readers = let word_fid_docids_entries = word_fid_docids.into_unordered_entries()?;
word_fid_docids.into_sorter().and_then(|s| s.into_reader_cursors()); let word_docids_entries = word_docids.into_unordered_entries()?;
let word_docids_readers = word_docids.into_sorter().and_then(|s| s.into_reader_cursors()); let exact_word_docids_entries = exact_word_docids.into_unordered_entries()?;
let exact_word_docids_readers = let word_position_docids_entries = word_position_docids.into_unordered_entries()?;
exact_word_docids.into_sorter().and_then(|s| s.into_reader_cursors()); let fid_word_count_docids_entries = fid_word_count_docids.into_unordered_entries()?;
let word_position_docids_readers =
word_position_docids.into_sorter().and_then(|s| s.into_reader_cursors());
let fid_word_count_docids_readers =
fid_word_count_docids.into_sorter().and_then(|s| s.into_reader_cursors());
self.word_fid_docids.extend(word_fid_docids_readers?); self.word_fid_docids.push(word_fid_docids_entries);
self.word_docids.extend(word_docids_readers?); self.word_docids.push(word_docids_entries);
self.exact_word_docids.extend(exact_word_docids_readers?); self.exact_word_docids.push(exact_word_docids_entries);
self.word_position_docids.extend(word_position_docids_readers?); self.word_position_docids.push(word_position_docids_entries);
self.fid_word_count_docids.extend(fid_word_count_docids_readers?); self.fid_word_count_docids.push(fid_word_count_docids_entries);
Ok(()) Ok(())
} }
@ -293,11 +221,7 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
type Data = RefCell<Option<WordDocidsCachedSorters<'extractor>>>; type Data = RefCell<Option<WordDocidsCachedSorters<'extractor>>>;
fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> { fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> {
Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in( Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in(extractor_alloc))))
self.grenad_parameters,
self.max_memory,
extractor_alloc,
))))
} }
fn process( fn process(
@ -357,7 +281,6 @@ pub struct WordDocidsExtractors;
impl WordDocidsExtractors { impl WordDocidsExtractors {
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
grenad_parameters: GrenadParameters,
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>, indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>, extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,

View File

@ -31,7 +31,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
// and to store the docids of the documents that have a number of words in a given field // and to store the docids of the documents that have a number of words in a given field
// equal to or under than MAX_COUNTED_WORDS. // equal to or under than MAX_COUNTED_WORDS.
fn extract_document_change( fn extract_document_change(
context: &DocumentChangeContext<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>, context: &DocumentChangeContext<RefCell<CboCachedSorter>>,
document_tokenizer: &DocumentTokenizer, document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {

View File

@ -12,7 +12,6 @@ pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor;
use grenad::Merger; use grenad::Merger;
use heed::RoTxn; use heed::RoTxn;
use raw_collections::alloc::RefBump; use raw_collections::alloc::RefBump;
use rayon::iter::{ParallelBridge, ParallelIterator};
use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::cache::CboCachedSorter; use super::cache::CboCachedSorter;
@ -22,7 +21,7 @@ use crate::update::new::indexer::document_changes::{
IndexingContext, ThreadLocal, IndexingContext, ThreadLocal,
}; };
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE}; use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
pub struct SearchableExtractorData<'extractor, EX: SearchableExtractor> { pub struct SearchableExtractorData<'extractor, EX: SearchableExtractor> {
@ -35,21 +34,10 @@ pub struct SearchableExtractorData<'extractor, EX: SearchableExtractor> {
impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
for SearchableExtractorData<'extractor, EX> for SearchableExtractorData<'extractor, EX>
{ {
type Data = RefCell<CboCachedSorter<'extractor, MergeDeladdCboRoaringBitmaps>>; type Data = RefCell<CboCachedSorter<'extractor>>;
fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> { fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> {
Ok(RefCell::new(CboCachedSorter::new_in( Ok(RefCell::new(CboCachedSorter::new_in(extractor_alloc)?))
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
self.grenad_parameters.chunk_compression_type,
self.grenad_parameters.chunk_compression_level,
self.grenad_parameters.max_nb_chunks,
self.max_memory,
false,
),
extractor_alloc,
)))
} }
fn process( fn process(