From 721fc294bef5563c86b183875413cad18fe2c9ba Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Tue, 9 Nov 2021 20:19:49 +0100 Subject: [PATCH 1/2] improve document deletion returned meta returns both the remaining number of documents and the number of deleted documents. --- milli/src/update/delete_documents.rs | 34 +++++++++++++++++++++------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/milli/src/update/delete_documents.rs b/milli/src/update/delete_documents.rs index e1a658218..d9c3fba14 100644 --- a/milli/src/update/delete_documents.rs +++ b/milli/src/update/delete_documents.rs @@ -6,6 +6,7 @@ use fst::IntoStreamer; use heed::types::ByteSlice; use heed::{BytesDecode, BytesEncode}; use roaring::RoaringBitmap; +use serde::{Deserialize, Serialize}; use serde_json::Value; use super::ClearDocuments; @@ -25,6 +26,12 @@ pub struct DeleteDocuments<'t, 'u, 'i> { update_id: u64, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DocumentDeletionResult { + pub deleted_documents: u64, + pub remaining_documents: u64, +} + impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { pub fn new( wtxn: &'t mut heed::RwTxn<'i, 'u>, @@ -56,26 +63,34 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { Some(docid) } - pub fn execute(self) -> Result { + pub fn execute(self) -> Result { self.index.set_updated_at(self.wtxn, &Utc::now())?; // We retrieve the current documents ids that are in the database. let mut documents_ids = self.index.documents_ids(self.wtxn)?; + let current_documents_ids_len = documents_ids.len(); // We can and must stop removing documents in a database that is empty. if documents_ids.is_empty() { - return Ok(0); + return Ok(DocumentDeletionResult { + deleted_documents: 0, + remaining_documents: current_documents_ids_len, + }); } // We remove the documents ids that we want to delete // from the documents in the database and write them back. - let current_documents_ids_len = documents_ids.len(); documents_ids -= &self.documents_ids; self.index.put_documents_ids(self.wtxn, &documents_ids)?; // We can execute a ClearDocuments operation when the number of documents // to delete is exactly the number of documents in the database. if current_documents_ids_len == self.documents_ids.len() { - return ClearDocuments::new(self.wtxn, self.index, self.update_id).execute(); + let remaining_documents = + ClearDocuments::new(self.wtxn, self.index, self.update_id).execute()?; + return Ok(DocumentDeletionResult { + deleted_documents: current_documents_ids_len, + remaining_documents, + }); } let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; @@ -86,11 +101,11 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { } })?; - // If we can't find the id of the primary key it means that the database - // is empty and it should be safe to return that we deleted 0 documents. + // Since we already checked if the DB was empty, if we can't find the primary key, then + // something is wrong, and we must return an error. let id_field = match fields_ids_map.id(primary_key) { Some(field) => field, - None => return Ok(0), + None => return Err(UserError::MissingPrimaryKey.into()), }; let Index { @@ -439,7 +454,10 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { )?; } - Ok(self.documents_ids.len()) + Ok(DocumentDeletionResult { + deleted_documents: self.documents_ids.len(), + remaining_documents: documents_ids.len(), + }) } } From 09b4281cff5f72821461a04f92be4c4c52686fb8 Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Wed, 10 Nov 2021 14:08:36 +0100 Subject: [PATCH 2/2] improve document addition returned metaimprove document addition returned metaimprove document addition returned metaimprove document addition returned metaimprove document addition returned metaimprove document addition returned metaimprove document addition returned metaimprove document addition returned meta --- milli/src/update/index_documents/mod.rs | 31 ++++++++++++++++--------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 440546b10..cb3c1a75c 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -35,9 +35,12 @@ static MERGED_DATABASE_COUNT: usize = 7; static PREFIX_DATABASE_COUNT: usize = 5; static TOTAL_POSTING_DATABASE_COUNT: usize = MERGED_DATABASE_COUNT + PREFIX_DATABASE_COUNT; -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DocumentAdditionResult { - pub nb_documents: usize, + /// The number of documents that were indexed during the update + pub indexed_documents: u64, + /// The total number of documents in the index after the update + pub number_of_documents: u64, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -137,7 +140,10 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { { // Early return when there is no document to add if reader.is_empty() { - return Ok(DocumentAdditionResult { nb_documents: 0 }); + return Ok(DocumentAdditionResult { + indexed_documents: 0, + number_of_documents: self.index.number_of_documents(self.wtxn)?, + }); } self.index.set_updated_at(self.wtxn, &Utc::now())?; @@ -157,16 +163,17 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { }; let output = transform.read_documents(reader, progress_callback)?; - let nb_documents = output.documents_count; + let indexed_documents = output.documents_count as u64; info!("Update transformed in {:.02?}", before_transform.elapsed()); - self.execute_raw(output, progress_callback)?; - Ok(DocumentAdditionResult { nb_documents }) - } + let number_of_documents = self.execute_raw(output, progress_callback)?; + Ok(DocumentAdditionResult { indexed_documents, number_of_documents }) + } + /// Returns the total number of documents in the index after the update. #[logging_timer::time("IndexDocuments::{}")] - pub fn execute_raw(self, output: TransformOutput, progress_callback: F) -> Result<()> + pub fn execute_raw(self, output: TransformOutput, progress_callback: F) -> Result where F: Fn(UpdateIndexingStep) + Sync, { @@ -294,7 +301,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { debug!("documents to delete {:?}", replaced_documents_ids); deletion_builder.delete_documents(&replaced_documents_ids); let deleted_documents_count = deletion_builder.execute()?; - debug!("{} documents actually deleted", deleted_documents_count); + debug!("{} documents actually deleted", deleted_documents_count.deleted_documents); } let index_documents_ids = self.index.documents_ids(self.wtxn)?; @@ -325,7 +332,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { if is_merged_database { databases_seen += 1; progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { - databases_seen: databases_seen, + databases_seen, total_databases: TOTAL_POSTING_DATABASE_COUNT, }); } @@ -343,7 +350,9 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids; self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; - self.execute_prefix_databases(progress_callback) + self.execute_prefix_databases(progress_callback)?; + + Ok(all_documents_ids.len()) } #[logging_timer::time("IndexDocuments::{}")]