Delete the tasks content file once the transaction has been successfully committed

This commit is contained in:
Kerollmops 2022-10-19 11:48:35 +02:00 committed by Clément Renault
parent ec0a5a9f01
commit 50b8b9df6a
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -6,7 +6,7 @@ use std::sync::atomic::Ordering::Relaxed;
use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId}; use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId};
use dump::IndexMetadata; use dump::IndexMetadata;
use log::{debug, info}; use log::{debug, error, info};
use meilisearch_types::milli::documents::obkv_to_object; use meilisearch_types::milli::documents::obkv_to_object;
use meilisearch_types::milli::update::IndexDocumentsConfig; use meilisearch_types::milli::update::IndexDocumentsConfig;
@ -468,7 +468,7 @@ impl IndexScheduler {
}; };
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
let canceled_tasks_count = let canceled_tasks_content_uuids =
self.cancel_matched_tasks(&mut wtxn, task.uid, matched_tasks)?; self.cancel_matched_tasks(&mut wtxn, task.uid, matched_tasks)?;
task.status = Status::Succeeded; task.status = Status::Succeeded;
@ -478,12 +478,29 @@ impl IndexScheduler {
canceled_tasks, canceled_tasks,
original_query: _, original_query: _,
}) => { }) => {
*canceled_tasks = Some(canceled_tasks_count); *canceled_tasks = Some(canceled_tasks_content_uuids.len() as u64);
} }
_ => unreachable!(), _ => unreachable!(),
} }
wtxn.commit()?; // We must only remove the content files if the transaction is successfuly committed
// and if errors occurs when we are deleting files we must do our best to delete
// everything. We do not return the encountered errors when deleting the content
// files as it is not a breaking operation and we can safely continue our job.
match wtxn.commit() {
Ok(()) => {
for content_uuid in canceled_tasks_content_uuids {
if let Err(error) = self.delete_update_file(content_uuid) {
error!(
"We failed deleting the content file indentified as {}: {}",
content_uuid, error
)
}
}
}
Err(e) => return Err(e.into()),
}
Ok(vec![task]) Ok(vec![task])
} }
Batch::TaskDeletion(mut task) => { Batch::TaskDeletion(mut task) => {
@ -1022,13 +1039,13 @@ impl IndexScheduler {
/// Cancel each given task from all the databases (if it is cancelable). /// Cancel each given task from all the databases (if it is cancelable).
/// ///
/// Return the number of tasks that were actually canceled. /// Returns the content files that the transaction owner must delete if the commit is successful.
fn cancel_matched_tasks( fn cancel_matched_tasks(
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
cancel_task_id: TaskId, cancel_task_id: TaskId,
matched_tasks: &RoaringBitmap, matched_tasks: &RoaringBitmap,
) -> Result<u64> { ) -> Result<Vec<Uuid>> {
let now = OffsetDateTime::now_utc(); let now = OffsetDateTime::now_utc();
// 1. Remove from this list the tasks that we are not allowed to cancel // 1. Remove from this list the tasks that we are not allowed to cancel
@ -1038,14 +1055,17 @@ impl IndexScheduler {
let tasks_to_cancel = cancelable_tasks & matched_tasks; let tasks_to_cancel = cancelable_tasks & matched_tasks;
// 2. We now have a list of tasks to cancel, cancel them // 2. We now have a list of tasks to cancel, cancel them
let mut content_files_to_delete = Vec::new();
for mut task in self.get_existing_tasks(wtxn, tasks_to_cancel.iter())? { for mut task in self.get_existing_tasks(wtxn, tasks_to_cancel.iter())? {
self.delete_persisted_task_data(&task)?; if let Some(uuid) = task.content_uuid() {
content_files_to_delete.push(*uuid);
}
task.status = Status::Canceled; task.status = Status::Canceled;
task.canceled_by = Some(cancel_task_id); task.canceled_by = Some(cancel_task_id);
task.finished_at = Some(now); task.finished_at = Some(now);
self.update_task(wtxn, &task)?; self.update_task(wtxn, &task)?;
} }
Ok(tasks_to_cancel.len()) Ok(content_files_to_delete)
} }
} }