Allocate the decompressed documents in the extractor allocator

This commit is contained in:
Kerollmops 2024-12-12 14:32:31 +01:00 committed by Clément Renault
parent 409f17b1a0
commit b7a9dbfdf8
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
18 changed files with 295 additions and 58 deletions

View File

@ -2,6 +2,7 @@ use std::borrow::Cow;
use std::io;
use std::io::ErrorKind;
use bumpalo::Bump;
use heed::BoxedError;
use obkv::KvReaderU16;
use zstd::bulk::{Compressor, Decompressor};
@ -54,6 +55,17 @@ impl<'a> CompressedKvReaderU16<'a> {
Ok(KvReaderU16::from_slice(&buffer[..size]))
}
pub fn decompress_into_bump<'b>(
&self,
bump: &'b Bump,
dictionary: &DecoderDictionary,
) -> io::Result<&'b KvReaderU16> {
/// TODO use a better approch and stop cloning so much.
let mut buffer = Vec::new();
self.decompress_with(&mut buffer, dictionary)?;
Ok(KvReaderU16::from_slice(bump.alloc_slice_copy(&buffer)))
}
/// Returns the KvReader like it is not compressed.
/// Happends when there is no dictionary yet.
pub fn as_non_compressed(&self) -> &'a KvReaderU16 {
@ -75,11 +87,16 @@ impl<'a> CompressedKvReaderU16<'a> {
}
}
pub fn decompress_as_owned_with_optinal_dictionary(
pub fn into_owned_with_dictionary(
&self,
dictionary: Option<&DecoderDictionary>,
) -> io::Result<Cow<'a, KvReaderU16>> {
todo!("Impl owned version of KvReader")
dictionary: &DecoderDictionary<'_>,
) -> io::Result<Box<KvReaderU16>> {
let mut buffer = Vec::new();
let reader = self.decompress_with(&mut buffer, dictionary)?;
// Make sure the Vec is exactly the size of the reader
let size = reader.as_bytes().len();
buffer.resize(size, 0);
Ok(buffer.into_boxed_slice().into())
}
}

View File

