From 83865d2ebd4793f8ccf7aafc90f0063a1ce238fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 14 Nov 2024 16:00:11 +0100 Subject: [PATCH] Expose intermediate errors when processing batches --- crates/index-scheduler/src/batch.rs | 75 ++- .../update/new/indexer/document_operation.rs | 548 ++++++++++++------ crates/milli/src/update/new/indexer/mod.rs | 2 +- 3 files changed, 409 insertions(+), 216 deletions(-) diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index fb47c705a..fb9cfbe6c 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -39,7 +39,7 @@ use meilisearch_types::milli::update::{IndexDocumentsMethod, Settings as MilliSe use meilisearch_types::milli::vector::parsed_vectors::{ ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, }; -use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; +use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; @@ -1331,55 +1331,19 @@ impl IndexScheduler { let mut indexer = indexer::DocumentOperation::new(method); let embedders = index.embedding_configs(index_wtxn)?; let embedders = self.embedders(embedders)?; - for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) { + for operation in operations { match operation { DocumentOperation::Add(_content_uuid) => { let mmap = content_files_iter.next().unwrap(); - let stats = indexer.add_documents(mmap)?; + indexer.add_documents(mmap)?; // builder = builder.with_embedders(embedders.clone()); - - let received_documents = - if let Some(Details::DocumentAdditionOrUpdate { - received_documents, - .. - }) = task.details - { - received_documents - } else { - // In the case of a `documentAdditionOrUpdate` the details MUST be set - unreachable!(); - }; - - task.status = Status::Succeeded; - task.details = Some(Details::DocumentAdditionOrUpdate { - received_documents, - indexed_documents: Some(stats.document_count as u64), - }) } DocumentOperation::Delete(document_ids) => { - let count = document_ids.len(); let document_ids: bumpalo::collections::vec::Vec<_> = document_ids .iter() .map(|s| &*indexer_alloc.alloc_str(s)) .collect_in(&indexer_alloc); indexer.delete_documents(document_ids.into_bump_slice()); - // Uses Invariant: remove documents actually always returns Ok for the inner result - // let count = user_result.unwrap(); - let provided_ids = - if let Some(Details::DocumentDeletion { provided_ids, .. }) = - task.details - { - provided_ids - } else { - // In the case of a `documentAdditionOrUpdate` the details MUST be set - unreachable!(); - }; - - task.status = Status::Succeeded; - task.details = Some(Details::DocumentDeletion { - provided_ids, - deleted_documents: Some(count as u64), - }); } } } @@ -1394,8 +1358,7 @@ impl IndexScheduler { } }; - // TODO we want to multithread this - let document_changes = indexer.into_changes( + let (document_changes, operation_stats) = indexer.into_changes( &indexer_alloc, index, &rtxn, @@ -1403,6 +1366,36 @@ impl IndexScheduler { &mut new_fields_ids_map, )?; + for (stats, task) in operation_stats.into_iter().zip(&mut tasks) { + match stats.error { + Some(error) => { + task.status = Status::Failed; + task.error = Some(milli::Error::UserError(error).into()); + } + None => task.status = Status::Succeeded, + } + + task.details = match task.details { + Some(Details::DocumentAdditionOrUpdate { + received_documents, .. + }) => Some(Details::DocumentAdditionOrUpdate { + received_documents, + indexed_documents: Some(stats.document_count), + }), + Some(Details::DocumentDeletion { provided_ids, .. }) => { + Some(Details::DocumentDeletion { + provided_ids, + deleted_documents: Some(stats.document_count), + }) + } + _ => { + // In the case of a `documentAdditionOrUpdate` or `DocumentDeletion` + // the details MUST be set to either addition or deletion + unreachable!(); + } + } + } + pool.install(|| { indexer::index( index_wtxn, diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index a164d099e..0b586a795 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -1,10 +1,11 @@ use bumpalo::collections::CollectIn; use bumpalo::Bump; +use hashbrown::hash_map::Entry; use heed::RoTxn; use memmap2::Mmap; use rayon::slice::ParallelSlice; use serde_json::value::RawValue; -use IndexDocumentsMethod as Idm; +use serde_json::Deserializer; use super::super::document_change::DocumentChange; use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend}; @@ -12,55 +13,24 @@ use crate::documents::PrimaryKey; use crate::update::new::document::Versions; use crate::update::new::{Deletion, Insertion, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; -use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError}; +use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; pub struct DocumentOperation<'pl> { operations: Vec>, - index_documents_method: IndexDocumentsMethod, -} - -pub struct DocumentOperationChanges<'pl> { - docids_version_offsets: &'pl [(&'pl str, ((u32, bool), &'pl [InnerDocOp<'pl>]))], - index_documents_method: IndexDocumentsMethod, -} - -pub enum Payload<'pl> { - Addition(&'pl [u8]), - Deletion(&'pl [&'pl str]), -} - -pub struct PayloadStats { - pub document_count: usize, - pub bytes: u64, -} - -#[derive(Clone)] -pub enum InnerDocOp<'pl> { - Addition(DocumentOffset<'pl>), - Deletion, -} - -/// Represents an offset where a document lives -/// in an mmapped grenad reader file. -#[derive(Clone)] -pub struct DocumentOffset<'pl> { - /// The mmapped payload files. - pub content: &'pl [u8], + method: MergeMethod, } impl<'pl> DocumentOperation<'pl> { pub fn new(method: IndexDocumentsMethod) -> Self { - Self { operations: Default::default(), index_documents_method: method } + Self { operations: Default::default(), method: MergeMethod::from(method) } } /// TODO please give me a type /// The payload is expected to be in the grenad format - pub fn add_documents(&mut self, payload: &'pl Mmap) -> Result { + pub fn add_documents(&mut self, payload: &'pl Mmap) -> Result<()> { payload.advise(memmap2::Advice::Sequential)?; - let document_count = - memchr::memmem::find_iter(&payload[..], "}{").count().saturating_add(1); self.operations.push(Payload::Addition(&payload[..])); - Ok(PayloadStats { bytes: payload.len() as u64, document_count }) + Ok(()) } pub fn delete_documents(&mut self, to_delete: &'pl [&'pl str]) { @@ -74,141 +44,239 @@ impl<'pl> DocumentOperation<'pl> { rtxn: &RoTxn, primary_key: &PrimaryKey, new_fields_ids_map: &mut FieldsIdsMap, - ) -> Result> { - // will contain nodes from the intermediate hashmap - let document_changes_alloc = Bump::with_capacity(1024 * 1024 * 1024); // 1 MiB + ) -> Result<(DocumentOperationChanges<'pl>, Vec)> { + let Self { operations, method } = self; let documents_ids = index.documents_ids(rtxn)?; + let mut operations_stats = Vec::new(); let mut available_docids = AvailableIds::new(&documents_ids); - let mut docids_version_offsets = - hashbrown::HashMap::<&'pl str, _, _, _>::new_in(&document_changes_alloc); + let mut docids_version_offsets = hashbrown::HashMap::new(); - for operation in self.operations { - match operation { - Payload::Addition(payload) => { - let mut iter = - serde_json::Deserializer::from_slice(payload).into_iter::<&RawValue>(); + for operation in operations { + let (bytes, document_count, result) = match operation { + Payload::Addition(payload) => extract_addition_payload_changes( + indexer, + index, + rtxn, + primary_key, + new_fields_ids_map, + &mut available_docids, + &docids_version_offsets, + method, + payload, + ), + Payload::Deletion(to_delete) => extract_deletion_payload_changes( + index, + rtxn, + &mut available_docids, + &docids_version_offsets, + method, + to_delete, + ), + }; - /// TODO manage the error - let mut previous_offset = 0; - while let Some(document) = - iter.next().transpose().map_err(UserError::SerdeJson)? - { - let external_document_id = primary_key.extract_fields_and_docid( - document, - new_fields_ids_map, - indexer, - )?; - - let external_document_id = external_document_id.to_de(); - - let current_offset = iter.byte_offset(); - let document_operation = InnerDocOp::Addition(DocumentOffset { - content: &payload[previous_offset..current_offset], - }); - - match docids_version_offsets.get_mut(external_document_id) { - None => { - let (docid, is_new) = match index - .external_documents_ids() - .get(rtxn, external_document_id)? - { - Some(docid) => (docid, false), - None => ( - available_docids.next().ok_or(Error::UserError( - UserError::DocumentLimitReached, - ))?, - true, - ), - }; - - docids_version_offsets.insert( - external_document_id, - ( - (docid, is_new), - bumpalo::vec![in indexer; document_operation], - ), - ); - } - Some((_, offsets)) => { - let useless_previous_addition = match self.index_documents_method { - IndexDocumentsMethod::ReplaceDocuments => { - MergeDocumentForReplacement::USELESS_PREVIOUS_CHANGES - } - IndexDocumentsMethod::UpdateDocuments => { - MergeDocumentForUpdates::USELESS_PREVIOUS_CHANGES - } - }; - - if useless_previous_addition { - offsets.clear(); - } - - offsets.push(document_operation); - } - } - - previous_offset = iter.byte_offset(); - } + let error = match result { + Ok(new_docids_version_offsets) => { + // If we don't have any error then we can merge the content of this payload + // into to main payload. Else we just drop this payload extraction. + merge_version_offsets(&mut docids_version_offsets, new_docids_version_offsets); + None } - Payload::Deletion(to_delete) => { - for external_document_id in to_delete { - match docids_version_offsets.get_mut(external_document_id) { - None => { - let (docid, is_new) = match index - .external_documents_ids() - .get(rtxn, external_document_id)? - { - Some(docid) => (docid, false), - None => ( - available_docids.next().ok_or(Error::UserError( - UserError::DocumentLimitReached, - ))?, - true, - ), - }; + Err(Error::UserError(user_error)) => Some(user_error), + Err(e) => return Err(e), + }; - docids_version_offsets.insert( - external_document_id, - ( - (docid, is_new), - bumpalo::vec![in indexer; InnerDocOp::Deletion], - ), - ); - } - Some((_, offsets)) => { - offsets.clear(); - offsets.push(InnerDocOp::Deletion); - } - } - } - } - } + operations_stats.push(PayloadStats { document_count, bytes, error }); } // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone - let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> = docids_version_offsets - .drain() - .map(|(item, (docid, v))| (item, (docid, v.into_bump_slice()))) - .collect_in(indexer); + let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> = + docids_version_offsets.drain().collect_in(indexer); + // Reorder the offsets to make sure we iterate on the file sequentially - let sort_function_key = match self.index_documents_method { - Idm::ReplaceDocuments => MergeDocumentForReplacement::sort_key, - Idm::UpdateDocuments => MergeDocumentForUpdates::sort_key, + // And finally sort them + docids_version_offsets.sort_unstable_by_key(|(_, po)| method.sort_key(&po.operations)); + + let docids_version_offsets = docids_version_offsets.into_bump_slice(); + Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats)) + } +} + +fn extract_addition_payload_changes<'s, 'pl: 's>( + indexer: &'pl Bump, + index: &Index, + rtxn: &RoTxn, + primary_key: &PrimaryKey, + fields_ids_map: &mut FieldsIdsMap, + available_docids: &mut AvailableIds, + main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, + method: MergeMethod, + payload: &'pl [u8], +) -> (u64, u64, Result>>) { + let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); + + /// TODO manage the error + let mut previous_offset = 0; + let mut iter = Deserializer::from_slice(payload).into_iter::<&RawValue>(); + loop { + let doc = match iter.next().transpose() { + Ok(Some(doc)) => doc, + Ok(None) => break, + Err(e) => { + return ( + payload.len() as u64, + new_docids_version_offsets.len() as u64, + Err(InternalError::SerdeJson(e).into()), + ) + } }; - // And finally sort them - docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops)); - let docids_version_offsets = docids_version_offsets.into_bump_slice(); - Ok(DocumentOperationChanges { - docids_version_offsets, - index_documents_method: self.index_documents_method, - }) + let external_id = match primary_key.extract_fields_and_docid(doc, fields_ids_map, indexer) { + Ok(edi) => edi, + Err(e) => { + return (payload.len() as u64, new_docids_version_offsets.len() as u64, Err(e)) + } + }; + + let external_id = external_id.to_de(); + let current_offset = iter.byte_offset(); + let document_offset = DocumentOffset { content: &payload[previous_offset..current_offset] }; + + match main_docids_version_offsets.get(external_id) { + None => { + let (docid, is_new) = match index.external_documents_ids().get(rtxn, external_id) { + Ok(Some(docid)) => (docid, false), + Ok(None) => ( + match available_docids.next() { + Some(docid) => docid, + None => { + return ( + payload.len() as u64, + new_docids_version_offsets.len() as u64, + Err(UserError::DocumentLimitReached.into()), + ) + } + }, + true, + ), + Err(e) => { + return ( + payload.len() as u64, + new_docids_version_offsets.len() as u64, + Err(e.into()), + ) + } + }; + + match new_docids_version_offsets.entry(external_id) { + Entry::Occupied(mut entry) => entry.get_mut().push_addition(document_offset), + Entry::Vacant(entry) => { + entry.insert(PayloadOperations::new_addition( + method, + docid, + is_new, + document_offset, + )); + } + } + } + Some(payload_operations) => match new_docids_version_offsets.entry(external_id) { + Entry::Occupied(mut entry) => entry.get_mut().push_addition(document_offset), + Entry::Vacant(entry) => { + entry.insert(PayloadOperations::new_addition( + method, + payload_operations.docid, + payload_operations.is_new, + document_offset, + )); + } + }, + } + + previous_offset = iter.byte_offset(); + } + + (payload.len() as u64, new_docids_version_offsets.len() as u64, Ok(new_docids_version_offsets)) +} + +fn extract_deletion_payload_changes<'s, 'pl: 's>( + index: &Index, + rtxn: &RoTxn, + available_docids: &mut AvailableIds, + main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, + method: MergeMethod, + to_delete: &'pl [&'pl str], +) -> (u64, u64, Result>>) { + let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); + let mut document_count = 0; + + for external_id in to_delete { + match main_docids_version_offsets.get(external_id) { + None => { + let (docid, is_new) = match index.external_documents_ids().get(rtxn, external_id) { + Ok(Some(docid)) => (docid, false), + Ok(None) => ( + match available_docids.next() { + Some(docid) => docid, + None => { + return ( + 0, + new_docids_version_offsets.len() as u64, + Err(UserError::DocumentLimitReached.into()), + ) + } + }, + true, + ), + Err(e) => return (0, new_docids_version_offsets.len() as u64, Err(e.into())), + }; + + match new_docids_version_offsets.entry(external_id) { + Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), + Entry::Vacant(entry) => { + entry.insert(PayloadOperations::new_deletion(method, docid, is_new)); + } + } + } + Some(payload_operations) => match new_docids_version_offsets.entry(external_id) { + Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), + Entry::Vacant(entry) => { + entry.insert(PayloadOperations::new_deletion( + method, + payload_operations.docid, + payload_operations.is_new, + )); + } + }, + } + document_count += 1; + } + + (0, document_count, Ok(new_docids_version_offsets)) +} + +fn merge_version_offsets<'s, 'pl>( + main: &mut hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, + new: hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, +) { + // We cannot swap like nothing because documents + // operations must be in the right order. + if main.is_empty() { + return *main = new; + } + + for (key, new_payload) in new { + match main.entry(key) { + Entry::Occupied(mut entry) => entry.get_mut().append_operations(new_payload.operations), + Entry::Vacant(entry) => { + entry.insert(new_payload); + } + } } } impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { - type Item = (&'pl str, ((u32, bool), &'pl [InnerDocOp<'pl>])); + type Item = (&'pl str, PayloadOperations<'pl>); fn iter( &self, @@ -225,21 +293,14 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { where 'pl: 'doc, { - let document_merge_function = match self.index_documents_method { - Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, - Idm::UpdateDocuments => MergeDocumentForUpdates::merge, - }; - - let (external_doc, ((internal_docid, is_new), operations)) = *item; - - let change = document_merge_function( - internal_docid, + let (external_doc, payload_operations) = item; + payload_operations.merge_method.merge( + payload_operations.docid, external_doc, - is_new, + payload_operations.is_new, &context.doc_alloc, - operations, - )?; - Ok(change) + &payload_operations.operations[..], + ) } fn len(&self) -> usize { @@ -247,14 +308,92 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { } } +pub struct DocumentOperationChanges<'pl> { + docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)], +} + +pub enum Payload<'pl> { + Addition(&'pl [u8]), + Deletion(&'pl [&'pl str]), +} + +pub struct PayloadStats { + pub bytes: u64, + pub document_count: u64, + pub error: Option, +} + +pub struct PayloadOperations<'pl> { + /// The internal document id of the document. + pub docid: DocumentId, + /// Wether this document is not in the current database (visible by the rtxn). + pub is_new: bool, + /// The operations to perform, in order, on this document. + pub operations: Vec>, + /// The merge method we are using to merge payloads and documents. + merge_method: MergeMethod, +} + +impl<'pl> PayloadOperations<'pl> { + fn new_deletion(merge_method: MergeMethod, docid: DocumentId, is_new: bool) -> Self { + Self { docid, is_new, operations: vec![InnerDocOp::Deletion], merge_method } + } + + fn new_addition( + merge_method: MergeMethod, + docid: DocumentId, + is_new: bool, + offset: DocumentOffset<'pl>, + ) -> Self { + Self { docid, is_new, operations: vec![InnerDocOp::Addition(offset)], merge_method } + } +} + +impl<'pl> PayloadOperations<'pl> { + fn push_addition(&mut self, offset: DocumentOffset<'pl>) { + if self.merge_method.useless_previous_changes() { + self.operations.clear(); + } + self.operations.push(InnerDocOp::Addition(offset)) + } + + fn push_deletion(&mut self) { + self.operations.clear(); + self.operations.push(InnerDocOp::Deletion); + } + + fn append_operations(&mut self, mut operations: Vec>) { + debug_assert!(!operations.is_empty()); + if self.merge_method.useless_previous_changes() { + self.operations.clear(); + } + self.operations.append(&mut operations); + } +} + +#[derive(Clone)] +pub enum InnerDocOp<'pl> { + Addition(DocumentOffset<'pl>), + Deletion, +} + +/// Represents an offset where a document lives +/// in an mmapped grenad reader file. +#[derive(Clone)] +pub struct DocumentOffset<'pl> { + /// The mmapped payload files. + pub content: &'pl [u8], +} + trait MergeChanges { /// Whether the payloads in the list of operations are useless or not. - const USELESS_PREVIOUS_CHANGES: bool; + fn useless_previous_changes(&self) -> bool; /// Returns a key that is used to order the payloads the right way. - fn sort_key(docops: &[InnerDocOp]) -> usize; + fn sort_key(&self, docops: &[InnerDocOp]) -> usize; fn merge<'doc>( + &self, docid: DocumentId, external_docid: &'doc str, is_new: bool, @@ -263,13 +402,69 @@ trait MergeChanges { ) -> Result>>; } +#[derive(Debug, Clone, Copy)] +enum MergeMethod { + ForReplacement(MergeDocumentForReplacement), + ForUpdates(MergeDocumentForUpdates), +} + +impl MergeChanges for MergeMethod { + fn useless_previous_changes(&self) -> bool { + match self { + MergeMethod::ForReplacement(merge) => merge.useless_previous_changes(), + MergeMethod::ForUpdates(merge) => merge.useless_previous_changes(), + } + } + + fn sort_key(&self, docops: &[InnerDocOp]) -> usize { + match self { + MergeMethod::ForReplacement(merge) => merge.sort_key(docops), + MergeMethod::ForUpdates(merge) => merge.sort_key(docops), + } + } + + fn merge<'doc>( + &self, + docid: DocumentId, + external_docid: &'doc str, + is_new: bool, + doc_alloc: &'doc Bump, + operations: &'doc [InnerDocOp], + ) -> Result>> { + match self { + MergeMethod::ForReplacement(merge) => { + merge.merge(docid, external_docid, is_new, doc_alloc, operations) + } + MergeMethod::ForUpdates(merge) => { + merge.merge(docid, external_docid, is_new, doc_alloc, operations) + } + } + } +} + +impl From for MergeMethod { + fn from(method: IndexDocumentsMethod) -> Self { + match method { + IndexDocumentsMethod::ReplaceDocuments => { + MergeMethod::ForReplacement(MergeDocumentForReplacement) + } + IndexDocumentsMethod::UpdateDocuments => { + MergeMethod::ForUpdates(MergeDocumentForUpdates) + } + } + } +} + +#[derive(Debug, Clone, Copy)] struct MergeDocumentForReplacement; impl MergeChanges for MergeDocumentForReplacement { - const USELESS_PREVIOUS_CHANGES: bool = true; + fn useless_previous_changes(&self) -> bool { + true + } /// Reorders to read only the last change. - fn sort_key(docops: &[InnerDocOp]) -> usize { + fn sort_key(&self, docops: &[InnerDocOp]) -> usize { let f = |ido: &_| match ido { InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), InnerDocOp::Deletion => None, @@ -281,6 +476,7 @@ impl MergeChanges for MergeDocumentForReplacement { /// /// This function is only meant to be used when doing a replacement and not an update. fn merge<'doc>( + &self, docid: DocumentId, external_doc: &'doc str, is_new: bool, @@ -321,13 +517,16 @@ impl MergeChanges for MergeDocumentForReplacement { } } +#[derive(Debug, Clone, Copy)] struct MergeDocumentForUpdates; impl MergeChanges for MergeDocumentForUpdates { - const USELESS_PREVIOUS_CHANGES: bool = false; + fn useless_previous_changes(&self) -> bool { + false + } /// Reorders to read the first changes first so that it's faster to read the first one and then the rest. - fn sort_key(docops: &[InnerDocOp]) -> usize { + fn sort_key(&self, docops: &[InnerDocOp]) -> usize { let f = |ido: &_| match ido { InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), InnerDocOp::Deletion => None, @@ -340,6 +539,7 @@ impl MergeChanges for MergeDocumentForUpdates { /// /// This function is only meant to be used when doing an update and not a replacement. fn merge<'doc>( + &self, docid: DocumentId, external_docid: &'doc str, is_new: bool, diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 1a5e4fc23..0906c5b89 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -5,7 +5,7 @@ use std::thread::{self, Builder}; use big_s::S; use document_changes::{extract, DocumentChanges, IndexingContext, Progress, ThreadLocal}; pub use document_deletion::DocumentDeletion; -pub use document_operation::DocumentOperation; +pub use document_operation::{DocumentOperation, PayloadStats}; use hashbrown::HashMap; use heed::types::{Bytes, DecodeIgnore, Str}; use heed::{RoTxn, RwTxn};