diff --git a/fuzzers/src/bin/fuzz-indexing.rs b/fuzzers/src/bin/fuzz-indexing.rs index baf705709..9e344ac8d 100644 --- a/fuzzers/src/bin/fuzz-indexing.rs +++ b/fuzzers/src/bin/fuzz-indexing.rs @@ -110,7 +110,7 @@ fn main() { // after executing a batch we check if the database is corrupted let res = index.search(&wtxn).execute().unwrap(); - index.documents(&wtxn, res.documents_ids).unwrap(); + index.compressed_documents(&wtxn, res.documents_ids).unwrap(); progression.fetch_add(1, Ordering::Relaxed); } wtxn.abort(); diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index cd5525eea..934200980 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -908,16 +908,23 @@ impl IndexScheduler { let mut index_dumper = dump.create_index(uid, &metadata)?; let fields_ids_map = index.fields_ids_map(&rtxn)?; + let dictionary = index.document_compression_dictionary(&rtxn)?; let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let embedding_configs = index.embedding_configs(&rtxn)?; + let mut buffer = Vec::new(); // 3.1. Dump the documents - for ret in index.all_documents(&rtxn)? { + for ret in index.all_compressed_documents(&rtxn)? { if self.must_stop_processing.get() { return Err(Error::AbortedTask); } - let (id, doc) = ret?; + let (id, compressed) = ret?; + let doc = match dictionary { + // TODO manage this unwrap correctly + Some(dict) => compressed.decompress_with(&mut buffer, dict).unwrap(), + None => compressed.as_non_compressed(), + }; let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?; diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 0b98cc22a..d841b838c 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -2468,7 +2468,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -2528,7 +2528,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -2907,7 +2907,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -2958,7 +2958,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -3014,7 +3014,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -3132,7 +3132,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -3187,7 +3187,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -3901,7 +3901,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -3972,7 +3972,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -4040,7 +4040,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -4101,7 +4101,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -4166,7 +4166,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -4227,7 +4227,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -4310,7 +4310,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -4396,7 +4396,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -4475,7 +4475,7 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); @@ -5139,7 +5139,7 @@ mod tests { assert_json_snapshot!(embeddings[&simple_hf_name][0] == lab_embed, @"true"); assert_json_snapshot!(embeddings[&fakerest_name][0] == beagle_embed, @"true"); - let doc = index.documents(&rtxn, std::iter::once(0)).unwrap()[0].1; + let doc = index.compressed_documents(&rtxn, std::iter::once(0)).unwrap()[0].1; let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let doc = obkv_to_json( &[ @@ -5216,7 +5216,7 @@ mod tests { // remained beagle assert_json_snapshot!(embeddings[&fakerest_name][0] == beagle_embed, @"true"); - let doc = index.documents(&rtxn, std::iter::once(0)).unwrap()[0].1; + let doc = index.compressed_documents(&rtxn, std::iter::once(0)).unwrap()[0].1; let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let doc = obkv_to_json( &[ diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 7c6cbc85d..3d6a73a46 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -603,44 +603,54 @@ fn some_documents<'a, 't: 'a>( retrieve_vectors: RetrieveVectors, ) -> Result> + 'a, ResponseError> { let fields_ids_map = index.fields_ids_map(rtxn)?; + let dictionary = index.document_compression_dictionary(rtxn)?; let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let embedding_configs = index.embedding_configs(rtxn)?; + let mut buffer = Vec::new(); - Ok(index.iter_documents(rtxn, doc_ids)?.map(move |ret| { - ret.map_err(ResponseError::from).and_then(|(key, document)| -> Result<_, ResponseError> { - let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, document)?; - match retrieve_vectors { - RetrieveVectors::Ignore => {} - RetrieveVectors::Hide => { - document.remove("_vectors"); - } - RetrieveVectors::Retrieve => { - // Clippy is simply wrong - #[allow(clippy::manual_unwrap_or_default)] - let mut vectors = match document.remove("_vectors") { - Some(Value::Object(map)) => map, - _ => Default::default(), - }; - for (name, vector) in index.embeddings(rtxn, key)? { - let user_provided = embedding_configs - .iter() - .find(|conf| conf.name == name) - .is_some_and(|conf| conf.user_provided.contains(key)); - let embeddings = ExplicitVectors { - embeddings: Some(vector.into()), - regenerate: !user_provided, - }; - vectors.insert( - name, - serde_json::to_value(embeddings).map_err(MeilisearchHttpError::from)?, - ); + Ok(index.iter_compressed_documents(rtxn, doc_ids)?.map(move |ret| { + ret.map_err(ResponseError::from).and_then( + |(key, compressed_document)| -> Result<_, ResponseError> { + let document = match dictionary { + // TODO manage this unwrap correctly + Some(dict) => compressed_document.decompress_with(&mut buffer, dict).unwrap(), + None => compressed_document.as_non_compressed(), + }; + let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, document)?; + match retrieve_vectors { + RetrieveVectors::Ignore => {} + RetrieveVectors::Hide => { + document.remove("_vectors"); + } + RetrieveVectors::Retrieve => { + // Clippy is simply wrong + #[allow(clippy::manual_unwrap_or_default)] + let mut vectors = match document.remove("_vectors") { + Some(Value::Object(map)) => map, + _ => Default::default(), + }; + for (name, vector) in index.embeddings(rtxn, key)? { + let user_provided = embedding_configs + .iter() + .find(|conf| conf.name == name) + .is_some_and(|conf| conf.user_provided.contains(key)); + let embeddings = ExplicitVectors { + embeddings: Some(vector.into()), + regenerate: !user_provided, + }; + vectors.insert( + name, + serde_json::to_value(embeddings) + .map_err(MeilisearchHttpError::from)?, + ); + } + document.insert("_vectors".into(), vectors.into()); } - document.insert("_vectors".into(), vectors.into()); } - } - Ok(document) - }) + Ok(document) + }, + ) })) } diff --git a/meilisearch/src/search.rs b/meilisearch/src/search.rs index 2bc87d2ba..9f684fac2 100644 --- a/meilisearch/src/search.rs +++ b/meilisearch/src/search.rs @@ -1123,10 +1123,17 @@ fn make_hits( formatter_builder.crop_marker(format.crop_marker); formatter_builder.highlight_prefix(format.highlight_pre_tag); formatter_builder.highlight_suffix(format.highlight_post_tag); + let compression_dictionary = index.document_compression_dictionary(rtxn)?; + let mut buffer = Vec::new(); let mut documents = Vec::new(); let embedding_configs = index.embedding_configs(rtxn)?; - let documents_iter = index.documents(rtxn, documents_ids)?; - for ((id, obkv), score) in documents_iter.into_iter().zip(document_scores.into_iter()) { + let documents_iter = index.compressed_documents(rtxn, documents_ids)?; + for ((id, compressed), score) in documents_iter.into_iter().zip(document_scores.into_iter()) { + let obkv = match compression_dictionary { + // TODO manage this unwrap correctly + Some(dict) => compressed.decompress_with(&mut buffer, dict).unwrap(), + None => compressed.as_non_compressed(), + }; // First generate a document with all the displayed fields let displayed_document = make_document(&displayed_ids, &fields_ids_map, obkv)?; diff --git a/meilitool/src/main.rs b/meilitool/src/main.rs index 06c4890a5..9325d901c 100644 --- a/meilitool/src/main.rs +++ b/meilitool/src/main.rs @@ -280,7 +280,7 @@ fn export_a_dump( let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); // 4.1. Dump the documents - for ret in index.all_documents(&rtxn)? { + for ret in index.all_compressed_documents(&rtxn)? { let (_id, doc) = ret?; let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?; index_dumper.push_document(&document)?; diff --git a/milli/examples/search.rs b/milli/examples/search.rs index 87020994a..f570525dd 100644 --- a/milli/examples/search.rs +++ b/milli/examples/search.rs @@ -76,7 +76,7 @@ fn main() -> Result<(), Box> { println!("new: {}us, docids: {:?}", elapsed.as_micros(), docs.documents_ids); if print_documents { let documents = index - .documents(&txn, docs.documents_ids.iter().copied()) + .compressed_documents(&txn, docs.documents_ids.iter().copied()) .unwrap() .into_iter() .map(|(id, obkv)| { @@ -96,7 +96,7 @@ fn main() -> Result<(), Box> { } let documents = index - .documents(&txn, docs.documents_ids.iter().copied()) + .compressed_documents(&txn, docs.documents_ids.iter().copied()) .unwrap() .into_iter() .map(|(id, obkv)| { diff --git a/milli/src/heed_codec/compressed_obkv_codec.rs b/milli/src/heed_codec/compressed_obkv_codec.rs index 0fddc40d3..2c0b6d197 100644 --- a/milli/src/heed_codec/compressed_obkv_codec.rs +++ b/milli/src/heed_codec/compressed_obkv_codec.rs @@ -24,6 +24,7 @@ impl heed::BytesEncode<'_> for ObkvCompressedCodec { pub struct CompressedKvReaderU16<'a>(&'a [u8]); impl<'a> CompressedKvReaderU16<'a> { + /// Decompresses the KvReader into the buffer using the provided dictionnary. pub fn decompress_with<'b>( &self, buffer: &'b mut Vec, @@ -38,6 +39,11 @@ impl<'a> CompressedKvReaderU16<'a> { )?; Ok(KvReaderU16::new(&buffer[..size])) } + + /// Returns the KvReader like it is not compressed. Happends when there is no dictionnary yet. + pub fn as_non_compressed(&self) -> KvReaderU16<'a> { + KvReaderU16::new(self.0) + } } pub struct CompressedKvWriterU16(Vec); diff --git a/milli/src/heed_codec/mod.rs b/milli/src/heed_codec/mod.rs index 908a86a29..7956cf9c2 100644 --- a/milli/src/heed_codec/mod.rs +++ b/milli/src/heed_codec/mod.rs @@ -20,6 +20,9 @@ use thiserror::Error; pub use self::beu16_str_codec::BEU16StrCodec; pub use self::beu32_str_codec::BEU32StrCodec; +pub use self::compressed_obkv_codec::{ + CompressedKvReaderU16, CompressedKvWriterU16, ObkvCompressedCodec, +}; pub use self::field_id_word_count_codec::FieldIdWordCountCodec; pub use self::fst_set_codec::FstSetCodec; pub use self::obkv_codec::ObkvCodec; diff --git a/milli/src/index.rs b/milli/src/index.rs index 3886f1857..919fdf852 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -20,7 +20,8 @@ use crate::heed_codec::facet::{ FieldIdCodec, OrderedF64Codec, }; use crate::heed_codec::{ - BEU16StrCodec, FstSetCodec, ScriptLanguageCodec, StrBEU16Codec, StrRefCodec, + BEU16StrCodec, CompressedKvReaderU16, FstSetCodec, ObkvCompressedCodec, ScriptLanguageCodec, + StrBEU16Codec, StrRefCodec, }; use crate::order_by_map::OrderByMap; use crate::proximity::ProximityPrecision; @@ -29,8 +30,8 @@ use crate::vector::{Embedding, EmbeddingConfig}; use crate::{ default_criteria, CboRoaringBitmapCodec, Criterion, DocumentId, ExternalDocumentsIds, FacetDistribution, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldIdWordCountCodec, - FieldidsWeightsMap, GeoPoint, ObkvCodec, Result, RoaringBitmapCodec, RoaringBitmapLenCodec, - Search, U8StrStrCodec, Weight, BEU16, BEU32, BEU64, + FieldidsWeightsMap, GeoPoint, Result, RoaringBitmapCodec, RoaringBitmapLenCodec, Search, + U8StrStrCodec, Weight, BEU16, BEU32, BEU64, }; pub const DEFAULT_MIN_WORD_LEN_ONE_TYPO: u8 = 5; @@ -73,6 +74,7 @@ pub mod main_key { pub const PROXIMITY_PRECISION: &str = "proximity-precision"; pub const EMBEDDING_CONFIGS: &str = "embedding_configs"; pub const SEARCH_CUTOFF: &str = "search_cutoff"; + pub const DOCUMENT_COMPRESSION_DICTIONARY: &str = "document-compression-dictionary"; } pub mod db_name { @@ -172,7 +174,7 @@ pub struct Index { pub vector_arroy: arroy::Database, /// Maps the document id to the document as an obkv store. - pub(crate) documents: Database, + pub(crate) documents: Database, } impl Index { @@ -339,6 +341,29 @@ impl Index { self.env.prepare_for_closing() } + /* document compression dictionary */ + + /// Writes the dictionnary that will further be used to compress the documents. + pub fn put_document_compression_dictionary( + &self, + wtxn: &mut RwTxn, + dictionary: &[u8], + ) -> heed::Result<()> { + self.main.remap_types::().put( + wtxn, + main_key::DOCUMENT_COMPRESSION_DICTIONARY, + dictionary, + ) + } + + /// Returns the optional dictionnary to be used when reading the OBKV documents. + pub fn document_compression_dictionary<'t>( + &self, + rtxn: &'t RoTxn, + ) -> heed::Result> { + self.main.remap_types::().get(rtxn, main_key::DOCUMENT_COMPRESSION_DICTIONARY) + } + /* documents ids */ /// Writes the documents ids that corresponds to the user-ids-documents-ids FST. @@ -1261,36 +1286,36 @@ impl Index { /* documents */ - /// Returns an iterator over the requested documents. The next item will be an error if a document is missing. - pub fn iter_documents<'a, 't: 'a>( + /// Returns an iterator over the requested compressed documents. The next item will be an error if a document is missing. + pub fn iter_compressed_documents<'a, 't: 'a>( &'a self, rtxn: &'t RoTxn<'t>, ids: impl IntoIterator + 'a, - ) -> Result)>> + 'a> { + ) -> Result)>> + 'a> { Ok(ids.into_iter().map(move |id| { - let kv = self + let compressed = self .documents .get(rtxn, &id)? .ok_or(UserError::UnknownInternalDocumentId { document_id: id })?; - Ok((id, kv)) + Ok((id, compressed)) })) } /// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing. - pub fn documents<'t>( + pub fn compressed_documents<'t>( &self, rtxn: &'t RoTxn<'t>, ids: impl IntoIterator, - ) -> Result)>> { - self.iter_documents(rtxn, ids)?.collect() + ) -> Result)>> { + self.iter_compressed_documents(rtxn, ids)?.collect() } /// Returns an iterator over all the documents in the index. - pub fn all_documents<'a, 't: 'a>( + pub fn all_compressed_documents<'a, 't: 'a>( &'a self, rtxn: &'t RoTxn<'t>, - ) -> Result)>> + 'a> { - self.iter_documents(rtxn, self.documents_ids(rtxn)?) + ) -> Result)>> + 'a> { + self.iter_compressed_documents(rtxn, self.documents_ids(rtxn)?) } pub fn external_id_of<'a, 't: 'a>( @@ -1311,8 +1336,15 @@ impl Index { process: "external_id_of", }) })?; - Ok(self.iter_documents(rtxn, ids)?.map(move |entry| -> Result<_> { - let (_docid, obkv) = entry?; + let dictionary = self.document_compression_dictionary(rtxn)?; + let mut buffer = Vec::new(); + Ok(self.iter_compressed_documents(rtxn, ids)?.map(move |entry| -> Result<_> { + let (_docid, compressed_obkv) = entry?; + let obkv = match dictionary { + // TODO manage this unwrap correctly + Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict).unwrap(), + None => compressed_obkv.as_non_compressed(), + }; match primary_key.document_id(&obkv, &fields)? { Ok(document_id) => Ok(document_id), Err(_) => Err(InternalError::DocumentsError( @@ -2441,7 +2473,13 @@ pub(crate) mod tests { "###); let rtxn = index.read_txn().unwrap(); - let (_docid, obkv) = index.documents(&rtxn, [0]).unwrap()[0]; + let dictionary = index.document_compression_dictionary(&rtxn).unwrap(); + let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [0]).unwrap()[0]; + let mut buffer = Vec::new(); + let obkv = match dictionary { + Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict).unwrap(), + None => compressed_obkv.as_non_compressed(), + }; let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap(); insta::assert_debug_snapshot!(json, @r###" { @@ -2450,7 +2488,11 @@ pub(crate) mod tests { "###); // Furthermore, when we retrieve document 34, it is not the result of merging 35 with 34 - let (_docid, obkv) = index.documents(&rtxn, [2]).unwrap()[0]; + let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [2]).unwrap()[0]; + let obkv = match dictionary { + Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict).unwrap(), + None => compressed_obkv.as_non_compressed(), + }; let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap(); insta::assert_debug_snapshot!(json, @r###" { @@ -2657,7 +2699,7 @@ pub(crate) mod tests { } = search.execute().unwrap(); let primary_key_id = index.fields_ids_map(&rtxn).unwrap().id("primary_key").unwrap(); documents_ids.sort_unstable(); - let docs = index.documents(&rtxn, documents_ids).unwrap(); + let docs = index.compressed_documents(&rtxn, documents_ids).unwrap(); let mut all_ids = HashSet::new(); for (_docid, obkv) in docs { let id = obkv.get(primary_key_id).unwrap(); diff --git a/milli/src/search/new/tests/mod.rs b/milli/src/search/new/tests/mod.rs index 0faff9425..2075004db 100644 --- a/milli/src/search/new/tests/mod.rs +++ b/milli/src/search/new/tests/mod.rs @@ -24,8 +24,14 @@ fn collect_field_values( ) -> Vec { let mut values = vec![]; let fid = index.fields_ids_map(txn).unwrap().id(fid).unwrap(); - for doc in index.documents(txn, docids.iter().copied()).unwrap() { - if let Some(v) = doc.1.get(fid) { + let mut buffer = Vec::new(); + let dictionary = index.document_compression_dictionary(txn).unwrap(); + for (_id, compressed_doc) in index.compressed_documents(txn, docids.iter().copied()).unwrap() { + let doc = match dictionary { + Some(dict) => compressed_doc.decompress_with(&mut buffer, dict).unwrap(), + None => compressed_doc.as_non_compressed(), + }; + if let Some(v) = doc.get(fid) { let v: serde_json::Value = serde_json::from_slice(v).unwrap(); let v = v.to_string(); values.push(v); diff --git a/milli/src/snapshot_tests.rs b/milli/src/snapshot_tests.rs index 6635ab2f4..5288a8c94 100644 --- a/milli/src/snapshot_tests.rs +++ b/milli/src/snapshot_tests.rs @@ -407,9 +407,16 @@ pub fn snap_documents(index: &Index) -> String { let rtxn = index.read_txn().unwrap(); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let display = fields_ids_map.ids().collect::>(); + let dictionary = index.document_compression_dictionary(&rtxn).unwrap(); + let mut buffer = Vec::new(); - for document in index.all_documents(&rtxn).unwrap() { - let doc = obkv_to_json(&display, &fields_ids_map, document.unwrap().1).unwrap(); + for result in index.all_compressed_documents(&rtxn).unwrap() { + let (_id, compressed_document) = result.unwrap(); + let document = match dictionary { + Some(dict) => compressed_document.decompress_with(&mut buffer, dict).unwrap(), + None => compressed_document.as_non_compressed(), + }; + let doc = obkv_to_json(&display, &fields_ids_map, document).unwrap(); snap.push_str(&serde_json::to_string(&doc).unwrap()); snap.push('\n'); } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 727c763aa..1eb8f121a 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -834,7 +834,7 @@ mod tests { let rtxn = index.read_txn().unwrap(); let count = index.number_of_documents(&rtxn).unwrap(); assert_eq!(count, 3); - let count = index.all_documents(&rtxn).unwrap().count(); + let count = index.all_compressed_documents(&rtxn).unwrap().count(); assert_eq!(count, 3); drop(rtxn); @@ -861,7 +861,7 @@ mod tests { assert_eq!(count, 1); // Check that we get only one document from the database. - let docs = index.documents(&rtxn, Some(0)).unwrap(); + let docs = index.compressed_documents(&rtxn, Some(0)).unwrap(); assert_eq!(docs.len(), 1); let (id, doc) = docs[0]; assert_eq!(id, 0); @@ -882,7 +882,7 @@ mod tests { assert_eq!(count, 1); // Check that we get only one document from the database. - let docs = index.documents(&rtxn, Some(0)).unwrap(); + let docs = index.compressed_documents(&rtxn, Some(0)).unwrap(); assert_eq!(docs.len(), 1); let (id, doc) = docs[0]; assert_eq!(id, 0); @@ -932,7 +932,7 @@ mod tests { let count = index.number_of_documents(&rtxn).unwrap(); assert_eq!(count, 3); - let docs = index.documents(&rtxn, vec![0, 1, 2]).unwrap(); + let docs = index.compressed_documents(&rtxn, vec![0, 1, 2]).unwrap(); let (_id, obkv) = docs.iter().find(|(_id, kv)| kv.get(0) == Some(br#""kevin""#)).unwrap(); let kevin_uuid: String = serde_json::from_slice(obkv.get(1).unwrap()).unwrap(); drop(rtxn); @@ -946,7 +946,7 @@ mod tests { assert_eq!(count, 3); // the document 0 has been deleted and reinserted with the id 3 - let docs = index.documents(&rtxn, vec![1, 2, 0]).unwrap(); + let docs = index.compressed_documents(&rtxn, vec![1, 2, 0]).unwrap(); let kevin_position = docs.iter().position(|(_, d)| d.get(0).unwrap() == br#""updated kevin""#).unwrap(); assert_eq!(kevin_position, 2); @@ -1088,7 +1088,7 @@ mod tests { let rtxn = index.read_txn().unwrap(); let count = index.number_of_documents(&rtxn).unwrap(); assert_eq!(count, 6); - let count = index.all_documents(&rtxn).unwrap().count(); + let count = index.all_compressed_documents(&rtxn).unwrap().count(); assert_eq!(count, 6); db_snap!(index, word_docids, "updated"); @@ -1506,7 +1506,7 @@ mod tests { index.add_documents(documents!({ "a" : { "b" : { "c" : 1 }}})).unwrap(); let rtxn = index.read_txn().unwrap(); - let all_documents_count = index.all_documents(&rtxn).unwrap().count(); + let all_documents_count = index.all_compressed_documents(&rtxn).unwrap().count(); assert_eq!(all_documents_count, 1); let external_documents_ids = index.external_documents_ids(); assert!(external_documents_ids.get(&rtxn, "1").unwrap().is_some()); @@ -2796,7 +2796,7 @@ mod tests { // Ensuring all the returned IDs actually exists let rtxn = index.read_txn().unwrap(); let res = index.search(&rtxn).execute().unwrap(); - index.documents(&rtxn, res.documents_ids).unwrap(); + index.compressed_documents(&rtxn, res.documents_ids).unwrap(); } fn delete_documents<'t>( @@ -3163,7 +3163,7 @@ mod tests { let deleted_internal_ids = delete_documents(&mut wtxn, &index, &deleted_external_ids); // list all documents - let results = index.all_documents(&wtxn).unwrap(); + let results = index.all_compressed_documents(&wtxn).unwrap(); for result in results { let (id, _) = result.unwrap(); assert!( diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 73fa3ca7b..cbd2a9006 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -1035,15 +1035,24 @@ impl<'a, 'i> Transform<'a, 'i> { if original_sorter.is_some() || flattened_sorter.is_some() { let modified_faceted_fields = settings_diff.modified_faceted_fields(); + let dictionary = self.index.document_compression_dictionary(wtxn)?; + let mut original_obkv_buffer = Vec::new(); let mut flattened_obkv_buffer = Vec::new(); let mut document_sorter_key_buffer = Vec::new(); + let mut buffer = Vec::new(); for result in self.index.external_documents_ids().iter(wtxn)? { let (external_id, docid) = result?; - let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or( + let old_compressed_obkv = self.index.documents.get(wtxn, &docid)?.ok_or( InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, )?; + let old_obkv = match dictionary { + // TODO manage this unwrap correctly + Some(dict) => old_compressed_obkv.decompress_with(&mut buffer, dict).unwrap(), + None => old_compressed_obkv.as_non_compressed(), + }; + let injected_vectors: std::result::Result< serde_json::Map, arroy::Error, diff --git a/milli/src/update/settings.rs b/milli/src/update/settings.rs index 3ad6e658c..f45aacd4b 100644 --- a/milli/src/update/settings.rs +++ b/milli/src/update/settings.rs @@ -1777,7 +1777,7 @@ mod tests { // When we search for something that is in the searchable fields // we must find the appropriate document. let result = index.search(&rtxn).query(r#""kevin""#).execute().unwrap(); - let documents = index.documents(&rtxn, result.documents_ids).unwrap(); + let documents = index.compressed_documents(&rtxn, result.documents_ids).unwrap(); let fid_map = index.fields_ids_map(&rtxn).unwrap(); assert_eq!(documents.len(), 1); assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..])); @@ -1813,7 +1813,7 @@ mod tests { snapshot!(format!("{searchable_fields:?}"), @r###"["id", "name", "age"]"###); let result = index.search(&rtxn).query("23").execute().unwrap(); assert_eq!(result.documents_ids.len(), 1); - let documents = index.documents(&rtxn, result.documents_ids).unwrap(); + let documents = index.compressed_documents(&rtxn, result.documents_ids).unwrap(); assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..])); } @@ -1954,7 +1954,7 @@ mod tests { // Only count the field_id 0 and level 0 facet values. // TODO we must support typed CSVs for numbers to be understood. let fidmap = index.fields_ids_map(&rtxn).unwrap(); - for document in index.all_documents(&rtxn).unwrap() { + for document in index.all_compressed_documents(&rtxn).unwrap() { let document = document.unwrap(); let json = crate::obkv_to_json(&fidmap.ids().collect::>(), &fidmap, document.1) .unwrap(); @@ -2079,7 +2079,7 @@ mod tests { // Run an empty query just to ensure that the search results are ordered. let rtxn = index.read_txn().unwrap(); let SearchResult { documents_ids, .. } = index.search(&rtxn).execute().unwrap(); - let documents = index.documents(&rtxn, documents_ids).unwrap(); + let documents = index.compressed_documents(&rtxn, documents_ids).unwrap(); // Fetch the documents "age" field in the ordre in which the documents appear. let age_field_id = index.fields_ids_map(&rtxn).unwrap().id("age").unwrap(); @@ -2512,7 +2512,7 @@ mod tests { let rtxn = index.read_txn().unwrap(); let SearchResult { documents_ids, .. } = index.search(&rtxn).query("S").execute().unwrap(); let first_id = documents_ids[0]; - let documents = index.documents(&rtxn, documents_ids).unwrap(); + let documents = index.compressed_documents(&rtxn, documents_ids).unwrap(); let (_, content) = documents.iter().find(|(id, _)| *id == first_id).unwrap(); let fid = index.fields_ids_map(&rtxn).unwrap().id("title").unwrap(); @@ -2681,7 +2681,7 @@ mod tests { wtxn.commit().unwrap(); let rtxn = index.write_txn().unwrap(); - let docs: StdResult, _> = index.all_documents(&rtxn).unwrap().collect(); + let docs: StdResult, _> = index.all_compressed_documents(&rtxn).unwrap().collect(); let docs = docs.unwrap(); assert_eq!(docs.len(), 5); } diff --git a/milli/tests/search/query_criteria.rs b/milli/tests/search/query_criteria.rs index 65d403097..59c5e0523 100644 --- a/milli/tests/search/query_criteria.rs +++ b/milli/tests/search/query_criteria.rs @@ -317,7 +317,8 @@ fn criteria_ascdesc() { wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); - let documents = index.all_documents(&rtxn).unwrap().map(|doc| doc.unwrap()).collect::>(); + let documents = + index.all_compressed_documents(&rtxn).unwrap().map(|doc| doc.unwrap()).collect::>(); for criterion in [Asc(S("name")), Desc(S("name")), Asc(S("age")), Desc(S("age"))] { eprintln!("Testing with criterion: {:?}", &criterion);