diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 3cbaf836d..799079b0a 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -6,6 +6,7 @@ use heed::types::Bytes; use heed::RoTxn; use memmap2::Mmap; use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use IndexDocumentsMethod as Idm; use super::super::document_change::DocumentChange; use super::super::items_pool::ItemsPool; @@ -148,6 +149,8 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { (docid, vec![document_operation]), ); } + // TODO clean the code to make sure we clean the useless operations + // add a method to the MergeChanges trait Some((_, offsets)) => offsets.push(document_operation), } @@ -185,16 +188,10 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone let mut docids_version_offsets: Vec<_> = docids_version_offsets.drain().collect(); // Reorder the offsets to make sure we iterate on the file sequentially - docids_version_offsets.sort_unstable_by_key(|(_, (_, offsets))| { - offsets - .iter() - .rev() - .find_map(|ido| match ido { - InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), - InnerDocOp::Deletion => None, - }) - .unwrap_or(0) - }); + match self.index_documents_method { + Idm::ReplaceDocuments => MergeDocumentForReplacement::sort(&mut docids_version_offsets), + Idm::UpdateDocuments => MergeDocumentForUpdates::sort(&mut docids_version_offsets), + } Ok(docids_version_offsets .into_par_iter() @@ -202,11 +199,9 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))), move |context_pool, (external_docid, (internal_docid, operations))| { context_pool.with(|rtxn| { - use IndexDocumentsMethod as Idm; - let document_merge_function = match self.index_documents_method { - Idm::ReplaceDocuments => merge_document_for_replacements, - Idm::UpdateDocuments => merge_document_for_updates, + Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, + Idm::UpdateDocuments => MergeDocumentForUpdates::merge, }; document_merge_function( @@ -224,129 +219,178 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { } } -/// Returns only the most recent version of a document based on the updates from the payloads. -/// -/// This function is only meant to be used when doing a replacement and not an update. -fn merge_document_for_replacements( - rtxn: &RoTxn, - index: &Index, - fields_ids_map: &FieldsIdsMap, - docid: DocumentId, - external_docid: String, - operations: &[InnerDocOp], -) -> Result> { - let current = index.documents.remap_data_type::().get(rtxn, &docid)?; - let current: Option<&KvReaderFieldId> = current.map(Into::into); +trait MergeChanges { + /// Reorders the offsets to make sure we iterate on the file sequentially. + fn sort(changes_offsets: &mut [(CowStr, (DocumentId, Vec))]); - match operations.last() { - Some(InnerDocOp::Addition(DocumentOffset { content })) => { - let map: TopLevelMap = serde_json::from_slice(content).unwrap(); - let mut document_entries = Vec::new(); - for (key, v) in map.0 { - let id = fields_ids_map.id(key.as_ref()).unwrap(); - document_entries.push((id, v)); + fn merge( + rtxn: &RoTxn, + index: &Index, + fields_ids_map: &FieldsIdsMap, + docid: DocumentId, + external_docid: String, + operations: &[InnerDocOp], + ) -> Result>; +} + +struct MergeDocumentForReplacement; + +impl MergeChanges for MergeDocumentForReplacement { + /// Reorders to read only the last change. + fn sort(changes_offsets: &mut [(CowStr, (DocumentId, Vec))]) { + changes_offsets.sort_unstable_by_key(|(_, (_, offsets))| { + offsets + .iter() + .rev() + .find_map(|ido| match ido { + InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), + InnerDocOp::Deletion => None, + }) + .unwrap_or(0) + }); + } + + /// Returns only the most recent version of a document based on the updates from the payloads. + /// + /// This function is only meant to be used when doing a replacement and not an update. + fn merge( + rtxn: &RoTxn, + index: &Index, + fields_ids_map: &FieldsIdsMap, + docid: DocumentId, + external_docid: String, + operations: &[InnerDocOp], + ) -> Result> { + let current = index.documents.remap_data_type::().get(rtxn, &docid)?; + let current: Option<&KvReaderFieldId> = current.map(Into::into); + + match operations.last() { + Some(InnerDocOp::Addition(DocumentOffset { content })) => { + let map: TopLevelMap = serde_json::from_slice(content).unwrap(); + let mut document_entries = Vec::new(); + for (key, v) in map.0 { + let id = fields_ids_map.id(key.as_ref()).unwrap(); + document_entries.push((id, v)); + } + + document_entries.sort_unstable_by_key(|(id, _)| *id); + + let mut writer = KvWriterFieldId::memory(); + document_entries + .into_iter() + .for_each(|(id, value)| writer.insert(id, value.get()).unwrap()); + let new = writer.into_boxed(); + + match current { + Some(current) => { + let update = Update::create(docid, external_docid, current.boxed(), new); + Ok(Some(DocumentChange::Update(update))) + } + None => { + let insertion = Insertion::create(docid, external_docid, new); + Ok(Some(DocumentChange::Insertion(insertion))) + } + } } - - document_entries.sort_unstable_by_key(|(id, _)| *id); - - let mut writer = KvWriterFieldId::memory(); - document_entries - .into_iter() - .for_each(|(id, value)| writer.insert(id, value.get()).unwrap()); - let new = writer.into_boxed(); - - match current { + Some(InnerDocOp::Deletion) => match current { Some(current) => { - let update = Update::create(docid, external_docid, current.boxed(), new); - Ok(Some(DocumentChange::Update(update))) + let deletion = Deletion::create(docid, external_docid, current.boxed()); + Ok(Some(DocumentChange::Deletion(deletion))) } - None => { - let insertion = Insertion::create(docid, external_docid, new); - Ok(Some(DocumentChange::Insertion(insertion))) - } - } + None => Ok(None), + }, + None => Ok(None), // but it's strange } - Some(InnerDocOp::Deletion) => match current { - Some(current) => { - let deletion = Deletion::create(docid, external_docid, current.boxed()); - Ok(Some(DocumentChange::Deletion(deletion))) - } - None => Ok(None), - }, - None => Ok(None), // but it's strange } } -/// Reads the previous version of a document from the database, the new versions -/// in the grenad update files and merges them to generate a new boxed obkv. -/// -/// This function is only meant to be used when doing an update and not a replacement. -fn merge_document_for_updates( - rtxn: &RoTxn, - index: &Index, - fields_ids_map: &FieldsIdsMap, - docid: DocumentId, - external_docid: String, - operations: &[InnerDocOp], -) -> Result> { - let mut document = BTreeMap::<_, Cow<_>>::new(); - let current = index.documents.remap_data_type::().get(rtxn, &docid)?; - let current: Option<&KvReaderFieldId> = current.map(Into::into); +struct MergeDocumentForUpdates; - if operations.is_empty() { - return Ok(None); // but it's strange +impl MergeChanges for MergeDocumentForUpdates { + /// Reorders to read the first changes first so that it's faster to read the first one and then the rest. + fn sort(changes_offsets: &mut [(CowStr, (DocumentId, Vec))]) { + changes_offsets.sort_unstable_by_key(|(_, (_, offsets))| { + offsets + .iter() + .find_map(|ido| match ido { + InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), + InnerDocOp::Deletion => None, + }) + .unwrap_or(0) + }); } - let last_deletion = operations.iter().rposition(|op| matches!(op, InnerDocOp::Deletion)); - let operations = &operations[last_deletion.map_or(0, |i| i + 1)..]; + /// Reads the previous version of a document from the database, the new versions + /// in the grenad update files and merges them to generate a new boxed obkv. + /// + /// This function is only meant to be used when doing an update and not a replacement. + fn merge( + rtxn: &RoTxn, + index: &Index, + fields_ids_map: &FieldsIdsMap, + docid: DocumentId, + external_docid: String, + operations: &[InnerDocOp], + ) -> Result> { + let mut document = BTreeMap::<_, Cow<_>>::new(); + let current = index.documents.remap_data_type::().get(rtxn, &docid)?; + let current: Option<&KvReaderFieldId> = current.map(Into::into); - // If there was a deletion we must not start - // from the original document but from scratch. - if last_deletion.is_none() { - if let Some(current) = current { - current.into_iter().for_each(|(k, v)| { - document.insert(k, v.into()); - }); + if operations.is_empty() { + return Ok(None); // but it's strange } - } - if operations.is_empty() { + let last_deletion = operations.iter().rposition(|op| matches!(op, InnerDocOp::Deletion)); + let operations = &operations[last_deletion.map_or(0, |i| i + 1)..]; + + // If there was a deletion we must not start + // from the original document but from scratch. + if last_deletion.is_none() { + if let Some(current) = current { + current.into_iter().for_each(|(k, v)| { + document.insert(k, v.into()); + }); + } + } + + if operations.is_empty() { + match current { + Some(current) => { + let deletion = Deletion::create(docid, external_docid, current.boxed()); + return Ok(Some(DocumentChange::Deletion(deletion))); + } + None => return Ok(None), + } + } + + for operation in operations { + let DocumentOffset { content } = match operation { + InnerDocOp::Addition(offset) => offset, + InnerDocOp::Deletion => { + unreachable!("Deletion in document operations") + } + }; + + let map: TopLevelMap = serde_json::from_slice(content).unwrap(); + for (key, v) in map.0 { + let id = fields_ids_map.id(key.as_ref()).unwrap(); + document.insert(id, v.get().as_bytes().to_vec().into()); + } + } + + let mut writer = KvWriterFieldId::memory(); + document.into_iter().for_each(|(id, value)| writer.insert(id, value).unwrap()); + let new = writer.into_boxed(); + match current { Some(current) => { - let deletion = Deletion::create(docid, external_docid, current.boxed()); - return Ok(Some(DocumentChange::Deletion(deletion))); + let update = Update::create(docid, external_docid, current.boxed(), new); + Ok(Some(DocumentChange::Update(update))) } - None => return Ok(None), - } - } - - for operation in operations { - let DocumentOffset { content } = match operation { - InnerDocOp::Addition(offset) => offset, - InnerDocOp::Deletion => { - unreachable!("Deletion in document operations") + None => { + let insertion = Insertion::create(docid, external_docid, new); + Ok(Some(DocumentChange::Insertion(insertion))) } - }; - - let map: TopLevelMap = serde_json::from_slice(content).unwrap(); - for (key, v) in map.0 { - let id = fields_ids_map.id(key.as_ref()).unwrap(); - document.insert(id, v.get().as_bytes().to_vec().into()); - } - } - - let mut writer = KvWriterFieldId::memory(); - document.into_iter().for_each(|(id, value)| writer.insert(id, value).unwrap()); - let new = writer.into_boxed(); - - match current { - Some(current) => { - let update = Update::create(docid, external_docid, current.boxed(), new); - Ok(Some(DocumentChange::Update(update))) - } - None => { - let insertion = Insertion::create(docid, external_docid, new); - Ok(Some(DocumentChange::Insertion(insertion))) } } }