From 12323d610e33b1f0dcdaa97ddc90c5b59b599417 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Tue, 31 Oct 2023 16:46:16 +0100 Subject: [PATCH] Change the original document sorter key from the internal docid to a concatenation of the internal and the external docid --- milli/src/update/index_documents/mod.rs | 2 + milli/src/update/index_documents/transform.rs | 116 ++++++++++-------- 2 files changed, 69 insertions(+), 49 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index c32f907b2..129b67cf0 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -1387,6 +1387,8 @@ mod tests { index.add_documents(documents!({ "a" : { "b" : { "c" : 1 }}})).unwrap(); let rtxn = index.read_txn().unwrap(); + let all_documents_count = index.all_documents(&rtxn).unwrap().count(); + assert_eq!(all_documents_count, 1); let external_documents_ids = index.external_documents_ids(); assert!(external_documents_ids.get(&rtxn, "1").unwrap().is_some()); } diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 23b5c78c1..3863d5a54 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -174,7 +174,8 @@ impl<'a, 'i> Transform<'a, 'i> { self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; let mut obkv_buffer = Vec::new(); - let mut document_sorter_buffer = Vec::new(); + let mut document_sorter_value_buffer = Vec::new(); + let mut document_sorter_key_buffer = Vec::new(); let mut documents_count = 0; let mut docid_buffer: Vec = Vec::new(); let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new(); @@ -268,57 +269,64 @@ impl<'a, 'i> Transform<'a, 'i> { // we associate the base document with the new key, everything will get merged later. let keep_original_version = self.index_documents_method == IndexDocumentsMethod::UpdateDocuments; - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( KvReaderU16::new(base_obkv), true, keep_original_version, - &mut document_sorter_buffer, + &mut document_sorter_value_buffer, )?; - self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + self.original_sorter.insert(&document_sorter_key_buffer, &document_sorter_buffer)?; let base_obkv = KvReader::new(base_obkv); if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? { // we recreate our buffer with the flattened documents - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( KvReaderU16::new(&flattened_obkv), true, keep_original_version, - &mut document_sorter_buffer, + &mut document_sorter_value_buffer, )?; } - self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; } } if !skip_insertion { self.new_documents_ids.insert(docid); - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( KvReaderU16::new(&obkv_buffer), false, true, - &mut document_sorter_buffer, + &mut document_sorter_value_buffer, )?; // We use the extracted/generated user id as the key for this document. - self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + self.original_sorter + .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; let flattened_obkv = KvReader::new(&obkv_buffer); if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( KvReaderU16::new(&obkv), false, true, - &mut document_sorter_buffer, + &mut document_sorter_value_buffer, )? } - self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; } documents_count += 1; @@ -372,37 +380,42 @@ impl<'a, 'i> Transform<'a, 'i> { let external_documents_ids = self.index.external_documents_ids(); let mut documents_deleted = 0; - let mut document_sorter_buffer = Vec::new(); + let mut document_sorter_value_buffer = Vec::new(); + let mut document_sorter_key_buffer = Vec::new(); for to_remove in to_remove { if should_abort() { return Err(Error::InternalError(InternalError::AbortedIndexation)); } // Check if the document has been added in the current indexing process. - let deleted_from_current = match self - .new_external_documents_ids_builder - .entry((*to_remove).into()) - { - // if the document was added in a previous iteration of the transform we make it as deleted in the sorters. - HEntry::Occupied(entry) => { - let doc_id = *entry.get() as u32; - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Deletion as u8); - obkv::KvWriterU16::new(&mut document_sorter_buffer).finish().unwrap(); - self.original_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?; - self.flattened_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?; + let deleted_from_current = + match self.new_external_documents_ids_builder.entry((*to_remove).into()) { + // if the document was added in a previous iteration of the transform we make it as deleted in the sorters. + HEntry::Occupied(entry) => { + let docid = *entry.get() as u32; + // Key is the concatenation of the internal docid and the external one. + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(to_remove.as_bytes()); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Deletion as u8); + obkv::KvWriterU16::new(&mut document_sorter_value_buffer).finish().unwrap(); + self.original_sorter + .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; + self.flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; - // we must NOT update the list of replaced_documents_ids - // Either: - // 1. It's already in it and there is nothing to do - // 2. It wasn't in it because the document was created by a previous batch and since - // we're removing it there is nothing to do. - self.new_documents_ids.remove(doc_id); - entry.remove_entry(); - true - } - HEntry::Vacant(_) => false, - }; + // we must NOT update the list of replaced_documents_ids + // Either: + // 1. It's already in it and there is nothing to do + // 2. It wasn't in it because the document was created by a previous batch and since + // we're removing it there is nothing to do. + self.new_documents_ids.remove(docid); + entry.remove_entry(); + true + } + HEntry::Vacant(_) => false, + }; // If the document was already in the db we mark it as a `to_delete` document. // Then we push the document in sorters in deletion mode. @@ -422,31 +435,36 @@ impl<'a, 'i> Transform<'a, 'i> { key: None, })?; + // Key is the concatenation of the internal docid and the external one. + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(to_remove.as_bytes()); // push it as to delete in the original_sorter - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Deletion as u8); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Deletion as u8); into_del_add_obkv( KvReaderU16::new(base_obkv), true, false, - &mut document_sorter_buffer, + &mut document_sorter_value_buffer, )?; - self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + self.original_sorter + .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; // flatten it and push it as to delete in the flattened_sorter let flattened_obkv = KvReader::new(base_obkv); if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { // we recreate our buffer with the flattened documents - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Deletion as u8); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Deletion as u8); into_del_add_obkv( KvReaderU16::new(&obkv), true, false, - &mut document_sorter_buffer, + &mut document_sorter_value_buffer, )?; } - self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; true }