Use the zstd library directly to be able to define the compression level

This commit is contained in:
Clément Renault 2024-07-02 17:34:02 +02:00
parent b15e8aacb6
commit e18b06ddda
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
13 changed files with 503 additions and 430 deletions

825
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -37,6 +37,7 @@ use meilisearch_types::milli::vector::parsed_vectors::{
use meilisearch_types::milli::{self, Filter};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::zstd::dict::DecoderDictionary;
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
use roaring::RoaringBitmap;
use time::macros::format_description;
@ -908,7 +909,8 @@ impl IndexScheduler {
let mut index_dumper = dump.create_index(uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let dictionary = index.document_compression_dictionary(&rtxn)?;
let dictionary =
index.document_compression_dictionary(&rtxn)?.map(DecoderDictionary::copy);
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let embedding_configs = index.embedding_configs(&rtxn)?;
let mut buffer = Vec::new();
@ -920,9 +922,9 @@ impl IndexScheduler {
}
let (id, compressed) = ret?;
let doc = match dictionary {
let doc = match dictionary.as_ref() {
// TODO manage this unwrap correctly
Some(dict) => compressed.decompress_with(&mut buffer, dict).unwrap(),
Some(dict) => compressed.decompress_with(&mut buffer, dict)?,
None => compressed.as_non_compressed(),
};

View File

@ -12,7 +12,7 @@ pub mod star_or;
pub mod task_view;
pub mod tasks;
pub mod versioning;
pub use milli::{heed, Index};
pub use milli::{heed, zstd, Index};
use uuid::Uuid;
pub use versioning::VERSION_FILE_NAME;
pub use {milli, serde_cs};

View File

@ -20,6 +20,7 @@ use meilisearch_types::milli::vector::parsed_vectors::ExplicitVectors;
use meilisearch_types::milli::DocumentId;
use meilisearch_types::star_or::OptionStarOrList;
use meilisearch_types::tasks::KindWithContent;
use meilisearch_types::zstd::dict::DecoderDictionary;
use meilisearch_types::{milli, Document, Index};
use mime::Mime;
use once_cell::sync::Lazy;
@ -603,7 +604,7 @@ fn some_documents<'a, 't: 'a>(
retrieve_vectors: RetrieveVectors,
) -> Result<impl Iterator<Item = Result<Document, ResponseError>> + 'a, ResponseError> {
let fields_ids_map = index.fields_ids_map(rtxn)?;
let dictionary = index.document_compression_dictionary(rtxn)?;
let dictionary = index.document_compression_dictionary(rtxn)?.map(DecoderDictionary::copy);
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let embedding_configs = index.embedding_configs(rtxn)?;
let mut buffer = Vec::new();
@ -611,7 +612,7 @@ fn some_documents<'a, 't: 'a>(
Ok(index.iter_compressed_documents(rtxn, doc_ids)?.map(move |ret| {
ret.map_err(ResponseError::from).and_then(
|(key, compressed_document)| -> Result<_, ResponseError> {
let document = match dictionary {
let document = match dictionary.as_ref() {
// TODO manage this unwrap correctly
Some(dict) => compressed_document.decompress_with(&mut buffer, dict).unwrap(),
None => compressed_document.as_non_compressed(),

View File

@ -19,6 +19,7 @@ use meilisearch_types::milli::vector::parsed_vectors::ExplicitVectors;
use meilisearch_types::milli::vector::Embedder;
use meilisearch_types::milli::{FacetValueHit, OrderBy, SearchForFacetValues, TimeBudget};
use meilisearch_types::settings::DEFAULT_PAGINATION_MAX_TOTAL_HITS;
use meilisearch_types::zstd::dict::DecoderDictionary;
use meilisearch_types::{milli, Document};
use milli::tokenizer::TokenizerBuilder;
use milli::{
@ -1123,13 +1124,14 @@ fn make_hits(
formatter_builder.crop_marker(format.crop_marker);
formatter_builder.highlight_prefix(format.highlight_pre_tag);
formatter_builder.highlight_suffix(format.highlight_post_tag);
let compression_dictionary = index.document_compression_dictionary(rtxn)?;
let compression_dictionary =
index.document_compression_dictionary(rtxn)?.map(DecoderDictionary::copy);
let mut buffer = Vec::new();
let mut documents = Vec::new();
let embedding_configs = index.embedding_configs(rtxn)?;
let documents_iter = index.compressed_documents(rtxn, documents_ids)?;
for ((id, compressed), score) in documents_iter.into_iter().zip(document_scores.into_iter()) {
let obkv = match compression_dictionary {
let obkv = match compression_dictionary.as_ref() {
// TODO manage this unwrap correctly
Some(dict) => compressed.decompress_with(&mut buffer, dict).unwrap(),
None => compressed.as_non_compressed(),

View File

@ -38,7 +38,6 @@ heed = { version = "0.20.3", default-features = false, features = [
indexmap = { version = "2.2.6", features = ["serde"] }
json-depth-checker = { path = "../json-depth-checker" }
levenshtein_automata = { version = "0.2.1", features = ["fst_automaton"] }
lz4_flex = "0.11.3"
zstd = { version = "0.11.2", features = ["zdict_builder"] }
memmap2 = "0.9.4"
obkv = "0.2.2"

View File

@ -1,7 +1,14 @@
use std::borrow::Cow;
use std::io;
use std::io::ErrorKind;
use heed::BoxedError;
use obkv::KvReaderU16;
use zstd::bulk::{Compressor, Decompressor};
use zstd::dict::{DecoderDictionary, EncoderDictionary};
// TODO move that elsewhere
pub const COMPRESSION_LEVEL: i32 = 12;
pub struct CompressedObkvCodec;
@ -28,13 +35,23 @@ impl<'a> CompressedKvReaderU16<'a> {
pub fn decompress_with<'b>(
&self,
buffer: &'b mut Vec<u8>,
dictionary: &[u8],
) -> Result<KvReaderU16<'b>, lz4_flex::block::DecompressError> {
let (size, input) = lz4_flex::block::uncompressed_size(self.0)?;
buffer.resize(size, 0);
// TODO loop to increase the buffer size of need be
let size =
lz4_flex::block::decompress_into_with_dict(input, &mut buffer[..size], dictionary)?;
dictionary: &DecoderDictionary,
) -> io::Result<KvReaderU16<'b>> {
const TWO_GIGABYTES: usize = 2 * 1024 * 1024 * 1024;
let mut decompressor = Decompressor::with_prepared_dictionary(dictionary)?;
let mut max_size = self.0.len() * 4;
let size = loop {
buffer.resize(max_size, 0);
match decompressor.decompress_to_buffer(self.0, &mut buffer[..max_size]) {
Ok(size) => break size,
// TODO don't do that !!! But what should I do?
Err(e) if e.kind() == ErrorKind::Other && max_size <= TWO_GIGABYTES => {
max_size *= 2
}
Err(e) => return Err(e),
}
};
Ok(KvReaderU16::new(&buffer[..size]))
}
@ -48,8 +65,9 @@ pub struct CompressedKvWriterU16(Vec<u8>);
impl CompressedKvWriterU16 {
// TODO ask for a KvReaderU16 here
pub fn new_with_dictionary(writer: &[u8], dictionary: &[u8]) -> Self {
CompressedKvWriterU16(lz4_flex::block::compress_prepend_size_with_dict(writer, dictionary))
pub fn new_with_dictionary(input: &[u8], dictionary: &EncoderDictionary) -> io::Result<Self> {
let mut compressor = Compressor::with_prepared_dictionary(dictionary)?;
compressor.compress(input).map(CompressedKvWriterU16)
}
pub fn as_bytes(&self) -> &[u8] {

View File

@ -21,7 +21,7 @@ use thiserror::Error;
pub use self::beu16_str_codec::BEU16StrCodec;
pub use self::beu32_str_codec::BEU32StrCodec;
pub use self::compressed_obkv_codec::{
CompressedKvReaderU16, CompressedKvWriterU16, CompressedObkvCodec,
CompressedKvReaderU16, CompressedKvWriterU16, CompressedObkvCodec, COMPRESSION_LEVEL,
};
pub use self::field_id_word_count_codec::FieldIdWordCountCodec;
pub use self::fst_set_codec::FstSetCodec;

View File

@ -11,6 +11,7 @@ use roaring::RoaringBitmap;
use rstar::RTree;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use zstd::dict::DecoderDictionary;
use crate::documents::PrimaryKey;
use crate::error::{InternalError, UserError};
@ -1341,13 +1342,12 @@ impl Index {
process: "external_id_of",
})
})?;
let dictionary = self.document_compression_dictionary(rtxn)?;
let dictionary = self.document_compression_dictionary(rtxn)?.map(DecoderDictionary::copy);
let mut buffer = Vec::new();
Ok(self.iter_compressed_documents(rtxn, ids)?.map(move |entry| -> Result<_> {
let (_docid, compressed_obkv) = entry?;
let obkv = match dictionary {
// TODO manage this unwrap correctly
Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict).unwrap(),
let obkv = match dictionary.as_ref() {
Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict)?,
None => compressed_obkv.as_non_compressed(),
};
match primary_key.document_id(&obkv, &fields)? {

View File

@ -45,7 +45,7 @@ pub use search::new::{
};
use serde_json::Value;
pub use thread_pool_no_abort::{PanicCatched, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
pub use {charabia as tokenizer, heed};
pub use {charabia as tokenizer, heed, zstd};
pub use self::asc_desc::{AscDesc, AscDescError, Member, SortError};
pub use self::criterion::{default_criteria, Criterion, CriterionError};

View File

@ -21,6 +21,7 @@ use serde::{Deserialize, Serialize};
use slice_group_by::GroupBy;
use tracing::debug;
use typed_chunk::{write_typed_chunk_into_index, ChunkAccumulator, TypedChunk};
use zstd::dict::EncoderDictionary;
use self::enrich::enrich_documents_batch;
pub use self::enrich::{extract_finite_float_from_value, DocumentId};
@ -34,7 +35,7 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput};
use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError, UserError};
use crate::heed_codec::{CompressedKvWriterU16, CompressedObkvCodec};
use crate::heed_codec::{CompressedKvWriterU16, CompressedObkvCodec, COMPRESSION_LEVEL};
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{
@ -783,13 +784,15 @@ where
// TODO make this 64_000 const
let dictionary = zstd::dict::from_continuous(&sample_data, &sample_sizes, 64_000)?;
self.index.put_document_compression_dictionary(self.wtxn, &dictionary)?;
// TODO use declare the level 3 as a const
let dictionary = EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL);
// TODO do not remap types here but rather expose the &[u8] for the KvReaderU16
let mut iter = self.index.documents.remap_data_type::<Bytes>().iter_mut(self.wtxn)?;
let mut iter = self.index.documents.iter_mut(self.wtxn)?;
while let Some(result) = iter.next() {
let (docid, document) = result?;
// TODO manage this unwrap correctly
let compressed = CompressedKvWriterU16::new_with_dictionary(document, &dictionary);
let document = document.as_non_compressed().as_bytes();
let compressed = CompressedKvWriterU16::new_with_dictionary(document, &dictionary)?;
// safety the compressed document is entirely owned
unsafe {
iter.put_current_with_options::<CompressedObkvCodec>(

View File

@ -11,6 +11,7 @@ use obkv::{KvReader, KvReaderU16, KvWriter};
use roaring::RoaringBitmap;
use serde_json::Value;
use smartstring::SmartString;
use zstd::dict::DecoderDictionary;
use super::helpers::{
create_sorter, create_writer, keep_first, obkvs_keep_last_addition_merge_deletions,
@ -168,7 +169,8 @@ impl<'a, 'i> Transform<'a, 'i> {
let external_documents_ids = self.index.external_documents_ids();
let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
let dictionary = self.index.document_compression_dictionary(wtxn)?;
let dictionary =
self.index.document_compression_dictionary(wtxn)?.map(DecoderDictionary::copy);
let primary_key = cursor.primary_key().to_string();
let primary_key_id =
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
@ -253,11 +255,11 @@ impl<'a, 'i> Transform<'a, 'i> {
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?;
let base_obkv = match dictionary {
let base_obkv = match dictionary.as_ref() {
// TODO manage this unwrap correctly
Some(dict) => base_compressed_obkv
.decompress_with(&mut decompression_buffer, dict)
.unwrap(),
Some(dict) => {
base_compressed_obkv.decompress_with(&mut decompression_buffer, dict)?
}
None => base_compressed_obkv.as_non_compressed(),
};
@ -1038,7 +1040,8 @@ impl<'a, 'i> Transform<'a, 'i> {
if original_sorter.is_some() || flattened_sorter.is_some() {
let modified_faceted_fields = settings_diff.modified_faceted_fields();
let dictionary = self.index.document_compression_dictionary(wtxn)?;
let dictionary =
self.index.document_compression_dictionary(wtxn)?.map(DecoderDictionary::copy);
let mut original_obkv_buffer = Vec::new();
let mut flattened_obkv_buffer = Vec::new();
@ -1050,7 +1053,7 @@ impl<'a, 'i> Transform<'a, 'i> {
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?;
let old_obkv = match dictionary {
let old_obkv = match dictionary.as_ref() {
// TODO manage this unwrap correctly
Some(dict) => old_compressed_obkv.decompress_with(&mut buffer, dict).unwrap(),
None => old_compressed_obkv.as_non_compressed(),

View File

@ -10,6 +10,7 @@ use heed::types::Bytes;
use heed::{BytesDecode, RwTxn};
use obkv::{KvReader, KvWriter};
use roaring::RoaringBitmap;
use zstd::dict::EncoderDictionary;
use super::helpers::{
self, keep_first, merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps,
@ -19,7 +20,7 @@ use super::helpers::{
use super::MergeFn;
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
use crate::facet::FacetType;
use crate::heed_codec::CompressedKvWriterU16;
use crate::heed_codec::{CompressedKvWriterU16, COMPRESSION_LEVEL};
use crate::index::db_name::DOCUMENTS;
use crate::index::IndexEmbeddingConfig;
use crate::proximity::MAX_DISTANCE;
@ -163,7 +164,10 @@ pub(crate) fn write_typed_chunk_into_index(
.into_iter()
.map(|IndexEmbeddingConfig { name, .. }| name)
.collect();
let dictionary = index.document_compression_dictionary(wtxn)?.map(Vec::from);
// TODO declare the compression ratio as a const
let dictionary = index
.document_compression_dictionary(wtxn)?
.map(|dict| EncoderDictionary::copy(dict, COMPRESSION_LEVEL));
let mut vectors_buffer = Vec::new();
while let Some((key, reader)) = iter.next()? {
let mut writer: KvWriter<_, FieldId> = KvWriter::memory();
@ -219,7 +223,7 @@ pub(crate) fn write_typed_chunk_into_index(
let compressed = CompressedKvWriterU16::new_with_dictionary(
&uncompressed_document_bytes,
dictionary,
);
)?;
db.put(wtxn, &docid, compressed.as_bytes())?
}
None => db.put(wtxn, &docid, &uncompressed_document_bytes)?,