First version compressing the documents

This commit is contained in:
Clément Renault 2024-07-02 12:11:26 +02:00
parent e9d6b4222b
commit bf5d9f68fa
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
6 changed files with 81 additions and 14 deletions

View File

@ -3,9 +3,9 @@ use std::borrow::Cow;
use heed::BoxedError; use heed::BoxedError;
use obkv::KvReaderU16; use obkv::KvReaderU16;
pub struct ObkvCompressedCodec; pub struct CompressedObkvCodec;
impl<'a> heed::BytesDecode<'a> for ObkvCompressedCodec { impl<'a> heed::BytesDecode<'a> for CompressedObkvCodec {
type DItem = CompressedKvReaderU16<'a>; type DItem = CompressedKvReaderU16<'a>;
fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, BoxedError> { fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, BoxedError> {
@ -13,7 +13,7 @@ impl<'a> heed::BytesDecode<'a> for ObkvCompressedCodec {
} }
} }
impl heed::BytesEncode<'_> for ObkvCompressedCodec { impl heed::BytesEncode<'_> for CompressedObkvCodec {
type EItem = CompressedKvWriterU16; type EItem = CompressedKvWriterU16;
fn bytes_encode(item: &Self::EItem) -> Result<Cow<[u8]>, BoxedError> { fn bytes_encode(item: &Self::EItem) -> Result<Cow<[u8]>, BoxedError> {
@ -30,8 +30,10 @@ impl<'a> CompressedKvReaderU16<'a> {
buffer: &'b mut Vec<u8>, buffer: &'b mut Vec<u8>,
dictionnary: &[u8], dictionnary: &[u8],
) -> Result<KvReaderU16<'b>, lz4_flex::block::DecompressError> { ) -> Result<KvReaderU16<'b>, lz4_flex::block::DecompressError> {
let max_size = lz4_flex::block::get_maximum_output_size(self.0.len()); // TODO WHAT THE HECK!!! WHY DO I NEED TO INCREASE THE SIZE PROVIDED
let max_size = lz4_flex::block::get_maximum_output_size(self.0.len()) * 2;
buffer.resize(max_size, 0); buffer.resize(max_size, 0);
// TODO loop to increase the buffer size of need be
let size = lz4_flex::block::decompress_into_with_dict( let size = lz4_flex::block::decompress_into_with_dict(
self.0, self.0,
&mut buffer[..max_size], &mut buffer[..max_size],
@ -50,7 +52,11 @@ pub struct CompressedKvWriterU16(Vec<u8>);
impl CompressedKvWriterU16 { impl CompressedKvWriterU16 {
// TODO ask for a KvReaderU16 here // TODO ask for a KvReaderU16 here
pub fn new_with_dictionnary(writer: &[u8], dictionnary: &[u8]) -> Self { pub fn new_with_dictionary(writer: &[u8], dictionary: &[u8]) -> Self {
CompressedKvWriterU16(lz4_flex::block::compress_with_dict(writer, dictionnary)) CompressedKvWriterU16(lz4_flex::block::compress_with_dict(writer, dictionary))
}
pub fn as_bytes(&self) -> &[u8] {
&self.0
} }
} }

View File

@ -21,7 +21,7 @@ use thiserror::Error;
pub use self::beu16_str_codec::BEU16StrCodec; pub use self::beu16_str_codec::BEU16StrCodec;
pub use self::beu32_str_codec::BEU32StrCodec; pub use self::beu32_str_codec::BEU32StrCodec;
pub use self::compressed_obkv_codec::{ pub use self::compressed_obkv_codec::{
CompressedKvReaderU16, CompressedKvWriterU16, ObkvCompressedCodec, CompressedKvReaderU16, CompressedKvWriterU16, CompressedObkvCodec,
}; };
pub use self::field_id_word_count_codec::FieldIdWordCountCodec; pub use self::field_id_word_count_codec::FieldIdWordCountCodec;
pub use self::fst_set_codec::FstSetCodec; pub use self::fst_set_codec::FstSetCodec;

View File

@ -20,7 +20,7 @@ use crate::heed_codec::facet::{
FieldIdCodec, OrderedF64Codec, FieldIdCodec, OrderedF64Codec,
}; };
use crate::heed_codec::{ use crate::heed_codec::{
BEU16StrCodec, CompressedKvReaderU16, FstSetCodec, ObkvCompressedCodec, ScriptLanguageCodec, BEU16StrCodec, CompressedKvReaderU16, CompressedObkvCodec, FstSetCodec, ScriptLanguageCodec,
StrBEU16Codec, StrRefCodec, StrBEU16Codec, StrRefCodec,
}; };
use crate::order_by_map::OrderByMap; use crate::order_by_map::OrderByMap;
@ -174,7 +174,7 @@ pub struct Index {
pub vector_arroy: arroy::Database<arroy::distances::Angular>, pub vector_arroy: arroy::Database<arroy::distances::Angular>,
/// Maps the document id to the document as an obkv store. /// Maps the document id to the document as an obkv store.
pub(crate) documents: Database<BEU32, ObkvCompressedCodec>, pub(crate) documents: Database<BEU32, CompressedObkvCodec>,
} }
impl Index { impl Index {
@ -356,6 +356,11 @@ impl Index {
) )
} }
/// Deletes the document compression dictionary.
pub fn delete_document_compression_dictionary(&self, wtxn: &mut RwTxn) -> heed::Result<bool> {
self.main.remap_key_type::<Str>().delete(wtxn, main_key::DOCUMENT_COMPRESSION_DICTIONARY)
}
/// Returns the optional dictionnary to be used when reading the OBKV documents. /// Returns the optional dictionnary to be used when reading the OBKV documents.
pub fn document_compression_dictionary<'t>( pub fn document_compression_dictionary<'t>(
&self, &self,

View File

@ -63,6 +63,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
self.index.put_field_distribution(self.wtxn, &FieldDistribution::default())?; self.index.put_field_distribution(self.wtxn, &FieldDistribution::default())?;
self.index.delete_geo_rtree(self.wtxn)?; self.index.delete_geo_rtree(self.wtxn)?;
self.index.delete_geo_faceted_documents_ids(self.wtxn)?; self.index.delete_geo_faceted_documents_ids(self.wtxn)?;
self.index.delete_document_compression_dictionary(self.wtxn)?;
// Remove all user-provided bits from the configs // Remove all user-provided bits from the configs
let mut configs = self.index.embedding_configs(self.wtxn)?; let mut configs = self.index.embedding_configs(self.wtxn)?;

View File

@ -13,8 +13,8 @@ use std::sync::Arc;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use grenad::{Merger, MergerBuilder}; use grenad::{Merger, MergerBuilder};
use heed::types::Str; use heed::types::{Bytes, Str};
use heed::Database; use heed::{Database, PutFlags};
use rand::SeedableRng; use rand::SeedableRng;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -34,6 +34,7 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput}; pub use self::transform::{Transform, TransformOutput};
use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError, UserError}; use crate::error::{Error, InternalError, UserError};
use crate::heed_codec::{CompressedKvWriterU16, CompressedObkvCodec};
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
pub use crate::update::index_documents::helpers::CursorClonableMmap; pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{ use crate::update::{
@ -266,7 +267,7 @@ where
target = "indexing::details", target = "indexing::details",
name = "index_documents_raw" name = "index_documents_raw"
)] )]
pub fn execute_raw(self, output: TransformOutput) -> Result<u64> pub fn execute_raw(mut self, output: TransformOutput) -> Result<u64>
where where
FP: Fn(UpdateIndexingStep) + Sync, FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync, FA: Fn() -> bool + Sync,
@ -565,6 +566,14 @@ where
word_fid_docids.map(MergerBuilder::build), word_fid_docids.map(MergerBuilder::build),
)?; )?;
// TODO increase this number to 10k and put it in a const somewhere
// I don't like that this dangerous condition is here...
if number_of_documents > 1_000
&& self.index.document_compression_dictionary(self.wtxn)?.is_none()
{
self.manage_compression_dictionary()?;
}
Ok(number_of_documents) Ok(number_of_documents)
} }
@ -575,7 +584,7 @@ where
name = "index_documents_prefix_databases" name = "index_documents_prefix_databases"
)] )]
pub fn execute_prefix_databases( pub fn execute_prefix_databases(
self, &mut self,
word_docids: Option<Merger<CursorClonableMmap, MergeFn>>, word_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
exact_word_docids: Option<Merger<CursorClonableMmap, MergeFn>>, exact_word_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
word_position_docids: Option<Merger<CursorClonableMmap, MergeFn>>, word_position_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
@ -747,6 +756,40 @@ where
Ok(()) Ok(())
} }
/// Computes a new dictionay and compress the documents with it in the database.
///
/// Documents still need to be directly compressed when being written in the database and a dictionary exists.
#[tracing::instrument(
level = "trace",
skip_all,
target = "indexing::compression",
name = "compress_documents_database"
)]
pub fn manage_compression_dictionary(&mut self) -> Result<()> {
// TODO This is a dumb dictionary, just so you get the idea.
// We need to compute a better one by using zstd or something else.
let dictionary = b"movietraileradventurehorror";
self.index.put_document_compression_dictionary(self.wtxn, dictionary)?;
// TODO do not remap types here but rather expose the &[u8] for the KvReaderU16
let mut iter = self.index.documents.remap_data_type::<Bytes>().iter_mut(self.wtxn)?;
while let Some(result) = iter.next() {
let (docid, document) = result?;
// TODO manage this unwrap correctly
let compressed = CompressedKvWriterU16::new_with_dictionary(document, dictionary);
// safety the compressed document is entirely owned
unsafe {
iter.put_current_with_options::<CompressedObkvCodec>(
PutFlags::empty(),
&docid,
&compressed,
)?;
}
}
Ok(())
}
} }
/// Run the word prefix docids update operation. /// Run the word prefix docids update operation.

