From c5360bcdbf6ffdf4e4fa2631782d508a8db46b7d Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Tue, 22 Apr 2025 09:37:56 +0200 Subject: [PATCH] When canceling an upgrade task, execute the rollback code --- .../src/scheduler/process_batch.rs | 86 +++++++++++++++++-- 1 file changed, 81 insertions(+), 5 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 42de1d137..44a136ba0 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -6,7 +6,8 @@ use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::{self, ChannelCongestion}; -use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task}; +use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; +use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use milli::update::Settings as MilliSettings; use roaring::RoaringBitmap; @@ -144,11 +145,22 @@ impl IndexScheduler { self.index_mapper.index(&rtxn, &index_uid)? }; + let mut index_wtxn = index.write_txn()?; + + let index_version = index.get_version(&index_wtxn)?.unwrap_or((1, 12, 0)); + let package_version = (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH); + if index_version != (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) { + return Err(Error::IndexVersionMismatch { + index: index_uid, + index_version, + package_version, + }); + } + // the index operation can take a long time, so save this handle to make it available to the search for the duration of the tick self.index_mapper .set_currently_updating_index(Some((index_uid.clone(), index.clone()))); - let mut index_wtxn = index.write_txn()?; let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?; let (tasks, congestion) = self.apply_index_operation(&mut index_wtxn, &index, op, &progress)?; @@ -353,9 +365,11 @@ impl IndexScheduler { let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else { unreachable!(); }; + let ret = catch_unwind(AssertUnwindSafe(|| self.process_upgrade(from, progress))); match ret { Ok(Ok(())) => (), + Ok(Err(Error::AbortedTask)) => return Err(Error::AbortedTask), Ok(Err(e)) => return Err(Error::DatabaseUpgrade(Box::new(e))), Err(e) => { let msg = match e.downcast_ref::<&'static str>() { @@ -653,17 +667,79 @@ impl IndexScheduler { progress: &Progress, ) -> Result> { progress.update_progress(TaskCancelationProgress::RetrievingTasks); + let mut tasks_to_cancel = RoaringBitmap::new(); + let enqueued_tasks = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?; + + // 0. Check if any upgrade task was matched. + // If so, we cancel all the failed or enqueued upgrade tasks. + let upgrade_tasks = &self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)?; + let is_canceling_upgrade = !matched_tasks.is_disjoint(upgrade_tasks); + if is_canceling_upgrade { + let failed_tasks = self.queue.tasks.get_status(rtxn, Status::Failed)?; + tasks_to_cancel |= upgrade_tasks & (enqueued_tasks | failed_tasks); + } // 1. Remove from this list the tasks that we are not allowed to cancel // Notice that only the _enqueued_ ones are cancelable and we should // have already aborted the indexation of the _processing_ ones - let cancelable_tasks = self.queue.tasks.get_status(rtxn, Status::Enqueued)?; - let tasks_to_cancel = cancelable_tasks & matched_tasks; + tasks_to_cancel |= enqueued_tasks & matched_tasks; + // 2. If we're canceling an upgrade, attempt the rollback + if let Some(latest_upgrade_task) = (&tasks_to_cancel & upgrade_tasks).max() { + progress.update_progress(TaskCancelationProgress::CancelingUpgrade); + + let task = self.queue.tasks.get_task(rtxn, latest_upgrade_task)?.unwrap(); + let Some(Details::UpgradeDatabase { from, to }) = task.details else { + unreachable!("wrong details for upgrade task {latest_upgrade_task}") + }; + + // check that we are rollbacking an upgrade to the current Meilisearch + let bin_major: u32 = meilisearch_types::versioning::VERSION_MAJOR; + let bin_minor: u32 = meilisearch_types::versioning::VERSION_MINOR; + let bin_patch: u32 = meilisearch_types::versioning::VERSION_PATCH; + + if to == (bin_major, bin_minor, bin_patch) { + tracing::warn!( + "Rollbacking from v{}.{}.{} to v{}.{}.{}", + to.0, + to.1, + to.2, + from.0, + from.1, + from.2 + ); + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + self.process_rollback(from, progress) + })) { + Ok(Ok(())) => {} + Ok(Err(err)) => return Err(Error::DatabaseUpgrade(Box::new(err))), + Err(e) => { + let msg = match e.downcast_ref::<&'static str>() { + Some(s) => *s, + None => match e.downcast_ref::() { + Some(s) => &s[..], + None => "Box", + }, + }; + return Err(Error::DatabaseUpgrade(Box::new(Error::ProcessBatchPanicked( + msg.to_string(), + )))); + } + } + } else { + tracing::debug!( + "Not rollbacking an upgrade targetting the earlier version v{}.{}.{}", + bin_major, + bin_minor, + bin_patch + ) + } + } + + // 3. We now have a list of tasks to cancel, cancel them let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32); progress.update_progress(progress_obj); - // 2. We now have a list of tasks to cancel, cancel them let mut tasks = self.queue.tasks.get_existing_tasks( rtxn, tasks_to_cancel.iter().inspect(|_| {