Compress and send compressed documents to the writer

This commit is contained in:
Clément Renault 2024-12-17 14:52:10 +01:00
parent a466cf4f2c
commit b7ae720a7e
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
12 changed files with 104 additions and 41 deletions

View File

@ -19,7 +19,7 @@ impl<'a> heed::BytesDecode<'a> for CompressedObkvCodec {
}
impl heed::BytesEncode<'_> for CompressedObkvCodec {
type EItem = CompressedKvWriterU16;
type EItem = CompressedObkvU16;
fn bytes_encode(item: &Self::EItem) -> Result<Cow<[u8]>, BoxedError> {
Ok(Cow::Borrowed(&item.0))
@ -60,7 +60,7 @@ impl<'a> CompressedKvReaderU16<'a> {
bump: &'b Bump,
dictionary: &DecoderDictionary,
) -> io::Result<&'b KvReaderU16> {
/// TODO use a better approch and stop cloning so much.
/// TODO use a better approach and stop cloning so much.
let mut buffer = Vec::new();
self.decompress_with(&mut buffer, dictionary)?;
Ok(KvReaderU16::from_slice(bump.alloc_slice_copy(&buffer)))
@ -100,15 +100,19 @@ impl<'a> CompressedKvReaderU16<'a> {
}
}
pub struct CompressedKvWriterU16(Vec<u8>);
pub struct CompressedObkvU16(Vec<u8>);
impl CompressedKvWriterU16 {
pub fn new_with_dictionary(
impl CompressedObkvU16 {
pub fn with_dictionary(
input: &KvReaderU16,
dictionary: &EncoderDictionary,
) -> io::Result<Self> {
let mut compressor = Compressor::with_prepared_dictionary(dictionary)?;
compressor.compress(input).map(CompressedKvWriterU16)
Self::with_compressor(input, &mut compressor)
}
pub fn with_compressor(input: &KvReaderU16, compressor: &mut Compressor) -> io::Result<Self> {
compressor.compress(input.as_bytes()).map(CompressedObkvU16)
}
pub fn as_bytes(&self) -> &[u8] {

View File

@ -20,7 +20,7 @@ use thiserror::Error;
pub use self::beu16_str_codec::BEU16StrCodec;
pub use self::beu32_str_codec::BEU32StrCodec;
pub use self::compressed_obkv_codec::{
CompressedKvReaderU16, CompressedKvWriterU16, CompressedObkvCodec,
CompressedKvReaderU16, CompressedObkvCodec, CompressedObkvU16,
};
pub use self::field_id_word_count_codec::FieldIdWordCountCodec;
pub use self::fst_set_codec::FstSetCodec;

View File

@ -28,7 +28,7 @@ pub use self::transform::{Transform, TransformOutput};
use super::new::StdResult;
use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError, UserError};
use crate::heed_codec::{CompressedKvWriterU16, CompressedObkvCodec};
use crate::heed_codec::{CompressedObkvCodec, CompressedObkvU16};
use crate::index::{PrefixSearch, PrefixSettings};
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
pub use crate::update::index_documents::helpers::CursorClonableMmap;
@ -771,8 +771,8 @@ where
let mut iter = self.index.documents.iter_mut(self.wtxn)?;
while let Some(result) = iter.next() {
let (docid, document) = result?;
let document = document.as_non_compressed().as_bytes();
let compressed = CompressedKvWriterU16::new_with_dictionary(document, &dictionary)?;
let document = document.as_non_compressed();
let compressed = CompressedObkvU16::with_dictionary(document, &dictionary)?;
// safety: the compressed document is entirely owned
unsafe {
iter.put_current_with_options::<CompressedObkvCodec>(

View File

@ -7,7 +7,7 @@ use bytemuck::allocation::pod_collect_to_vec;
use grenad::{MergeFunction, Merger, MergerBuilder};
use heed::types::Bytes;
use heed::{BytesDecode, RwTxn};
use obkv::{KvReader, KvWriter};
use obkv::{KvReader, KvReaderU16, KvWriter};
use roaring::RoaringBitmap;
use super::helpers::{
@ -17,7 +17,7 @@ use super::helpers::{
};
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
use crate::facet::FacetType;
use crate::heed_codec::CompressedKvWriterU16;
use crate::heed_codec::CompressedObkvU16;
use crate::index::db_name::DOCUMENTS;
use crate::index::IndexEmbeddingConfig;
use crate::proximity::MAX_DISTANCE;
@ -213,10 +213,8 @@ pub(crate) fn write_typed_chunk_into_index(
let uncompressed_document_bytes = writer.into_inner().unwrap();
match dictionary.as_ref() {
Some(dictionary) => {
let compressed = CompressedKvWriterU16::new_with_dictionary(
&uncompressed_document_bytes,
dictionary,
)?;
let doc = KvReaderU16::from_slice(&uncompressed_document_bytes);
let compressed = CompressedObkvU16::with_dictionary(&doc, dictionary)?;
db.put(wtxn, &docid, compressed.as_bytes())?
}
None => db.put(wtxn, &docid, &uncompressed_document_bytes)?,

View File

@ -21,6 +21,7 @@ use super::ref_cell_ext::RefCellExt;
use super::thread_local::{FullySend, ThreadLocal};
use super::StdResult;
use crate::heed_codec::facet::{FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec};
use crate::heed_codec::CompressedObkvU16;
use crate::index::db_name;
use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY};
use crate::update::new::KvReaderFieldId;
@ -825,14 +826,31 @@ impl FieldIdDocidFacetSender<'_, '_> {
pub struct DocumentsSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
impl DocumentsSender<'_, '_> {
/// TODO do that efficiently
pub fn uncompressed(
pub fn write_uncompressed(
&self,
docid: DocumentId,
external_id: String,
document: &KvReaderFieldId,
) -> crate::Result<()> {
self.0.write_key_value(Database::Documents, &docid.to_be_bytes(), document.as_bytes())?;
self.write_raw(docid, external_id, document.as_bytes())
}
pub fn write_compressed(
&self,
docid: DocumentId,
external_id: String,
document: &CompressedObkvU16,
) -> crate::Result<()> {
self.write_raw(docid, external_id, document.as_bytes())
}
fn write_raw(
&self,
docid: DocumentId,
external_id: String,
raw_document_bytes: &[u8],
) -> crate::Result<()> {
self.0.write_key_value(Database::Documents, &docid.to_be_bytes(), raw_document_bytes)?;
self.0.write_key_value(
Database::ExternalDocumentsIds,
external_id.as_bytes(),

View File

@ -134,6 +134,7 @@ impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> {
) -> Result<Option<Self>> {
match index.compressed_document(rtxn, docid)? {
Some(compressed) => {
/// TODO maybe give the dictionary as a parameter
let content = match index.document_decompression_dictionary(rtxn)? {
Some(dictionary) => compressed.decompress_into_bump(doc_alloc, &dictionary)?,
None => compressed.as_non_compressed(),

View File

@ -5,10 +5,9 @@ use bumpalo::Bump;
use heed::RwTxn;
use rayon::iter::{ParallelBridge, ParallelIterator as _};
use roaring::RoaringBitmap;
use zstd::bulk::Compressor;
use zstd::dict::{from_continuous, EncoderDictionary};
use crate::heed_codec::CompressedKvWriterU16;
use crate::heed_codec::CompressedObkvU16;
use crate::update::new::document::Document as _;
use crate::update::new::indexer::document_changes::{
DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
@ -128,7 +127,7 @@ where
let compressed_document = index.compressed_document(&rtxn, docid)?.unwrap();
// The documents are not compressed with any dictionary at this point.
let document = compressed_document.as_non_compressed();
let compressed = CompressedKvWriterU16::new_with_dictionary(document, &dictionary)?;
let compressed = CompressedObkvU16::with_dictionary(document, &dictionary)?;
Ok((docid, compressed)) as crate::Result<_>
});

View File

@ -3,8 +3,11 @@ use std::cell::RefCell;
use bumpalo::Bump;
pub use compression::retrieve_or_compute_document_compression_dictionary;
use hashbrown::HashMap;
use zstd::bulk::Compressor;
use zstd::dict::EncoderDictionary;
use super::DelAddRoaringBitmap;
use crate::heed_codec::CompressedObkvU16;
use crate::update::new::channel::DocumentsSender;
use crate::update::new::document::{write_to_obkv, Document as _};
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
@ -18,26 +21,40 @@ mod compression;
pub struct DocumentsExtractor<'a, 'b> {
document_sender: DocumentsSender<'a, 'b>,
documents_compression_dictionary: Option<&'a EncoderDictionary<'a>>,
embedders: &'a EmbeddingConfigs,
}
impl<'a, 'b> DocumentsExtractor<'a, 'b> {
pub fn new(document_sender: DocumentsSender<'a, 'b>, embedders: &'a EmbeddingConfigs) -> Self {
Self { document_sender, embedders }
pub fn new(
document_sender: DocumentsSender<'a, 'b>,
documents_compression_dictionary: Option<&'a EncoderDictionary<'a>>,
embedders: &'a EmbeddingConfigs,
) -> Self {
Self { document_sender, documents_compression_dictionary, embedders }
}
}
#[derive(Default)]
pub struct DocumentExtractorData {
pub struct DocumentExtractorData<'a> {
pub docids_delta: DelAddRoaringBitmap,
pub field_distribution_delta: HashMap<String, i64>,
pub documents_compressor: Option<Compressor<'a>>,
}
impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
type Data = FullySend<RefCell<DocumentExtractorData>>;
type Data = FullySend<RefCell<DocumentExtractorData<'a>>>;
fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(FullySend(Default::default()))
let documents_compressor = match self.documents_compression_dictionary {
Some(dictionary) => Some(Compressor::with_prepared_dictionary(dictionary)?),
None => None,
};
Ok(FullySend(RefCell::new(DocumentExtractorData {
docids_delta: Default::default(),
field_distribution_delta: Default::default(),
documents_compressor,
})))
}
fn process<'doc>(
@ -50,13 +67,13 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
for change in changes {
let change = change?;
// **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop
// Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction)
// **WARNING**: The exclusive borrow on `new_fields_ids_map` needs to be taken
// **inside** of the `for change in changes` loop. Otherwise,
// `BorrowMutError` will occur for document changes that also need
// the new_fields_ids_map (e.g.: UpdateByFunction).
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let external_docid = change.external_docid().to_owned();
todo!("manage documents compression");
// document but we need to create a function that collects and compresses documents.
match change {
DocumentChange::Deletion(deletion) => {
@ -129,7 +146,19 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
&mut new_fields_ids_map,
&mut document_buffer,
)?;
self.document_sender.uncompressed(docid, external_docid, content).unwrap();
match document_extractor_data.documents_compressor.as_mut() {
Some(compressor) => {
let doc = CompressedObkvU16::with_compressor(content, compressor)?;
self.document_sender
.write_compressed(docid, external_docid, &doc)
.unwrap();
}
None => self
.document_sender
.write_uncompressed(docid, external_docid, content)
.unwrap(),
}
}
DocumentChange::Insertion(insertion) => {
let docid = insertion.docid();
@ -153,7 +182,18 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
&mut document_buffer,
)?;
document_extractor_data.docids_delta.insert_add_u32(docid);
self.document_sender.uncompressed(docid, external_docid, content).unwrap();
match document_extractor_data.documents_compressor.as_mut() {
Some(compressor) => {
let doc = CompressedObkvU16::with_compressor(content, compressor)?;
self.document_sender
.write_compressed(docid, external_docid, &doc)
.unwrap();
}
None => self
.document_sender
.write_uncompressed(docid, external_docid, content)
.unwrap(),
}
}
}
}

View File

@ -27,6 +27,8 @@ pub struct DocumentChangeContext<
/// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents
/// inside of the DB.
pub db_fields_ids_map: &'indexer FieldsIdsMap,
/// The dictionary used to decompress the documents in the database.
pub db_document_decompression_dictionary: Option<&'indexer DecoderDictionary<'static>>,
/// A transaction providing data from the DB before all indexing operations
pub rtxn: RoTxn<'indexer>,
@ -62,6 +64,7 @@ impl<
pub fn new<F>(
index: &'indexer Index,
db_fields_ids_map: &'indexer FieldsIdsMap,
db_document_decompression_dictionary: Option<&'indexer DecoderDictionary<'static>>,
new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
extractor_allocs: &'extractor ThreadLocal<FullySend<Bump>>,
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
@ -80,14 +83,13 @@ impl<
let fields_ids_map = &fields_ids_map.0;
let extractor_alloc = extractor_allocs.get_or_default();
let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?;
let txn = index.read_txn()?;
Ok(DocumentChangeContext {
index,
rtxn: txn,
rtxn: index.read_txn()?,
db_fields_ids_map,
db_document_decompression_dictionary,
new_fields_ids_map: fields_ids_map,
doc_alloc,
extractor_alloc: &extractor_alloc.0,
@ -239,6 +241,7 @@ where
DocumentChangeContext::new(
index,
db_fields_ids_map,
db_document_decompression_dictionary,
new_fields_ids_map,
extractor_allocs,
doc_allocs,

View File

@ -65,7 +65,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
'pl: 'doc, // the payload must survive the process calls
{
let compressed = context.index.compressed_document(&context.rtxn, *docid)?.unwrap();
let current = match context.index.document_decompression_dictionary(&context.rtxn)? {
let current = match context.db_document_decompression_dictionary {
Some(dict) => compressed.decompress_into_bump(&context.doc_alloc, &dict)?,
None => compressed.as_non_compressed(),
};
@ -93,7 +93,6 @@ mod test {
use std::sync::RwLock;
use bumpalo::Bump;
use zstd::dict::DecoderDictionary;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::index::tests::TempIndex;

View File

@ -165,7 +165,7 @@ where
// document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(document_sender, embedders);
let document_extractor = DocumentsExtractor::new(document_sender, document_compression_dictionary.as_ref(), embedders);
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
{
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents");

View File

@ -95,6 +95,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
let DocumentChangeContext {
index,
db_fields_ids_map,
db_document_decompression_dictionary,
rtxn: txn,
new_fields_ids_map,
doc_alloc,
@ -106,7 +107,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
// safety: Both documents *must* exists in the database as
// their IDs comes from the list of documents ids.
let compressed_document = index.compressed_document(txn, docid)?.unwrap();
let document = match index.document_decompression_dictionary(txn)? {
let document = match db_document_decompression_dictionary {
Some(dictionary) => compressed_document.decompress_into_bump(doc_alloc, &dictionary)?,
None => compressed_document.as_non_compressed(),
};