diff --git a/crates/milli/src/heed_codec/compressed_obkv_codec.rs b/crates/milli/src/heed_codec/compressed_obkv_codec.rs index 05b0d90ac..711a5c049 100644 --- a/crates/milli/src/heed_codec/compressed_obkv_codec.rs +++ b/crates/milli/src/heed_codec/compressed_obkv_codec.rs @@ -103,8 +103,10 @@ impl<'a> CompressedKvReaderU16<'a> { pub struct CompressedKvWriterU16(Vec); impl CompressedKvWriterU16 { - // TODO ask for a KvReaderU16 here - pub fn new_with_dictionary(input: &[u8], dictionary: &EncoderDictionary) -> io::Result { + pub fn new_with_dictionary( + input: &KvReaderU16, + dictionary: &EncoderDictionary, + ) -> io::Result { let mut compressor = Compressor::with_prepared_dictionary(dictionary)?; compressor.compress(input).map(CompressedKvWriterU16) } diff --git a/crates/milli/src/update/new/extract/documents/compression.rs b/crates/milli/src/update/new/extract/documents/compression.rs index 3b323ff1b..a068b7afd 100644 --- a/crates/milli/src/update/new/extract/documents/compression.rs +++ b/crates/milli/src/update/new/extract/documents/compression.rs @@ -3,8 +3,12 @@ use std::sync::atomic::{self, AtomicUsize}; use bumpalo::Bump; use heed::RwTxn; +use rayon::iter::{ParallelBridge, ParallelIterator as _}; +use roaring::RoaringBitmap; +use zstd::bulk::Compressor; use zstd::dict::{from_continuous, EncoderDictionary}; +use crate::heed_codec::CompressedKvWriterU16; use crate::update::new::document::Document as _; use crate::update::new::indexer::document_changes::{ DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, @@ -87,8 +91,13 @@ where Step::PreparingCompressionDictionary, )?; + let mut all_documents_seen = RoaringBitmap::new(); for data in datastore { - let CompressorExtractorData { fields, fields_count, .. } = data.into_inner(); + let CompressorExtractorData { documents_seen, fields, fields_count, must_stop: _ } = + data.into_inner(); + + all_documents_seen |= documents_seen; + let mut fields_iter = fields.into_iter(); for field_count in fields_count { let mut document_fields_size = 0; @@ -109,10 +118,31 @@ where let dictionary = from_continuous(&sample_data, &sample_sizes, DICTIONARY_MAX_SIZE)?; index.put_document_compression_dictionary(wtxn, &dictionary)?; + let dictionary = EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL); - todo!("compress (in parallel) all the database documents that are not impacted by the current update"); + let all_documents = index.documents_ids(wtxn)?; + let documents_to_compress = all_documents - all_documents_seen; + let pi = documents_to_compress.into_iter().par_bridge().map(|docid| { + /// TODO put the rtxn and encoder dict in a ThreadLocal datastore + let rtxn = index.read_txn()?; + let compressed_document = index.compressed_document(&rtxn, docid)?.unwrap(); + // The documents are not compressed with any dictionary at this point. + let document = compressed_document.as_non_compressed(); + let compressed = CompressedKvWriterU16::new_with_dictionary(document, &dictionary)?; + Ok((docid, compressed)) as crate::Result<_> + }); - Ok(Some(EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL))) + // We compress in parallel and sequentially write the documents + // in the database using the above parallel iterator. + rayon_par_bridge::par_bridge(100, pi, |seq_iter| { + for result in seq_iter { + let (docid, compressed_document) = result?; + index.documents.put(wtxn, &docid, &compressed_document)?; + } + Ok(()) as crate::Result<_> + })?; + + Ok(Some(dictionary)) } } } @@ -126,6 +156,8 @@ struct CompressorExtractor { #[derive(Default)] struct CompressorExtractorData<'extractor> { + /// The set of documents impacted by this update: deleted, modified, or updated. + documents_seen: RoaringBitmap, /// The field content in JSON but as bytes. fields: Vec<&'extractor [u8]>, /// The number of fields associated to single documents. @@ -160,9 +192,9 @@ impl<'extractor> Extractor<'extractor> for CompressorExtractor { } let change = change?; - match change { - DocumentChange::Deletion(_) => (), - DocumentChange::Update(_) => (), + let docid = match change { + DocumentChange::Deletion(deletion) => deletion.docid(), + DocumentChange::Update(update) => update.docid(), DocumentChange::Insertion(insertion) => { let mut fields_count = 0; for result in insertion.inserted().iter_top_level_fields() { @@ -176,8 +208,13 @@ impl<'extractor> Extractor<'extractor> for CompressorExtractor { self.extracted_documents_count.fetch_add(1, atomic::Ordering::SeqCst); data.must_stop = previous_count >= self.total_documents_to_extract; data.fields_count.push(fields_count); + + insertion.docid() } - } + }; + + let is_new = data.documents_seen.insert(docid); + debug_assert!(is_new, "We must not see the same documents multiple times"); } Ok(())