From e6ffa4d45447145dac78b560ca7fbef1562c951a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 29 Aug 2024 14:08:31 +0200 Subject: [PATCH] Implement the document merge function for the replace method --- milli/src/update/new/mod.rs | 128 ++++++++++++++++++++++++++---------- 1 file changed, 95 insertions(+), 33 deletions(-) diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index e5d376534..20266267d 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -19,7 +19,7 @@ mod indexer { use roaring::RoaringBitmap; use serde_json::Value; - use super::document_change::{self, DocumentChange, Insertion, Update}; + use super::document_change::{Deletion, DocumentChange, Insertion, Update}; use super::items_pool::ItemsPool; use crate::documents::{ obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, @@ -88,7 +88,7 @@ mod indexer { rtxn: &'a RoTxn, mut fields_ids_map: FieldsIdsMap, primary_key: &'a PrimaryKey<'a>, - ) -> Result + 'a> { + ) -> Result + 'a> { let documents_ids = index.documents_ids(rtxn)?; let mut available_docids = AvailableDocumentsIds::from_documents_ids(&documents_ids); let mut docids_version_offsets = HashMap::::new(); @@ -185,9 +185,16 @@ mod indexer { items, |context_pool, (external_docid, (internal_docid, operations))| { context_pool.with(|rtxn| match self.method { - IndexDocumentsMethod::ReplaceDocuments => todo!(), + IndexDocumentsMethod::ReplaceDocuments => merge_document_for_replacements( + rtxn, + index, + &fields_ids_map, + internal_docid, + external_docid, + &operations, + ), // TODO Remap the documents to match the db fields_ids_map - IndexDocumentsMethod::UpdateDocuments => merge_document_obkv_for_updates( + IndexDocumentsMethod::UpdateDocuments => merge_document_for_updates( rtxn, index, &fields_ids_map, @@ -282,13 +289,12 @@ mod indexer { index: &'a Index, fields: &'a FieldsIdsMap, primary_key: &'a PrimaryKey<'a>, - ) -> Result> + 'a> - { + ) -> Result> + 'a> { let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| { items.with(|rtxn| { - let document = index.document(rtxn, docid)?; - let external_docid = match primary_key.document_id(&document, fields)? { + let current = index.document(rtxn, docid)?; + let external_docid = match primary_key.document_id(¤t, fields)? { Ok(document_id) => Ok(document_id) as Result<_>, Err(_) => Err(InternalError::DocumentsError( crate::documents::Error::InvalidDocumentFormat, @@ -297,12 +303,8 @@ mod indexer { }?; /// TODO create a function for this - let document = document.as_bytes().to_vec().into_boxed_slice().into(); - Ok(DocumentChange::Deletion(document_change::Deletion::create( - docid, - external_docid, - document, - ))) + let current = current.as_bytes().to_vec().into_boxed_slice().into(); + Ok(DocumentChange::Deletion(Deletion::create(docid, external_docid, current))) }) })) } @@ -319,7 +321,7 @@ mod indexer { self, iter: I, index: &Index, - ) -> impl ParallelIterator + ) -> impl ParallelIterator where I: IntoIterator, { @@ -349,15 +351,13 @@ mod indexer { } pub struct UpdateByFunctionIndexer; - // DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))? /// Reads the previous version of a document from the database, the new versions /// in the grenad update files and merges them to generate a new boxed obkv. /// /// This function is only meant to be used when doing an update and not a replacement. - pub fn merge_document_obkv_for_updates( + pub fn merge_document_for_updates( rtxn: &RoTxn, - // Let's construct the new obkv in memory index: &Index, fields_ids_map: &FieldsIdsMap, docid: DocumentId, @@ -365,11 +365,11 @@ mod indexer { operations: &[DocumentOperation], ) -> Result> { let mut document = BTreeMap::<_, Cow<_>>::new(); - let original = index.documents.remap_data_type::().get(rtxn, &docid)?; - let original: Option<&KvReaderFieldId> = original.map(Into::into); + let current = index.documents.remap_data_type::().get(rtxn, &docid)?; + let current: Option<&KvReaderFieldId> = current.map(Into::into); - if let Some(original) = original { - original.into_iter().for_each(|(k, v)| { + if let Some(current) = current { + current.into_iter().for_each(|(k, v)| { document.insert(k, v.into()); }); } @@ -381,14 +381,12 @@ mod indexer { let operations = &operations[last_deletion.map_or(0, |i| i + 1)..]; if operations.is_empty() { - match original { - Some(original_obkv) => { - let current = original_obkv.as_bytes().to_vec().into_boxed_slice().into(); - return Ok(Some(DocumentChange::Deletion(document_change::Deletion::create( - docid, - external_docid, - current, - )))); + match current { + Some(current) => { + /// TODO create a function for this + let current = current.as_bytes().to_vec().into_boxed_slice().into(); + let deletion = Deletion::create(docid, external_docid, current); + return Ok(Some(DocumentChange::Deletion(deletion))); } None => return Ok(None), } @@ -416,10 +414,10 @@ mod indexer { /// TODO create a function for this conversion let new = writer.into_inner().unwrap().into_boxed_slice().into(); - match original { - Some(original) => { + match current { + Some(current) => { /// TODO create a function for this conversion - let current = original.as_bytes().to_vec().into_boxed_slice().into(); + let current = current.as_bytes().to_vec().into_boxed_slice().into(); let update = Update::create(docid, external_docid, current, new); Ok(Some(DocumentChange::Update(update))) } @@ -429,4 +427,68 @@ mod indexer { } } } + + /// Returns only the most recent version of a document based on the updates from the payloads. + /// + /// This function is only meant to be used when doing a replacement and not an update. + pub fn merge_document_for_replacements( + rtxn: &RoTxn, + index: &Index, + fields_ids_map: &FieldsIdsMap, + docid: DocumentId, + external_docid: String, + operations: &[DocumentOperation], + ) -> Result> { + let current = index.documents.remap_data_type::().get(rtxn, &docid)?; + let current: Option<&KvReaderFieldId> = current.map(Into::into); + + match operations.last() { + Some(DocumentOperation::Addition(DocumentOffset { content, offset })) => { + let reader = DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?; + let (mut cursor, batch_index) = reader.into_cursor_and_fields_index(); + let update = cursor.get(*offset)?.expect("must exists"); + + let mut document_entries = Vec::new(); + update.into_iter().for_each(|(k, v)| { + let field_name = batch_index.name(k).unwrap(); + let id = fields_ids_map.id(field_name).unwrap(); + document_entries.push((id, v)); + }); + + document_entries.sort_unstable_by_key(|(id, _)| *id); + + let mut writer = KvWriterFieldId::memory(); + document_entries + .into_iter() + .for_each(|(id, value)| writer.insert(id, value).unwrap()); + /// TODO create a function for this conversion + let new = writer.into_inner().unwrap().into_boxed_slice().into(); + + match current { + Some(current) => { + /// TODO create a function for this conversion + let current = current.as_bytes().to_vec().into_boxed_slice().into(); + let update = Update::create(docid, external_docid, current, new); + Ok(Some(DocumentChange::Update(update))) + } + None => { + let insertion = Insertion::create(docid, external_docid, new); + Ok(Some(DocumentChange::Insertion(insertion))) + } + } + } + Some(DocumentOperation::Deletion) => { + match current { + Some(current) => { + /// TODO create a function for this conversion + let current = current.as_bytes().to_vec().into_boxed_slice().into(); + let deletion = Deletion::create(docid, external_docid, current); + Ok(Some(DocumentChange::Deletion(deletion))) + } + None => Ok(None), + } + } + None => Ok(None), + } + } }