diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index 09ce46884..f23b811e5 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -74,6 +74,7 @@ make_enum_progress! { make_enum_progress! { pub enum TaskCancelationProgress { RetrievingTasks, + CancelingUpgrade, UpdatingTasks, } } diff --git a/crates/index-scheduler/src/scheduler/create_batch.rs b/crates/index-scheduler/src/scheduler/create_batch.rs index 29d352fe8..8460a4b8f 100644 --- a/crates/index-scheduler/src/scheduler/create_batch.rs +++ b/crates/index-scheduler/src/scheduler/create_batch.rs @@ -423,7 +423,8 @@ impl IndexScheduler { } /// Create the next batch to be processed; - /// 1. We get the *last* task to cancel. + /// 0. We get the *last* task to cancel. + /// 1. We get the tasks to upgrade. /// 2. We get the *next* task to delete. /// 3. We get the *next* snapshot to process. /// 4. We get the *next* dump to process. @@ -443,7 +444,20 @@ impl IndexScheduler { let count_total_enqueued = enqueued.len(); let failed = &self.queue.tasks.get_status(rtxn, Status::Failed)?; - // 0. The priority over everything is to upgrade the instance + // 0. we get the last task to cancel. + let to_cancel = self.queue.tasks.get_kind(rtxn, Kind::TaskCancelation)? & enqueued; + if let Some(task_id) = to_cancel.max() { + let mut task = + self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; + current_batch.processing(Some(&mut task)); + current_batch.reason(BatchStopReason::TaskCannotBeBatched { + kind: Kind::TaskCancelation, + id: task_id, + }); + return Ok(Some((Batch::TaskCancelation { task }, current_batch))); + } + + // 1. We upgrade the instance // There shouldn't be multiple upgrade tasks but just in case we're going to batch all of them at the same time let upgrade = self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)? & (enqueued | failed); if !upgrade.is_empty() {