diff --git a/crates/milli/src/heed_codec/compressed_obkv_codec.rs b/crates/milli/src/heed_codec/compressed_obkv_codec.rs index 711a5c049..c696de871 100644 --- a/crates/milli/src/heed_codec/compressed_obkv_codec.rs +++ b/crates/milli/src/heed_codec/compressed_obkv_codec.rs @@ -19,7 +19,7 @@ impl<'a> heed::BytesDecode<'a> for CompressedObkvCodec { } impl heed::BytesEncode<'_> for CompressedObkvCodec { - type EItem = CompressedKvWriterU16; + type EItem = CompressedObkvU16; fn bytes_encode(item: &Self::EItem) -> Result, BoxedError> { Ok(Cow::Borrowed(&item.0)) @@ -60,7 +60,7 @@ impl<'a> CompressedKvReaderU16<'a> { bump: &'b Bump, dictionary: &DecoderDictionary, ) -> io::Result<&'b KvReaderU16> { - /// TODO use a better approch and stop cloning so much. + /// TODO use a better approach and stop cloning so much. let mut buffer = Vec::new(); self.decompress_with(&mut buffer, dictionary)?; Ok(KvReaderU16::from_slice(bump.alloc_slice_copy(&buffer))) @@ -100,15 +100,19 @@ impl<'a> CompressedKvReaderU16<'a> { } } -pub struct CompressedKvWriterU16(Vec); +pub struct CompressedObkvU16(Vec); -impl CompressedKvWriterU16 { - pub fn new_with_dictionary( +impl CompressedObkvU16 { + pub fn with_dictionary( input: &KvReaderU16, dictionary: &EncoderDictionary, ) -> io::Result { let mut compressor = Compressor::with_prepared_dictionary(dictionary)?; - compressor.compress(input).map(CompressedKvWriterU16) + Self::with_compressor(input, &mut compressor) + } + + pub fn with_compressor(input: &KvReaderU16, compressor: &mut Compressor) -> io::Result { + compressor.compress(input.as_bytes()).map(CompressedObkvU16) } pub fn as_bytes(&self) -> &[u8] { diff --git a/crates/milli/src/heed_codec/mod.rs b/crates/milli/src/heed_codec/mod.rs index 3ce9306dd..7cbce064d 100644 --- a/crates/milli/src/heed_codec/mod.rs +++ b/crates/milli/src/heed_codec/mod.rs @@ -20,7 +20,7 @@ use thiserror::Error; pub use self::beu16_str_codec::BEU16StrCodec; pub use self::beu32_str_codec::BEU32StrCodec; pub use self::compressed_obkv_codec::{ - CompressedKvReaderU16, CompressedKvWriterU16, CompressedObkvCodec, + CompressedKvReaderU16, CompressedObkvCodec, CompressedObkvU16, }; pub use self::field_id_word_count_codec::FieldIdWordCountCodec; pub use self::fst_set_codec::FstSetCodec; diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 3410bca00..25fcfee17 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -28,7 +28,7 @@ pub use self::transform::{Transform, TransformOutput}; use super::new::StdResult; use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; -use crate::heed_codec::{CompressedKvWriterU16, CompressedObkvCodec}; +use crate::heed_codec::{CompressedObkvCodec, CompressedObkvU16}; use crate::index::{PrefixSearch, PrefixSettings}; use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; pub use crate::update::index_documents::helpers::CursorClonableMmap; @@ -771,8 +771,8 @@ where let mut iter = self.index.documents.iter_mut(self.wtxn)?; while let Some(result) = iter.next() { let (docid, document) = result?; - let document = document.as_non_compressed().as_bytes(); - let compressed = CompressedKvWriterU16::new_with_dictionary(document, &dictionary)?; + let document = document.as_non_compressed(); + let compressed = CompressedObkvU16::with_dictionary(document, &dictionary)?; // safety: the compressed document is entirely owned unsafe { iter.put_current_with_options::( diff --git a/crates/milli/src/update/index_documents/typed_chunk.rs b/crates/milli/src/update/index_documents/typed_chunk.rs index 2656839f7..52d6df756 100644 --- a/crates/milli/src/update/index_documents/typed_chunk.rs +++ b/crates/milli/src/update/index_documents/typed_chunk.rs @@ -7,7 +7,7 @@ use bytemuck::allocation::pod_collect_to_vec; use grenad::{MergeFunction, Merger, MergerBuilder}; use heed::types::Bytes; use heed::{BytesDecode, RwTxn}; -use obkv::{KvReader, KvWriter}; +use obkv::{KvReader, KvReaderU16, KvWriter}; use roaring::RoaringBitmap; use super::helpers::{ @@ -17,7 +17,7 @@ use super::helpers::{ }; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::facet::FacetType; -use crate::heed_codec::CompressedKvWriterU16; +use crate::heed_codec::CompressedObkvU16; use crate::index::db_name::DOCUMENTS; use crate::index::IndexEmbeddingConfig; use crate::proximity::MAX_DISTANCE; @@ -213,10 +213,8 @@ pub(crate) fn write_typed_chunk_into_index( let uncompressed_document_bytes = writer.into_inner().unwrap(); match dictionary.as_ref() { Some(dictionary) => { - let compressed = CompressedKvWriterU16::new_with_dictionary( - &uncompressed_document_bytes, - dictionary, - )?; + let doc = KvReaderU16::from_slice(&uncompressed_document_bytes); + let compressed = CompressedObkvU16::with_dictionary(&doc, dictionary)?; db.put(wtxn, &docid, compressed.as_bytes())? } None => db.put(wtxn, &docid, &uncompressed_document_bytes)?, diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index 7590c02ac..4cfb1adb5 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -21,6 +21,7 @@ use super::ref_cell_ext::RefCellExt; use super::thread_local::{FullySend, ThreadLocal}; use super::StdResult; use crate::heed_codec::facet::{FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec}; +use crate::heed_codec::CompressedObkvU16; use crate::index::db_name; use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY}; use crate::update::new::KvReaderFieldId; @@ -825,14 +826,31 @@ impl FieldIdDocidFacetSender<'_, '_> { pub struct DocumentsSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); impl DocumentsSender<'_, '_> { - /// TODO do that efficiently - pub fn uncompressed( + pub fn write_uncompressed( &self, docid: DocumentId, external_id: String, document: &KvReaderFieldId, ) -> crate::Result<()> { - self.0.write_key_value(Database::Documents, &docid.to_be_bytes(), document.as_bytes())?; + self.write_raw(docid, external_id, document.as_bytes()) + } + + pub fn write_compressed( + &self, + docid: DocumentId, + external_id: String, + document: &CompressedObkvU16, + ) -> crate::Result<()> { + self.write_raw(docid, external_id, document.as_bytes()) + } + + fn write_raw( + &self, + docid: DocumentId, + external_id: String, + raw_document_bytes: &[u8], + ) -> crate::Result<()> { + self.0.write_key_value(Database::Documents, &docid.to_be_bytes(), raw_document_bytes)?; self.0.write_key_value( Database::ExternalDocumentsIds, external_id.as_bytes(), diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs index 1ba58ba63..5ac527231 100644 --- a/crates/milli/src/update/new/document.rs +++ b/crates/milli/src/update/new/document.rs @@ -134,6 +134,7 @@ impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> { ) -> Result> { match index.compressed_document(rtxn, docid)? { Some(compressed) => { + /// TODO maybe give the dictionary as a parameter let content = match index.document_decompression_dictionary(rtxn)? { Some(dictionary) => compressed.decompress_into_bump(doc_alloc, &dictionary)?, None => compressed.as_non_compressed(), diff --git a/crates/milli/src/update/new/extract/documents/compression.rs b/crates/milli/src/update/new/extract/documents/compression.rs index a068b7afd..8f84bf42d 100644 --- a/crates/milli/src/update/new/extract/documents/compression.rs +++ b/crates/milli/src/update/new/extract/documents/compression.rs @@ -5,10 +5,9 @@ use bumpalo::Bump; use heed::RwTxn; use rayon::iter::{ParallelBridge, ParallelIterator as _}; use roaring::RoaringBitmap; -use zstd::bulk::Compressor; use zstd::dict::{from_continuous, EncoderDictionary}; -use crate::heed_codec::CompressedKvWriterU16; +use crate::heed_codec::CompressedObkvU16; use crate::update::new::document::Document as _; use crate::update::new::indexer::document_changes::{ DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, @@ -128,7 +127,7 @@ where let compressed_document = index.compressed_document(&rtxn, docid)?.unwrap(); // The documents are not compressed with any dictionary at this point. let document = compressed_document.as_non_compressed(); - let compressed = CompressedKvWriterU16::new_with_dictionary(document, &dictionary)?; + let compressed = CompressedObkvU16::with_dictionary(document, &dictionary)?; Ok((docid, compressed)) as crate::Result<_> }); diff --git a/crates/milli/src/update/new/extract/documents/mod.rs b/crates/milli/src/update/new/extract/documents/mod.rs index 2d65d445e..61fe83662 100644 --- a/crates/milli/src/update/new/extract/documents/mod.rs +++ b/crates/milli/src/update/new/extract/documents/mod.rs @@ -3,8 +3,11 @@ use std::cell::RefCell; use bumpalo::Bump; pub use compression::retrieve_or_compute_document_compression_dictionary; use hashbrown::HashMap; +use zstd::bulk::Compressor; +use zstd::dict::EncoderDictionary; use super::DelAddRoaringBitmap; +use crate::heed_codec::CompressedObkvU16; use crate::update::new::channel::DocumentsSender; use crate::update::new::document::{write_to_obkv, Document as _}; use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor}; @@ -18,26 +21,40 @@ mod compression; pub struct DocumentsExtractor<'a, 'b> { document_sender: DocumentsSender<'a, 'b>, + documents_compression_dictionary: Option<&'a EncoderDictionary<'a>>, embedders: &'a EmbeddingConfigs, } impl<'a, 'b> DocumentsExtractor<'a, 'b> { - pub fn new(document_sender: DocumentsSender<'a, 'b>, embedders: &'a EmbeddingConfigs) -> Self { - Self { document_sender, embedders } + pub fn new( + document_sender: DocumentsSender<'a, 'b>, + documents_compression_dictionary: Option<&'a EncoderDictionary<'a>>, + embedders: &'a EmbeddingConfigs, + ) -> Self { + Self { document_sender, documents_compression_dictionary, embedders } } } -#[derive(Default)] -pub struct DocumentExtractorData { +pub struct DocumentExtractorData<'a> { pub docids_delta: DelAddRoaringBitmap, pub field_distribution_delta: HashMap, + pub documents_compressor: Option>, } impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> { - type Data = FullySend>; + type Data = FullySend>>; fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result { - Ok(FullySend(Default::default())) + let documents_compressor = match self.documents_compression_dictionary { + Some(dictionary) => Some(Compressor::with_prepared_dictionary(dictionary)?), + None => None, + }; + + Ok(FullySend(RefCell::new(DocumentExtractorData { + docids_delta: Default::default(), + field_distribution_delta: Default::default(), + documents_compressor, + }))) } fn process<'doc>( @@ -50,13 +67,13 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> { for change in changes { let change = change?; - // **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop - // Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction) + // **WARNING**: The exclusive borrow on `new_fields_ids_map` needs to be taken + // **inside** of the `for change in changes` loop. Otherwise, + // `BorrowMutError` will occur for document changes that also need + // the new_fields_ids_map (e.g.: UpdateByFunction). let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let external_docid = change.external_docid().to_owned(); - todo!("manage documents compression"); - // document but we need to create a function that collects and compresses documents. match change { DocumentChange::Deletion(deletion) => { @@ -129,7 +146,19 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> { &mut new_fields_ids_map, &mut document_buffer, )?; - self.document_sender.uncompressed(docid, external_docid, content).unwrap(); + + match document_extractor_data.documents_compressor.as_mut() { + Some(compressor) => { + let doc = CompressedObkvU16::with_compressor(content, compressor)?; + self.document_sender + .write_compressed(docid, external_docid, &doc) + .unwrap(); + } + None => self + .document_sender + .write_uncompressed(docid, external_docid, content) + .unwrap(), + } } DocumentChange::Insertion(insertion) => { let docid = insertion.docid(); @@ -153,7 +182,18 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> { &mut document_buffer, )?; document_extractor_data.docids_delta.insert_add_u32(docid); - self.document_sender.uncompressed(docid, external_docid, content).unwrap(); + match document_extractor_data.documents_compressor.as_mut() { + Some(compressor) => { + let doc = CompressedObkvU16::with_compressor(content, compressor)?; + self.document_sender + .write_compressed(docid, external_docid, &doc) + .unwrap(); + } + None => self + .document_sender + .write_uncompressed(docid, external_docid, content) + .unwrap(), + } } } } diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index 4110a2d8f..53fc3a3d8 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -27,6 +27,8 @@ pub struct DocumentChangeContext< /// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents /// inside of the DB. pub db_fields_ids_map: &'indexer FieldsIdsMap, + /// The dictionary used to decompress the documents in the database. + pub db_document_decompression_dictionary: Option<&'indexer DecoderDictionary<'static>>, /// A transaction providing data from the DB before all indexing operations pub rtxn: RoTxn<'indexer>, @@ -62,6 +64,7 @@ impl< pub fn new( index: &'indexer Index, db_fields_ids_map: &'indexer FieldsIdsMap, + db_document_decompression_dictionary: Option<&'indexer DecoderDictionary<'static>>, new_fields_ids_map: &'fid RwLock, extractor_allocs: &'extractor ThreadLocal>, doc_allocs: &'doc ThreadLocal>>, @@ -80,14 +83,13 @@ impl< let fields_ids_map = &fields_ids_map.0; let extractor_alloc = extractor_allocs.get_or_default(); - let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?; - let txn = index.read_txn()?; Ok(DocumentChangeContext { index, - rtxn: txn, + rtxn: index.read_txn()?, db_fields_ids_map, + db_document_decompression_dictionary, new_fields_ids_map: fields_ids_map, doc_alloc, extractor_alloc: &extractor_alloc.0, @@ -239,6 +241,7 @@ where DocumentChangeContext::new( index, db_fields_ids_map, + db_document_decompression_dictionary, new_fields_ids_map, extractor_allocs, doc_allocs, diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index b7626047b..1a9864ae8 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -65,7 +65,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { 'pl: 'doc, // the payload must survive the process calls { let compressed = context.index.compressed_document(&context.rtxn, *docid)?.unwrap(); - let current = match context.index.document_decompression_dictionary(&context.rtxn)? { + let current = match context.db_document_decompression_dictionary { Some(dict) => compressed.decompress_into_bump(&context.doc_alloc, &dict)?, None => compressed.as_non_compressed(), }; @@ -93,7 +93,6 @@ mod test { use std::sync::RwLock; use bumpalo::Bump; - use zstd::dict::DecoderDictionary; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::index::tests::TempIndex; diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 5e0231aba..cc58d8264 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -165,7 +165,7 @@ where // document but we need to create a function that collects and compresses documents. let document_sender = extractor_sender.documents(); - let document_extractor = DocumentsExtractor::new(document_sender, embedders); + let document_extractor = DocumentsExtractor::new(document_sender, document_compression_dictionary.as_ref(), embedders); let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); { let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); diff --git a/crates/milli/src/update/new/indexer/update_by_function.rs b/crates/milli/src/update/new/indexer/update_by_function.rs index d6e442665..4ddc87a8b 100644 --- a/crates/milli/src/update/new/indexer/update_by_function.rs +++ b/crates/milli/src/update/new/indexer/update_by_function.rs @@ -95,6 +95,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { let DocumentChangeContext { index, db_fields_ids_map, + db_document_decompression_dictionary, rtxn: txn, new_fields_ids_map, doc_alloc, @@ -106,7 +107,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { // safety: Both documents *must* exists in the database as // their IDs comes from the list of documents ids. let compressed_document = index.compressed_document(txn, docid)?.unwrap(); - let document = match index.document_decompression_dictionary(txn)? { + let document = match db_document_decompression_dictionary { Some(dictionary) => compressed_document.decompress_into_bump(doc_alloc, &dictionary)?, None => compressed_document.as_non_compressed(), };