From e95e47d258ad39d61d6f8f8abc7268ba4fae9128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 3 Jul 2024 15:05:14 +0200 Subject: [PATCH] Simplify optional document decompression usage --- index-scheduler/src/batch.rs | 9 ++++----- meilisearch/src/routes/indexes/documents.rs | 7 ++----- milli/src/heed_codec/compressed_obkv_codec.rs | 15 ++++++++++++++- milli/src/index.rs | 18 ++++++------------ milli/src/search/new/tests/mod.rs | 6 ++---- milli/src/snapshot_tests.rs | 7 +++---- milli/src/update/index_documents/transform.rs | 18 ++++++------------ 7 files changed, 37 insertions(+), 43 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index a00ed5bc8..803101e04 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -922,11 +922,10 @@ impl IndexScheduler { } let (id, compressed) = ret?; - let doc = match dictionary.as_ref() { - // TODO manage this unwrap correctly - Some(dict) => compressed.decompress_with(&mut buffer, dict)?, - None => compressed.as_non_compressed(), - }; + let doc = compressed.decompress_with_optional_dictionary( + &mut buffer, + dictionary.as_ref(), + )?; let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?; diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 68b864ac2..a972b8f1f 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -612,11 +612,8 @@ fn some_documents<'a, 't: 'a>( Ok(index.iter_compressed_documents(rtxn, doc_ids)?.map(move |ret| { ret.map_err(ResponseError::from).and_then( |(key, compressed_document)| -> Result<_, ResponseError> { - let document = match dictionary.as_ref() { - // TODO manage this unwrap correctly - Some(dict) => compressed_document.decompress_with(&mut buffer, dict).unwrap(), - None => compressed_document.as_non_compressed(), - }; + let document = compressed_document + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?; let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, document)?; match retrieve_vectors { RetrieveVectors::Ignore => {} diff --git a/milli/src/heed_codec/compressed_obkv_codec.rs b/milli/src/heed_codec/compressed_obkv_codec.rs index c00c5c927..1603380d0 100644 --- a/milli/src/heed_codec/compressed_obkv_codec.rs +++ b/milli/src/heed_codec/compressed_obkv_codec.rs @@ -55,10 +55,23 @@ impl<'a> CompressedKvReaderU16<'a> { Ok(KvReaderU16::new(&buffer[..size])) } - /// Returns the KvReader like it is not compressed. Happends when there is no dictionnary yet. + /// Returns the KvReader like it is not compressed. + /// Happends when there is no dictionary yet. pub fn as_non_compressed(&self) -> KvReaderU16<'a> { KvReaderU16::new(self.0) } + + /// Decompresses this KvReader if necessary. + pub fn decompress_with_optional_dictionary<'b>( + &'b self, + buffer: &'b mut Vec, + dictionary: Option<&DecoderDictionary>, + ) -> io::Result> { + match dictionary { + Some(dict) => self.decompress_with(buffer, dict), + None => Ok(self.as_non_compressed()), + } + } } pub struct CompressedKvWriterU16(Vec); diff --git a/milli/src/index.rs b/milli/src/index.rs index 8f2fb64ae..b7461c8bb 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -1346,10 +1346,8 @@ impl Index { let mut buffer = Vec::new(); Ok(self.iter_compressed_documents(rtxn, ids)?.map(move |entry| -> Result<_> { let (_docid, compressed_obkv) = entry?; - let obkv = match dictionary.as_ref() { - Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict)?, - None => compressed_obkv.as_non_compressed(), - }; + let obkv = compressed_obkv + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?; match primary_key.document_id(&obkv, &fields)? { Ok(document_id) => Ok(document_id), Err(_) => Err(InternalError::DocumentsError( @@ -2481,10 +2479,8 @@ pub(crate) mod tests { let dictionary = index.document_compression_dictionary(&rtxn).unwrap(); let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [0]).unwrap()[0]; let mut buffer = Vec::new(); - let obkv = match dictionary { - Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict).unwrap(), - None => compressed_obkv.as_non_compressed(), - }; + let obkv = + compressed_obkv.decompress_with_optional_dictionary(&mut buffer, dictionary).unwrap(); let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap(); insta::assert_debug_snapshot!(json, @r###" { @@ -2494,10 +2490,8 @@ pub(crate) mod tests { // Furthermore, when we retrieve document 34, it is not the result of merging 35 with 34 let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [2]).unwrap()[0]; - let obkv = match dictionary { - Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict).unwrap(), - None => compressed_obkv.as_non_compressed(), - }; + let obkv = + compressed_obkv.decompress_with_optional_dictionary(&mut buffer, dictionary).unwrap(); let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap(); insta::assert_debug_snapshot!(json, @r###" { diff --git a/milli/src/search/new/tests/mod.rs b/milli/src/search/new/tests/mod.rs index 2075004db..e5602d667 100644 --- a/milli/src/search/new/tests/mod.rs +++ b/milli/src/search/new/tests/mod.rs @@ -27,10 +27,8 @@ fn collect_field_values( let mut buffer = Vec::new(); let dictionary = index.document_compression_dictionary(txn).unwrap(); for (_id, compressed_doc) in index.compressed_documents(txn, docids.iter().copied()).unwrap() { - let doc = match dictionary { - Some(dict) => compressed_doc.decompress_with(&mut buffer, dict).unwrap(), - None => compressed_doc.as_non_compressed(), - }; + let doc = + compressed_doc.decompress_with_optional_dictionary(&mut buffer, dictionary).unwrap(); if let Some(v) = doc.get(fid) { let v: serde_json::Value = serde_json::from_slice(v).unwrap(); let v = v.to_string(); diff --git a/milli/src/snapshot_tests.rs b/milli/src/snapshot_tests.rs index 5288a8c94..66f7f20f2 100644 --- a/milli/src/snapshot_tests.rs +++ b/milli/src/snapshot_tests.rs @@ -412,10 +412,9 @@ pub fn snap_documents(index: &Index) -> String { for result in index.all_compressed_documents(&rtxn).unwrap() { let (_id, compressed_document) = result.unwrap(); - let document = match dictionary { - Some(dict) => compressed_document.decompress_with(&mut buffer, dict).unwrap(), - None => compressed_document.as_non_compressed(), - }; + let document = compressed_document + .decompress_with_optional_dictionary(&mut buffer, dictionary) + .unwrap(); let doc = obkv_to_json(&display, &fields_ids_map, document).unwrap(); snap.push_str(&serde_json::to_string(&doc).unwrap()); snap.push('\n'); diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 258b8025a..9c684cd8c 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -255,13 +255,10 @@ impl<'a, 'i> Transform<'a, 'i> { InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, )?; - let base_obkv = match dictionary.as_ref() { - // TODO manage this unwrap correctly - Some(dict) => { - base_compressed_obkv.decompress_with(&mut decompression_buffer, dict)? - } - None => base_compressed_obkv.as_non_compressed(), - }; + let base_obkv = base_compressed_obkv.decompress_with_optional_dictionary( + &mut decompression_buffer, + dictionary.as_ref(), + )?; // we check if the two documents are exactly equal. If it's the case we can skip this document entirely if base_obkv.as_bytes() == obkv_buffer { @@ -1053,11 +1050,8 @@ impl<'a, 'i> Transform<'a, 'i> { InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, )?; - let old_obkv = match dictionary.as_ref() { - // TODO manage this unwrap correctly - Some(dict) => old_compressed_obkv.decompress_with(&mut buffer, dict).unwrap(), - None => old_compressed_obkv.as_non_compressed(), - }; + let old_obkv = old_compressed_obkv + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?; let injected_vectors: std::result::Result< serde_json::Map,