From b7a9dbfdf87cf602bc3f464f71a718089ec473bc Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 12 Dec 2024 14:32:31 +0100 Subject: [PATCH] Allocate the decompressed documents in the extractor allocator --- .../src/heed_codec/compressed_obkv_codec.rs | 25 ++- crates/milli/src/index.rs | 15 +- crates/milli/src/update/new/document.rs | 48 +++-- .../milli/src/update/new/document_change.rs | 14 +- .../milli/src/update/new/extract/documents.rs | 17 +- .../new/extract/faceted/extract_facets.rs | 7 +- .../milli/src/update/new/extract/geo/mod.rs | 5 +- .../extract/searchable/extract_word_docids.rs | 7 +- .../extract_word_pair_proximity_docids.rs | 10 +- .../src/update/new/extract/vectors/mod.rs | 4 + .../update/new/indexer/document_changes.rs | 11 +- .../update/new/indexer/document_deletion.rs | 6 +- crates/milli/src/update/new/indexer/mod.rs | 169 +++++++++++++++++- .../update/new/indexer/update_by_function.rs | 6 +- .../src/update/new/parallel_iterator_ext.rs | 4 +- crates/milli/src/update/new/steps.rs | 2 + .../milli/src/update/new/vector_document.rs | 3 +- milli/examples/search.rs | 0 18 files changed, 295 insertions(+), 58 deletions(-) delete mode 100644 milli/examples/search.rs diff --git a/crates/milli/src/heed_codec/compressed_obkv_codec.rs b/crates/milli/src/heed_codec/compressed_obkv_codec.rs index 8ddd94d92..05b0d90ac 100644 --- a/crates/milli/src/heed_codec/compressed_obkv_codec.rs +++ b/crates/milli/src/heed_codec/compressed_obkv_codec.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use std::io; use std::io::ErrorKind; +use bumpalo::Bump; use heed::BoxedError; use obkv::KvReaderU16; use zstd::bulk::{Compressor, Decompressor}; @@ -54,6 +55,17 @@ impl<'a> CompressedKvReaderU16<'a> { Ok(KvReaderU16::from_slice(&buffer[..size])) } + pub fn decompress_into_bump<'b>( + &self, + bump: &'b Bump, + dictionary: &DecoderDictionary, + ) -> io::Result<&'b KvReaderU16> { + /// TODO use a better approch and stop cloning so much. + let mut buffer = Vec::new(); + self.decompress_with(&mut buffer, dictionary)?; + Ok(KvReaderU16::from_slice(bump.alloc_slice_copy(&buffer))) + } + /// Returns the KvReader like it is not compressed. /// Happends when there is no dictionary yet. pub fn as_non_compressed(&self) -> &'a KvReaderU16 { @@ -75,11 +87,16 @@ impl<'a> CompressedKvReaderU16<'a> { } } - pub fn decompress_as_owned_with_optinal_dictionary( + pub fn into_owned_with_dictionary( &self, - dictionary: Option<&DecoderDictionary>, - ) -> io::Result> { - todo!("Impl owned version of KvReader") + dictionary: &DecoderDictionary<'_>, + ) -> io::Result> { + let mut buffer = Vec::new(); + let reader = self.decompress_with(&mut buffer, dictionary)?; + // Make sure the Vec is exactly the size of the reader + let size = reader.as_bytes().len(); + buffer.resize(size, 0); + Ok(buffer.into_boxed_slice().into()) } } diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index d1c2fe8f8..a6b4558db 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -1311,11 +1311,8 @@ impl Index { &self, rtxn: &'t RoTxn, id: DocumentId, - ) -> Result> { - self.documents - .get(rtxn, &id)? - .ok_or(UserError::UnknownInternalDocumentId { document_id: id }) - .map_err(Into::into) + ) -> Result>> { + self.documents.get(rtxn, &id).map_err(Into::into) } /// Returns an iterator over the requested compressed documents. The next item will be an error if a document is missing. @@ -1324,9 +1321,11 @@ impl Index { rtxn: &'t RoTxn<'t>, ids: impl IntoIterator + 'a, ) -> Result)>> + 'a> { - Ok(ids - .into_iter() - .map(move |id| self.compressed_document(rtxn, id).map(|compressed| (id, compressed)))) + Ok(ids.into_iter().flat_map(move |id| { + self.compressed_document(rtxn, id) + .map(|opt| opt.map(|compressed| (id, compressed))) + .transpose() + })) } /// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing. diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs index 2c75c516a..1ba58ba63 100644 --- a/crates/milli/src/update/new/document.rs +++ b/crates/milli/src/update/new/document.rs @@ -1,6 +1,6 @@ -use std::borrow::Cow; use std::collections::{BTreeMap, BTreeSet}; +use bumpalo::Bump; use bumparaw_collections::RawMap; use heed::RoTxn; use rustc_hash::FxBuildHasher; @@ -48,15 +48,24 @@ pub trait Document<'doc> { fn geo_field(&self) -> Result>; } -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct DocumentFromDb<'t, Mapper: FieldIdMapper> where Mapper: FieldIdMapper, { fields_ids_map: &'t Mapper, - content: Cow<'t, KvReaderFieldId>, + content: &'t KvReaderFieldId, } +impl<'t, Mapper: FieldIdMapper> Clone for DocumentFromDb<'t, Mapper> { + #[inline] + fn clone(&self) -> Self { + *self + } +} + +impl<'t, Mapper: FieldIdMapper> Copy for DocumentFromDb<'t, Mapper> {} + impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> { fn iter_top_level_fields(&self) -> impl Iterator> { let mut it = self.content.iter(); @@ -121,10 +130,18 @@ impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> { rtxn: &'t RoTxn, index: &'t Index, db_fields_ids_map: &'t Mapper, + doc_alloc: &'t Bump, ) -> Result> { - index.documents.get(rtxn, &docid).map_err(crate::Error::from).map(|reader| { - reader.map(|reader| Self { fields_ids_map: db_fields_ids_map, content: reader }) - }) + match index.compressed_document(rtxn, docid)? { + Some(compressed) => { + let content = match index.document_decompression_dictionary(rtxn)? { + Some(dictionary) => compressed.decompress_into_bump(doc_alloc, &dictionary)?, + None => compressed.as_non_compressed(), + }; + Ok(Some(Self { fields_ids_map: db_fields_ids_map, content })) + } + None => Ok(None), + } } pub fn field(&self, name: &str) -> Result> { @@ -188,9 +205,10 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> { rtxn: &'t RoTxn, index: &'t Index, db_fields_ids_map: &'t Mapper, + doc_alloc: &'t Bump, new_doc: DocumentFromVersions<'a, 'doc>, ) -> Result { - let db = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?; + let db = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map, doc_alloc)?; Ok(Self { new_doc, db }) } @@ -233,9 +251,10 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d> return Ok(Some(vectors)); } - let Some(db) = self.db else { return Ok(None) }; - - db.vectors_field() + match &self.db { + Some(db) => db.vectors_field(), + None => Ok(None), + } } fn geo_field(&self) -> Result> { @@ -243,9 +262,10 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d> return Ok(Some(geo)); } - let Some(db) = self.db else { return Ok(None) }; - - db.geo_field() + match &self.db { + Some(db) => db.geo_field(), + None => Ok(None), + } } fn top_level_fields_count(&self) -> usize { @@ -256,7 +276,7 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d> if let Some(f) = self.new_doc.top_level_field(k)? { return Ok(Some(f)); } - if let Some(db) = self.db { + if let Some(db) = &self.db { return db.field(k); } Ok(None) diff --git a/crates/milli/src/update/new/document_change.rs b/crates/milli/src/update/new/document_change.rs index 1644b2254..c209ae67e 100644 --- a/crates/milli/src/update/new/document_change.rs +++ b/crates/milli/src/update/new/document_change.rs @@ -72,8 +72,9 @@ impl<'doc> Deletion<'doc> { rtxn: &'a RoTxn, index: &'a Index, mapper: &'a Mapper, + doc_alloc: &'a Bump, ) -> Result> { - Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or( + Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper, doc_alloc)?.ok_or( crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, )?) } @@ -91,6 +92,7 @@ impl<'doc> Insertion<'doc> { pub fn external_document_id(&self) -> &'doc str { self.external_document_id } + pub fn inserted(&self) -> DocumentFromVersions<'_, 'doc> { DocumentFromVersions::new(&self.new) } @@ -126,8 +128,9 @@ impl<'doc> Update<'doc> { rtxn: &'a RoTxn, index: &'a Index, mapper: &'a Mapper, + doc_alloc: &'a Bump, ) -> Result> { - Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or( + Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper, doc_alloc)?.ok_or( crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, )?) } @@ -153,6 +156,7 @@ impl<'doc> Update<'doc> { rtxn: &'t RoTxn, index: &'t Index, mapper: &'t Mapper, + doc_alloc: &'t Bump, ) -> Result> { if self.has_deletion { Ok(MergedDocument::without_db(DocumentFromVersions::new(&self.new))) @@ -162,6 +166,7 @@ impl<'doc> Update<'doc> { rtxn, index, mapper, + doc_alloc, DocumentFromVersions::new(&self.new), ) } @@ -177,6 +182,7 @@ impl<'doc> Update<'doc> { rtxn: &'t RoTxn, index: &'t Index, mapper: &'t Mapper, + doc_alloc: &'t Bump, ) -> Result { let mut changed = false; let mut cached_current = None; @@ -192,7 +198,7 @@ impl<'doc> Update<'doc> { updated_selected_field_count += 1; let current = match cached_current { Some(current) => current, - None => self.current(rtxn, index, mapper)?, + None => self.current(rtxn, index, mapper, doc_alloc)?, }; let current_value = current.top_level_field(key)?; let Some(current_value) = current_value else { @@ -222,7 +228,7 @@ impl<'doc> Update<'doc> { let has_deleted_fields = { let current = match cached_current { Some(current) => current, - None => self.current(rtxn, index, mapper)?, + None => self.current(rtxn, index, mapper, doc_alloc)?, }; let mut current_selected_field_count = 0; diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index b38e2d79a..02f0481bb 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -63,6 +63,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> { &context.rtxn, context.index, &context.db_fields_ids_map, + &context.doc_alloc, )?; let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); @@ -79,8 +80,12 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> { } DocumentChange::Update(update) => { let docid = update.docid(); - let content = - update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?; + let content = update.current( + &context.rtxn, + context.index, + &context.db_fields_ids_map, + &context.doc_alloc, + )?; let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); for res in content.iter_top_level_fields().chain(geo_iter) { @@ -103,8 +108,12 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> { *entry += 1; } - let content = - update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?; + let content = update.merged( + &context.rtxn, + context.index, + &context.db_fields_ids_map, + &context.doc_alloc, + )?; let vector_content = update.merged_vectors( &context.rtxn, context.index, diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs index 66ed6cbfb..bbb53955f 100644 --- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs @@ -79,7 +79,7 @@ impl FacetedDocidsExtractor { let res = match document_change { DocumentChange::Deletion(inner) => extract_document_facets( attributes_to_extract, - inner.current(rtxn, index, context.db_fields_ids_map)?, + inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?, inner.external_document_id(), new_fields_ids_map.deref_mut(), &mut |fid, depth, value| { @@ -102,13 +102,14 @@ impl FacetedDocidsExtractor { rtxn, index, context.db_fields_ids_map, + &context.doc_alloc, )? { return Ok(()); } extract_document_facets( attributes_to_extract, - inner.current(rtxn, index, context.db_fields_ids_map)?, + inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?, inner.external_document_id(), new_fields_ids_map.deref_mut(), &mut |fid, depth, value| { @@ -128,7 +129,7 @@ impl FacetedDocidsExtractor { extract_document_facets( attributes_to_extract, - inner.merged(rtxn, index, context.db_fields_ids_map)?, + inner.merged(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?, inner.external_document_id(), new_fields_ids_map.deref_mut(), &mut |fid, depth, value| { diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs index a3820609d..4ecb78ba0 100644 --- a/crates/milli/src/update/new/extract/geo/mod.rs +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -158,6 +158,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { let index = context.index; let max_memory = self.grenad_parameters.max_memory_by_thread(); let db_fields_ids_map = context.db_fields_ids_map; + let doc_alloc = &context.doc_alloc; let mut data_ref = context.data.borrow_mut_or_yield(); for change in changes { @@ -173,7 +174,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); let external_id = deletion.external_document_id(); - let current = deletion.current(rtxn, index, db_fields_ids_map)?; + let current = deletion.current(rtxn, index, db_fields_ids_map, doc_alloc)?; let current_geo = current .geo_field()? .map(|geo| extract_geo_coordinates(external_id, geo)) @@ -188,7 +189,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { } } DocumentChange::Update(update) => { - let current = update.current(rtxn, index, db_fields_ids_map)?; + let current = update.current(rtxn, index, db_fields_ids_map, doc_alloc)?; let external_id = update.external_document_id(); let docid = update.docid(); diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs index 952ee91e4..c8a11923b 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -339,7 +339,7 @@ impl WordDocidsExtractors { ) }; document_tokenizer.tokenize_document( - inner.current(rtxn, index, context.db_fields_ids_map)?, + inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?, new_fields_ids_map, &mut token_fn, )?; @@ -350,6 +350,7 @@ impl WordDocidsExtractors { &context.rtxn, context.index, context.db_fields_ids_map, + &context.doc_alloc, )? { return Ok(()); } @@ -365,7 +366,7 @@ impl WordDocidsExtractors { ) }; document_tokenizer.tokenize_document( - inner.current(rtxn, index, context.db_fields_ids_map)?, + inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?, new_fields_ids_map, &mut token_fn, )?; @@ -381,7 +382,7 @@ impl WordDocidsExtractors { ) }; document_tokenizer.tokenize_document( - inner.merged(rtxn, index, context.db_fields_ids_map)?, + inner.merged(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?, new_fields_ids_map, &mut token_fn, )?; diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index e58c0efd2..422f11037 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -58,7 +58,8 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { let docid = document_change.docid(); match document_change { DocumentChange::Deletion(inner) => { - let document = inner.current(rtxn, index, context.db_fields_ids_map)?; + let document = + inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?; process_document_tokens( document, document_tokenizer, @@ -75,11 +76,13 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { rtxn, index, context.db_fields_ids_map, + &context.doc_alloc, )? { return Ok(()); } - let document = inner.current(rtxn, index, context.db_fields_ids_map)?; + let document = + inner.current(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?; process_document_tokens( document, document_tokenizer, @@ -89,7 +92,8 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { del_word_pair_proximity.push(((w1, w2), prox)); }, )?; - let document = inner.merged(rtxn, index, context.db_fields_ids_map)?; + let document = + inner.merged(rtxn, index, context.db_fields_ids_map, &context.doc_alloc)?; process_document_tokens( document, document_tokenizer, diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 2a72a1650..bebe23c90 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -135,6 +135,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> { &context.rtxn, context.index, context.db_fields_ids_map, + &context.doc_alloc, )?, context.new_fields_ids_map, &context.doc_alloc, @@ -145,6 +146,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> { &context.rtxn, context.index, context.db_fields_ids_map, + &context.doc_alloc, )?, context.new_fields_ids_map, &context.doc_alloc, @@ -165,6 +167,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> { &context.rtxn, context.index, context.db_fields_ids_map, + &context.doc_alloc, )?, context.new_fields_ids_map, &context.doc_alloc, @@ -175,6 +178,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> { &context.rtxn, context.index, context.db_fields_ids_map, + &context.doc_alloc, )?, context.new_fields_ids_map, &context.doc_alloc, diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index a45fcee85..4110a2d8f 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -5,6 +5,7 @@ use std::sync::{Arc, RwLock}; use bumpalo::Bump; use heed::RoTxn; use rayon::iter::IndexedParallelIterator; +use zstd::dict::DecoderDictionary; use super::super::document_change::DocumentChange; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; @@ -105,7 +106,7 @@ pub trait Extractor<'extractor>: Sync { fn process<'doc>( &'doc self, changes: impl Iterator>>, - context: &'doc DocumentChangeContext, + context: &'doc DocumentChangeContext<'_, 'extractor, '_, '_, Self::Data>, ) -> Result<()>; } @@ -121,8 +122,10 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload self.len() == 0 } - fn item_to_document_change<'doc, // lifetime of a single `process` call - T: MostlySend>( + fn item_to_document_change< + 'doc, // lifetime of a single `process` call + T: MostlySend, + >( &'doc self, context: &'doc DocumentChangeContext, item: &'doc Self::Item, @@ -140,6 +143,7 @@ pub struct IndexingContext< { pub index: &'index Index, pub db_fields_ids_map: &'indexer FieldsIdsMap, + pub db_document_decompression_dictionary: Option<&'indexer DecoderDictionary<'static>>, pub new_fields_ids_map: &'fid RwLock, pub doc_allocs: &'indexer ThreadLocal>>, pub fields_ids_map_store: &'indexer ThreadLocal>>>, @@ -202,6 +206,7 @@ pub fn extract< IndexingContext { index, db_fields_ids_map, + db_document_decompression_dictionary, new_fields_ids_map, doc_allocs, fields_ids_map_store, diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index b42a6c859..0094cd2e8 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -64,7 +64,11 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { where 'pl: 'doc, // the payload must survive the process calls { - let current = context.index.document(&context.rtxn, *docid)?; + let compressed = context.index.compressed_document(&context.rtxn, *docid)?.unwrap(); + let current = match context.index.document_decompression_dictionary(&context.rtxn)? { + Some(dict) => compressed.decompress_into_bump(&context.doc_alloc, &dict)?, + None => compressed.as_non_compressed(), + }; let external_document_id = self.primary_key.extract_docid_from_db( current, diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index a850c0d03..8fdef4643 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -1,11 +1,15 @@ +use std::cell::RefCell; use std::cmp::Ordering; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{self, AtomicBool, AtomicUsize}; use std::sync::{OnceLock, RwLock}; use std::thread::{self, Builder}; use big_s::S; +use bumpalo::Bump; use bumparaw_collections::RawMap; -use document_changes::{extract, DocumentChanges, IndexingContext}; +use document_changes::{ + extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, +}; pub use document_deletion::DocumentDeletion; pub use document_operation::{DocumentOperation, PayloadStats}; use hashbrown::HashMap; @@ -17,18 +21,20 @@ use rand::SeedableRng as _; use rustc_hash::FxBuildHasher; use time::OffsetDateTime; pub use update_by_function::UpdateByFunction; +use zstd::dict::{DecoderDictionary, EncoderDictionary}; use super::channel::*; +use super::document::Document as _; use super::extract::*; use super::facet_search_builder::FacetSearchBuilder; use super::merger::FacetFieldIdsDelta; use super::steps::IndexingStep; -use super::thread_local::ThreadLocal; +use super::thread_local::{FullySend, MostlySend, ThreadLocal}; use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; use super::words_prefix_docids::{ compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, }; -use super::StdResult; +use super::{DocumentChange, StdResult}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::facet::FacetType; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; @@ -38,6 +44,7 @@ use crate::proximity::ProximityPrecision; use crate::update::del_add::DelAdd; use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::merger::merge_and_send_rtree; +use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::settings::InnerIndexSettings; @@ -111,6 +118,9 @@ where .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) .unwrap(); + let db_document_decompression_dictionary = index + .document_compression_raw_dictionary(wtxn) + .map(|opt| opt.map(DecoderDictionary::copy))?; let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); let new_fields_ids_map = RwLock::new(new_fields_ids_map); @@ -121,6 +131,7 @@ where let indexing_context = IndexingContext { index, db_fields_ids_map, + db_document_decompression_dictionary: db_document_decompression_dictionary.as_ref(), new_fields_ids_map: &new_fields_ids_map, doc_allocs: &doc_allocs, fields_ids_map_store: &fields_ids_map_store, @@ -128,6 +139,19 @@ where progress, }; + let document_compression_dictionary = pool + .install(|| { + let rtxn = index.read_txn()?; + compute_document_compression_dictionary( + index, + &rtxn, + document_changes, + indexing_context, + &mut extractor_allocs, + ) + }) + .unwrap()?; + let mut index_embeddings = index.embedding_configs(wtxn)?; let mut field_distribution = index.field_distribution(wtxn)?; let mut document_ids = index.documents_ids(wtxn)?; @@ -429,7 +453,7 @@ where while let Some(action) = writer_receiver.recv_action() { if _entered_post_merge.is_none() - && finished_extraction.load(std::sync::atomic::Ordering::Relaxed) + && finished_extraction.load(atomic::Ordering::Relaxed) { _entered_post_merge = Some(span.enter()); } @@ -550,6 +574,141 @@ where Ok(()) } +/// The compression level to use when compressing documents. +const DOCUMENT_COMPRESSION_LEVEL: i32 = 19; +/// The sample size used to generate the document compression dictionary. +const DOCUMENT_COMPRESSION_SAMPLE_SIZE: usize = 10_000; +/// The maximum size the document compression dictionary can be. +const DOCUMENT_COMPRESSION_DICTIONARY_MAX_SIZE: usize = 64_000; +/// The maximum number of documents we accept to compress if they +/// weren't already compressed in the database. If this threshold +/// is reached we do not generate a dictionary and continue as is. +const DOCUMENT_COMPRESSION_COMPRESS_LIMIT: u64 = 5_000_000; + +/// A function dedicated to use the existing or generate an appropriate +/// document compression dictionay based on the documents available in +/// the database and the ones in the payload. +/// +/// If there are too many documents already in the database and no +/// compression dictionary we prefer not to generate a dictionary to avoid +/// compressing all of the documents and potentially blow up disk space. +fn compute_document_compression_dictionary<'pl, 'extractor, DC, MSP, SP>( + index: &Index, + rtxn: &RoTxn<'_>, + document_changes: &DC, + indexing_context: IndexingContext, + extractor_allocs: &'extractor mut ThreadLocal>, +) -> Result>> +where + DC: DocumentChanges<'pl>, + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, +{ + match index.document_compression_raw_dictionary(rtxn)? { + Some(dict) => Ok(Some(EncoderDictionary::copy(dict, DOCUMENT_COMPRESSION_LEVEL))), + None if index.number_of_documents(rtxn)? >= DOCUMENT_COMPRESSION_COMPRESS_LIMIT => Ok(None), + None => { + let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + let extractor = CompressorExtractor { + total_documents_to_extract: DOCUMENT_COMPRESSION_SAMPLE_SIZE, + extracted_documents_count: AtomicUsize::new(0), + }; + + todo!("collect the documents samples from the database first (or after)"); + + // This extraction only takes care about documents replacement + // and not update (merges). The merged documents are ignore as + // we will only use the previous version of them in the database. + extract( + document_changes, + &extractor, + indexing_context, + extractor_allocs, + &datastore, + Step::PreparingCompressionDictionary, + )?; + + let mut sample_data = Vec::new(); + let mut sample_sizes = Vec::new(); + for data in datastore { + let CompressorExtractorData { buffer, must_stop: _ } = data.into_inner(); + let mut subsample_size = 0; + for subsample in buffer { + sample_data.extend_from_slice(subsample); + subsample_size += subsample.len(); + } + sample_sizes.push(subsample_size); + } + + let dictionary = zstd::dict::from_continuous( + &sample_data, + &sample_sizes, + DOCUMENT_COMPRESSION_DICTIONARY_MAX_SIZE, + )?; + + Ok(Some(EncoderDictionary::copy(&dictionary, DOCUMENT_COMPRESSION_LEVEL))) + } + } +} + +struct CompressorExtractor { + total_documents_to_extract: usize, + extracted_documents_count: AtomicUsize, +} + +#[derive(Default)] +struct CompressorExtractorData<'extractor> { + buffer: Vec<&'extractor [u8]>, + /// We extracted the expected count of documents, we can skip everything now. + must_stop: bool, +} + +unsafe impl<'extractor> MostlySend for RefCell> {} + +impl<'extractor> Extractor<'extractor> for CompressorExtractor { + type Data = RefCell>; + + fn init_data<'doc>( + &'doc self, + _extractor_alloc: &'extractor bumpalo::Bump, + ) -> crate::Result { + Ok(RefCell::new(CompressorExtractorData::default())) + } + + fn process<'doc>( + &'doc self, + changes: impl Iterator>>, + context: &'doc DocumentChangeContext<'_, 'extractor, '_, '_, Self::Data>, + ) -> crate::Result<()> { + let mut data = context.data.borrow_mut_or_yield(); + + for change in changes { + if data.must_stop { + return Ok(()); + } + + let change = change?; + match change { + DocumentChange::Deletion(_) => (), + DocumentChange::Update(_) => (), + DocumentChange::Insertion(insertion) => { + for result in insertion.inserted().iter_top_level_fields() { + let (_field_name, raw_value) = result?; + let bytes = raw_value.get().as_bytes(); + data.buffer.push(context.extractor_alloc.alloc_slice_copy(bytes)); + } + + let previous_count = + self.extracted_documents_count.fetch_add(1, atomic::Ordering::SeqCst); + data.must_stop = previous_count >= self.total_documents_to_extract; + } + } + } + + Ok(()) + } +} + /// A function dedicated to manage all the available BBQueue frames. /// /// It reads all the available frames, do the corresponding database operations diff --git a/crates/milli/src/update/new/indexer/update_by_function.rs b/crates/milli/src/update/new/indexer/update_by_function.rs index 3001648e6..d6e442665 100644 --- a/crates/milli/src/update/new/indexer/update_by_function.rs +++ b/crates/milli/src/update/new/indexer/update_by_function.rs @@ -105,7 +105,11 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { // safety: Both documents *must* exists in the database as // their IDs comes from the list of documents ids. - let document = index.document(txn, docid)?; + let compressed_document = index.compressed_document(txn, docid)?.unwrap(); + let document = match index.document_decompression_dictionary(txn)? { + Some(dictionary) => compressed_document.decompress_into_bump(doc_alloc, &dictionary)?, + None => compressed_document.as_non_compressed(), + }; let rhai_document = obkv_to_rhaimap(document, db_fields_ids_map)?; let json_document = all_obkv_to_json(document, db_fields_ids_map)?; diff --git a/crates/milli/src/update/new/parallel_iterator_ext.rs b/crates/milli/src/update/new/parallel_iterator_ext.rs index ff69d7acf..80b9ace68 100644 --- a/crates/milli/src/update/new/parallel_iterator_ext.rs +++ b/crates/milli/src/update/new/parallel_iterator_ext.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use rayon::iter::ParallelIterator; pub trait ParallelIteratorExt: ParallelIterator { - /// A method to run a closure of all the items and return an owned error. + /// A method to run a closure on all the items and return an owned error. /// - /// The init function is ran only as necessary which is basically once by thread. + /// The init function is ran only as necessary which is basically once per thread. fn try_arc_for_each_try_init(self, init: INIT, op: F) -> Result<(), E> where E: Send + Sync, diff --git a/crates/milli/src/update/new/steps.rs b/crates/milli/src/update/new/steps.rs index 9eb7d376d..16912b3d5 100644 --- a/crates/milli/src/update/new/steps.rs +++ b/crates/milli/src/update/new/steps.rs @@ -8,6 +8,7 @@ use crate::progress::Step; #[repr(u8)] pub enum IndexingStep { PreparingPayloads, + PreparingCompressionDictionary, ExtractingDocuments, ExtractingFacets, ExtractingWords, @@ -26,6 +27,7 @@ impl Step for IndexingStep { fn name(&self) -> Cow<'static, str> { match self { IndexingStep::PreparingPayloads => "preparing update file", + IndexingStep::PreparingCompressionDictionary => "preparing documents compression dictionary", IndexingStep::ExtractingDocuments => "extracting documents", IndexingStep::ExtractingFacets => "extracting facets", IndexingStep::ExtractingWords => "extracting words", diff --git a/crates/milli/src/update/new/vector_document.rs b/crates/milli/src/update/new/vector_document.rs index 8d14a749d..498a3f2a2 100644 --- a/crates/milli/src/update/new/vector_document.rs +++ b/crates/milli/src/update/new/vector_document.rs @@ -98,7 +98,8 @@ impl<'t> VectorDocumentFromDb<'t> { db_fields_ids_map: &'t Mapper, doc_alloc: &'t Bump, ) -> Result> { - let Some(document) = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)? else { + let Some(document) = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map, doc_alloc)? + else { return Ok(None); }; let vectors = document.vectors_field()?; diff --git a/milli/examples/search.rs b/milli/examples/search.rs deleted file mode 100644 index e69de29bb..000000000