Fix merging of documents to support compressed documents

This commit is contained in:
Clément Renault 2024-07-02 15:50:22 +02:00
parent 767f20e30d
commit b15e8aacb6
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -168,10 +168,12 @@ impl<'a, 'i> Transform<'a, 'i> {
let external_documents_ids = self.index.external_documents_ids(); let external_documents_ids = self.index.external_documents_ids();
let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
let dictionary = self.index.document_compression_dictionary(wtxn)?;
let primary_key = cursor.primary_key().to_string(); let primary_key = cursor.primary_key().to_string();
let primary_key_id = let primary_key_id =
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
let mut decompression_buffer = Vec::new();
let mut obkv_buffer = Vec::new(); let mut obkv_buffer = Vec::new();
let mut document_sorter_value_buffer = Vec::new(); let mut document_sorter_value_buffer = Vec::new();
let mut document_sorter_key_buffer = Vec::new(); let mut document_sorter_key_buffer = Vec::new();
@ -247,18 +249,20 @@ impl<'a, 'i> Transform<'a, 'i> {
let mut skip_insertion = false; let mut skip_insertion = false;
if let Some(original_docid) = original_docid { if let Some(original_docid) = original_docid {
let original_key = original_docid; let original_key = original_docid;
let base_obkv = self let base_compressed_obkv = self.index.documents.get(wtxn, &original_key)?.ok_or(
.index InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
.documents )?;
.remap_data_type::<heed::types::Bytes>()
.get(wtxn, &original_key)? let base_obkv = match dictionary {
.ok_or(InternalError::DatabaseMissingEntry { // TODO manage this unwrap correctly
db_name: db_name::DOCUMENTS, Some(dict) => base_compressed_obkv
key: None, .decompress_with(&mut decompression_buffer, dict)
})?; .unwrap(),
None => base_compressed_obkv.as_non_compressed(),
};
// we check if the two documents are exactly equal. If it's the case we can skip this document entirely // we check if the two documents are exactly equal. If it's the case we can skip this document entirely
if base_obkv == obkv_buffer { if base_obkv.as_bytes() == obkv_buffer {
// we're not replacing anything // we're not replacing anything
self.replaced_documents_ids.remove(original_docid); self.replaced_documents_ids.remove(original_docid);
// and we need to put back the original id as it was before // and we need to put back the original id as it was before
@ -278,13 +282,12 @@ impl<'a, 'i> Transform<'a, 'i> {
document_sorter_value_buffer.clear(); document_sorter_value_buffer.clear();
document_sorter_value_buffer.push(Operation::Addition as u8); document_sorter_value_buffer.push(Operation::Addition as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(base_obkv), base_obkv,
deladd_operation, deladd_operation,
&mut document_sorter_value_buffer, &mut document_sorter_value_buffer,
)?; )?;
self.original_sorter self.original_sorter
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
let base_obkv = KvReader::new(base_obkv);
if let Some(flattened_obkv) = if let Some(flattened_obkv) =
Self::flatten_from_fields_ids_map(&base_obkv, &mut self.fields_ids_map)? Self::flatten_from_fields_ids_map(&base_obkv, &mut self.fields_ids_map)?
{ {