diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index ed8f1c93f..1670c8145 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -137,9 +137,22 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { (docid, vec![document_operation]), ); } - // TODO clean the code to make sure we clean the useless operations - // add a method to the MergeChanges trait - Some((_, offsets)) => offsets.push(document_operation), + Some((_, offsets)) => { + let useless_previous_addition = match self.index_documents_method { + IndexDocumentsMethod::ReplaceDocuments => { + MergeDocumentForReplacement::USELESS_PREVIOUS_CHANGES + } + IndexDocumentsMethod::UpdateDocuments => { + MergeDocumentForUpdates::USELESS_PREVIOUS_CHANGES + } + }; + + if useless_previous_addition { + offsets.clear(); + } + + offsets.push(document_operation); + } } previous_offset = iter.byte_offset(); @@ -164,7 +177,10 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { (docid, vec![InnerDocOp::Deletion]), ); } - Some((_, offsets)) => offsets.push(InnerDocOp::Deletion), + Some((_, offsets)) => { + offsets.clear(); + offsets.push(InnerDocOp::Deletion); + } } } } @@ -176,10 +192,13 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone let mut docids_version_offsets: Vec<_> = docids_version_offsets.drain().collect(); // Reorder the offsets to make sure we iterate on the file sequentially - match self.index_documents_method { - Idm::ReplaceDocuments => MergeDocumentForReplacement::sort(&mut docids_version_offsets), - Idm::UpdateDocuments => MergeDocumentForUpdates::sort(&mut docids_version_offsets), - } + let sort_function_key = match self.index_documents_method { + Idm::ReplaceDocuments => MergeDocumentForReplacement::sort_key, + Idm::UpdateDocuments => MergeDocumentForUpdates::sort_key, + }; + + // And finally sort them + docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops)); Ok(docids_version_offsets .into_par_iter() @@ -208,8 +227,11 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { } trait MergeChanges { - /// Reorders the offsets to make sure we iterate on the file sequentially. - fn sort(changes_offsets: &mut [(CowStr, (DocumentId, Vec))]); + /// Wether the payloads in the list of operations are useless or not. + const USELESS_PREVIOUS_CHANGES: bool; + + /// Returns a key that is used to order the payloads the right way. + fn sort_key(docops: &[InnerDocOp]) -> usize; fn merge( rtxn: &RoTxn, @@ -224,18 +246,15 @@ trait MergeChanges { struct MergeDocumentForReplacement; impl MergeChanges for MergeDocumentForReplacement { + const USELESS_PREVIOUS_CHANGES: bool = true; + /// Reorders to read only the last change. - fn sort(changes_offsets: &mut [(CowStr, (DocumentId, Vec))]) { - changes_offsets.sort_unstable_by_key(|(_, (_, offsets))| { - offsets - .iter() - .rev() - .find_map(|ido| match ido { - InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), - InnerDocOp::Deletion => None, - }) - .unwrap_or(0) - }); + fn sort_key(docops: &[InnerDocOp]) -> usize { + let f = |ido: &_| match ido { + InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), + InnerDocOp::Deletion => None, + }; + docops.iter().rev().find_map(f).unwrap_or(0) } /// Returns only the most recent version of a document based on the updates from the payloads. @@ -295,17 +314,15 @@ impl MergeChanges for MergeDocumentForReplacement { struct MergeDocumentForUpdates; impl MergeChanges for MergeDocumentForUpdates { + const USELESS_PREVIOUS_CHANGES: bool = false; + /// Reorders to read the first changes first so that it's faster to read the first one and then the rest. - fn sort(changes_offsets: &mut [(CowStr, (DocumentId, Vec))]) { - changes_offsets.sort_unstable_by_key(|(_, (_, offsets))| { - offsets - .iter() - .find_map(|ido| match ido { - InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), - InnerDocOp::Deletion => None, - }) - .unwrap_or(0) - }); + fn sort_key(docops: &[InnerDocOp]) -> usize { + let f = |ido: &_| match ido { + InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), + InnerDocOp::Deletion => None, + }; + docops.iter().find_map(f).unwrap_or(0) } /// Reads the previous version of a document from the database, the new versions diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 0273d4fe2..c1bcd20cf 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -1,4 +1,3 @@ -use std::fs::File; use std::sync::RwLock; use std::thread::{self, Builder}; @@ -17,12 +16,10 @@ use super::document_change::DocumentChange; use super::extract::*; use super::merger::merge_grenad_entries; use super::StdResult; -use crate::documents::{ - obkv_to_object, DocumentsBatchCursor, DocumentsBatchIndex, PrimaryKey, DEFAULT_PRIMARY_KEY, -}; +use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::update::new::channel::{DatabaseType, ExtractorSender}; use crate::update::GrenadParameters; -use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, UserError}; +use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; mod document_deletion; mod document_operation;