Simplify optional document decompression usage

This commit is contained in:
Clément Renault 2024-07-03 15:05:14 +02:00
parent e18b06ddda
commit e95e47d258
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
7 changed files with 37 additions and 43 deletions

View File

@ -922,11 +922,10 @@ impl IndexScheduler {
}
let (id, compressed) = ret?;
let doc = match dictionary.as_ref() {
// TODO manage this unwrap correctly
Some(dict) => compressed.decompress_with(&mut buffer, dict)?,
None => compressed.as_non_compressed(),
};
let doc = compressed.decompress_with_optional_dictionary(
&mut buffer,
dictionary.as_ref(),
)?;
let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;

View File

@ -612,11 +612,8 @@ fn some_documents<'a, 't: 'a>(
Ok(index.iter_compressed_documents(rtxn, doc_ids)?.map(move |ret| {
ret.map_err(ResponseError::from).and_then(
|(key, compressed_document)| -> Result<_, ResponseError> {
let document = match dictionary.as_ref() {
// TODO manage this unwrap correctly
Some(dict) => compressed_document.decompress_with(&mut buffer, dict).unwrap(),
None => compressed_document.as_non_compressed(),
};
let document = compressed_document
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?;
let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, document)?;
match retrieve_vectors {
RetrieveVectors::Ignore => {}

View File

@ -55,10 +55,23 @@ impl<'a> CompressedKvReaderU16<'a> {
Ok(KvReaderU16::new(&buffer[..size]))
}
/// Returns the KvReader like it is not compressed. Happends when there is no dictionnary yet.
/// Returns the KvReader like it is not compressed.
/// Happends when there is no dictionary yet.
pub fn as_non_compressed(&self) -> KvReaderU16<'a> {
KvReaderU16::new(self.0)
}
/// Decompresses this KvReader if necessary.
pub fn decompress_with_optional_dictionary<'b>(
&'b self,
buffer: &'b mut Vec<u8>,
dictionary: Option<&DecoderDictionary>,
) -> io::Result<KvReaderU16<'b>> {
match dictionary {
Some(dict) => self.decompress_with(buffer, dict),
None => Ok(self.as_non_compressed()),
}
}
}
pub struct CompressedKvWriterU16(Vec<u8>);

View File

@ -1346,10 +1346,8 @@ impl Index {
let mut buffer = Vec::new();
Ok(self.iter_compressed_documents(rtxn, ids)?.map(move |entry| -> Result<_> {
let (_docid, compressed_obkv) = entry?;
let obkv = match dictionary.as_ref() {
Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict)?,
None => compressed_obkv.as_non_compressed(),
};
let obkv = compressed_obkv
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?;
match primary_key.document_id(&obkv, &fields)? {
Ok(document_id) => Ok(document_id),
Err(_) => Err(InternalError::DocumentsError(
@ -2481,10 +2479,8 @@ pub(crate) mod tests {
let dictionary = index.document_compression_dictionary(&rtxn).unwrap();
let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [0]).unwrap()[0];
let mut buffer = Vec::new();
let obkv = match dictionary {
Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict).unwrap(),
None => compressed_obkv.as_non_compressed(),
};
let obkv =
compressed_obkv.decompress_with_optional_dictionary(&mut buffer, dictionary).unwrap();
let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap();
insta::assert_debug_snapshot!(json, @r###"
{
@ -2494,10 +2490,8 @@ pub(crate) mod tests {
// Furthermore, when we retrieve document 34, it is not the result of merging 35 with 34
let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [2]).unwrap()[0];
let obkv = match dictionary {
Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict).unwrap(),
None => compressed_obkv.as_non_compressed(),
};
let obkv =
compressed_obkv.decompress_with_optional_dictionary(&mut buffer, dictionary).unwrap();
let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap();
insta::assert_debug_snapshot!(json, @r###"
{

View File

@ -27,10 +27,8 @@ fn collect_field_values(
let mut buffer = Vec::new();
let dictionary = index.document_compression_dictionary(txn).unwrap();
for (_id, compressed_doc) in index.compressed_documents(txn, docids.iter().copied()).unwrap() {
let doc = match dictionary {
Some(dict) => compressed_doc.decompress_with(&mut buffer, dict).unwrap(),
None => compressed_doc.as_non_compressed(),
};
let doc =
compressed_doc.decompress_with_optional_dictionary(&mut buffer, dictionary).unwrap();
if let Some(v) = doc.get(fid) {
let v: serde_json::Value = serde_json::from_slice(v).unwrap();
let v = v.to_string();

View File

@ -412,10 +412,9 @@ pub fn snap_documents(index: &Index) -> String {
for result in index.all_compressed_documents(&rtxn).unwrap() {
let (_id, compressed_document) = result.unwrap();
let document = match dictionary {
Some(dict) => compressed_document.decompress_with(&mut buffer, dict).unwrap(),
None => compressed_document.as_non_compressed(),
};
let document = compressed_document
.decompress_with_optional_dictionary(&mut buffer, dictionary)
.unwrap();
let doc = obkv_to_json(&display, &fields_ids_map, document).unwrap();
snap.push_str(&serde_json::to_string(&doc).unwrap());
snap.push('\n');

View File

@ -255,13 +255,10 @@ impl<'a, 'i> Transform<'a, 'i> {
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?;
let base_obkv = match dictionary.as_ref() {
// TODO manage this unwrap correctly
Some(dict) => {
base_compressed_obkv.decompress_with(&mut decompression_buffer, dict)?
}
None => base_compressed_obkv.as_non_compressed(),
};
let base_obkv = base_compressed_obkv.decompress_with_optional_dictionary(
&mut decompression_buffer,
dictionary.as_ref(),
)?;
// we check if the two documents are exactly equal. If it's the case we can skip this document entirely
if base_obkv.as_bytes() == obkv_buffer {
@ -1053,11 +1050,8 @@ impl<'a, 'i> Transform<'a, 'i> {
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?;
let old_obkv = match dictionary.as_ref() {
// TODO manage this unwrap correctly
Some(dict) => old_compressed_obkv.decompress_with(&mut buffer, dict).unwrap(),
None => old_compressed_obkv.as_non_compressed(),
};
let old_obkv = old_compressed_obkv
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?;
let injected_vectors: std::result::Result<
serde_json::Map<String, serde_json::Value>,