Fix more errors around the cache

This commit is contained in:
Clément Renault 2024-10-16 15:57:06 +02:00
parent 05a015b27c
commit 495742e113
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
4 changed files with 79 additions and 92 deletions

View File

@ -9,6 +9,7 @@ use roaring::bitmap::Statistics;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::update::new::indexer::document_changes::MostlySend;
use crate::CboRoaringBitmapCodec; use crate::CboRoaringBitmapCodec;
const KEY_SIZE: usize = 12; const KEY_SIZE: usize = 12;
@ -273,6 +274,8 @@ impl<MF> SpilledCache<MF> {
} }
} }
unsafe impl<'extractor, MF: Send> MostlySend for CboCachedSorter<'extractor, MF> {}
#[derive(Default, Debug)] #[derive(Default, Debug)]
struct Stats { struct Stats {
pub len: usize, pub len: usize,

View File

@ -7,6 +7,7 @@ use std::ops::DerefMut as _;
use bumpalo::Bump; use bumpalo::Bump;
use grenad::{MergeFunction, Merger}; use grenad::{MergeFunction, Merger};
use heed::RoTxn; use heed::RoTxn;
use raw_collections::alloc::RefBump;
use rayon::iter::{ParallelBridge as _, ParallelIterator as _}; use rayon::iter::{ParallelBridge as _, ParallelIterator as _};
use serde_json::Value; use serde_json::Value;
@ -30,15 +31,10 @@ pub struct FacetedExtractorData<'extractor> {
} }
impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> { impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> {
type Data = FullySend<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>; type Data = RefCell<CboCachedSorter<'extractor, MergeDeladdCboRoaringBitmaps>>;
fn init_data( fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> {
&self, Ok(RefCell::new(CboCachedSorter::new_in(
_extractor_alloc: raw_collections::alloc::RefBump<'extractor>,
) -> Result<Self::Data> {
Ok(FullySend(RefCell::new(CboCachedSorter::new(
// TODO use a better value
1_000_000.try_into().unwrap(),
create_sorter( create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps, MergeDeladdCboRoaringBitmaps,
@ -51,13 +47,14 @@ impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> {
// 2. it creates correctness issues if it causes to yield a borrow-mut wielding task // 2. it creates correctness issues if it causes to yield a borrow-mut wielding task
false, false,
), ),
)))) extractor_alloc,
)))
} }
fn process( fn process(
&self, &self,
change: DocumentChange, change: DocumentChange,
context: &crate::update::new::indexer::document_changes::DocumentChangeContext<Self::Data>, context: &DocumentChangeContext<Self::Data>,
) -> Result<()> { ) -> Result<()> {
FacetedDocidsExtractor::extract_document_change(context, self.attributes_to_extract, change) FacetedDocidsExtractor::extract_document_change(context, self.attributes_to_extract, change)
} }
@ -67,16 +64,14 @@ pub struct FacetedDocidsExtractor;
impl FacetedDocidsExtractor { impl FacetedDocidsExtractor {
fn extract_document_change( fn extract_document_change(
context: &DocumentChangeContext< context: &DocumentChangeContext<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>,
FullySend<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>,
>,
attributes_to_extract: &[&str], attributes_to_extract: &[&str],
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {
let index = &context.index; let index = &context.index;
let rtxn = &context.txn; let rtxn = &context.txn;
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let mut cached_sorter = context.data.0.borrow_mut_or_yield(); let mut cached_sorter = context.data.borrow_mut_or_yield();
match document_change { match document_change {
DocumentChange::Deletion(inner) => extract_document_facets( DocumentChange::Deletion(inner) => extract_document_facets(
attributes_to_extract, attributes_to_extract,
@ -90,7 +85,8 @@ impl FacetedDocidsExtractor {
inner.docid(), inner.docid(),
fid, fid,
value, value,
) );
Ok(())
}, },
), ),
DocumentChange::Update(inner) => { DocumentChange::Update(inner) => {
@ -106,7 +102,8 @@ impl FacetedDocidsExtractor {
inner.docid(), inner.docid(),
fid, fid,
value, value,
) );
Ok(())
}, },
)?; )?;
@ -122,7 +119,8 @@ impl FacetedDocidsExtractor {
inner.docid(), inner.docid(),
fid, fid,
value, value,
) );
Ok(())
}, },
) )
} }
@ -138,31 +136,27 @@ impl FacetedDocidsExtractor {
inner.docid(), inner.docid(),
fid, fid,
value, value,
) );
Ok(())
}, },
), ),
} }
} }
fn facet_fn_with_options<MF>( fn facet_fn_with_options<'extractor, MF>(
doc_alloc: &Bump, doc_alloc: &Bump,
cached_sorter: &mut CboCachedSorter<MF>, cached_sorter: &mut CboCachedSorter<'extractor, MF>,
cache_fn: impl Fn(&mut CboCachedSorter<MF>, &[u8], u32) -> grenad::Result<(), MF::Error>, cache_fn: impl Fn(&mut CboCachedSorter<'extractor, MF>, &[u8], u32),
docid: DocumentId, docid: DocumentId,
fid: FieldId, fid: FieldId,
value: &Value, value: &Value,
) -> Result<()> ) {
where
MF: MergeFunction,
MF::Error: Debug,
grenad::Error<MF::Error>: Into<crate::Error>,
{
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc); let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
// Exists // Exists
// key: fid // key: fid
buffer.push(FacetKind::Exists as u8); buffer.push(FacetKind::Exists as u8);
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid).map_err(Into::into)?; cache_fn(cached_sorter, &buffer, docid);
match value { match value {
// Number // Number
@ -177,10 +171,7 @@ impl FacetedDocidsExtractor {
buffer.push(0); // level 0 buffer.push(0); // level 0
buffer.extend_from_slice(&ordered); buffer.extend_from_slice(&ordered);
buffer.extend_from_slice(&n.to_be_bytes()); buffer.extend_from_slice(&n.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid);
cache_fn(cached_sorter, &buffer, docid).map_err(Into::into)
} else {
Ok(())
} }
} }
// String // String
@ -193,7 +184,7 @@ impl FacetedDocidsExtractor {
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(0); // level 0 buffer.push(0); // level 0
buffer.extend_from_slice(truncated.as_bytes()); buffer.extend_from_slice(truncated.as_bytes());
cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) cache_fn(cached_sorter, &buffer, docid);
} }
// Null // Null
// key: fid // key: fid
@ -201,7 +192,7 @@ impl FacetedDocidsExtractor {
buffer.clear(); buffer.clear();
buffer.push(FacetKind::Null as u8); buffer.push(FacetKind::Null as u8);
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) cache_fn(cached_sorter, &buffer, docid);
} }
// Empty // Empty
// key: fid // key: fid
@ -209,17 +200,17 @@ impl FacetedDocidsExtractor {
buffer.clear(); buffer.clear();
buffer.push(FacetKind::Empty as u8); buffer.push(FacetKind::Empty as u8);
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) cache_fn(cached_sorter, &buffer, docid);
} }
Value::Object(o) if o.is_empty() => { Value::Object(o) if o.is_empty() => {
buffer.clear(); buffer.clear();
buffer.push(FacetKind::Empty as u8); buffer.push(FacetKind::Empty as u8);
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) cache_fn(cached_sorter, &buffer, docid);
} }
// Otherwise, do nothing // Otherwise, do nothing
/// TODO: What about Value::Bool? /// TODO: What about Value::Bool?
_ => Ok(()), _ => (),
} }
} }

