From acc400facebfa8811be425104e482385ed815c79 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 28 Jan 2025 11:31:53 +0100 Subject: [PATCH] Support merging update and replacement operations --- .../milli/src/update/new/document_change.rs | 12 +- .../update/new/indexer/document_operation.rs | 535 ++++++++---------- 2 files changed, 238 insertions(+), 309 deletions(-) diff --git a/crates/milli/src/update/new/document_change.rs b/crates/milli/src/update/new/document_change.rs index 1644b2254..7d5b03fa1 100644 --- a/crates/milli/src/update/new/document_change.rs +++ b/crates/milli/src/update/new/document_change.rs @@ -27,7 +27,7 @@ pub struct Update<'doc> { docid: DocumentId, external_document_id: &'doc str, new: Versions<'doc>, - has_deletion: bool, + from_scratch: bool, } pub struct Insertion<'doc> { @@ -109,9 +109,9 @@ impl<'doc> Update<'doc> { docid: DocumentId, external_document_id: &'doc str, new: Versions<'doc>, - has_deletion: bool, + from_scratch: bool, ) -> Self { - Update { docid, new, external_document_id, has_deletion } + Update { docid, new, external_document_id, from_scratch } } pub fn docid(&self) -> DocumentId { @@ -154,7 +154,7 @@ impl<'doc> Update<'doc> { index: &'t Index, mapper: &'t Mapper, ) -> Result> { - if self.has_deletion { + if self.from_scratch { Ok(MergedDocument::without_db(DocumentFromVersions::new(&self.new))) } else { MergedDocument::with_db( @@ -207,7 +207,7 @@ impl<'doc> Update<'doc> { cached_current = Some(current); } - if !self.has_deletion { + if !self.from_scratch { // no field deletion, so fields that don't appear in `updated` cannot have changed return Ok(changed); } @@ -257,7 +257,7 @@ impl<'doc> Update<'doc> { doc_alloc: &'doc Bump, embedders: &'doc EmbeddingConfigs, ) -> Result>> { - if self.has_deletion { + if self.from_scratch { MergedVectorDocument::without_db( self.external_document_id, &self.new, diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 6bf0432c4..96a64cabe 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -26,23 +26,36 @@ use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserE #[derive(Default)] pub struct DocumentOperation<'pl> { operations: Vec>, - method: MergeMethod, } impl<'pl> DocumentOperation<'pl> { - pub fn new(method: IndexDocumentsMethod) -> Self { - Self { operations: Default::default(), method: MergeMethod::from(method) } + pub fn new() -> Self { + Self { operations: Default::default() } } - /// TODO please give me a type + /// Append a replacement of documents. + /// /// The payload is expected to be in the NDJSON format - pub fn add_documents(&mut self, payload: &'pl Mmap) -> Result<()> { + pub fn replace_documents(&mut self, payload: &'pl Mmap) -> Result<()> { #[cfg(unix)] payload.advise(memmap2::Advice::Sequential)?; - self.operations.push(Payload::Addition(&payload[..])); + self.operations.push(Payload::Replace(&payload[..])); Ok(()) } + /// Append an update of documents. + /// + /// The payload is expected to be in the NDJSON format + pub fn update_documents(&mut self, payload: &'pl Mmap) -> Result<()> { + #[cfg(unix)] + payload.advise(memmap2::Advice::Sequential)?; + self.operations.push(Payload::Update(&payload[..])); + Ok(()) + } + + /// Append a deletion of documents IDs. + /// + /// The list is a set of external documents IDs. pub fn delete_documents(&mut self, to_delete: &'pl [&'pl str]) { self.operations.push(Payload::Deletion(to_delete)) } @@ -63,7 +76,7 @@ impl<'pl> DocumentOperation<'pl> { MSP: Fn() -> bool, { progress.update_progress(IndexingStep::PreparingPayloads); - let Self { operations, method } = self; + let Self { operations } = self; let documents_ids = index.documents_ids(rtxn)?; let mut operations_stats = Vec::new(); @@ -83,7 +96,7 @@ impl<'pl> DocumentOperation<'pl> { let mut bytes = 0; let result = match operation { - Payload::Addition(payload) => extract_addition_payload_changes( + Payload::Replace(payload) => extract_addition_payload_changes( indexer, index, rtxn, @@ -93,7 +106,20 @@ impl<'pl> DocumentOperation<'pl> { &mut available_docids, &mut bytes, &docids_version_offsets, - method, + IndexDocumentsMethod::ReplaceDocuments, + payload, + ), + Payload::Update(payload) => extract_addition_payload_changes( + indexer, + index, + rtxn, + primary_key_from_op, + &mut primary_key, + new_fields_ids_map, + &mut available_docids, + &mut bytes, + &docids_version_offsets, + IndexDocumentsMethod::UpdateDocuments, payload, ), Payload::Deletion(to_delete) => extract_deletion_payload_changes( @@ -101,7 +127,6 @@ impl<'pl> DocumentOperation<'pl> { rtxn, &mut available_docids, &docids_version_offsets, - method, to_delete, ), }; @@ -127,20 +152,15 @@ impl<'pl> DocumentOperation<'pl> { docids_version_offsets.drain().collect_in(indexer); // Reorder the offsets to make sure we iterate on the file sequentially - // And finally sort them - docids_version_offsets.sort_unstable_by_key(|(_, po)| method.sort_key(&po.operations)); + // And finally sort them. This clearly speeds up reading the update files. + docids_version_offsets + .sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0)); let docids_version_offsets = docids_version_offsets.into_bump_slice(); Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key)) } } -impl Default for DocumentOperation<'_> { - fn default() -> Self { - DocumentOperation::new(IndexDocumentsMethod::default()) - } -} - #[allow(clippy::too_many_arguments)] fn extract_addition_payload_changes<'r, 'pl: 'r>( indexer: &'pl Bump, @@ -152,9 +172,11 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( available_docids: &mut AvailableIds, bytes: &mut u64, main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>, - method: MergeMethod, + method: IndexDocumentsMethod, payload: &'pl [u8], ) -> Result>> { + use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments}; + let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); let mut previous_offset = 0; @@ -205,48 +227,82 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( None => { match index.external_documents_ids().get(rtxn, external_id) { Ok(Some(docid)) => match new_docids_version_offsets.entry(external_id) { - Entry::Occupied(mut entry) => { - entry.get_mut().push_addition(document_offset) - } + Entry::Occupied(mut entry) => match method { + ReplaceDocuments => entry.get_mut().push_replacement(document_offset), + UpdateDocuments => entry.get_mut().push_update(document_offset), + }, Entry::Vacant(entry) => { - entry.insert(PayloadOperations::new_addition( - method, - docid, - false, // is new - document_offset, - )); + match method { + ReplaceDocuments => { + entry.insert(PayloadOperations::new_replacement( + docid, + false, // is new + document_offset, + )); + } + UpdateDocuments => { + entry.insert(PayloadOperations::new_update( + docid, + false, // is new + document_offset, + )); + } + } } }, Ok(None) => match new_docids_version_offsets.entry(external_id) { - Entry::Occupied(mut entry) => { - entry.get_mut().push_addition(document_offset) - } + Entry::Occupied(mut entry) => match method { + ReplaceDocuments => entry.get_mut().push_replacement(document_offset), + UpdateDocuments => entry.get_mut().push_update(document_offset), + }, Entry::Vacant(entry) => { let docid = match available_docids.next() { Some(docid) => docid, None => return Err(UserError::DocumentLimitReached.into()), }; - entry.insert(PayloadOperations::new_addition( - method, - docid, - true, // is new - document_offset, - )); + + match method { + ReplaceDocuments => { + entry.insert(PayloadOperations::new_replacement( + docid, + true, // is new + document_offset, + )); + } + UpdateDocuments => { + entry.insert(PayloadOperations::new_update( + docid, + true, // is new + document_offset, + )); + } + } } }, Err(e) => return Err(e.into()), } } Some(payload_operations) => match new_docids_version_offsets.entry(external_id) { - Entry::Occupied(mut entry) => entry.get_mut().push_addition(document_offset), - Entry::Vacant(entry) => { - entry.insert(PayloadOperations::new_addition( - method, - payload_operations.docid, - payload_operations.is_new, - document_offset, - )); - } + Entry::Occupied(mut entry) => match method { + ReplaceDocuments => entry.get_mut().push_replacement(document_offset), + UpdateDocuments => entry.get_mut().push_update(document_offset), + }, + Entry::Vacant(entry) => match method { + ReplaceDocuments => { + entry.insert(PayloadOperations::new_replacement( + payload_operations.docid, + payload_operations.is_new, + document_offset, + )); + } + UpdateDocuments => { + entry.insert(PayloadOperations::new_update( + payload_operations.docid, + payload_operations.is_new, + document_offset, + )); + } + }, }, } @@ -279,7 +335,6 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>( rtxn: &RoTxn, available_docids: &mut AvailableIds, main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, - method: MergeMethod, to_delete: &'pl [&'pl str], ) -> Result>> { let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); @@ -293,7 +348,7 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>( Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), Entry::Vacant(entry) => { entry.insert(PayloadOperations::new_deletion( - method, docid, false, // is new + docid, false, // is new )); } } @@ -307,7 +362,7 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>( Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), Entry::Vacant(entry) => { entry.insert(PayloadOperations::new_deletion( - method, docid, true, // is new + docid, true, // is new )); } } @@ -319,7 +374,6 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>( Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), Entry::Vacant(entry) => { entry.insert(PayloadOperations::new_deletion( - method, payload_operations.docid, payload_operations.is_new, )); @@ -370,13 +424,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { 'pl: 'doc, { let (external_doc, payload_operations) = item; - payload_operations.merge_method.merge( - payload_operations.docid, - external_doc, - payload_operations.is_new, - &context.doc_alloc, - &payload_operations.operations[..], - ) + payload_operations.merge(external_doc, &context.doc_alloc) } fn len(&self) -> usize { @@ -389,7 +437,8 @@ pub struct DocumentOperationChanges<'pl> { } pub enum Payload<'pl> { - Addition(&'pl [u8]), + Replace(&'pl [u8]), + Update(&'pl [u8]), Deletion(&'pl [&'pl str]), } @@ -406,31 +455,30 @@ pub struct PayloadOperations<'pl> { pub is_new: bool, /// The operations to perform, in order, on this document. pub operations: Vec>, - /// The merge method we are using to merge payloads and documents. - merge_method: MergeMethod, } impl<'pl> PayloadOperations<'pl> { - fn new_deletion(merge_method: MergeMethod, docid: DocumentId, is_new: bool) -> Self { - Self { docid, is_new, operations: vec![InnerDocOp::Deletion], merge_method } + fn new_replacement(docid: DocumentId, is_new: bool, offset: DocumentOffset<'pl>) -> Self { + Self { docid, is_new, operations: vec![InnerDocOp::Replace(offset)] } } - fn new_addition( - merge_method: MergeMethod, - docid: DocumentId, - is_new: bool, - offset: DocumentOffset<'pl>, - ) -> Self { - Self { docid, is_new, operations: vec![InnerDocOp::Addition(offset)], merge_method } + fn new_update(docid: DocumentId, is_new: bool, offset: DocumentOffset<'pl>) -> Self { + Self { docid, is_new, operations: vec![InnerDocOp::Update(offset)] } + } + + fn new_deletion(docid: DocumentId, is_new: bool) -> Self { + Self { docid, is_new, operations: vec![InnerDocOp::Deletion] } } } impl<'pl> PayloadOperations<'pl> { - fn push_addition(&mut self, offset: DocumentOffset<'pl>) { - if self.merge_method.useless_previous_changes() { - self.operations.clear(); - } - self.operations.push(InnerDocOp::Addition(offset)) + fn push_replacement(&mut self, offset: DocumentOffset<'pl>) { + self.operations.clear(); + self.operations.push(InnerDocOp::Replace(offset)) + } + + fn push_update(&mut self, offset: DocumentOffset<'pl>) { + self.operations.push(InnerDocOp::Update(offset)) } fn push_deletion(&mut self) { @@ -440,16 +488,114 @@ impl<'pl> PayloadOperations<'pl> { fn append_operations(&mut self, mut operations: Vec>) { debug_assert!(!operations.is_empty()); - if self.merge_method.useless_previous_changes() { + if matches!(operations.first(), Some(InnerDocOp::Deletion | InnerDocOp::Replace(_))) { self.operations.clear(); } self.operations.append(&mut operations); } + + /// Returns only the most recent version of a document based on the updates from the payloads. + /// + /// This function is only meant to be used when doing a replacement and not an update. + fn merge<'doc>( + &self, + external_doc: &'doc str, + doc_alloc: &'doc Bump, + ) -> Result>> + where + 'pl: 'doc, + { + match self.operations.last() { + Some(InnerDocOp::Replace(DocumentOffset { content })) => { + let document = serde_json::from_slice(content).unwrap(); + let document = + RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc) + .map_err(UserError::SerdeJson)?; + + if self.is_new { + Ok(Some(DocumentChange::Insertion(Insertion::create( + self.docid, + external_doc, + Versions::single(document), + )))) + } else { + Ok(Some(DocumentChange::Update(Update::create( + self.docid, + external_doc, + Versions::single(document), + true, + )))) + } + } + Some(InnerDocOp::Update(_)) => { + // Search the first operation that is a tombstone which resets the document. + let last_tombstone = self + .operations + .iter() + .rposition(|op| matches!(op, InnerDocOp::Deletion | InnerDocOp::Replace(_))); + + // Track when we must ignore previous document versions from the rtxn. + let from_scratch = last_tombstone.is_some(); + + // We ignore deletion and keep the replacement to create the appropriate versions. + let operations = match last_tombstone { + Some(i) => match self.operations[i] { + InnerDocOp::Deletion => &self.operations[i + 1..], + InnerDocOp::Replace(_) => &self.operations[i..], + InnerDocOp::Update(_) => unreachable!("Found a non-tombstone operation"), + }, + None => &self.operations[..], + }; + + // We collect the versions to generate the appropriate document. + let versions = operations.iter().map(|operation| { + let DocumentOffset { content } = match operation { + InnerDocOp::Replace(offset) | InnerDocOp::Update(offset) => offset, + InnerDocOp::Deletion => unreachable!("Deletion in document operations"), + }; + + let document = serde_json::from_slice(content).unwrap(); + let document = + RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc) + .map_err(UserError::SerdeJson)?; + + Ok(document) + }); + + let Some(versions) = Versions::multiple(versions)? else { return Ok(None) }; + + if self.is_new { + Ok(Some(DocumentChange::Insertion(Insertion::create( + self.docid, + external_doc, + versions, + )))) + } else { + Ok(Some(DocumentChange::Update(Update::create( + self.docid, + external_doc, + versions, + from_scratch, + )))) + } + } + Some(InnerDocOp::Deletion) => { + return if self.is_new { + Ok(None) + } else { + let deletion = Deletion::create(self.docid, external_doc); + Ok(Some(DocumentChange::Deletion(deletion))) + }; + } + None => unreachable!("We must not have an empty set of operations on a document"), + } + } } #[derive(Clone)] pub enum InnerDocOp<'pl> { - Addition(DocumentOffset<'pl>), + Replace(DocumentOffset<'pl>), + Update(DocumentOffset<'pl>), Deletion, } @@ -461,231 +607,14 @@ pub struct DocumentOffset<'pl> { pub content: &'pl [u8], } -trait MergeChanges { - /// Whether the payloads in the list of operations are useless or not. - fn useless_previous_changes(&self) -> bool; - - /// Returns a key that is used to order the payloads the right way. - fn sort_key(&self, docops: &[InnerDocOp]) -> usize; - - fn merge<'doc>( - &self, - docid: DocumentId, - external_docid: &'doc str, - is_new: bool, - doc_alloc: &'doc Bump, - operations: &'doc [InnerDocOp], - ) -> Result>>; -} - -#[derive(Debug, Clone, Copy)] -enum MergeMethod { - ForReplacement(MergeDocumentForReplacement), - ForUpdates(MergeDocumentForUpdates), -} - -impl MergeChanges for MergeMethod { - fn useless_previous_changes(&self) -> bool { - match self { - MergeMethod::ForReplacement(merge) => merge.useless_previous_changes(), - MergeMethod::ForUpdates(merge) => merge.useless_previous_changes(), - } - } - - fn sort_key(&self, docops: &[InnerDocOp]) -> usize { - match self { - MergeMethod::ForReplacement(merge) => merge.sort_key(docops), - MergeMethod::ForUpdates(merge) => merge.sort_key(docops), - } - } - - fn merge<'doc>( - &self, - docid: DocumentId, - external_docid: &'doc str, - is_new: bool, - doc_alloc: &'doc Bump, - operations: &'doc [InnerDocOp], - ) -> Result>> { - match self { - MergeMethod::ForReplacement(merge) => { - merge.merge(docid, external_docid, is_new, doc_alloc, operations) - } - MergeMethod::ForUpdates(merge) => { - merge.merge(docid, external_docid, is_new, doc_alloc, operations) - } - } - } -} - -impl From for MergeMethod { - fn from(method: IndexDocumentsMethod) -> Self { - match method { - IndexDocumentsMethod::ReplaceDocuments => { - MergeMethod::ForReplacement(MergeDocumentForReplacement) - } - IndexDocumentsMethod::UpdateDocuments => { - MergeMethod::ForUpdates(MergeDocumentForUpdates) - } - } - } -} - -#[derive(Debug, Clone, Copy)] -struct MergeDocumentForReplacement; - -impl MergeChanges for MergeDocumentForReplacement { - fn useless_previous_changes(&self) -> bool { - true - } - - /// Reorders to read only the last change. - fn sort_key(&self, docops: &[InnerDocOp]) -> usize { - let f = |ido: &_| match ido { - InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), - InnerDocOp::Deletion => None, - }; - docops.iter().rev().find_map(f).unwrap_or(0) - } - - /// Returns only the most recent version of a document based on the updates from the payloads. - /// - /// This function is only meant to be used when doing a replacement and not an update. - fn merge<'doc>( - &self, - docid: DocumentId, - external_doc: &'doc str, - is_new: bool, - doc_alloc: &'doc Bump, - operations: &'doc [InnerDocOp], - ) -> Result>> { - match operations.last() { - Some(InnerDocOp::Addition(DocumentOffset { content })) => { - let document = serde_json::from_slice(content).unwrap(); - let document = - RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc) - .map_err(UserError::SerdeJson)?; - - if is_new { - Ok(Some(DocumentChange::Insertion(Insertion::create( - docid, - external_doc, - Versions::single(document), - )))) - } else { - Ok(Some(DocumentChange::Update(Update::create( - docid, - external_doc, - Versions::single(document), - true, - )))) - } - } - Some(InnerDocOp::Deletion) => { - return if is_new { - Ok(None) - } else { - let deletion = Deletion::create(docid, external_doc); - Ok(Some(DocumentChange::Deletion(deletion))) - }; - } - None => unreachable!("We must not have empty set of operations on a document"), - } - } -} - -#[derive(Debug, Clone, Copy)] -struct MergeDocumentForUpdates; - -impl MergeChanges for MergeDocumentForUpdates { - fn useless_previous_changes(&self) -> bool { - false - } - - /// Reorders to read the first changes first so that it's faster to read the first one and then the rest. - fn sort_key(&self, docops: &[InnerDocOp]) -> usize { - let f = |ido: &_| match ido { - InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), - InnerDocOp::Deletion => None, - }; - docops.iter().find_map(f).unwrap_or(0) - } - - /// Reads the previous version of a document from the database, the new versions - /// in the grenad update files and merges them to generate a new boxed obkv. - /// - /// This function is only meant to be used when doing an update and not a replacement. - fn merge<'doc>( - &self, - docid: DocumentId, - external_docid: &'doc str, - is_new: bool, - doc_alloc: &'doc Bump, - operations: &'doc [InnerDocOp], - ) -> Result>> { - if operations.is_empty() { - unreachable!("We must not have empty set of operations on a document"); - } - - let last_deletion = operations.iter().rposition(|op| matches!(op, InnerDocOp::Deletion)); - let operations = &operations[last_deletion.map_or(0, |i| i + 1)..]; - - let has_deletion = last_deletion.is_some(); - - if operations.is_empty() { - return if is_new { - Ok(None) - } else { - let deletion = Deletion::create(docid, external_docid); - Ok(Some(DocumentChange::Deletion(deletion))) - }; - } - - let versions = match operations { - [single] => { - let DocumentOffset { content } = match single { - InnerDocOp::Addition(offset) => offset, - InnerDocOp::Deletion => { - unreachable!("Deletion in document operations") - } - }; - let document = serde_json::from_slice(content).unwrap(); - let document = - RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc) - .map_err(UserError::SerdeJson)?; - - Some(Versions::single(document)) - } - operations => { - let versions = operations.iter().map(|operation| { - let DocumentOffset { content } = match operation { - InnerDocOp::Addition(offset) => offset, - InnerDocOp::Deletion => { - unreachable!("Deletion in document operations") - } - }; - - let document = serde_json::from_slice(content).unwrap(); - let document = - RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc) - .map_err(UserError::SerdeJson)?; - Ok(document) - }); - Versions::multiple(versions)? - } - }; - - let Some(versions) = versions else { return Ok(None) }; - - if is_new { - Ok(Some(DocumentChange::Insertion(Insertion::create(docid, external_docid, versions)))) - } else { - Ok(Some(DocumentChange::Update(Update::create( - docid, - external_docid, - versions, - has_deletion, - )))) - } - } +/// Returns the first pointer of the first change in a document. +/// +/// This is used to sort the documents in update file content order +/// and read the update file in order to largely speed up the indexation. +pub fn first_update_pointer(docops: &[InnerDocOp]) -> Option { + docops.iter().find_map(|ido: &_| match ido { + InnerDocOp::Replace(replace) => Some(replace.content.as_ptr() as usize), + InnerDocOp::Update(update) => Some(update.content.as_ptr() as usize), + InnerDocOp::Deletion => None, + }) }