Fetch the compression dictionary only once to decompress documents

This commit is contained in:
Clément Renault 2024-12-17 16:56:54 +01:00
parent c1dd489adc
commit 7d75988a53
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
9 changed files with 138 additions and 26 deletions

View File

@ -5,6 +5,7 @@ use bumparaw_collections::RawMap;
use heed::RoTxn;
use rustc_hash::FxBuildHasher;
use serde_json::value::RawValue;
use zstd::dict::DecoderDictionary;
use super::vector_document::VectorDocument;
use super::{KvReaderFieldId, KvWriterFieldId};
@ -130,12 +131,12 @@ impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> {
rtxn: &'t RoTxn,
index: &'t Index,
db_fields_ids_map: &'t Mapper,
db_document_decompression_dictionary: Option<&DecoderDictionary<'static>>,
doc_alloc: &'t Bump,
) -> Result<Option<Self>> {
match index.compressed_document(rtxn, docid)? {
Some(compressed) => {
/// TODO maybe give the dictionary as a parameter
let content = match index.document_decompression_dictionary(rtxn)? {
let content = match db_document_decompression_dictionary {
Some(dictionary) => compressed.decompress_into_bump(doc_alloc, &dictionary)?,
None => compressed.as_non_compressed(),
};
@ -206,10 +207,18 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
rtxn: &'t RoTxn,
index: &'t Index,
db_fields_ids_map: &'t Mapper,
db_document_decompression_dictionary: Option<&'t DecoderDictionary<'static>>,
doc_alloc: &'t Bump,
new_doc: DocumentFromVersions<'a, 'doc>,
) -> Result<Self> {
let db = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map, doc_alloc)?;
let db = DocumentFromDb::new(
docid,
rtxn,
index,
db_fields_ids_map,
db_document_decompression_dictionary,
doc_alloc,
)?;
Ok(Self { new_doc, db })
}

View File

@ -1,5 +1,6 @@
use bumpalo::Bump;
use heed::RoTxn;
use zstd::dict::DecoderDictionary;
use super::document::{
Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions,
@ -72,9 +73,10 @@ impl<'doc> Deletion<'doc> {
rtxn: &'a RoTxn,
index: &'a Index,
mapper: &'a Mapper,
dictionary: Option<&'a DecoderDictionary<'static>>,
doc_alloc: &'a Bump,
) -> Result<DocumentFromDb<'a, Mapper>> {
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper, doc_alloc)?.ok_or(
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper, dictionary, doc_alloc)?.ok_or(
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
)?)
}
@ -128,9 +130,10 @@ impl<'doc> Update<'doc> {
rtxn: &'a RoTxn,
index: &'a Index,
mapper: &'a Mapper,
dictionary: Option<&'a DecoderDictionary<'static>>,
doc_alloc: &'a Bump,
) -> Result<DocumentFromDb<'a, Mapper>> {
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper, doc_alloc)?.ok_or(
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper, dictionary, doc_alloc)?.ok_or(
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
)?)
}
@ -140,11 +143,13 @@ impl<'doc> Update<'doc> {
rtxn: &'a RoTxn,
index: &'a Index,
mapper: &'a Mapper,
dictionary: Option<&'a DecoderDictionary<'static>>,
doc_alloc: &'a Bump,
) -> Result<VectorDocumentFromDb<'a>> {
Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, doc_alloc)?.ok_or(
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
)?)
Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, dictionary, doc_alloc)?
.ok_or(crate::error::UserError::UnknownInternalDocumentId {
document_id: self.docid,
})?)
}
pub fn updated(&self) -> DocumentFromVersions<'_, 'doc> {
@ -156,6 +161,7 @@ impl<'doc> Update<'doc> {
rtxn: &'t RoTxn,
index: &'t Index,
mapper: &'t Mapper,
dictionary: Option<&'t DecoderDictionary<'static>>,
doc_alloc: &'t Bump,
) -> Result<MergedDocument<'_, 'doc, 't, Mapper>> {
if self.has_deletion {
@ -166,6 +172,7 @@ impl<'doc> Update<'doc> {
rtxn,
index,
mapper,
dictionary,
doc_alloc,
DocumentFromVersions::new(&self.new),
)
@ -182,6 +189,7 @@ impl<'doc> Update<'doc> {
rtxn: &'t RoTxn,
index: &'t Index,
mapper: &'t Mapper,
dictionary: Option<&'t DecoderDictionary<'static>>,
doc_alloc: &'t Bump,
) -> Result<bool> {
let mut changed = false;
@ -198,7 +206,7 @@ impl<'doc> Update<'doc> {
updated_selected_field_count += 1;
let current = match cached_current {
Some(current) => current,
None => self.current(rtxn, index, mapper, doc_alloc)?,
None => self.current(rtxn, index, mapper, dictionary, doc_alloc)?,
};
let current_value = current.top_level_field(key)?;
let Some(current_value) = current_value else {
@ -228,7 +236,7 @@ impl<'doc> Update<'doc> {
let has_deleted_fields = {
let current = match cached_current {
Some(current) => current,
None => self.current(rtxn, index, mapper, doc_alloc)?,
None => self.current(rtxn, index, mapper, dictionary, doc_alloc)?,
};
let mut current_selected_field_count = 0;
@ -260,6 +268,7 @@ impl<'doc> Update<'doc> {
rtxn: &'doc RoTxn,
index: &'doc Index,
mapper: &'doc Mapper,
dictionary: Option<&'doc DecoderDictionary<'static>>,
doc_alloc: &'doc Bump,
embedders: &'doc EmbeddingConfigs,
) -> Result<Option<MergedVectorDocument<'doc>>> {
@ -277,6 +286,7 @@ impl<'doc> Update<'doc> {
index,
rtxn,
mapper,
dictionary,
&self.new,
doc_alloc,
embedders,

View File

@ -82,6 +82,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
&context.rtxn,
context.index,
&context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?;
let geo_iter =
@ -103,6 +104,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
&context.rtxn,
context.index,
&context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?;
let geo_iter =
@ -131,12 +133,14 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
&context.rtxn,
context.index,
&context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?;
let vector_content = update.merged_vectors(
&context.rtxn,
context.index,
&context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
self.embedders,
)?;

View File

@ -79,7 +79,13 @@ impl FacetedDocidsExtractor {
let res = match document_change {
DocumentChange::Deletion(inner) => extract_document_facets(
attributes_to_extract,
inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?,
inner.current(
rtxn,
index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(),
&mut |fid, depth, value| {
@ -102,6 +108,7 @@ impl FacetedDocidsExtractor {
rtxn,
index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)? {
return Ok(());
@ -109,7 +116,13 @@ impl FacetedDocidsExtractor {
extract_document_facets(
attributes_to_extract,
inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?,
inner.current(
rtxn,
index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(),
&mut |fid, depth, value| {
@ -129,7 +142,13 @@ impl FacetedDocidsExtractor {
extract_document_facets(
attributes_to_extract,
inner.merged(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?,
inner.merged(
rtxn,
index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(),
&mut |fid, depth, value| {

View File

@ -158,6 +158,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
let index = context.index;
let max_memory = self.grenad_parameters.max_memory_by_thread();
let db_fields_ids_map = context.db_fields_ids_map;
let db_document_decompression_dictionary = context.db_document_decompression_dictionary;
let doc_alloc = &context.doc_alloc;
let mut data_ref = context.data.borrow_mut_or_yield();
@ -174,7 +175,13 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
let external_id = deletion.external_document_id();
let current = deletion.current(rtxn, index, db_fields_ids_map, doc_alloc)?;
let current = deletion.current(
rtxn,
index,
db_fields_ids_map,
db_document_decompression_dictionary,
doc_alloc,
)?;
let current_geo = current
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
@ -189,7 +196,13 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
}
}
DocumentChange::Update(update) => {
let current = update.current(rtxn, index, db_fields_ids_map, doc_alloc)?;
let current = update.current(
rtxn,
index,
db_fields_ids_map,
db_document_decompression_dictionary,
doc_alloc,
)?;
let external_id = update.external_document_id();
let docid = update.docid();

View File

@ -339,7 +339,13 @@ impl WordDocidsExtractors {
)
};
document_tokenizer.tokenize_document(
inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?,
inner.current(
rtxn,
index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?,
new_fields_ids_map,
&mut token_fn,
)?;
@ -350,6 +356,7 @@ impl WordDocidsExtractors {
&context.rtxn,
context.index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)? {
return Ok(());
@ -366,7 +373,13 @@ impl WordDocidsExtractors {
)
};
document_tokenizer.tokenize_document(
inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?,
inner.current(
rtxn,
index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?,
new_fields_ids_map,
&mut token_fn,
)?;
@ -382,7 +395,13 @@ impl WordDocidsExtractors {
)
};
document_tokenizer.tokenize_document(
inner.merged(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?,
inner.merged(
rtxn,
index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?,
new_fields_ids_map,
&mut token_fn,
)?;

View File

@ -58,8 +58,13 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
let docid = document_change.docid();
match document_change {
DocumentChange::Deletion(inner) => {
let document =
inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?;
let document = inner.current(
rtxn,
index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?;
process_document_tokens(
document,
document_tokenizer,
@ -76,13 +81,19 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
rtxn,
index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)? {
return Ok(());
}
let document =
inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?;
let document = inner.current(
rtxn,
index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?;
process_document_tokens(
document,
document_tokenizer,
@ -92,8 +103,13 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
del_word_pair_proximity.push(((w1, w2), prox));
},
)?;
let document =
inner.merged(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?;
let document = inner.merged(
rtxn,
index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?;
process_document_tokens(
document,
document_tokenizer,

View File

@ -97,6 +97,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
&context.rtxn,
context.index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?;
let new_vectors = update.updated_vectors(&context.doc_alloc, self.embedders)?;
@ -135,6 +136,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
&context.rtxn,
context.index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?,
context.new_fields_ids_map,
@ -146,6 +148,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
&context.rtxn,
context.index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?,
context.new_fields_ids_map,
@ -167,6 +170,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
&context.rtxn,
context.index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?,
context.new_fields_ids_map,
@ -178,6 +182,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
&context.rtxn,
context.index,
context.db_fields_ids_map,
context.db_document_decompression_dictionary,
&context.doc_alloc,
)?,
context.new_fields_ids_map,

View File

@ -7,6 +7,7 @@ use heed::RoTxn;
use rustc_hash::FxBuildHasher;
use serde::Serialize;
use serde_json::value::RawValue;
use zstd::dict::DecoderDictionary;
use super::document::{Document, DocumentFromDb, DocumentFromVersions, Versions};
use super::indexer::de::DeserrRawValue;
@ -96,9 +97,17 @@ impl<'t> VectorDocumentFromDb<'t> {
index: &'t Index,
rtxn: &'t RoTxn,
db_fields_ids_map: &'t Mapper,
db_document_decompression_dictionary: Option<&'t DecoderDictionary<'static>>,
doc_alloc: &'t Bump,
) -> Result<Option<Self>> {
let Some(document) = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map, doc_alloc)?
let Some(document) = DocumentFromDb::new(
docid,
rtxn,
index,
db_fields_ids_map,
db_document_decompression_dictionary,
doc_alloc,
)?
else {
return Ok(None);
};
@ -283,11 +292,19 @@ impl<'doc> MergedVectorDocument<'doc> {
index: &'doc Index,
rtxn: &'doc RoTxn,
db_fields_ids_map: &'doc Mapper,
db_document_decompression_dictionary: Option<&'doc DecoderDictionary<'static>>,
versions: &Versions<'doc>,
doc_alloc: &'doc Bump,
embedders: &'doc EmbeddingConfigs,
) -> Result<Option<Self>> {
let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?;
let db = VectorDocumentFromDb::new(
docid,
index,
rtxn,
db_fields_ids_map,
db_document_decompression_dictionary,
doc_alloc,
)?;
let new_doc =
VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)?;
Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) })