From 421a9cf05e5d70c41edeb64222ab0cfc62a25692 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 8 Feb 2023 16:06:09 +0100 Subject: [PATCH] provide a new method on the transform to remove documents --- milli/src/snapshot_tests.rs | 20 +- milli/src/update/index_documents/mod.rs | 225 ++++++++++++++++++ milli/src/update/index_documents/transform.rs | 60 ++++- 3 files changed, 302 insertions(+), 3 deletions(-) diff --git a/milli/src/snapshot_tests.rs b/milli/src/snapshot_tests.rs index 49f9fbe92..f7f1a97e6 100644 --- a/milli/src/snapshot_tests.rs +++ b/milli/src/snapshot_tests.rs @@ -6,7 +6,7 @@ use roaring::RoaringBitmap; use crate::facet::FacetType; use crate::heed_codec::facet::{FacetGroupKey, FacetGroupValue}; -use crate::{make_db_snap_from_iter, ExternalDocumentsIds, Index}; +use crate::{make_db_snap_from_iter, obkv_to_json, ExternalDocumentsIds, Index}; #[track_caller] pub fn default_db_snapshot_settings_for_test(name: Option<&str>) -> (insta::Settings, String) { @@ -427,8 +427,26 @@ pub fn snap_settings(index: &Index) -> String { snap } +pub fn snap_documents(index: &Index) -> String { + let mut snap = String::new(); + let rtxn = index.read_txn().unwrap(); + let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let display = fields_ids_map.ids().collect::>(); + + for document in index.all_documents(&rtxn).unwrap() { + let doc = obkv_to_json(&display, &fields_ids_map, document.unwrap().1).unwrap(); + snap.push_str(&serde_json::to_string(&doc).unwrap()); + snap.push('\n'); + } + + snap +} + #[macro_export] macro_rules! full_snap_of_db { + ($index:ident, documents) => {{ + $crate::snapshot_tests::snap_documents(&$index) + }}; ($index:ident, settings) => {{ $crate::snapshot_tests::snap_settings(&$index) }}; diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 3e9edf3a2..87c96818e 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -79,6 +79,7 @@ pub struct IndexDocuments<'t, 'u, 'i, 'a, FP, FA> { progress: FP, should_abort: FA, added_documents: u64, + deleted_documents: u64, } #[derive(Default, Debug, Clone)] @@ -122,6 +123,7 @@ where wtxn, index, added_documents: 0, + deleted_documents: 0, }) } @@ -166,6 +168,30 @@ where Ok((self, Ok(indexed_documents))) } + /// Remove a batch of documents from the current builder. + /// + /// Returns the number of documents deleted from the builder. + pub fn remove_documents( + mut self, + to_delete: Vec, + ) -> Result<(Self, StdResult)> { + // Early return when there is no document to add + if to_delete.is_empty() { + return Ok((self, Ok(0))); + } + + let deleted_documents = self + .transform + .as_mut() + .expect("Invalid document deletion state") + .remove_documents(to_delete, self.wtxn, &self.should_abort)? + as u64; + + self.deleted_documents += deleted_documents; + + Ok((self, Ok(deleted_documents))) + } + #[logging_timer::time("IndexDocuments::{}")] pub fn execute(mut self) -> Result { if self.added_documents == 0 { @@ -1879,4 +1905,203 @@ mod tests { index.add_documents(doc1).unwrap(); } + + #[test] + fn add_and_delete_documents_in_single_transform() { + let index = TempIndex::new(); + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let documents = documents!([ + { "id": 1, "doggo": "kevin" }, + { "id": 2, "doggo": { "name": "bob", "age": 20 } }, + { "id": 3, "name": "jean", "age": 25 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"3"); + + let (builder, removed) = builder.remove_documents(vec![S("2")]).unwrap(); + insta::assert_display_snapshot!(removed.unwrap(), @"1"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 3, + number_of_documents: 2, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":1,"doggo":"kevin"} + {"id":3,"name":"jean","age":25} + "###); + } + + #[test] + fn add_update_and_delete_documents_in_single_transform() { + let index = TempIndex::new(); + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let documents = documents!([ + { "id": 1, "doggo": "kevin" }, + { "id": 2, "doggo": { "name": "bob", "age": 20 } }, + { "id": 3, "name": "jean", "age": 25 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"3"); + + let documents = documents!([ + { "id": 2, "doggo": { "name": "jean", "age": 20 } }, + { "id": 3, "name": "bob", "age": 25 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"2"); + + let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap(); + insta::assert_display_snapshot!(removed.unwrap(), @"2"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 5, + number_of_documents: 1, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":3,"name":"bob","age":25} + "###); + } + + #[test] + fn add_document_and_in_another_transform_update_and_delete_documents() { + let index = TempIndex::new(); + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let documents = documents!([ + { "id": 1, "doggo": "kevin" }, + { "id": 2, "doggo": { "name": "bob", "age": 20 } }, + { "id": 3, "name": "jean", "age": 25 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"3"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 3, + number_of_documents: 3, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":1,"doggo":"kevin"} + {"id":2,"doggo":{"name":"bob","age":20}} + {"id":3,"name":"jean","age":25} + "###); + + // A first batch of documents has been inserted + + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let documents = documents!([ + { "id": 2, "doggo": { "name": "jean", "age": 20 } }, + { "id": 3, "name": "bob", "age": 25 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"2"); + + let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap(); + insta::assert_display_snapshot!(removed.unwrap(), @"2"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 2, + number_of_documents: 1, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":3,"name":"bob","age":25} + "###); + } + + #[test] + fn delete_document_and_then_add_documents_in_the_same_transform() { + let index = TempIndex::new(); + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap(); + insta::assert_display_snapshot!(removed.unwrap(), @"0"); + + let documents = documents!([ + { "id": 2, "doggo": { "name": "jean", "age": 20 } }, + { "id": 3, "name": "bob", "age": 25 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"2"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 2, + number_of_documents: 2, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":2,"doggo":{"name":"jean","age":20}} + {"id":3,"name":"bob","age":25} + "###); + } } diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index fe8f06b6c..50d3d4248 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -159,9 +159,7 @@ impl<'a, 'i> Transform<'a, 'i> { FA: Fn() -> bool + Sync, { let (mut cursor, fields_index) = reader.into_cursor_and_fields_index(); - let external_documents_ids = self.index.external_documents_ids(wtxn)?; - let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; let primary_key = cursor.primary_key().to_string(); @@ -322,6 +320,64 @@ impl<'a, 'i> Transform<'a, 'i> { Ok(documents_count) } + /// The counter part of `read_documents` that removes documents that may have been inserted into the transform previously. + pub fn remove_documents( + &mut self, + mut to_remove: Vec, + wtxn: &mut heed::RwTxn, + should_abort: FA, + ) -> Result + where + FA: Fn() -> bool + Sync, + { + // there may be duplicates in the documents to remove. + to_remove.sort_unstable(); + to_remove.dedup(); + + let external_documents_ids = self.index.external_documents_ids(wtxn)?; + + let mut documents_deleted = 0; + for to_remove in to_remove { + if should_abort() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + + match self.new_external_documents_ids_builder.entry((*to_remove).into()) { + // if the document was added in a previous iteration of the transform we make it as deleted in the sorters. + Entry::Occupied(entry) => { + let doc_id = *entry.get() as u32; + self.original_sorter + .insert(doc_id.to_be_bytes(), &[Operation::Deletion as u8])?; + self.flattened_sorter + .insert(doc_id.to_be_bytes(), &[Operation::Deletion as u8])?; + + // we must NOT update the list of replaced_documents_ids + // Either: + // 1. It's already in it and there is nothing to do + // 2. It wasn't in it because the document was created by a previous batch and since + // we're removing it there is nothing to do. + self.new_documents_ids.remove(doc_id); + entry.remove_entry(); + } + Entry::Vacant(entry) => { + // If the document was already in the db we mark it as a `to_delete` document. + // It'll be deleted later. We don't need to push anything to the sorters. + if let Some(docid) = external_documents_ids.get(entry.key()) { + self.replaced_documents_ids.insert(docid); + } else { + // if the document is nowehere to be found, there is nothing to do and we must NOT + // increment the count of documents_deleted + continue; + } + } + }; + + documents_deleted += 1; + } + + Ok(documents_deleted) + } + // Flatten a document from the fields ids map contained in self and insert the new // created fields. Returns `None` if the document doesn't need to be flattened. fn flatten_from_fields_ids_map(&mut self, obkv: KvReader) -> Result>> {