View File

@ -14,7 +14,7 @@ use crate::update::new::extract::cache::CboCachedSorter;
use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::extract::perm_json_p::contained_in;
use crate::update::new::indexer::document_changes::{ use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend,
IndexingContext, RefCellExt, ThreadLocal, IndexingContext, MostlySend, RefCellExt, ThreadLocal,
}; };
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
@ -22,26 +22,27 @@ use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_
const MAX_COUNTED_WORDS: usize = 30; const MAX_COUNTED_WORDS: usize = 30;
pub struct WordDocidsCachedSorters { pub struct WordDocidsCachedSorters<'indexer> {
word_fid_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>, word_fid_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>,
word_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>, word_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>,
exact_word_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>, exact_word_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>,
word_position_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>, word_position_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>,
fid_word_count_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>, fid_word_count_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>,
fid_word_count: HashMap<FieldId, (usize, usize)>, fid_word_count: HashMap<FieldId, (usize, usize)>,
current_docid: Option<DocumentId>, current_docid: Option<DocumentId>,
} }
impl WordDocidsCachedSorters { unsafe impl<'indexer> MostlySend for WordDocidsCachedSorters<'indexer> {}
pub fn new(
impl<'indexer> WordDocidsCachedSorters<'indexer> {
pub fn new_in(
indexer: GrenadParameters, indexer: GrenadParameters,
max_memory: Option<usize>, max_memory: Option<usize>,
capacity: NonZero<usize>, alloc: RefBump<'indexer>,
) -> Self { ) -> Self {
let max_memory = max_memory.map(|max_memory| max_memory / 4); let max_memory = max_memory.map(|max_memory| max_memory / 4);
let word_fid_docids = CboCachedSorter::new( let word_fid_docids = CboCachedSorter::new_in(
capacity,
create_sorter( create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps, MergeDeladdCboRoaringBitmaps,
@ -51,9 +52,9 @@ impl WordDocidsCachedSorters {
max_memory, max_memory,
false, false,
), ),
RefBump::clone(&alloc),
); );
let word_docids = CboCachedSorter::new( let word_docids = CboCachedSorter::new_in(
capacity,
create_sorter( create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps, MergeDeladdCboRoaringBitmaps,
@ -63,9 +64,9 @@ impl WordDocidsCachedSorters {
max_memory, max_memory,
false, false,
), ),
RefBump::clone(&alloc),
); );
let exact_word_docids = CboCachedSorter::new( let exact_word_docids = CboCachedSorter::new_in(
capacity,
create_sorter( create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps, MergeDeladdCboRoaringBitmaps,
@ -75,9 +76,9 @@ impl WordDocidsCachedSorters {
max_memory, max_memory,
false, false,
), ),
RefBump::clone(&alloc),
); );
let word_position_docids = CboCachedSorter::new( let word_position_docids = CboCachedSorter::new_in(
capacity,
create_sorter( create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps, MergeDeladdCboRoaringBitmaps,
@ -87,9 +88,9 @@ impl WordDocidsCachedSorters {
max_memory, max_memory,
false, false,
), ),
RefBump::clone(&alloc),
); );
let fid_word_count_docids = CboCachedSorter::new( let fid_word_count_docids = CboCachedSorter::new_in(
capacity,
create_sorter( create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps, MergeDeladdCboRoaringBitmaps,
@ -99,6 +100,7 @@ impl WordDocidsCachedSorters {
max_memory, max_memory,
false, false,
), ),
alloc,
); );
Self { Self {
@ -120,29 +122,29 @@ impl WordDocidsCachedSorters {
exact: bool, exact: bool,
docid: u32, docid: u32,
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
) -> Result<()> { ) {
let key = word.as_bytes(); let key = word.as_bytes();
if exact { if exact {
self.exact_word_docids.insert_add_u32(key, docid)?; self.exact_word_docids.insert_add_u32(key, docid);
} else { } else {
self.word_docids.insert_add_u32(key, docid)?; self.word_docids.insert_add_u32(key, docid);
} }
buffer.clear(); buffer.clear();
buffer.extend_from_slice(word.as_bytes()); buffer.extend_from_slice(word.as_bytes());
buffer.push(0); buffer.push(0);
buffer.extend_from_slice(&field_id.to_be_bytes()); buffer.extend_from_slice(&field_id.to_be_bytes());
self.word_fid_docids.insert_add_u32(buffer, docid)?; self.word_fid_docids.insert_add_u32(buffer, docid);
let position = bucketed_position(position); let position = bucketed_position(position);
buffer.clear(); buffer.clear();
buffer.extend_from_slice(word.as_bytes()); buffer.extend_from_slice(word.as_bytes());
buffer.push(0); buffer.push(0);
buffer.extend_from_slice(&position.to_be_bytes()); buffer.extend_from_slice(&position.to_be_bytes());
self.word_position_docids.insert_add_u32(buffer, docid)?; self.word_position_docids.insert_add_u32(buffer, docid);
if self.current_docid.map_or(false, |id| docid != id) { if self.current_docid.map_or(false, |id| docid != id) {
self.flush_fid_word_count(buffer)?; self.flush_fid_word_count(buffer);
} }
self.fid_word_count self.fid_word_count
@ -150,8 +152,6 @@ impl WordDocidsCachedSorters {
.and_modify(|(_current_count, new_count)| *new_count += 1) .and_modify(|(_current_count, new_count)| *new_count += 1)
.or_insert((0, 1)); .or_insert((0, 1));
self.current_docid = Some(docid); self.current_docid = Some(docid);
Ok(())
} }
fn insert_del_u32( fn insert_del_u32(
@ -162,61 +162,56 @@ impl WordDocidsCachedSorters {
exact: bool, exact: bool,
docid: u32, docid: u32,
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
) -> Result<()> { ) {
let key = word.as_bytes(); let key = word.as_bytes();
if exact { if exact {
self.exact_word_docids.insert_del_u32(key, docid)?; self.exact_word_docids.insert_del_u32(key, docid);
} else { } else {
self.word_docids.insert_del_u32(key, docid)?; self.word_docids.insert_del_u32(key, docid);
} }
buffer.clear(); buffer.clear();
buffer.extend_from_slice(word.as_bytes()); buffer.extend_from_slice(word.as_bytes());
buffer.push(0); buffer.push(0);
buffer.extend_from_slice(&field_id.to_be_bytes()); buffer.extend_from_slice(&field_id.to_be_bytes());
self.word_fid_docids.insert_del_u32(buffer, docid)?; self.word_fid_docids.insert_del_u32(buffer, docid);
let position = bucketed_position(position); let position = bucketed_position(position);
buffer.clear(); buffer.clear();
buffer.extend_from_slice(word.as_bytes()); buffer.extend_from_slice(word.as_bytes());
buffer.push(0); buffer.push(0);
buffer.extend_from_slice(&position.to_be_bytes()); buffer.extend_from_slice(&position.to_be_bytes());
self.word_position_docids.insert_del_u32(buffer, docid)?; self.word_position_docids.insert_del_u32(buffer, docid);
if self.current_docid.map_or(false, |id| docid != id) { if self.current_docid.map_or(false, |id| docid != id) {
self.flush_fid_word_count(buffer)?; self.flush_fid_word_count(buffer);
} }
self.fid_word_count self.fid_word_count
.entry(field_id) .entry(field_id)
.and_modify(|(current_count, _new_count)| *current_count += 1) .and_modify(|(current_count, _new_count)| *current_count += 1)
.or_insert((1, 0)); .or_insert((1, 0));
self.current_docid = Some(docid);
Ok(()) self.current_docid = Some(docid);
} }
fn flush_fid_word_count(&mut self, buffer: &mut Vec<u8>) -> Result<()> { fn flush_fid_word_count(&mut self, buffer: &mut Vec<u8>) {
for (fid, (current_count, new_count)) in self.fid_word_count.drain() { for (fid, (current_count, new_count)) in self.fid_word_count.drain() {
if current_count != new_count { if current_count != new_count {
if current_count <= MAX_COUNTED_WORDS { if current_count <= MAX_COUNTED_WORDS {
buffer.clear(); buffer.clear();
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(current_count as u8); buffer.push(current_count as u8);
self.fid_word_count_docids self.fid_word_count_docids.insert_del_u32(buffer, self.current_docid.unwrap());
.insert_del_u32(buffer, self.current_docid.unwrap())?;
} }
if new_count <= MAX_COUNTED_WORDS { if new_count <= MAX_COUNTED_WORDS {
buffer.clear(); buffer.clear();
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(new_count as u8); buffer.push(new_count as u8);
self.fid_word_count_docids self.fid_word_count_docids.insert_add_u32(buffer, self.current_docid.unwrap());
.insert_add_u32(buffer, self.current_docid.unwrap())?;
} }
} }
} }
Ok(())
} }
} }
@ -312,24 +307,20 @@ pub struct WordDocidsExtractorData<'extractor> {
} }
impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
type Data = FullySend<RefCell<WordDocidsCachedSorters>>; type Data = RefCell<WordDocidsCachedSorters<'extractor>>;
fn init_data( fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data> {
&self, Ok(RefCell::new(WordDocidsCachedSorters::new_in(
_extractor_alloc: raw_collections::alloc::RefBump<'extractor>,
) -> Result<Self::Data> {
Ok(FullySend(RefCell::new(WordDocidsCachedSorters::new(
self.grenad_parameters, self.grenad_parameters,
self.max_memory, self.max_memory,
// TODO use a better value extractor_alloc,
200_000.try_into().unwrap(), )))
))))
} }
fn process( fn process(
&self, &self,
change: DocumentChange, change: DocumentChange,
context: &crate::update::new::indexer::document_changes::DocumentChangeContext<Self::Data>, context: &DocumentChangeContext<Self::Data>,
) -> Result<()> { ) -> Result<()> {
WordDocidsExtractors::extract_document_change(context, self.tokenizer, change) WordDocidsExtractors::extract_document_change(context, self.tokenizer, change)
} }
@ -343,7 +334,7 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
return Ok(()); return Ok(());
} }
let mut data = data.0.borrow_mut(); let mut data = data.borrow_mut();
let WordDocidsCachedSorters { let WordDocidsCachedSorters {
word_fid_docids, word_fid_docids,
word_docids, word_docids,
@ -454,7 +445,7 @@ impl WordDocidsExtractors {
} }
fn extract_document_change( fn extract_document_change(
context: &DocumentChangeContext<FullySend<RefCell<WordDocidsCachedSorters>>>, context: &DocumentChangeContext<RefCell<WordDocidsCachedSorters>>,
document_tokenizer: &DocumentTokenizer, document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {

View File

@ -104,6 +104,8 @@ pub struct FullySend<T>(pub T);
// SAFETY: a type **fully** send is always mostly send as well. // SAFETY: a type **fully** send is always mostly send as well.
unsafe impl<T> MostlySend for FullySend<T> where T: Send {} unsafe impl<T> MostlySend for FullySend<T> where T: Send {}
unsafe impl<T> MostlySend for RefCell<T> where T: MostlySend {}
impl<T> FullySend<T> { impl<T> FullySend<T> {
pub fn into(self) -> T { pub fn into(self) -> T {
self.0 self.0