Compress the right documents when a new dictionary is computed

This commit is contained in:
Clément Renault 2024-12-17 12:19:45 +01:00
parent 742709450d
commit a466cf4f2c
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 48 additions and 9 deletions

View File

@ -103,8 +103,10 @@ impl<'a> CompressedKvReaderU16<'a> {
pub struct CompressedKvWriterU16(Vec<u8>);
impl CompressedKvWriterU16 {
// TODO ask for a KvReaderU16 here
pub fn new_with_dictionary(input: &[u8], dictionary: &EncoderDictionary) -> io::Result<Self> {
pub fn new_with_dictionary(
input: &KvReaderU16,
dictionary: &EncoderDictionary,
) -> io::Result<Self> {
let mut compressor = Compressor::with_prepared_dictionary(dictionary)?;
compressor.compress(input).map(CompressedKvWriterU16)
}

View File

@ -3,8 +3,12 @@ use std::sync::atomic::{self, AtomicUsize};
use bumpalo::Bump;
use heed::RwTxn;
use rayon::iter::{ParallelBridge, ParallelIterator as _};
use roaring::RoaringBitmap;
use zstd::bulk::Compressor;
use zstd::dict::{from_continuous, EncoderDictionary};
use crate::heed_codec::CompressedKvWriterU16;
use crate::update::new::document::Document as _;
use crate::update::new::indexer::document_changes::{
DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
@ -87,8 +91,13 @@ where
Step::PreparingCompressionDictionary,
)?;
let mut all_documents_seen = RoaringBitmap::new();
for data in datastore {
let CompressorExtractorData { fields, fields_count, .. } = data.into_inner();
let CompressorExtractorData { documents_seen, fields, fields_count, must_stop: _ } =
data.into_inner();
all_documents_seen |= documents_seen;
let mut fields_iter = fields.into_iter();
for field_count in fields_count {
let mut document_fields_size = 0;
@ -109,10 +118,31 @@ where
let dictionary = from_continuous(&sample_data, &sample_sizes, DICTIONARY_MAX_SIZE)?;
index.put_document_compression_dictionary(wtxn, &dictionary)?;
let dictionary = EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL);
todo!("compress (in parallel) all the database documents that are not impacted by the current update");
let all_documents = index.documents_ids(wtxn)?;
let documents_to_compress = all_documents - all_documents_seen;
let pi = documents_to_compress.into_iter().par_bridge().map(|docid| {
/// TODO put the rtxn and encoder dict in a ThreadLocal datastore
let rtxn = index.read_txn()?;
let compressed_document = index.compressed_document(&rtxn, docid)?.unwrap();
// The documents are not compressed with any dictionary at this point.
let document = compressed_document.as_non_compressed();
let compressed = CompressedKvWriterU16::new_with_dictionary(document, &dictionary)?;
Ok((docid, compressed)) as crate::Result<_>
});
Ok(Some(EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL)))
// We compress in parallel and sequentially write the documents
// in the database using the above parallel iterator.
rayon_par_bridge::par_bridge(100, pi, |seq_iter| {
for result in seq_iter {
let (docid, compressed_document) = result?;
index.documents.put(wtxn, &docid, &compressed_document)?;
}
Ok(()) as crate::Result<_>
})?;
Ok(Some(dictionary))
}
}
}
@ -126,6 +156,8 @@ struct CompressorExtractor {
#[derive(Default)]
struct CompressorExtractorData<'extractor> {
/// The set of documents impacted by this update: deleted, modified, or updated.
documents_seen: RoaringBitmap,
/// The field content in JSON but as bytes.
fields: Vec<&'extractor [u8]>,
/// The number of fields associated to single documents.
@ -160,9 +192,9 @@ impl<'extractor> Extractor<'extractor> for CompressorExtractor {
}
let change = change?;
match change {
DocumentChange::Deletion(_) => (),
DocumentChange::Update(_) => (),
let docid = match change {
DocumentChange::Deletion(deletion) => deletion.docid(),
DocumentChange::Update(update) => update.docid(),
DocumentChange::Insertion(insertion) => {
let mut fields_count = 0;
for result in insertion.inserted().iter_top_level_fields() {
@ -176,8 +208,13 @@ impl<'extractor> Extractor<'extractor> for CompressorExtractor {
self.extracted_documents_count.fetch_add(1, atomic::Ordering::SeqCst);
data.must_stop = previous_count >= self.total_documents_to_extract;
data.fields_count.push(fields_count);
insertion.docid()
}
}
};
let is_new = data.documents_seen.insert(docid);
debug_assert!(is_new, "We must not see the same documents multiple times");
}
Ok(())