@ -1311,11 +1311,8 @@ impl Index {
&self,
rtxn: &'t RoTxn,
id: DocumentId,
) -> Result<CompressedKvReaderU16<'t>> {
self.documents
.get(rtxn, &id)?
.ok_or(UserError::UnknownInternalDocumentId { document_id: id })
.map_err(Into::into)
) -> Result<Option<CompressedKvReaderU16<'t>>> {
self.documents.get(rtxn, &id).map_err(Into::into)
}
/// Returns an iterator over the requested compressed documents. The next item will be an error if a document is missing.
@ -1324,9 +1321,11 @@ impl Index {
rtxn: &'t RoTxn<'t>,
ids: impl IntoIterator<Item = DocumentId> + 'a,
) -> Result<impl Iterator<Item = Result<(DocumentId, CompressedKvReaderU16<'t>)>> + 'a> {
Ok(ids
.into_iter()
.map(move |id| self.compressed_document(rtxn, id).map(|compressed| (id, compressed))))
Ok(ids.into_iter().flat_map(move |id| {
self.compressed_document(rtxn, id)
.map(|opt| opt.map(|compressed| (id, compressed)))
.transpose()
}))
}
/// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing.

View File

@ -1,6 +1,6 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
use bumpalo::Bump;
use bumparaw_collections::RawMap;
use heed::RoTxn;
use rustc_hash::FxBuildHasher;
@ -48,15 +48,24 @@ pub trait Document<'doc> {
fn geo_field(&self) -> Result<Option<&'doc RawValue>>;
}
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct DocumentFromDb<'t, Mapper: FieldIdMapper>
where
Mapper: FieldIdMapper,
{
fields_ids_map: &'t Mapper,
content: Cow<'t, KvReaderFieldId>,
content: &'t KvReaderFieldId,
}
impl<'t, Mapper: FieldIdMapper> Clone for DocumentFromDb<'t, Mapper> {
#[inline]
fn clone(&self) -> Self {
*self
}
}
impl<'t, Mapper: FieldIdMapper> Copy for DocumentFromDb<'t, Mapper> {}
impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> {
fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'t str, &'t RawValue)>> {
let mut it = self.content.iter();
@ -121,10 +130,18 @@ impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> {
rtxn: &'t RoTxn,
index: &'t Index,
db_fields_ids_map: &'t Mapper,
doc_alloc: &'t Bump,
) -> Result<Option<Self>> {
index.documents.get(rtxn, &docid).map_err(crate::Error::from).map(|reader| {
reader.map(|reader| Self { fields_ids_map: db_fields_ids_map, content: reader })
})
match index.compressed_document(rtxn, docid)? {
Some(compressed) => {
let content = match index.document_decompression_dictionary(rtxn)? {
Some(dictionary) => compressed.decompress_into_bump(doc_alloc, &dictionary)?,
None => compressed.as_non_compressed(),
};
Ok(Some(Self { fields_ids_map: db_fields_ids_map, content }))
}
None => Ok(None),
}
}
pub fn field(&self, name: &str) -> Result<Option<&'t RawValue>> {
@ -188,9 +205,10 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
rtxn: &'t RoTxn,
index: &'t Index,
db_fields_ids_map: &'t Mapper,
doc_alloc: &'t Bump,
new_doc: DocumentFromVersions<'a, 'doc>,
) -> Result<Self> {
let db = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?;
let db = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map, doc_alloc)?;
Ok(Self { new_doc, db })
}
@ -233,9 +251,10 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
return Ok(Some(vectors));
}
let Some(db) = self.db else { return Ok(None) };
db.vectors_field()
match &self.db {
Some(db) => db.vectors_field(),
None => Ok(None),
}
}
fn geo_field(&self) -> Result<Option<&'d RawValue>> {
@ -243,9 +262,10 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
return Ok(Some(geo));
}
let Some(db) = self.db else { return Ok(None) };
db.geo_field()
match &self.db {
Some(db) => db.geo_field(),
None => Ok(None),
}
}
fn top_level_fields_count(&self) -> usize {
@ -256,7 +276,7 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
if let Some(f) = self.new_doc.top_level_field(k)? {
return Ok(Some(f));
}
if let Some(db) = self.db {
if let Some(db) = &self.db {
return db.field(k);
}
Ok(None)

View File

@ -72,8 +72,9 @@ impl<'doc> Deletion<'doc> {
rtxn: &'a RoTxn,
index: &'a Index,
mapper: &'a Mapper,
doc_alloc: &'a Bump,
) -> Result<DocumentFromDb<'a, Mapper>> {
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or(
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper, doc_alloc)?.ok_or(
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
)?)
}
@ -91,6 +92,7 @@ impl<'doc> Insertion<'doc> {
pub fn external_document_id(&self) -> &'doc str {
self.external_document_id
}
pub fn inserted(&self) -> DocumentFromVersions<'_, 'doc> {
DocumentFromVersions::new(&self.new)
}
@ -126,8 +128,9 @@ impl<'doc> Update<'doc> {
rtxn: &'a RoTxn,
index: &'a Index,
mapper: &'a Mapper,
doc_alloc: &'a Bump,
) -> Result<DocumentFromDb<'a, Mapper>> {
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or(
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper, doc_alloc)?.ok_or(
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
)?)
}
@ -153,6 +156,7 @@ impl<'doc> Update<'doc> {
rtxn: &'t RoTxn,
index: &'t Index,
mapper: &'t Mapper,
doc_alloc: &'t Bump,
) -> Result<MergedDocument<'_, 'doc, 't, Mapper>> {
if self.has_deletion {
Ok(MergedDocument::without_db(DocumentFromVersions::new(&self.new)))
@ -162,6 +166,7 @@ impl<'doc> Update<'doc> {
rtxn,
index,
mapper,
doc_alloc,
DocumentFromVersions::new(&self.new),
)
}
@ -177,6 +182,7 @@ impl<'doc> Update<'doc> {
rtxn: &'t RoTxn,
index: &'t Index,
mapper: &'t Mapper,
doc_alloc: &'t Bump,
) -> Result<bool> {
let mut changed = false;
let mut cached_current = None;
@ -192,7 +198,7 @@ impl<'doc> Update<'doc> {
updated_selected_field_count += 1;
let current = match cached_current {
Some(current) => current,
None => self.current(rtxn, index, mapper)?,
None => self.current(rtxn, index, mapper, doc_alloc)?,
};
let current_value = current.top_level_field(key)?;
let Some(current_value) = current_value else {
@ -222,7 +228,7 @@ impl<'doc> Update<'doc> {
let has_deleted_fields = {
let current = match cached_current {
Some(current) => current,
None => self.current(rtxn, index, mapper)?,
None => self.current(rtxn, index, mapper, doc_alloc)?,
};
let mut current_selected_field_count = 0;

View File

@ -63,6 +63,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
&context.rtxn,
context.index,
&context.db_fields_ids_map,
&context.doc_alloc,
)?;
let geo_iter =
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
@ -79,8 +80,12 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
}
DocumentChange::Update(update) => {
let docid = update.docid();
let content =
update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
let content = update.current(
&context.rtxn,
context.index,
&context.db_fields_ids_map,
&context.doc_alloc,
)?;
let geo_iter =
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
for res in content.iter_top_level_fields().chain(geo_iter) {
@ -103,8 +108,12 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
*entry += 1;
}
let content =
update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?;
let content = update.merged(
&context.rtxn,
context.index,
&context.db_fields_ids_map,
&context.doc_alloc,
)?;
let vector_content = update.merged_vectors(
&context.rtxn,
context.index,

View File

@ -79,7 +79,7 @@ impl FacetedDocidsExtractor {
let res = match document_change {
DocumentChange::Deletion(inner) => extract_document_facets(
attributes_to_extract,
inner.current(rtxn, index, context.db_fields_ids_map)?,
inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(),
&mut |fid, depth, value| {
@ -102,13 +102,14 @@ impl FacetedDocidsExtractor {
rtxn,
index,
context.db_fields_ids_map,
&context.doc_alloc,
)? {
return Ok(());
}
extract_document_facets(
attributes_to_extract,
inner.current(rtxn, index, context.db_fields_ids_map)?,
inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(),
&mut |fid, depth, value| {
@ -128,7 +129,7 @@ impl FacetedDocidsExtractor {
extract_document_facets(
attributes_to_extract,
inner.merged(rtxn, index, context.db_fields_ids_map)?,
inner.merged(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(),
&mut |fid, depth, value| {

View File

@ -158,6 +158,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
let index = context.index;
let max_memory = self.grenad_parameters.max_memory_by_thread();
let db_fields_ids_map = context.db_fields_ids_map;
let doc_alloc = &context.doc_alloc;
let mut data_ref = context.data.borrow_mut_or_yield();
for change in changes {
@ -173,7 +174,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
let external_id = deletion.external_document_id();
let current = deletion.current(rtxn, index, db_fields_ids_map)?;
let current = deletion.current(rtxn, index, db_fields_ids_map, doc_alloc)?;
let current_geo = current
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
@ -188,7 +189,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
}
}
DocumentChange::Update(update) => {
let current = update.current(rtxn, index, db_fields_ids_map)?;
let current = update.current(rtxn, index, db_fields_ids_map, doc_alloc)?;
let external_id = update.external_document_id();
let docid = update.docid();

View File

@ -339,7 +339,7 @@ impl WordDocidsExtractors {
)
};
document_tokenizer.tokenize_document(
inner.current(rtxn, index, context.db_fields_ids_map)?,
inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?,
new_fields_ids_map,
&mut token_fn,
)?;
@ -350,6 +350,7 @@ impl WordDocidsExtractors {
&context.rtxn,
context.index,
context.db_fields_ids_map,
&context.doc_alloc,
)? {
return Ok(());
}
@ -365,7 +366,7 @@ impl WordDocidsExtractors {
)
};
document_tokenizer.tokenize_document(
inner.current(rtxn, index, context.db_fields_ids_map)?,
inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?,
new_fields_ids_map,
&mut token_fn,
)?;
@ -381,7 +382,7 @@ impl WordDocidsExtractors {
)
};
document_tokenizer.tokenize_document(
inner.merged(rtxn, index, context.db_fields_ids_map)?,
inner.merged(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?,
new_fields_ids_map,
&mut token_fn,
)?;

View File

@ -58,7 +58,8 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
let docid = document_change.docid();
match document_change {
DocumentChange::Deletion(inner) => {
let document = inner.current(rtxn, index, context.db_fields_ids_map)?;
let document =
inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?;
process_document_tokens(
document,
document_tokenizer,
@ -75,11 +76,13 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
rtxn,
index,
context.db_fields_ids_map,
&context.doc_alloc,
)? {
return Ok(());
}
let document = inner.current(rtxn, index, context.db_fields_ids_map)?;
let document =
inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?;
process_document_tokens(
document,
document_tokenizer,
@ -89,7 +92,8 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
del_word_pair_proximity.push(((w1, w2), prox));
},
)?;
let document = inner.merged(rtxn, index, context.db_fields_ids_map)?;
let document =
inner.merged(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?;
process_document_tokens(
document,
document_tokenizer,

View File

@ -135,6 +135,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
&context.rtxn,
context.index,
context.db_fields_ids_map,
&context.doc_alloc,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
@ -145,6 +146,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
&context.rtxn,
context.index,
context.db_fields_ids_map,
&context.doc_alloc,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
@ -165,6 +167,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
&context.rtxn,
context.index,
context.db_fields_ids_map,
&context.doc_alloc,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
@ -175,6 +178,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
&context.rtxn,
context.index,
context.db_fields_ids_map,
&context.doc_alloc,
)?,
context.new_fields_ids_map,
&context.doc_alloc,

View File

@ -5,6 +5,7 @@ use std::sync::{Arc, RwLock};
use bumpalo::Bump;
use heed::RoTxn;
use rayon::iter::IndexedParallelIterator;
use zstd::dict::DecoderDictionary;
use super::super::document_change::DocumentChange;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
@ -105,7 +106,7 @@ pub trait Extractor<'extractor>: Sync {
fn process<'doc>(
&'doc self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &'doc DocumentChangeContext<Self::Data>,
context: &'doc DocumentChangeContext<'_, 'extractor, '_, '_, Self::Data>,
) -> Result<()>;
}
@ -121,8 +122,10 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload
self.len() == 0
}
fn item_to_document_change<'doc, // lifetime of a single `process` call
T: MostlySend>(
fn item_to_document_change<
'doc, // lifetime of a single `process` call
T: MostlySend,
>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
item: &'doc Self::Item,
@ -140,6 +143,7 @@ pub struct IndexingContext<
{
pub index: &'index Index,
pub db_fields_ids_map: &'indexer FieldsIdsMap,
pub db_document_decompression_dictionary: Option<&'indexer DecoderDictionary<'static>>,
pub new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
pub doc_allocs: &'indexer ThreadLocal<FullySend<Cell<Bump>>>,
pub fields_ids_map_store: &'indexer ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
@ -202,6 +206,7 @@ pub fn extract<
IndexingContext {
index,
db_fields_ids_map,
db_document_decompression_dictionary,
new_fields_ids_map,
doc_allocs,
fields_ids_map_store,

View File

@ -64,7 +64,11 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
where
'pl: 'doc, // the payload must survive the process calls
{
let current = context.index.document(&context.rtxn, *docid)?;
let compressed = context.index.compressed_document(&context.rtxn, *docid)?.unwrap();
let current = match context.index.document_decompression_dictionary(&context.rtxn)? {
Some(dict) => compressed.decompress_into_bump(&context.doc_alloc, &dict)?,
None => compressed.as_non_compressed(),
};
let external_document_id = self.primary_key.extract_docid_from_db(
current,

View File

@ -1,11 +1,15 @@
use std::cell::RefCell;
use std::cmp::Ordering;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{self, AtomicBool, AtomicUsize};
use std::sync::{OnceLock, RwLock};
use std::thread::{self, Builder};
use big_s::S;
use bumpalo::Bump;
use bumparaw_collections::RawMap;
use document_changes::{extract, DocumentChanges, IndexingContext};
use document_changes::{
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
};
pub use document_deletion::DocumentDeletion;
pub use document_operation::{DocumentOperation, PayloadStats};
use hashbrown::HashMap;
@ -17,18 +21,20 @@ use rand::SeedableRng as _;
use rustc_hash::FxBuildHasher;
use time::OffsetDateTime;
pub use update_by_function::UpdateByFunction;
use zstd::dict::{DecoderDictionary, EncoderDictionary};
use super::channel::*;
use super::document::Document as _;
use super::extract::*;
use super::facet_search_builder::FacetSearchBuilder;
use super::merger::FacetFieldIdsDelta;
use super::steps::IndexingStep;
use super::thread_local::ThreadLocal;
use super::thread_local::{FullySend, MostlySend, ThreadLocal};
use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
use super::words_prefix_docids::{
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
};
use super::StdResult;
use super::{DocumentChange, StdResult};
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::facet::FacetType;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
@ -38,6 +44,7 @@ use crate::proximity::ProximityPrecision;
use crate::update::del_add::DelAdd;
use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::InnerIndexSettings;
@ -111,6 +118,9 @@ where
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap();
let db_document_decompression_dictionary = index
.document_compression_raw_dictionary(wtxn)
.map(|opt| opt.map(DecoderDictionary::copy))?;
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
@ -121,6 +131,7 @@ where
let indexing_context = IndexingContext {
index,
db_fields_ids_map,
db_document_decompression_dictionary: db_document_decompression_dictionary.as_ref(),
new_fields_ids_map: &new_fields_ids_map,
doc_allocs: &doc_allocs,
fields_ids_map_store: &fields_ids_map_store,
@ -128,6 +139,19 @@ where
progress,
};
let document_compression_dictionary = pool
.install(|| {
let rtxn = index.read_txn()?;
compute_document_compression_dictionary(
index,
&rtxn,
document_changes,
indexing_context,
&mut extractor_allocs,
)
})
.unwrap()?;
let mut index_embeddings = index.embedding_configs(wtxn)?;
let mut field_distribution = index.field_distribution(wtxn)?;
let mut document_ids = index.documents_ids(wtxn)?;
@ -429,7 +453,7 @@ where
while let Some(action) = writer_receiver.recv_action() {
if _entered_post_merge.is_none()
&& finished_extraction.load(std::sync::atomic::Ordering::Relaxed)
&& finished_extraction.load(atomic::Ordering::Relaxed)
{
_entered_post_merge = Some(span.enter());
}
@ -550,6 +574,141 @@ where
Ok(())
}
/// The compression level to use when compressing documents.
const DOCUMENT_COMPRESSION_LEVEL: i32 = 19;
/// The sample size used to generate the document compression dictionary.
const DOCUMENT_COMPRESSION_SAMPLE_SIZE: usize = 10_000;
/// The maximum size the document compression dictionary can be.
const DOCUMENT_COMPRESSION_DICTIONARY_MAX_SIZE: usize = 64_000;
/// The maximum number of documents we accept to compress if they
/// weren't already compressed in the database. If this threshold
/// is reached we do not generate a dictionary and continue as is.
const DOCUMENT_COMPRESSION_COMPRESS_LIMIT: u64 = 5_000_000;
/// A function dedicated to use the existing or generate an appropriate
/// document compression dictionay based on the documents available in
/// the database and the ones in the payload.
///
/// If there are too many documents already in the database and no
/// compression dictionary we prefer not to generate a dictionary to avoid
/// compressing all of the documents and potentially blow up disk space.
fn compute_document_compression_dictionary<'pl, 'extractor, DC, MSP, SP>(
index: &Index,
rtxn: &RoTxn<'_>,
document_changes: &DC,
indexing_context: IndexingContext<MSP, SP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Option<EncoderDictionary<'static>>>
where
DC: DocumentChanges<'pl>,
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync,
{
match index.document_compression_raw_dictionary(rtxn)? {
Some(dict) => Ok(Some(EncoderDictionary::copy(dict, DOCUMENT_COMPRESSION_LEVEL))),
None if index.number_of_documents(rtxn)? >= DOCUMENT_COMPRESSION_COMPRESS_LIMIT => Ok(None),
None => {
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
let extractor = CompressorExtractor {
total_documents_to_extract: DOCUMENT_COMPRESSION_SAMPLE_SIZE,
extracted_documents_count: AtomicUsize::new(0),
};
todo!("collect the documents samples from the database first (or after)");
// This extraction only takes care about documents replacement
// and not update (merges). The merged documents are ignore as
// we will only use the previous version of them in the database.
extract(
document_changes,
&extractor,
indexing_context,
extractor_allocs,
&datastore,
Step::PreparingCompressionDictionary,
)?;
let mut sample_data = Vec::new();
let mut sample_sizes = Vec::new();
for data in datastore {
let CompressorExtractorData { buffer, must_stop: _ } = data.into_inner();
let mut subsample_size = 0;
for subsample in buffer {
sample_data.extend_from_slice(subsample);
subsample_size += subsample.len();
}
sample_sizes.push(subsample_size);
}
let dictionary = zstd::dict::from_continuous(
&sample_data,
&sample_sizes,
DOCUMENT_COMPRESSION_DICTIONARY_MAX_SIZE,
)?;
Ok(Some(EncoderDictionary::copy(&dictionary, DOCUMENT_COMPRESSION_LEVEL)))
}
}
}
struct CompressorExtractor {
total_documents_to_extract: usize,
extracted_documents_count: AtomicUsize,
}
#[derive(Default)]
struct CompressorExtractorData<'extractor> {
buffer: Vec<&'extractor [u8]>,
/// We extracted the expected count of documents, we can skip everything now.
must_stop: bool,
}
unsafe impl<'extractor> MostlySend for RefCell<CompressorExtractorData<'extractor>> {}
impl<'extractor> Extractor<'extractor> for CompressorExtractor {
type Data = RefCell<CompressorExtractorData<'extractor>>;
fn init_data<'doc>(
&'doc self,
_extractor_alloc: &'extractor bumpalo::Bump,
) -> crate::Result<Self::Data> {
Ok(RefCell::new(CompressorExtractorData::default()))
}
fn process<'doc>(
&'doc self,
changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>,
context: &'doc DocumentChangeContext<'_, 'extractor, '_, '_, Self::Data>,
) -> crate::Result<()> {
let mut data = context.data.borrow_mut_or_yield();
for change in changes {
if data.must_stop {
return Ok(());
}
let change = change?;
match change {
DocumentChange::Deletion(_) => (),
DocumentChange::Update(_) => (),
DocumentChange::Insertion(insertion) => {
for result in insertion.inserted().iter_top_level_fields() {
let (_field_name, raw_value) = result?;
let bytes = raw_value.get().as_bytes();
data.buffer.push(context.extractor_alloc.alloc_slice_copy(bytes));
}
let previous_count =
self.extracted_documents_count.fetch_add(1, atomic::Ordering::SeqCst);
data.must_stop = previous_count >= self.total_documents_to_extract;
}
}
}
Ok(())
}
}
/// A function dedicated to manage all the available BBQueue frames.
///
/// It reads all the available frames, do the corresponding database operations

View File

@ -105,7 +105,11 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
// safety: Both documents *must* exists in the database as
// their IDs comes from the list of documents ids.
let document = index.document(txn, docid)?;
let compressed_document = index.compressed_document(txn, docid)?.unwrap();
let document = match index.document_decompression_dictionary(txn)? {
Some(dictionary) => compressed_document.decompress_into_bump(doc_alloc, &dictionary)?,
None => compressed_document.as_non_compressed(),
};
let rhai_document = obkv_to_rhaimap(document, db_fields_ids_map)?;
let json_document = all_obkv_to_json(document, db_fields_ids_map)?;

View File

@ -3,9 +3,9 @@ use std::sync::Arc;
use rayon::iter::ParallelIterator;
pub trait ParallelIteratorExt: ParallelIterator {
/// A method to run a closure of all the items and return an owned error.
/// A method to run a closure on all the items and return an owned error.
///
/// The init function is ran only as necessary which is basically once by thread.
/// The init function is ran only as necessary which is basically once per thread.
fn try_arc_for_each_try_init<F, INIT, T, E>(self, init: INIT, op: F) -> Result<(), E>
where
E: Send + Sync,

View File

@ -8,6 +8,7 @@ use crate::progress::Step;
#[repr(u8)]
pub enum IndexingStep {
PreparingPayloads,
PreparingCompressionDictionary,
ExtractingDocuments,
ExtractingFacets,
ExtractingWords,
@ -26,6 +27,7 @@ impl Step for IndexingStep {
fn name(&self) -> Cow<'static, str> {
match self {
IndexingStep::PreparingPayloads => "preparing update file",
IndexingStep::PreparingCompressionDictionary => "preparing documents compression dictionary",
IndexingStep::ExtractingDocuments => "extracting documents",
IndexingStep::ExtractingFacets => "extracting facets",
IndexingStep::ExtractingWords => "extracting words",

View File

@ -98,7 +98,8 @@ impl<'t> VectorDocumentFromDb<'t> {
db_fields_ids_map: &'t Mapper,
doc_alloc: &'t Bump,
) -> Result<Option<Self>> {
let Some(document) = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)? else {
let Some(document) = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map, doc_alloc)?
else {
return Ok(None);
};
let vectors = document.vectors_field()?;