diff --git a/milli/src/update/delete_documents.rs b/milli/src/update/delete_documents.rs index e1a658218..d9c3fba14 100644 --- a/milli/src/update/delete_documents.rs +++ b/milli/src/update/delete_documents.rs @@ -6,6 +6,7 @@ use fst::IntoStreamer; use heed::types::ByteSlice; use heed::{BytesDecode, BytesEncode}; use roaring::RoaringBitmap; +use serde::{Deserialize, Serialize}; use serde_json::Value; use super::ClearDocuments; @@ -25,6 +26,12 @@ pub struct DeleteDocuments<'t, 'u, 'i> { update_id: u64, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DocumentDeletionResult { + pub deleted_documents: u64, + pub remaining_documents: u64, +} + impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { pub fn new( wtxn: &'t mut heed::RwTxn<'i, 'u>, @@ -56,26 +63,34 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { Some(docid) } - pub fn execute(self) -> Result { + pub fn execute(self) -> Result { self.index.set_updated_at(self.wtxn, &Utc::now())?; // We retrieve the current documents ids that are in the database. let mut documents_ids = self.index.documents_ids(self.wtxn)?; + let current_documents_ids_len = documents_ids.len(); // We can and must stop removing documents in a database that is empty. if documents_ids.is_empty() { - return Ok(0); + return Ok(DocumentDeletionResult { + deleted_documents: 0, + remaining_documents: current_documents_ids_len, + }); } // We remove the documents ids that we want to delete // from the documents in the database and write them back. - let current_documents_ids_len = documents_ids.len(); documents_ids -= &self.documents_ids; self.index.put_documents_ids(self.wtxn, &documents_ids)?; // We can execute a ClearDocuments operation when the number of documents // to delete is exactly the number of documents in the database. if current_documents_ids_len == self.documents_ids.len() { - return ClearDocuments::new(self.wtxn, self.index, self.update_id).execute(); + let remaining_documents = + ClearDocuments::new(self.wtxn, self.index, self.update_id).execute()?; + return Ok(DocumentDeletionResult { + deleted_documents: current_documents_ids_len, + remaining_documents, + }); } let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; @@ -86,11 +101,11 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { } })?; - // If we can't find the id of the primary key it means that the database - // is empty and it should be safe to return that we deleted 0 documents. + // Since we already checked if the DB was empty, if we can't find the primary key, then + // something is wrong, and we must return an error. let id_field = match fields_ids_map.id(primary_key) { Some(field) => field, - None => return Ok(0), + None => return Err(UserError::MissingPrimaryKey.into()), }; let Index { @@ -439,7 +454,10 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { )?; } - Ok(self.documents_ids.len()) + Ok(DocumentDeletionResult { + deleted_documents: self.documents_ids.len(), + remaining_documents: documents_ids.len(), + }) } } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 440546b10..cb3c1a75c 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -35,9 +35,12 @@ static MERGED_DATABASE_COUNT: usize = 7; static PREFIX_DATABASE_COUNT: usize = 5; static TOTAL_POSTING_DATABASE_COUNT: usize = MERGED_DATABASE_COUNT + PREFIX_DATABASE_COUNT; -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DocumentAdditionResult { - pub nb_documents: usize, + /// The number of documents that were indexed during the update + pub indexed_documents: u64, + /// The total number of documents in the index after the update + pub number_of_documents: u64, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -137,7 +140,10 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { { // Early return when there is no document to add if reader.is_empty() { - return Ok(DocumentAdditionResult { nb_documents: 0 }); + return Ok(DocumentAdditionResult { + indexed_documents: 0, + number_of_documents: self.index.number_of_documents(self.wtxn)?, + }); } self.index.set_updated_at(self.wtxn, &Utc::now())?; @@ -157,16 +163,17 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { }; let output = transform.read_documents(reader, progress_callback)?; - let nb_documents = output.documents_count; + let indexed_documents = output.documents_count as u64; info!("Update transformed in {:.02?}", before_transform.elapsed()); - self.execute_raw(output, progress_callback)?; - Ok(DocumentAdditionResult { nb_documents }) - } + let number_of_documents = self.execute_raw(output, progress_callback)?; + Ok(DocumentAdditionResult { indexed_documents, number_of_documents }) + } + /// Returns the total number of documents in the index after the update. #[logging_timer::time("IndexDocuments::{}")] - pub fn execute_raw(self, output: TransformOutput, progress_callback: F) -> Result<()> + pub fn execute_raw(self, output: TransformOutput, progress_callback: F) -> Result where F: Fn(UpdateIndexingStep) + Sync, { @@ -294,7 +301,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { debug!("documents to delete {:?}", replaced_documents_ids); deletion_builder.delete_documents(&replaced_documents_ids); let deleted_documents_count = deletion_builder.execute()?; - debug!("{} documents actually deleted", deleted_documents_count); + debug!("{} documents actually deleted", deleted_documents_count.deleted_documents); } let index_documents_ids = self.index.documents_ids(self.wtxn)?; @@ -325,7 +332,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { if is_merged_database { databases_seen += 1; progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { - databases_seen: databases_seen, + databases_seen, total_databases: TOTAL_POSTING_DATABASE_COUNT, }); } @@ -343,7 +350,9 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids; self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; - self.execute_prefix_databases(progress_callback) + self.execute_prefix_databases(progress_callback)?; + + Ok(all_documents_ids.len()) } #[logging_timer::time("IndexDocuments::{}")]