Stop using delete documents pipeline in batch runner

This commit is contained in:
Louis Dureuil 2023-10-25 13:41:11 +02:00
parent 2263dff02b
commit c534a1b687
No known key found for this signature in database
2 changed files with 41 additions and 29 deletions

View File

@ -30,8 +30,7 @@ use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::heed::CompactionOption; use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::update::{ use meilisearch_types::milli::update::{
DeleteDocuments, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, IndexDocumentsConfig, IndexDocumentsMethod, Settings as MilliSettings,
Settings as MilliSettings,
}; };
use meilisearch_types::milli::{self, Filter, BEU32}; use meilisearch_types::milli::{self, Filter, BEU32};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
@ -1238,7 +1237,8 @@ impl IndexScheduler {
let (new_builder, user_result) = let (new_builder, user_result) =
builder.remove_documents(document_ids)?; builder.remove_documents(document_ids)?;
builder = new_builder; builder = new_builder;
// Uses Invariant: remove documents actually always returns Ok for the inner result
let count = user_result.unwrap();
let provided_ids = let provided_ids =
if let Some(Details::DocumentDeletion { provided_ids, .. }) = if let Some(Details::DocumentDeletion { provided_ids, .. }) =
task.details task.details
@ -1249,23 +1249,11 @@ impl IndexScheduler {
unreachable!(); unreachable!();
}; };
match user_result { task.status = Status::Succeeded;
Ok(count) => { task.details = Some(Details::DocumentDeletion {
task.status = Status::Succeeded; provided_ids,
task.details = Some(Details::DocumentDeletion { deleted_documents: Some(count),
provided_ids, });
deleted_documents: Some(count),
});
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentDeletion {
provided_ids,
deleted_documents: Some(0),
});
task.error = Some(milli::Error::from(e).into());
}
}
} }
} }
} }
@ -1288,21 +1276,42 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
IndexOperation::DocumentDeletion { index_uid: _, documents, mut tasks } => { IndexOperation::DocumentDeletion { index_uid: _, documents, mut tasks } => {
let mut builder = milli::update::DeleteDocuments::new(index_wtxn, index)?; let indexer_config = self.index_mapper.indexer_config();
documents.iter().flatten().for_each(|id| { let config = IndexDocumentsConfig {
builder.delete_external_id(id); update_method: IndexDocumentsMethod::ReplaceDocuments,
}); ..Default::default()
};
let must_stop_processing = self.must_stop_processing.clone();
let DocumentDeletionResult { deleted_documents, .. } = builder.execute()?; let mut builder = milli::update::IndexDocuments::new(
index_wtxn,
index,
indexer_config,
config,
|indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop_processing.get(),
)?;
let document_ids = documents.iter().cloned().flatten().collect();
let (new_builder, user_result) = builder.remove_documents(document_ids)?;
builder = new_builder;
// Uses Invariant: remove documents actually always returns Ok for the inner result
let count = user_result.unwrap();
for (task, documents) in tasks.iter_mut().zip(documents) { for (task, documents) in tasks.iter_mut().zip(documents) {
task.status = Status::Succeeded; task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletion { task.details = Some(Details::DocumentDeletion {
provided_ids: documents.len(), provided_ids: documents.len(),
deleted_documents: Some(deleted_documents.min(documents.len() as u64)), deleted_documents: Some(count.min(documents.len() as u64)),
}); });
} }
if !tasks.iter().all(|res| res.error.is_some()) {
let addition = builder.execute()?;
info!("document deletion done: {:?}", addition);
}
Ok(tasks) Ok(tasks)
} }
IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => { IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
@ -1558,9 +1567,10 @@ fn delete_document_by_filter<'a>(
} }
e => e.into(), e => e.into(),
})?; })?;
let mut delete_operation = DeleteDocuments::new(wtxn, index)?; todo!("need a way to get back the external ids from the internal ids");
delete_operation.delete_documents(&candidates); // let mut delete_operation = DeleteDocuments::new(wtxn, index)?;
delete_operation.execute().map(|result| result.deleted_documents)? // delete_operation.delete_documents(&candidates);
// delete_operation.execute().map(|result| result.deleted_documents)?
} else { } else {
0 0
}) })

View File

@ -180,6 +180,7 @@ where
// Early return when there is no document to add // Early return when there is no document to add
if to_delete.is_empty() { if to_delete.is_empty() {
// Maintains Invariant: remove documents actually always returns Ok for the inner result
return Ok((self, Ok(0))); return Ok((self, Ok(0)));
} }
@ -192,6 +193,7 @@ where
self.deleted_documents += deleted_documents; self.deleted_documents += deleted_documents;
// Maintains Invariant: remove documents actually always returns Ok for the inner result
Ok((self, Ok(deleted_documents))) Ok((self, Ok(deleted_documents)))
} }