mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-25 14:10:06 +01:00
Merge #414
414: improve update result types r=ManyTheFish a=MarinPostma Inprove the returned meta when performing document additions and deletions: - On document addition return the number of indexed documents and the total number of documents in the index after the indexion - On document deletion return the number of deleted documents, and the remaining number of documents in the index after the deletion is performed I also fixed a potential bug when performing a document deletion and the primary key couldn't be found: before we assumed that the db was empty and returned that no documents were deleted, but since we checked before that the db wasn't empty, entering this branch is actually a bug, and now returns a 'MissingPrimaryKey' error. Co-authored-by: Marin Postma <postma.marin@protonmail.com>
This commit is contained in:
commit
21b78f3926
@ -6,6 +6,7 @@ use fst::IntoStreamer;
|
|||||||
use heed::types::ByteSlice;
|
use heed::types::ByteSlice;
|
||||||
use heed::{BytesDecode, BytesEncode};
|
use heed::{BytesDecode, BytesEncode};
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use super::ClearDocuments;
|
use super::ClearDocuments;
|
||||||
@ -25,6 +26,12 @@ pub struct DeleteDocuments<'t, 'u, 'i> {
|
|||||||
update_id: u64,
|
update_id: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct DocumentDeletionResult {
|
||||||
|
pub deleted_documents: u64,
|
||||||
|
pub remaining_documents: u64,
|
||||||
|
}
|
||||||
|
|
||||||
impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
|
impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
wtxn: &'t mut heed::RwTxn<'i, 'u>,
|
wtxn: &'t mut heed::RwTxn<'i, 'u>,
|
||||||
@ -56,26 +63,34 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
|
|||||||
Some(docid)
|
Some(docid)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn execute(self) -> Result<u64> {
|
pub fn execute(self) -> Result<DocumentDeletionResult> {
|
||||||
self.index.set_updated_at(self.wtxn, &Utc::now())?;
|
self.index.set_updated_at(self.wtxn, &Utc::now())?;
|
||||||
// We retrieve the current documents ids that are in the database.
|
// We retrieve the current documents ids that are in the database.
|
||||||
let mut documents_ids = self.index.documents_ids(self.wtxn)?;
|
let mut documents_ids = self.index.documents_ids(self.wtxn)?;
|
||||||
|
let current_documents_ids_len = documents_ids.len();
|
||||||
|
|
||||||
// We can and must stop removing documents in a database that is empty.
|
// We can and must stop removing documents in a database that is empty.
|
||||||
if documents_ids.is_empty() {
|
if documents_ids.is_empty() {
|
||||||
return Ok(0);
|
return Ok(DocumentDeletionResult {
|
||||||
|
deleted_documents: 0,
|
||||||
|
remaining_documents: current_documents_ids_len,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// We remove the documents ids that we want to delete
|
// We remove the documents ids that we want to delete
|
||||||
// from the documents in the database and write them back.
|
// from the documents in the database and write them back.
|
||||||
let current_documents_ids_len = documents_ids.len();
|
|
||||||
documents_ids -= &self.documents_ids;
|
documents_ids -= &self.documents_ids;
|
||||||
self.index.put_documents_ids(self.wtxn, &documents_ids)?;
|
self.index.put_documents_ids(self.wtxn, &documents_ids)?;
|
||||||
|
|
||||||
// We can execute a ClearDocuments operation when the number of documents
|
// We can execute a ClearDocuments operation when the number of documents
|
||||||
// to delete is exactly the number of documents in the database.
|
// to delete is exactly the number of documents in the database.
|
||||||
if current_documents_ids_len == self.documents_ids.len() {
|
if current_documents_ids_len == self.documents_ids.len() {
|
||||||
return ClearDocuments::new(self.wtxn, self.index, self.update_id).execute();
|
let remaining_documents =
|
||||||
|
ClearDocuments::new(self.wtxn, self.index, self.update_id).execute()?;
|
||||||
|
return Ok(DocumentDeletionResult {
|
||||||
|
deleted_documents: current_documents_ids_len,
|
||||||
|
remaining_documents,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
|
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
|
||||||
@ -86,11 +101,11 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
|
|||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// If we can't find the id of the primary key it means that the database
|
// Since we already checked if the DB was empty, if we can't find the primary key, then
|
||||||
// is empty and it should be safe to return that we deleted 0 documents.
|
// something is wrong, and we must return an error.
|
||||||
let id_field = match fields_ids_map.id(primary_key) {
|
let id_field = match fields_ids_map.id(primary_key) {
|
||||||
Some(field) => field,
|
Some(field) => field,
|
||||||
None => return Ok(0),
|
None => return Err(UserError::MissingPrimaryKey.into()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let Index {
|
let Index {
|
||||||
@ -439,7 +454,10 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
|
|||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(self.documents_ids.len())
|
Ok(DocumentDeletionResult {
|
||||||
|
deleted_documents: self.documents_ids.len(),
|
||||||
|
remaining_documents: documents_ids.len(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,9 +35,12 @@ static MERGED_DATABASE_COUNT: usize = 7;
|
|||||||
static PREFIX_DATABASE_COUNT: usize = 5;
|
static PREFIX_DATABASE_COUNT: usize = 5;
|
||||||
static TOTAL_POSTING_DATABASE_COUNT: usize = MERGED_DATABASE_COUNT + PREFIX_DATABASE_COUNT;
|
static TOTAL_POSTING_DATABASE_COUNT: usize = MERGED_DATABASE_COUNT + PREFIX_DATABASE_COUNT;
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
pub struct DocumentAdditionResult {
|
pub struct DocumentAdditionResult {
|
||||||
pub nb_documents: usize,
|
/// The number of documents that were indexed during the update
|
||||||
|
pub indexed_documents: u64,
|
||||||
|
/// The total number of documents in the index after the update
|
||||||
|
pub number_of_documents: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||||
@ -137,7 +140,10 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
{
|
{
|
||||||
// Early return when there is no document to add
|
// Early return when there is no document to add
|
||||||
if reader.is_empty() {
|
if reader.is_empty() {
|
||||||
return Ok(DocumentAdditionResult { nb_documents: 0 });
|
return Ok(DocumentAdditionResult {
|
||||||
|
indexed_documents: 0,
|
||||||
|
number_of_documents: self.index.number_of_documents(self.wtxn)?,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
self.index.set_updated_at(self.wtxn, &Utc::now())?;
|
self.index.set_updated_at(self.wtxn, &Utc::now())?;
|
||||||
@ -157,16 +163,17 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let output = transform.read_documents(reader, progress_callback)?;
|
let output = transform.read_documents(reader, progress_callback)?;
|
||||||
let nb_documents = output.documents_count;
|
let indexed_documents = output.documents_count as u64;
|
||||||
|
|
||||||
info!("Update transformed in {:.02?}", before_transform.elapsed());
|
info!("Update transformed in {:.02?}", before_transform.elapsed());
|
||||||
|
|
||||||
self.execute_raw(output, progress_callback)?;
|
let number_of_documents = self.execute_raw(output, progress_callback)?;
|
||||||
Ok(DocumentAdditionResult { nb_documents })
|
|
||||||
}
|
|
||||||
|
|
||||||
|
Ok(DocumentAdditionResult { indexed_documents, number_of_documents })
|
||||||
|
}
|
||||||
|
/// Returns the total number of documents in the index after the update.
|
||||||
#[logging_timer::time("IndexDocuments::{}")]
|
#[logging_timer::time("IndexDocuments::{}")]
|
||||||
pub fn execute_raw<F>(self, output: TransformOutput, progress_callback: F) -> Result<()>
|
pub fn execute_raw<F>(self, output: TransformOutput, progress_callback: F) -> Result<u64>
|
||||||
where
|
where
|
||||||
F: Fn(UpdateIndexingStep) + Sync,
|
F: Fn(UpdateIndexingStep) + Sync,
|
||||||
{
|
{
|
||||||
@ -294,7 +301,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
debug!("documents to delete {:?}", replaced_documents_ids);
|
debug!("documents to delete {:?}", replaced_documents_ids);
|
||||||
deletion_builder.delete_documents(&replaced_documents_ids);
|
deletion_builder.delete_documents(&replaced_documents_ids);
|
||||||
let deleted_documents_count = deletion_builder.execute()?;
|
let deleted_documents_count = deletion_builder.execute()?;
|
||||||
debug!("{} documents actually deleted", deleted_documents_count);
|
debug!("{} documents actually deleted", deleted_documents_count.deleted_documents);
|
||||||
}
|
}
|
||||||
|
|
||||||
let index_documents_ids = self.index.documents_ids(self.wtxn)?;
|
let index_documents_ids = self.index.documents_ids(self.wtxn)?;
|
||||||
@ -325,7 +332,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
if is_merged_database {
|
if is_merged_database {
|
||||||
databases_seen += 1;
|
databases_seen += 1;
|
||||||
progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase {
|
progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase {
|
||||||
databases_seen: databases_seen,
|
databases_seen,
|
||||||
total_databases: TOTAL_POSTING_DATABASE_COUNT,
|
total_databases: TOTAL_POSTING_DATABASE_COUNT,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -343,7 +350,9 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids;
|
let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids;
|
||||||
self.index.put_documents_ids(self.wtxn, &all_documents_ids)?;
|
self.index.put_documents_ids(self.wtxn, &all_documents_ids)?;
|
||||||
|
|
||||||
self.execute_prefix_databases(progress_callback)
|
self.execute_prefix_databases(progress_callback)?;
|
||||||
|
|
||||||
|
Ok(all_documents_ids.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[logging_timer::time("IndexDocuments::{}")]
|
#[logging_timer::time("IndexDocuments::{}")]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user