View File

@ -19,6 +19,7 @@ use super::helpers::{
use super::MergeFn; use super::MergeFn;
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
use crate::facet::FacetType; use crate::facet::FacetType;
use crate::heed_codec::CompressedKvWriterU16;
use crate::index::db_name::DOCUMENTS; use crate::index::db_name::DOCUMENTS;
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::proximity::MAX_DISTANCE; use crate::proximity::MAX_DISTANCE;
@ -162,6 +163,7 @@ pub(crate) fn write_typed_chunk_into_index(
.into_iter() .into_iter()
.map(|IndexEmbeddingConfig { name, .. }| name) .map(|IndexEmbeddingConfig { name, .. }| name)
.collect(); .collect();
let dictionary = index.document_compression_dictionary(wtxn)?.map(Vec::from);
let mut vectors_buffer = Vec::new(); let mut vectors_buffer = Vec::new();
while let Some((key, reader)) = iter.next()? { while let Some((key, reader)) = iter.next()? {
let mut writer: KvWriter<_, FieldId> = KvWriter::memory(); let mut writer: KvWriter<_, FieldId> = KvWriter::memory();
@ -211,7 +213,17 @@ pub(crate) fn write_typed_chunk_into_index(
let db = index.documents.remap_data_type::<Bytes>(); let db = index.documents.remap_data_type::<Bytes>();
if !writer.is_empty() { if !writer.is_empty() {
db.put(wtxn, &docid, &writer.into_inner().unwrap())?; let uncompressed_document_bytes = writer.into_inner().unwrap();
match dictionary.as_ref() {
Some(dictionary) => {
let compressed = CompressedKvWriterU16::new_with_dictionary(
&uncompressed_document_bytes,
dictionary,
);
db.put(wtxn, &docid, compressed.as_bytes())?
}
None => db.put(wtxn, &docid, &uncompressed_document_bytes)?,
}
operations.push(DocumentOperation { operations.push(DocumentOperation {
external_id: external_id.to_string(), external_id: external_id.to_string(),
internal_id: docid, internal_id: docid,