From bf5d9f68fa0781d146fc7834f6cfc47faa73b64b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 2 Jul 2024 12:11:26 +0200 Subject: [PATCH] First version compressing the documents --- milli/src/heed_codec/compressed_obkv_codec.rs | 18 ++++--- milli/src/heed_codec/mod.rs | 2 +- milli/src/index.rs | 9 +++- milli/src/update/clear_documents.rs | 1 + milli/src/update/index_documents/mod.rs | 51 +++++++++++++++++-- .../src/update/index_documents/typed_chunk.rs | 14 ++++- 6 files changed, 81 insertions(+), 14 deletions(-) diff --git a/milli/src/heed_codec/compressed_obkv_codec.rs b/milli/src/heed_codec/compressed_obkv_codec.rs index 2c0b6d197..f9a1d0966 100644 --- a/milli/src/heed_codec/compressed_obkv_codec.rs +++ b/milli/src/heed_codec/compressed_obkv_codec.rs @@ -3,9 +3,9 @@ use std::borrow::Cow; use heed::BoxedError; use obkv::KvReaderU16; -pub struct ObkvCompressedCodec; +pub struct CompressedObkvCodec; -impl<'a> heed::BytesDecode<'a> for ObkvCompressedCodec { +impl<'a> heed::BytesDecode<'a> for CompressedObkvCodec { type DItem = CompressedKvReaderU16<'a>; fn bytes_decode(bytes: &'a [u8]) -> Result { @@ -13,7 +13,7 @@ impl<'a> heed::BytesDecode<'a> for ObkvCompressedCodec { } } -impl heed::BytesEncode<'_> for ObkvCompressedCodec { +impl heed::BytesEncode<'_> for CompressedObkvCodec { type EItem = CompressedKvWriterU16; fn bytes_encode(item: &Self::EItem) -> Result, BoxedError> { @@ -30,8 +30,10 @@ impl<'a> CompressedKvReaderU16<'a> { buffer: &'b mut Vec, dictionnary: &[u8], ) -> Result, lz4_flex::block::DecompressError> { - let max_size = lz4_flex::block::get_maximum_output_size(self.0.len()); + // TODO WHAT THE HECK!!! WHY DO I NEED TO INCREASE THE SIZE PROVIDED + let max_size = lz4_flex::block::get_maximum_output_size(self.0.len()) * 2; buffer.resize(max_size, 0); + // TODO loop to increase the buffer size of need be let size = lz4_flex::block::decompress_into_with_dict( self.0, &mut buffer[..max_size], @@ -50,7 +52,11 @@ pub struct CompressedKvWriterU16(Vec); impl CompressedKvWriterU16 { // TODO ask for a KvReaderU16 here - pub fn new_with_dictionnary(writer: &[u8], dictionnary: &[u8]) -> Self { - CompressedKvWriterU16(lz4_flex::block::compress_with_dict(writer, dictionnary)) + pub fn new_with_dictionary(writer: &[u8], dictionary: &[u8]) -> Self { + CompressedKvWriterU16(lz4_flex::block::compress_with_dict(writer, dictionary)) + } + + pub fn as_bytes(&self) -> &[u8] { + &self.0 } } diff --git a/milli/src/heed_codec/mod.rs b/milli/src/heed_codec/mod.rs index 7956cf9c2..a221a0340 100644 --- a/milli/src/heed_codec/mod.rs +++ b/milli/src/heed_codec/mod.rs @@ -21,7 +21,7 @@ use thiserror::Error; pub use self::beu16_str_codec::BEU16StrCodec; pub use self::beu32_str_codec::BEU32StrCodec; pub use self::compressed_obkv_codec::{ - CompressedKvReaderU16, CompressedKvWriterU16, ObkvCompressedCodec, + CompressedKvReaderU16, CompressedKvWriterU16, CompressedObkvCodec, }; pub use self::field_id_word_count_codec::FieldIdWordCountCodec; pub use self::fst_set_codec::FstSetCodec; diff --git a/milli/src/index.rs b/milli/src/index.rs index 919fdf852..8600940af 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -20,7 +20,7 @@ use crate::heed_codec::facet::{ FieldIdCodec, OrderedF64Codec, }; use crate::heed_codec::{ - BEU16StrCodec, CompressedKvReaderU16, FstSetCodec, ObkvCompressedCodec, ScriptLanguageCodec, + BEU16StrCodec, CompressedKvReaderU16, CompressedObkvCodec, FstSetCodec, ScriptLanguageCodec, StrBEU16Codec, StrRefCodec, }; use crate::order_by_map::OrderByMap; @@ -174,7 +174,7 @@ pub struct Index { pub vector_arroy: arroy::Database, /// Maps the document id to the document as an obkv store. - pub(crate) documents: Database, + pub(crate) documents: Database, } impl Index { @@ -356,6 +356,11 @@ impl Index { ) } + /// Deletes the document compression dictionary. + pub fn delete_document_compression_dictionary(&self, wtxn: &mut RwTxn) -> heed::Result { + self.main.remap_key_type::().delete(wtxn, main_key::DOCUMENT_COMPRESSION_DICTIONARY) + } + /// Returns the optional dictionnary to be used when reading the OBKV documents. pub fn document_compression_dictionary<'t>( &self, diff --git a/milli/src/update/clear_documents.rs b/milli/src/update/clear_documents.rs index 9eca378a5..1f777f65e 100644 --- a/milli/src/update/clear_documents.rs +++ b/milli/src/update/clear_documents.rs @@ -63,6 +63,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> { self.index.put_field_distribution(self.wtxn, &FieldDistribution::default())?; self.index.delete_geo_rtree(self.wtxn)?; self.index.delete_geo_faceted_documents_ids(self.wtxn)?; + self.index.delete_document_compression_dictionary(self.wtxn)?; // Remove all user-provided bits from the configs let mut configs = self.index.embedding_configs(self.wtxn)?; diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 1eb8f121a..0fe333e8b 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -13,8 +13,8 @@ use std::sync::Arc; use crossbeam_channel::{Receiver, Sender}; use grenad::{Merger, MergerBuilder}; -use heed::types::Str; -use heed::Database; +use heed::types::{Bytes, Str}; +use heed::{Database, PutFlags}; use rand::SeedableRng; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; @@ -34,6 +34,7 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; +use crate::heed_codec::{CompressedKvWriterU16, CompressedObkvCodec}; use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ @@ -266,7 +267,7 @@ where target = "indexing::details", name = "index_documents_raw" )] - pub fn execute_raw(self, output: TransformOutput) -> Result + pub fn execute_raw(mut self, output: TransformOutput) -> Result where FP: Fn(UpdateIndexingStep) + Sync, FA: Fn() -> bool + Sync, @@ -565,6 +566,14 @@ where word_fid_docids.map(MergerBuilder::build), )?; + // TODO increase this number to 10k and put it in a const somewhere + // I don't like that this dangerous condition is here... + if number_of_documents > 1_000 + && self.index.document_compression_dictionary(self.wtxn)?.is_none() + { + self.manage_compression_dictionary()?; + } + Ok(number_of_documents) } @@ -575,7 +584,7 @@ where name = "index_documents_prefix_databases" )] pub fn execute_prefix_databases( - self, + &mut self, word_docids: Option>, exact_word_docids: Option>, word_position_docids: Option>, @@ -747,6 +756,40 @@ where Ok(()) } + + /// Computes a new dictionay and compress the documents with it in the database. + /// + /// Documents still need to be directly compressed when being written in the database and a dictionary exists. + #[tracing::instrument( + level = "trace", + skip_all, + target = "indexing::compression", + name = "compress_documents_database" + )] + pub fn manage_compression_dictionary(&mut self) -> Result<()> { + // TODO This is a dumb dictionary, just so you get the idea. + // We need to compute a better one by using zstd or something else. + let dictionary = b"movietraileradventurehorror"; + self.index.put_document_compression_dictionary(self.wtxn, dictionary)?; + + // TODO do not remap types here but rather expose the &[u8] for the KvReaderU16 + let mut iter = self.index.documents.remap_data_type::().iter_mut(self.wtxn)?; + while let Some(result) = iter.next() { + let (docid, document) = result?; + // TODO manage this unwrap correctly + let compressed = CompressedKvWriterU16::new_with_dictionary(document, dictionary); + // safety the compressed document is entirely owned + unsafe { + iter.put_current_with_options::( + PutFlags::empty(), + &docid, + &compressed, + )?; + } + } + + Ok(()) + } } /// Run the word prefix docids update operation. diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index c5cf35ca8..5dcd70226 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -19,6 +19,7 @@ use super::helpers::{ use super::MergeFn; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::facet::FacetType; +use crate::heed_codec::CompressedKvWriterU16; use crate::index::db_name::DOCUMENTS; use crate::index::IndexEmbeddingConfig; use crate::proximity::MAX_DISTANCE; @@ -162,6 +163,7 @@ pub(crate) fn write_typed_chunk_into_index( .into_iter() .map(|IndexEmbeddingConfig { name, .. }| name) .collect(); + let dictionary = index.document_compression_dictionary(wtxn)?.map(Vec::from); let mut vectors_buffer = Vec::new(); while let Some((key, reader)) = iter.next()? { let mut writer: KvWriter<_, FieldId> = KvWriter::memory(); @@ -211,7 +213,17 @@ pub(crate) fn write_typed_chunk_into_index( let db = index.documents.remap_data_type::(); if !writer.is_empty() { - db.put(wtxn, &docid, &writer.into_inner().unwrap())?; + let uncompressed_document_bytes = writer.into_inner().unwrap(); + match dictionary.as_ref() { + Some(dictionary) => { + let compressed = CompressedKvWriterU16::new_with_dictionary( + &uncompressed_document_bytes, + dictionary, + ); + db.put(wtxn, &docid, compressed.as_bytes())? + } + None => db.put(wtxn, &docid, &uncompressed_document_bytes)?, + } operations.push(DocumentOperation { external_id: external_id.to_string(), internal_id: docid,