From 3ef7a478cd43786d63c8b2e60ecb03e7c3812a66 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 16 Jan 2025 11:00:29 +0100 Subject: [PATCH] move the version check to the task queue --- crates/index-scheduler/src/error.rs | 6 +-- crates/index-scheduler/src/lib.rs | 3 ++ crates/index-scheduler/src/scheduler/mod.rs | 21 +++++++-- .../src/scheduler/process_batch.rs | 20 +++++++- .../src/scheduler/process_upgrade/mod.rs | 20 ++------ crates/index-scheduler/src/upgrade/mod.rs | 46 ++++++++++++++++++- crates/meilisearch-types/src/error.rs | 3 -- crates/milli/src/error.rs | 10 ++-- crates/milli/src/update/upgrade/mod.rs | 38 ++------------- 9 files changed, 95 insertions(+), 72 deletions(-) diff --git a/crates/index-scheduler/src/error.rs b/crates/index-scheduler/src/error.rs index f6ee1f685..a41672995 100644 --- a/crates/index-scheduler/src/error.rs +++ b/crates/index-scheduler/src/error.rs @@ -147,7 +147,7 @@ pub enum Error { #[error("Corrupted task queue.")] CorruptedTaskQueue, #[error(transparent)] - TaskDatabaseUpdate(Box), + TaskDatabaseUpgrade(Box), #[error(transparent)] HeedTransaction(heed::Error), @@ -202,7 +202,7 @@ impl Error { | Error::Anyhow(_) => true, Error::CreateBatch(_) | Error::CorruptedTaskQueue - | Error::TaskDatabaseUpdate(_) + | Error::TaskDatabaseUpgrade(_) | Error::HeedTransaction(_) => false, #[cfg(test)] Error::PlannedFailure => false, @@ -266,7 +266,7 @@ impl ErrorCode for Error { Error::Anyhow(_) => Code::Internal, Error::CorruptedTaskQueue => Code::Internal, Error::CorruptedDump => Code::Internal, - Error::TaskDatabaseUpdate(_) => Code::Internal, + Error::TaskDatabaseUpgrade(_) => Code::Internal, Error::CreateBatch(_) => Code::Internal, // This one should never be seen by the end user diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index cc9436a54..b423c47d4 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -369,6 +369,7 @@ impl IndexScheduler { match ret { Ok(Ok(TickOutcome::TickAgain(_))) => (), Ok(Ok(TickOutcome::WaitForSignal)) => run.scheduler.wake_up.wait(), + Ok(Ok(TickOutcome::StopProcessingForever)) => break, Ok(Err(e)) => { tracing::error!("{e}"); // Wait one second when an irrecoverable error occurs. @@ -816,6 +817,8 @@ pub enum TickOutcome { TickAgain(u64), /// The scheduler should wait for an external signal before attempting another `tick`. WaitForSignal, + /// The scheduler exits the run-loop and will never process tasks again + StopProcessingForever, } /// How many indexes we can afford to have open simultaneously. diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index 6478cf07a..7a55c9f54 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -184,6 +184,7 @@ impl IndexScheduler { progress.update_progress(BatchProgress::WritingTasksToDisk); processing_batch.finished(); + let mut stop_scheduler_forever = false; let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; let mut canceled = RoaringBitmap::new(); @@ -222,7 +223,7 @@ impl IndexScheduler { self.queue .tasks .update_task(&mut wtxn, &task) - .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; + .map_err(|e| Error::TaskDatabaseUpgrade(Box::new(e)))?; } if let Some(canceled_by) = canceled_by { self.queue.tasks.canceled_by.put(&mut wtxn, &canceled_by, &canceled)?; @@ -273,6 +274,12 @@ impl IndexScheduler { let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u32); progress.update_progress(task_progress_obj); + if matches!(err, Error::TaskDatabaseUpgrade(_)) { + tracing::error!( + "Upgrade task failed, tasks won't be processed until the following issue is fixed: {err}" + ); + stop_scheduler_forever = true; + } let error: ResponseError = err.into(); for id in ids.iter() { task_progress.fetch_add(1, Ordering::Relaxed); @@ -280,7 +287,7 @@ impl IndexScheduler { .queue .tasks .get_task(&wtxn, id) - .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? + .map_err(|e| Error::TaskDatabaseUpgrade(Box::new(e)))? .ok_or(Error::CorruptedTaskQueue)?; task.status = Status::Failed; task.error = Some(error.clone()); @@ -297,7 +304,7 @@ impl IndexScheduler { self.queue .tasks .update_task(&mut wtxn, &task) - .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; + .map_err(|e| Error::TaskDatabaseUpgrade(Box::new(e)))?; } } } @@ -327,7 +334,7 @@ impl IndexScheduler { .queue .tasks .get_task(&rtxn, id) - .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? + .map_err(|e| Error::TaskDatabaseUpgrade(Box::new(e)))? .ok_or(Error::CorruptedTaskQueue)?; if let Err(e) = self.queue.delete_persisted_task_data(&task) { tracing::error!( @@ -345,6 +352,10 @@ impl IndexScheduler { #[cfg(test)] self.breakpoint(crate::test_utils::Breakpoint::AfterProcessing); - Ok(TickOutcome::TickAgain(processed_tasks)) + if stop_scheduler_forever { + Ok(TickOutcome::StopProcessingForever) + } else { + Ok(TickOutcome::TickAgain(processed_tasks)) + } } } diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index f6699ae87..ae98dc83c 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -1,4 +1,5 @@ use std::collections::{BTreeSet, HashMap, HashSet}; +use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::atomic::Ordering; use meilisearch_types::batches::BatchId; @@ -314,7 +315,24 @@ impl IndexScheduler { task.status = Status::Succeeded; Ok(vec![task]) } - Batch::UpgradeDatabase { tasks } => self.process_upgrade(progress, tasks), + Batch::UpgradeDatabase { mut tasks } => { + let ret = catch_unwind(AssertUnwindSafe(|| self.process_upgrade(progress))); + match ret { + Ok(Ok(())) => (), + Ok(Err(e)) => return Err(Error::TaskDatabaseUpgrade(Box::new(e))), + Err(_e) => { + return Err(Error::TaskDatabaseUpgrade(Box::new( + Error::ProcessBatchPanicked, + ))); + } + } + + for task in tasks.iter_mut() { + task.status = Status::Succeeded; + } + + Ok(tasks) + } } } diff --git a/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs b/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs index 0c3af1386..f3038d343 100644 --- a/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs +++ b/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs @@ -1,24 +1,14 @@ use meilisearch_types::{ milli, milli::progress::{Progress, VariableNameStep}, - tasks::{KindWithContent, Status, Task}, }; use crate::{processing::UpgradeDatabaseProgress, Error, IndexScheduler, Result}; impl IndexScheduler { - pub(super) fn process_upgrade( - &self, - progress: Progress, - mut tasks: Vec, - ) -> Result> { + pub(super) fn process_upgrade(&self, progress: Progress) -> Result<()> { progress.update_progress(UpgradeDatabaseProgress::EnsuringCorrectnessOfTheSwap); - // Since we should not have multiple upgrade tasks, we're only going to process the latest one: - let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else { - unreachable!() - }; - enum UpgradeIndex {} let indexes = self.index_names()?; @@ -29,14 +19,10 @@ impl IndexScheduler { indexes.len() as u32, )); let index = self.index(uid)?; - milli::update::upgrade::upgrade(&index, from, progress.clone()) + milli::update::upgrade::upgrade(&index, progress.clone()) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; } - for task in tasks.iter_mut() { - task.status = Status::Succeeded; - } - - Ok(tasks) + Ok(()) } } diff --git a/crates/index-scheduler/src/upgrade/mod.rs b/crates/index-scheduler/src/upgrade/mod.rs index cd4adef52..a0ad32f57 100644 --- a/crates/index-scheduler/src/upgrade/mod.rs +++ b/crates/index-scheduler/src/upgrade/mod.rs @@ -1,16 +1,53 @@ use std::path::Path; +use anyhow::bail; use meilisearch_types::{ heed, tasks::{KindWithContent, Status, Task}, + versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}, }; use time::OffsetDateTime; use tracing::info; use crate::queue::TaskQueue; -pub fn upgrade_task_queue(tasks_path: &Path, version: (u32, u32, u32)) -> anyhow::Result<()> { +pub fn upgrade_task_queue(tasks_path: &Path, from: (u32, u32, u32)) -> anyhow::Result<()> { + let current_major: u32 = VERSION_MAJOR.parse().unwrap(); + let current_minor: u32 = VERSION_MINOR.parse().unwrap(); + let current_patch: u32 = VERSION_PATCH.parse().unwrap(); + + let upgrade_functions = + [(v1_12_to_current as fn(&Path) -> anyhow::Result<()>, "Upgrading from v1.12 to v1.13")]; + + let start = match from { + (1, 12, _) => 0, + (major, minor, patch) => { + if major > current_major + || (major == current_major && minor > current_minor) + || (major == current_major && minor == current_minor && patch > current_patch) + { + bail!( + "Database version {major}.{minor}.{patch} is higher than the binary version {current_major}.{current_minor}.{current_patch}. Downgrade is not supported", + ); + } else if major < current_major + || (major == current_major && minor < current_minor) + || (major == current_major && minor == current_minor && patch < current_patch) + { + bail!( + "Database version {major}.{minor}.{patch} is too old for the experimental dumpless upgrade feature. Please generate a dump using the v{major}.{minor}.{patch} and imports it in the v{current_major}.{current_minor}.{current_patch}", + ); + } else { + bail!("Unknown database version: v{major}.{minor}.{patch}"); + } + } + }; + info!("Upgrading the task queue"); + for (upgrade, upgrade_name) in upgrade_functions[start..].iter() { + info!("{upgrade_name}"); + (upgrade)(tasks_path)?; + } + let env = unsafe { heed::EnvOpenOptions::new() .max_dbs(19) @@ -33,7 +70,7 @@ pub fn upgrade_task_queue(tasks_path: &Path, version: (u32, u32, u32)) -> anyhow canceled_by: None, details: None, status: Status::Enqueued, - kind: KindWithContent::UpgradeDatabase { from: version }, + kind: KindWithContent::UpgradeDatabase { from }, }, )?; wtxn.commit()?; @@ -41,3 +78,8 @@ pub fn upgrade_task_queue(tasks_path: &Path, version: (u32, u32, u32)) -> anyhow env.prepare_for_closing().wait(); Ok(()) } + +/// The task queue is 100% compatible with the previous versions +fn v1_12_to_current(_path: &Path) -> anyhow::Result<()> { + Ok(()) +} diff --git a/crates/meilisearch-types/src/error.rs b/crates/meilisearch-types/src/error.rs index 54b9c8474..fa1d4a7d3 100644 --- a/crates/meilisearch-types/src/error.rs +++ b/crates/meilisearch-types/src/error.rs @@ -456,9 +456,6 @@ impl ErrorCode for milli::Error { | UserError::DocumentEditionCompilationError(_) => { Code::EditDocumentsByFunctionError } - UserError::TooOldForUpgrade(_, _, _) - | UserError::CannotDowngrade(_, _, _) - | UserError::CannotUpgradeToUnknownVersion(_, _, _) => Code::CouldNotUpgrade, } } } diff --git a/crates/milli/src/error.rs b/crates/milli/src/error.rs index aad9b9a9f..c8ed1912f 100644 --- a/crates/milli/src/error.rs +++ b/crates/milli/src/error.rs @@ -10,7 +10,7 @@ use rhai::EvalAltResult; use serde_json::Value; use thiserror::Error; -use crate::constants::{RESERVED_GEO_FIELD_NAME, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; +use crate::constants::RESERVED_GEO_FIELD_NAME; use crate::documents::{self, DocumentsBatchCursorError}; use crate::thread_pool_no_abort::PanicCatched; use crate::{CriterionError, DocumentId, FieldId, Object, SortError}; @@ -74,6 +74,8 @@ pub enum InternalError { AbortedIndexation, #[error("The matching words list contains at least one invalid member")] InvalidMatchingWords, + #[error("Cannot upgrade to the following version: v{0}.{1}.{2}.")] + CannotUpgradeToVersion(u32, u32, u32), #[error(transparent)] ArroyError(#[from] arroy::Error), #[error(transparent)] @@ -288,12 +290,6 @@ and can not be more than 511 bytes.", .document_id.to_string() DocumentEditionCompilationError(rhai::ParseError), #[error("{0}")] DocumentEmbeddingError(String), - #[error("Upgrade could not be processed because v{0}.{1}.{2} of the database is too old. Please re-open the v{0}.{1}.{2} and use a dump to upgrade your version. The oldest version meilisearch can upgrade from is v1.12.0.")] - TooOldForUpgrade(u32, u32, u32), - #[error("Upgrade could not be processed because the database version (v{0}.{1}.{2}) is newer than the targeted version (v{VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_PATCH})")] - CannotDowngrade(u32, u32, u32), - #[error("Cannot upgrade to unknown version v{0}.{1}.{2}.")] - CannotUpgradeToUnknownVersion(u32, u32, u32), } impl From for Error { diff --git a/crates/milli/src/update/upgrade/mod.rs b/crates/milli/src/update/upgrade/mod.rs index aab160b38..e06f3657e 100644 --- a/crates/milli/src/update/upgrade/mod.rs +++ b/crates/milli/src/update/upgrade/mod.rs @@ -1,47 +1,17 @@ -use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use crate::progress::{Progress, VariableNameStep}; -use crate::{Index, Result, UserError}; +use crate::{Index, InternalError, Result}; -pub fn upgrade(index: &Index, base_version: (u32, u32, u32), progress: Progress) -> Result<()> { +pub fn upgrade(index: &Index, progress: Progress) -> Result<()> { let wtxn = index.env.write_txn()?; let from = index.get_version(&wtxn)?; let upgrade_functions = [(v1_12_to_v1_13 as fn(&Index, Progress) -> Result<()>, "Upgrading from v1.12 to v1.13")]; - let current_major: u32 = VERSION_MAJOR.parse().unwrap(); - let current_minor: u32 = VERSION_MINOR.parse().unwrap(); - let current_patch: u32 = VERSION_PATCH.parse().unwrap(); - let start = match from { // If there was no version it means we're coming from the base version specified by the index-scheduler - None if base_version.0 == 1 && base_version.1 == 12 => 0, - Some((1, 12, _)) => 0, - - // --- Error handling - None => { - return Err(UserError::TooOldForUpgrade( - base_version.0, - base_version.1, - base_version.2, - ) - .into()); - } - Some((major, minor, patch)) if major == 0 || (major == 1 && minor < 12) => { - return Err(UserError::TooOldForUpgrade(major, minor, patch).into()); - } - Some((major, minor, patch)) if major > current_major => { - return Err(UserError::CannotDowngrade(major, minor, patch).into()); - } - Some((major, minor, patch)) if major == current_major && minor > current_minor => { - return Err(UserError::CannotDowngrade(major, minor, patch).into()); - } - Some((major, minor, patch)) - if major == current_major && minor == current_minor && patch > current_patch => - { - return Err(UserError::CannotDowngrade(major, minor, patch).into()); - } + None | Some((1, 12, _)) => 0, Some((major, minor, patch)) => { - return Err(UserError::CannotUpgradeToUnknownVersion(major, minor, patch).into()) + return Err(InternalError::CannotUpgradeToVersion(major, minor, patch).into()) } };