diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 3d7a65c8c..033fdf206 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -50,7 +50,7 @@ use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use roaring::RoaringBitmap; use synchronoise::SignalEvent; use time::OffsetDateTime; -use utils::{keep_tasks_within_datetimes, map_bound}; +use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; use uuid::Uuid; use crate::index_mapper::IndexMapper; @@ -565,7 +565,7 @@ impl IndexScheduler { pub fn register(&self, kind: KindWithContent) -> Result { let mut wtxn = self.env.write_txn()?; - let task = Task { + let mut task = Task { uid: self.next_task_id(&wtxn)?, enqueued_at: time::OffsetDateTime::now_utc(), started_at: None, @@ -576,6 +576,12 @@ impl IndexScheduler { status: Status::Enqueued, kind: kind.clone(), }; + // For deletion and cancelation tasks, we want to make extra sure that they + // don't attempt to delete/cancel tasks that are newer than themselves. + filter_out_references_to_newer_tasks(&mut task); + // Get rid of the mutability. + let task = task; + self.all_tasks.append(&mut wtxn, &BEU32::new(task.uid), &task)?; for index in task.indexes() { diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index 1b22f609c..faf7e0e04 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -276,6 +276,28 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) { } } } + +/// Remove references to task ids that are greater than the id of the given task. +pub(crate) fn filter_out_references_to_newer_tasks(task: &mut Task) { + let new_nbr_of_matched_tasks = match &mut task.kind { + KindWithContent::TaskCancelation { tasks, .. } + | KindWithContent::TaskDeletion { tasks, .. } => { + tasks.remove_range(task.uid..); + tasks.len() + } + _ => return, + }; + match &mut task.details { + Some( + Details::TaskCancelation { matched_tasks, .. } + | Details::TaskDeletion { matched_tasks, .. }, + ) => { + *matched_tasks = new_nbr_of_matched_tasks; + } + _ => (), + } +} + #[cfg(test)] impl IndexScheduler { /// Asserts that the index scheduler's content is internally consistent.