diff --git a/crates/milli/src/update/new/extract/documents/compression.rs b/crates/milli/src/update/new/extract/documents/compression.rs new file mode 100644 index 000000000..3b323ff1b --- /dev/null +++ b/crates/milli/src/update/new/extract/documents/compression.rs @@ -0,0 +1,185 @@ +use std::cell::RefCell; +use std::sync::atomic::{self, AtomicUsize}; + +use bumpalo::Bump; +use heed::RwTxn; +use zstd::dict::{from_continuous, EncoderDictionary}; + +use crate::update::new::document::Document as _; +use crate::update::new::indexer::document_changes::{ + DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, +}; +use crate::update::new::indexer::extract; +use crate::update::new::ref_cell_ext::RefCellExt as _; +use crate::update::new::steps::Step; +use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; +use crate::update::new::DocumentChange; +use crate::{Index, Result}; + +/// The compression level to use when compressing documents. +const COMPRESSION_LEVEL: i32 = 19; +/// The number of documents required as a sample for generating +/// the compression dictionary. +const SAMPLE_SIZE: usize = 10_000; +/// The maximum size the document compression dictionary can be. +const DICTIONARY_MAX_SIZE: usize = 64_000; +/// The maximum number of documents we accept to compress if they +/// have not already been compressed in the database. If this threshold +/// is reached, we do not generate a dictionary and continue as is. +const COMPRESS_LIMIT: usize = 5_000_000; + +/// A function dedicated to use the existing or generate an appropriate +/// document compression dictionay based on the documents available in +/// the database and the ones in the payload. +/// +/// If it has to compute a new compression dictionary it immediately +/// writes the dictionary in the database and compresses the documents +/// that are not part of the current update with it. +/// +/// If there are too many documents already in the database and no +/// compression dictionary we prefer not to generate a dictionary to avoid +/// compressing all of the documents and potentially blow up disk space. +pub fn retrieve_or_compute_document_compression_dictionary<'pl, 'extractor, DC, MSP, SP>( + index: &Index, + wtxn: &mut RwTxn<'_>, + document_changes: &DC, + indexing_context: IndexingContext, + extractor_allocs: &'extractor mut ThreadLocal>, +) -> Result>> +where + DC: DocumentChanges<'pl>, + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, +{ + let number_of_documents = index.number_of_documents(wtxn)? as usize; + match index.document_compression_raw_dictionary(wtxn)? { + Some(dict) => Ok(Some(EncoderDictionary::copy(dict, COMPRESSION_LEVEL))), + None if number_of_documents >= COMPRESS_LIMIT => Ok(None), + None if number_of_documents + document_changes.len() < SAMPLE_SIZE => Ok(None), + None => { + let mut sample_data = Vec::new(); + let mut sample_sizes = Vec::new(); + let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + let extractor = CompressorExtractor { + total_documents_to_extract: SAMPLE_SIZE, + extracted_documents_count: AtomicUsize::new(0), + }; + + // We first collect all the documents for the database into a buffer. + for result in index.all_compressed_documents(wtxn)? { + let (_docid, compressed_document) = result?; + // The documents are not compressed with any dictionary at this point. + let document = compressed_document.as_non_compressed(); + sample_data.extend_from_slice(document.as_bytes()); + sample_sizes.push(document.as_bytes().len()); + } + + // This extraction only takes care about documents replacements + // and not updates (merges). The merged documents are ignored as + // we will only use the previous version of them in the database, + // just above. + extract( + document_changes, + &extractor, + indexing_context, + extractor_allocs, + &datastore, + Step::PreparingCompressionDictionary, + )?; + + for data in datastore { + let CompressorExtractorData { fields, fields_count, .. } = data.into_inner(); + let mut fields_iter = fields.into_iter(); + for field_count in fields_count { + let mut document_fields_size = 0; + for field in fields_iter.by_ref().take(field_count) { + sample_data.extend_from_slice(field); + document_fields_size += field.len(); + } + sample_sizes.push(document_fields_size); + } + + debug_assert_eq!( + fields_iter.count(), + 0, + "We must have consumed all the documents' \ + fields but there were some remaining ones" + ); + } + + let dictionary = from_continuous(&sample_data, &sample_sizes, DICTIONARY_MAX_SIZE)?; + index.put_document_compression_dictionary(wtxn, &dictionary)?; + + todo!("compress (in parallel) all the database documents that are not impacted by the current update"); + + Ok(Some(EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL))) + } + } +} + +struct CompressorExtractor { + /// The total number of documents we must extract from all threads. + total_documents_to_extract: usize, + /// The combined, shared, number of extracted documents. + extracted_documents_count: AtomicUsize, +} + +#[derive(Default)] +struct CompressorExtractorData<'extractor> { + /// The field content in JSON but as bytes. + fields: Vec<&'extractor [u8]>, + /// The number of fields associated to single documents. + /// It is used to provide good sample to the dictionary generator. + fields_count: Vec, + /// We extracted the expected count of documents, we can skip everything now. + must_stop: bool, +} + +unsafe impl<'extractor> MostlySend for RefCell> {} + +impl<'extractor> Extractor<'extractor> for CompressorExtractor { + type Data = RefCell>; + + fn init_data<'doc>( + &'doc self, + _extractor_alloc: &'extractor bumpalo::Bump, + ) -> crate::Result { + Ok(RefCell::new(CompressorExtractorData::default())) + } + + fn process<'doc>( + &'doc self, + changes: impl Iterator>>, + context: &'doc DocumentChangeContext<'_, 'extractor, '_, '_, Self::Data>, + ) -> crate::Result<()> { + let mut data = context.data.borrow_mut_or_yield(); + + for change in changes { + if data.must_stop { + return Ok(()); + } + + let change = change?; + match change { + DocumentChange::Deletion(_) => (), + DocumentChange::Update(_) => (), + DocumentChange::Insertion(insertion) => { + let mut fields_count = 0; + for result in insertion.inserted().iter_top_level_fields() { + let (_field_name, raw_value) = result?; + let bytes = raw_value.get().as_bytes(); + data.fields.push(context.extractor_alloc.alloc_slice_copy(bytes)); + fields_count += 1; + } + + let previous_count = + self.extracted_documents_count.fetch_add(1, atomic::Ordering::SeqCst); + data.must_stop = previous_count >= self.total_documents_to_extract; + data.fields_count.push(fields_count); + } + } + } + + Ok(()) + } +} diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents/mod.rs similarity index 98% rename from crates/milli/src/update/new/extract/documents.rs rename to crates/milli/src/update/new/extract/documents/mod.rs index 02f0481bb..2d65d445e 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents/mod.rs @@ -1,6 +1,7 @@ use std::cell::RefCell; use bumpalo::Bump; +pub use compression::retrieve_or_compute_document_compression_dictionary; use hashbrown::HashMap; use super::DelAddRoaringBitmap; @@ -13,6 +14,8 @@ use crate::update::new::DocumentChange; use crate::vector::EmbeddingConfigs; use crate::Result; +mod compression; + pub struct DocumentsExtractor<'a, 'b> { document_sender: DocumentsSender<'a, 'b>, embedders: &'a EmbeddingConfigs, @@ -50,7 +53,6 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> { // **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop // Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction) let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); - let external_docid = change.external_docid().to_owned(); todo!("manage documents compression"); diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index 4bcb918e4..c6f673e47 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -6,9 +6,7 @@ mod searchable; mod vectors; use bumpalo::Bump; -pub use cache::{ - merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, -}; +pub use cache::*; pub use documents::*; pub use faceted::*; pub use geo::*; diff --git a/crates/milli/src/update/new/extract/searchable/mod.rs b/crates/milli/src/update/new/extract/searchable/mod.rs index c4240196a..355a575c6 100644 --- a/crates/milli/src/update/new/extract/searchable/mod.rs +++ b/crates/milli/src/update/new/extract/searchable/mod.rs @@ -1,7 +1,3 @@ -mod extract_word_docids; -mod extract_word_pair_proximity_docids; -mod tokenize_document; - use std::cell::RefCell; use std::marker::PhantomData; @@ -22,6 +18,10 @@ use crate::update::new::DocumentChange; use crate::update::GrenadParameters; use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE}; +mod extract_word_docids; +mod extract_word_pair_proximity_docids; +mod tokenize_document; + pub struct SearchableExtractorData<'a, EX: SearchableExtractor> { tokenizer: &'a DocumentTokenizer<'a>, grenad_parameters: GrenadParameters, diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index 0094cd2e8..b7626047b 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -93,6 +93,7 @@ mod test { use std::sync::RwLock; use bumpalo::Bump; + use zstd::dict::DecoderDictionary; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::index::tests::TempIndex; @@ -144,7 +145,6 @@ mod test { let indexer = Bump::new(); let index = TempIndex::new(); - let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); @@ -152,8 +152,13 @@ mod test { let fields_ids_map = RwLock::new(FieldIdMapWithMetadata::new(db_fields_ids_map.clone(), metadata_builder)); - let fields_ids_map_store = ThreadLocal::new(); + let db_document_decompression_dictionary = + match index.document_compression_raw_dictionary(&rtxn).unwrap() { + Some(dictionary) => Some(zstd::dict::DecoderDictionary::copy(dictionary)), + None => None, + }; + let fields_ids_map_store = ThreadLocal::new(); let mut extractor_allocs = ThreadLocal::new(); let doc_allocs = ThreadLocal::new(); @@ -165,6 +170,7 @@ mod test { let context = IndexingContext { index: &index, db_fields_ids_map: &db_fields_ids_map, + db_document_decompression_dictionary: db_document_decompression_dictionary.as_ref(), new_fields_ids_map: &fields_ids_map, doc_allocs: &doc_allocs, fields_ids_map_store: &fields_ids_map_store, diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 8fdef4643..5e0231aba 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -1,15 +1,11 @@ -use std::cell::RefCell; use std::cmp::Ordering; -use std::sync::atomic::{self, AtomicBool, AtomicUsize}; +use std::sync::atomic::{self, AtomicBool}; use std::sync::{OnceLock, RwLock}; use std::thread::{self, Builder}; use big_s::S; -use bumpalo::Bump; use bumparaw_collections::RawMap; -use document_changes::{ - extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, -}; +pub use document_changes::{extract, DocumentChanges, IndexingContext, Progress}; pub use document_deletion::DocumentDeletion; pub use document_operation::{DocumentOperation, PayloadStats}; use hashbrown::HashMap; @@ -21,20 +17,19 @@ use rand::SeedableRng as _; use rustc_hash::FxBuildHasher; use time::OffsetDateTime; pub use update_by_function::UpdateByFunction; -use zstd::dict::{DecoderDictionary, EncoderDictionary}; +use zstd::dict::DecoderDictionary; use super::channel::*; -use super::document::Document as _; use super::extract::*; use super::facet_search_builder::FacetSearchBuilder; use super::merger::FacetFieldIdsDelta; use super::steps::IndexingStep; -use super::thread_local::{FullySend, MostlySend, ThreadLocal}; +use super::thread_local::ThreadLocal; use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; use super::words_prefix_docids::{ compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, }; -use super::{DocumentChange, StdResult}; +use super::StdResult; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::facet::FacetType; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; @@ -44,7 +39,6 @@ use crate::proximity::ProximityPrecision; use crate::update::del_add::DelAdd; use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::merger::merge_and_send_rtree; -use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::settings::InnerIndexSettings; @@ -141,10 +135,9 @@ where let document_compression_dictionary = pool .install(|| { - let rtxn = index.read_txn()?; - compute_document_compression_dictionary( + retrieve_or_compute_document_compression_dictionary( index, - &rtxn, + wtxn, document_changes, indexing_context, &mut extractor_allocs, @@ -574,141 +567,6 @@ where Ok(()) } -/// The compression level to use when compressing documents. -const DOCUMENT_COMPRESSION_LEVEL: i32 = 19; -/// The sample size used to generate the document compression dictionary. -const DOCUMENT_COMPRESSION_SAMPLE_SIZE: usize = 10_000; -/// The maximum size the document compression dictionary can be. -const DOCUMENT_COMPRESSION_DICTIONARY_MAX_SIZE: usize = 64_000; -/// The maximum number of documents we accept to compress if they -/// weren't already compressed in the database. If this threshold -/// is reached we do not generate a dictionary and continue as is. -const DOCUMENT_COMPRESSION_COMPRESS_LIMIT: u64 = 5_000_000; - -/// A function dedicated to use the existing or generate an appropriate -/// document compression dictionay based on the documents available in -/// the database and the ones in the payload. -/// -/// If there are too many documents already in the database and no -/// compression dictionary we prefer not to generate a dictionary to avoid -/// compressing all of the documents and potentially blow up disk space. -fn compute_document_compression_dictionary<'pl, 'extractor, DC, MSP, SP>( - index: &Index, - rtxn: &RoTxn<'_>, - document_changes: &DC, - indexing_context: IndexingContext, - extractor_allocs: &'extractor mut ThreadLocal>, -) -> Result>> -where - DC: DocumentChanges<'pl>, - MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync, -{ - match index.document_compression_raw_dictionary(rtxn)? { - Some(dict) => Ok(Some(EncoderDictionary::copy(dict, DOCUMENT_COMPRESSION_LEVEL))), - None if index.number_of_documents(rtxn)? >= DOCUMENT_COMPRESSION_COMPRESS_LIMIT => Ok(None), - None => { - let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - let extractor = CompressorExtractor { - total_documents_to_extract: DOCUMENT_COMPRESSION_SAMPLE_SIZE, - extracted_documents_count: AtomicUsize::new(0), - }; - - todo!("collect the documents samples from the database first (or after)"); - - // This extraction only takes care about documents replacement - // and not update (merges). The merged documents are ignore as - // we will only use the previous version of them in the database. - extract( - document_changes, - &extractor, - indexing_context, - extractor_allocs, - &datastore, - Step::PreparingCompressionDictionary, - )?; - - let mut sample_data = Vec::new(); - let mut sample_sizes = Vec::new(); - for data in datastore { - let CompressorExtractorData { buffer, must_stop: _ } = data.into_inner(); - let mut subsample_size = 0; - for subsample in buffer { - sample_data.extend_from_slice(subsample); - subsample_size += subsample.len(); - } - sample_sizes.push(subsample_size); - } - - let dictionary = zstd::dict::from_continuous( - &sample_data, - &sample_sizes, - DOCUMENT_COMPRESSION_DICTIONARY_MAX_SIZE, - )?; - - Ok(Some(EncoderDictionary::copy(&dictionary, DOCUMENT_COMPRESSION_LEVEL))) - } - } -} - -struct CompressorExtractor { - total_documents_to_extract: usize, - extracted_documents_count: AtomicUsize, -} - -#[derive(Default)] -struct CompressorExtractorData<'extractor> { - buffer: Vec<&'extractor [u8]>, - /// We extracted the expected count of documents, we can skip everything now. - must_stop: bool, -} - -unsafe impl<'extractor> MostlySend for RefCell> {} - -impl<'extractor> Extractor<'extractor> for CompressorExtractor { - type Data = RefCell>; - - fn init_data<'doc>( - &'doc self, - _extractor_alloc: &'extractor bumpalo::Bump, - ) -> crate::Result { - Ok(RefCell::new(CompressorExtractorData::default())) - } - - fn process<'doc>( - &'doc self, - changes: impl Iterator>>, - context: &'doc DocumentChangeContext<'_, 'extractor, '_, '_, Self::Data>, - ) -> crate::Result<()> { - let mut data = context.data.borrow_mut_or_yield(); - - for change in changes { - if data.must_stop { - return Ok(()); - } - - let change = change?; - match change { - DocumentChange::Deletion(_) => (), - DocumentChange::Update(_) => (), - DocumentChange::Insertion(insertion) => { - for result in insertion.inserted().iter_top_level_fields() { - let (_field_name, raw_value) = result?; - let bytes = raw_value.get().as_bytes(); - data.buffer.push(context.extractor_alloc.alloc_slice_copy(bytes)); - } - - let previous_count = - self.extracted_documents_count.fetch_add(1, atomic::Ordering::SeqCst); - data.must_stop = previous_count >= self.total_documents_to_extract; - } - } - } - - Ok(()) - } -} - /// A function dedicated to manage all the available BBQueue frames. /// /// It reads all the available frames, do the corresponding database operations