diff --git a/crates/index-scheduler/src/features.rs b/crates/index-scheduler/src/features.rs index 80da67f3e..c6c17b2d5 100644 --- a/crates/index-scheduler/src/features.rs +++ b/crates/index-scheduler/src/features.rs @@ -93,15 +93,16 @@ impl FeatureData { NUMBER_OF_DATABASES } - pub fn new(env: &Env, instance_features: InstanceTogglableFeatures) -> Result { - let mut wtxn = env.write_txn()?; + pub fn new( + env: &Env, + wtxn: &mut RwTxn, + instance_features: InstanceTogglableFeatures, + ) -> Result { let runtime_features_db = - env.create_database(&mut wtxn, Some(db_name::EXPERIMENTAL_FEATURES))?; - wtxn.commit()?; + env.create_database(wtxn, Some(db_name::EXPERIMENTAL_FEATURES))?; - let txn = env.read_txn()?; let persisted_features: RuntimeTogglableFeatures = - runtime_features_db.get(&txn, db_name::EXPERIMENTAL_FEATURES)?.unwrap_or_default(); + runtime_features_db.get(wtxn, db_name::EXPERIMENTAL_FEATURES)?.unwrap_or_default(); let InstanceTogglableFeatures { metrics, logs_route, contains_filter } = instance_features; let runtime = Arc::new(RwLock::new(RuntimeTogglableFeatures { metrics: metrics || persisted_features.metrics, diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index 3a9009504..db506e58e 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -21,6 +21,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { cleanup_enabled: _, processing_tasks, env, + version: _, queue, scheduler, diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 2c7b3e075..530b7bedc 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -33,6 +33,7 @@ mod test_utils; pub mod upgrade; mod utils; pub mod uuid_codec; +mod versioning; pub type Result = std::result::Result; pub type TaskId = u32; @@ -66,6 +67,7 @@ use queue::Queue; use roaring::RoaringBitmap; use scheduler::Scheduler; use time::OffsetDateTime; +use versioning::Versioning; use crate::index_mapper::IndexMapper; use crate::utils::clamp_to_page_size; @@ -134,17 +136,18 @@ pub struct IndexScheduler { /// The list of tasks currently processing pub(crate) processing_tasks: Arc>, + /// A database containing only the version of the index-scheduler + pub version: versioning::Versioning, /// The queue containing both the tasks and the batches. pub queue: queue::Queue, - - pub scheduler: scheduler::Scheduler, - /// In charge of creating, opening, storing and returning indexes. pub(crate) index_mapper: IndexMapper, - /// In charge of fetching and setting the status of experimental features. features: features::FeatureData, + /// Everything related to the processing of the tasks + pub scheduler: scheduler::Scheduler, + /// Whether we should automatically cleanup the task queue or not. pub(crate) cleanup_enabled: bool, @@ -179,6 +182,7 @@ impl IndexScheduler { IndexScheduler { env: self.env.clone(), processing_tasks: self.processing_tasks.clone(), + version: self.version.clone(), queue: self.queue.private_clone(), scheduler: self.scheduler.private_clone(), @@ -198,13 +202,14 @@ impl IndexScheduler { } pub(crate) const fn nb_db() -> u32 { - Queue::nb_db() + IndexMapper::nb_db() + features::FeatureData::nb_db() + Versioning::nb_db() + Queue::nb_db() + IndexMapper::nb_db() + features::FeatureData::nb_db() } /// Create an index scheduler and start its run loop. #[allow(private_interfaces)] // because test_utils is private pub fn new( options: IndexSchedulerOptions, + from_db_version: (u32, u32, u32), #[cfg(test)] test_breakpoint_sdr: crossbeam_channel::Sender<(test_utils::Breakpoint, bool)>, #[cfg(test)] planned_failures: Vec<(usize, test_utils::FailureLocation)>, ) -> Result { @@ -241,9 +246,11 @@ impl IndexScheduler { .open(&options.tasks_path) }?; - let features = features::FeatureData::new(&env, options.instance_features)?; + // We **must** starts by upgrading the version because it'll also upgrade the required database before we can open them + let version = versioning::Versioning::new(&env, from_db_version)?; let mut wtxn = env.write_txn()?; + let features = features::FeatureData::new(&env, &mut wtxn, options.instance_features)?; let queue = Queue::new(&env, &mut wtxn, &options)?; let index_mapper = IndexMapper::new(&env, &mut wtxn, &options, budget)?; wtxn.commit()?; @@ -251,6 +258,7 @@ impl IndexScheduler { // allow unreachable_code to get rids of the warning in the case of a test build. let this = Self { processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())), + version, queue, scheduler: Scheduler::new(&options), diff --git a/crates/index-scheduler/src/test_utils.rs b/crates/index-scheduler/src/test_utils.rs index 024d56622..b1e44e32c 100644 --- a/crates/index-scheduler/src/test_utils.rs +++ b/crates/index-scheduler/src/test_utils.rs @@ -9,7 +9,7 @@ use meilisearch_types::document_formats::DocumentFormatError; use meilisearch_types::milli::update::IndexDocumentsMethod::ReplaceDocuments; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::tasks::KindWithContent; -use meilisearch_types::VERSION_FILE_NAME; +use meilisearch_types::{versioning, VERSION_FILE_NAME}; use tempfile::{NamedTempFile, TempDir}; use uuid::Uuid; use Breakpoint::*; @@ -113,7 +113,13 @@ impl IndexScheduler { }; configuration(&mut options); - let index_scheduler = Self::new(options, sender, planned_failures).unwrap(); + let version = ( + versioning::VERSION_MAJOR.parse().unwrap(), + versioning::VERSION_MINOR.parse().unwrap(), + versioning::VERSION_PATCH.parse().unwrap(), + ); + + let index_scheduler = Self::new(options, version, sender, planned_failures).unwrap(); // To be 100% consistent between all test we're going to start the scheduler right now // and ensure it's in the expected starting state. @@ -406,8 +412,7 @@ impl IndexSchedulerHandle { .recv_timeout(std::time::Duration::from_secs(1)) { Ok((_, true)) => continue, Ok((b, false)) => panic!("The scheduler was supposed to be down but successfully moved to the next breakpoint: {b:?}"), - Err(RecvTimeoutError::Timeout) => panic!(), - Err(RecvTimeoutError::Disconnected) => break, + Err(RecvTimeoutError::Timeout | RecvTimeoutError::Disconnected) => break, } } } diff --git a/crates/index-scheduler/src/upgrade/mod.rs b/crates/index-scheduler/src/upgrade/mod.rs index db4a9352a..ab124013d 100644 --- a/crates/index-scheduler/src/upgrade/mod.rs +++ b/crates/index-scheduler/src/upgrade/mod.rs @@ -1,25 +1,28 @@ -use std::path::Path; - use anyhow::bail; -use meilisearch_types::heed; +use meilisearch_types::heed::{Env, RwTxn}; use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use time::OffsetDateTime; use tracing::info; use crate::queue::TaskQueue; -use crate::IndexSchedulerOptions; -pub fn upgrade_task_queue( - opt: &IndexSchedulerOptions, +trait UpgradeIndexScheduler { + fn upgrade(&self, env: &Env, wtxn: &mut RwTxn, original: (u32, u32, u32)) + -> anyhow::Result<()>; + fn target_version(&self) -> (u32, u32, u32); +} + +pub fn upgrade_index_scheduler( + env: &Env, from: (u32, u32, u32), + to: (u32, u32, u32), ) -> anyhow::Result<()> { - let current_major: u32 = VERSION_MAJOR.parse().unwrap(); - let current_minor: u32 = VERSION_MINOR.parse().unwrap(); - let current_patch: u32 = VERSION_PATCH.parse().unwrap(); + let current_major = to.0; + let current_minor = to.1; + let current_patch = to.2; - let upgrade_functions = - [(v1_12_to_current as fn(&Path) -> anyhow::Result<()>, "Upgrading from v1.12 to v1.13")]; + let upgrade_functions: &[&dyn UpgradeIndexScheduler] = &[&V1_12_ToCurrent {}]; let start = match from { (1, 12, _) => 0, @@ -41,20 +44,23 @@ pub fn upgrade_task_queue( } }; + let mut current_version = from; + info!("Upgrading the task queue"); - for (upgrade, upgrade_name) in upgrade_functions[start..].iter() { - info!("{upgrade_name}"); - (upgrade)(&opt.tasks_path)?; + for upgrade in upgrade_functions[start..].iter() { + let target = upgrade.target_version(); + info!( + "Upgrading from v{}.{}.{} to v{}.{}.{}", + from.0, from.1, from.2, current_version.0, current_version.1, current_version.2 + ); + let mut wtxn = env.write_txn()?; + upgrade.upgrade(env, &mut wtxn, from)?; + wtxn.commit()?; + current_version = target; } - let env = unsafe { - heed::EnvOpenOptions::new() - .max_dbs(TaskQueue::nb_db()) - .map_size(opt.task_db_size) - .open(&opt.tasks_path) - }?; let mut wtxn = env.write_txn()?; - let queue = TaskQueue::new(&env, &mut wtxn)?; + let queue = TaskQueue::new(env, &mut wtxn)?; let uid = queue.next_task_id(&wtxn)?; queue.register( &mut wtxn, @@ -72,12 +78,28 @@ pub fn upgrade_task_queue( }, )?; wtxn.commit()?; - // Should be pretty much instantaneous since we're the only one reading this env - env.prepare_for_closing().wait(); + Ok(()) } -/// The task queue is 100% compatible with the previous versions -fn v1_12_to_current(_path: &Path) -> anyhow::Result<()> { - Ok(()) +#[allow(non_camel_case_types)] +struct V1_12_ToCurrent {} + +impl UpgradeIndexScheduler for V1_12_ToCurrent { + fn upgrade( + &self, + _env: &Env, + _wtxn: &mut RwTxn, + _original: (u32, u32, u32), + ) -> anyhow::Result<()> { + Ok(()) + } + + fn target_version(&self) -> (u32, u32, u32) { + ( + VERSION_MAJOR.parse().unwrap(), + VERSION_MINOR.parse().unwrap(), + VERSION_PATCH.parse().unwrap(), + ) + } } diff --git a/crates/index-scheduler/src/versioning.rs b/crates/index-scheduler/src/versioning.rs new file mode 100644 index 000000000..63cb57002 --- /dev/null +++ b/crates/index-scheduler/src/versioning.rs @@ -0,0 +1,61 @@ +use crate::{upgrade::upgrade_index_scheduler, Result}; +use meilisearch_types::{ + heed::{types::Str, Database, Env, RoTxn, RwTxn}, + milli::heed_codec::version::VersionCodec, + versioning, +}; + +/// The number of database used by queue itself +const NUMBER_OF_DATABASES: u32 = 1; +/// Database const names for the `IndexScheduler`. +mod db_name { + pub const VERSION: &str = "version"; +} +mod entry_name { + pub const MAIN: &str = "main"; +} + +#[derive(Clone)] +pub struct Versioning { + pub version: Database, +} + +impl Versioning { + pub(crate) const fn nb_db() -> u32 { + NUMBER_OF_DATABASES + } + + pub fn get_version(&self, rtxn: &RoTxn) -> Result> { + Ok(self.version.get(rtxn, entry_name::MAIN)?) + } + + pub fn set_version(&self, wtxn: &mut RwTxn, version: (u32, u32, u32)) -> Result<()> { + Ok(self.version.put(wtxn, entry_name::MAIN, &version)?) + } + + pub fn set_current_version(&self, wtxn: &mut RwTxn) -> Result<()> { + let major = versioning::VERSION_MAJOR.parse().unwrap(); + let minor = versioning::VERSION_MINOR.parse().unwrap(); + let patch = versioning::VERSION_PATCH.parse().unwrap(); + self.set_version(wtxn, (major, minor, patch)) + } + + /// Create an index scheduler and start its run loop. + pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result { + let mut wtxn = env.write_txn()?; + let version = env.create_database(&mut wtxn, Some(db_name::VERSION))?; + let this = Self { version }; + let from = this.get_version(&wtxn)?.unwrap_or(db_version); + wtxn.commit()?; + + let bin_major: u32 = versioning::VERSION_MAJOR.parse().unwrap(); + let bin_minor: u32 = versioning::VERSION_MINOR.parse().unwrap(); + let bin_patch: u32 = versioning::VERSION_PATCH.parse().unwrap(); + let to = (bin_major, bin_minor, bin_patch); + + if from != to { + upgrade_index_scheduler(env, from, to)?; + } + Ok(this) + } +} diff --git a/crates/meilisearch-types/src/versioning.rs b/crates/meilisearch-types/src/versioning.rs index 054d2e312..f009002d1 100644 --- a/crates/meilisearch-types/src/versioning.rs +++ b/crates/meilisearch-types/src/versioning.rs @@ -9,6 +9,36 @@ pub static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR"); pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR"); pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH"); +/// Persists the version of the current Meilisearch binary to a VERSION file +pub fn update_version_file_for_dumpless_upgrade( + db_path: &Path, + from: (u32, u32, u32), + to: (u32, u32, u32), +) -> Result<(), VersionFileError> { + let (from_major, from_minor, from_patch) = from; + let (to_major, to_minor, to_patch) = to; + + if from_major > to_major + || (from_major == to_major && from_minor > to_minor) + || (from_major == to_major && from_minor == to_minor && from_patch > to_patch) + { + Err(VersionFileError::DowngradeNotSupported { + major: from_major, + minor: from_minor, + patch: from_patch, + }) + } else if from_major < 1 || (from_major == to_major && from_minor < 12) { + Err(VersionFileError::TooOldForAutomaticUpgrade { + major: from_major, + minor: from_minor, + patch: from_patch, + }) + } else { + create_current_version_file(db_path)?; + Ok(()) + } +} + /// Persists the version of the current Meilisearch binary to a VERSION file pub fn create_current_version_file(db_path: &Path) -> io::Result<()> { create_version_file(db_path, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) @@ -78,6 +108,10 @@ pub enum VersionFileError { env!("CARGO_PKG_VERSION").to_string() )] VersionMismatch { major: u32, minor: u32, patch: u32 }, + #[error("Database version {major}.{minor}.{patch} is higher than the Meilisearch version {VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_PATCH}. Downgrade is not supported")] + DowngradeNotSupported { major: u32, minor: u32, patch: u32 }, + #[error("Database version {major}.{minor}.{patch} is too old for the experimental dumpless upgrade feature. Please generate a dump using the v{major}.{minor}.{patch} and import it in the v{VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_PATCH}")] + TooOldForAutomaticUpgrade { major: u32, minor: u32, patch: u32 }, #[error(transparent)] IoError(#[from] std::io::Error), diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index ef270670b..4d41c63ea 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -32,7 +32,6 @@ use analytics::Analytics; use anyhow::bail; use error::PayloadError; use extractors::payload::PayloadConfig; -use index_scheduler::upgrade::upgrade_task_queue; use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use meilisearch_auth::AuthController; use meilisearch_types::milli::constants::VERSION_MAJOR; @@ -41,7 +40,8 @@ use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMetho use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::KindWithContent; use meilisearch_types::versioning::{ - create_current_version_file, get_version, VersionFileError, VERSION_MINOR, VERSION_PATCH, + create_current_version_file, get_version, update_version_file_for_dumpless_upgrade, + VersionFileError, VERSION_MINOR, VERSION_PATCH, }; use meilisearch_types::{compression, milli, VERSION_FILE_NAME}; pub use option::Opt; @@ -234,6 +234,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< instance_features: opt.to_instance_features(), auto_upgrade: opt.experimental_dumpless_upgrade, }; + let bin_major: u32 = VERSION_MAJOR.parse().unwrap(); + let bin_minor: u32 = VERSION_MINOR.parse().unwrap(); + let bin_patch: u32 = VERSION_PATCH.parse().unwrap(); + let binary_version = (bin_major, bin_minor, bin_patch); let empty_db = is_empty_db(&opt.db_path); let (index_scheduler, auth_controller) = if let Some(ref snapshot_path) = opt.import_snapshot { @@ -245,6 +249,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< opt, index_scheduler_opt, OnFailure::RemoveDb, + binary_version, // the db is empty )?, Err(e) => { std::fs::remove_dir_all(&opt.db_path)?; @@ -262,14 +267,18 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< bail!("snapshot doesn't exist at {}", snapshot_path.display()) // the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag } else { - open_or_create_database(opt, index_scheduler_opt, empty_db)? + open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)? } } else if let Some(ref path) = opt.import_dump { let src_path_exists = path.exists(); // the db is empty and the dump exists, import it if empty_db && src_path_exists { - let (mut index_scheduler, mut auth_controller) = - open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::RemoveDb)?; + let (mut index_scheduler, mut auth_controller) = open_or_create_database_unchecked( + opt, + index_scheduler_opt, + OnFailure::RemoveDb, + binary_version, // the db is empty + )?; match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) { Ok(()) => (index_scheduler, auth_controller), Err(e) => { @@ -289,10 +298,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< // the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag // or, the dump is missing but we can ignore that because of the ignore_missing_dump flag } else { - open_or_create_database(opt, index_scheduler_opt, empty_db)? + open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)? } } else { - open_or_create_database(opt, index_scheduler_opt, empty_db)? + open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)? }; // We create a loop in a thread that registers snapshotCreation tasks @@ -322,12 +331,13 @@ fn open_or_create_database_unchecked( opt: &Opt, index_scheduler_opt: IndexSchedulerOptions, on_failure: OnFailure, + version: (u32, u32, u32), ) -> anyhow::Result<(IndexScheduler, AuthController)> { // we don't want to create anything in the data.ms yet, thus we // wrap our two builders in a closure that'll be executed later. let auth_controller = AuthController::new(&opt.db_path, &opt.master_key); let index_scheduler_builder = - || -> anyhow::Result<_> { Ok(IndexScheduler::new(index_scheduler_opt)?) }; + || -> anyhow::Result<_> { Ok(IndexScheduler::new(index_scheduler_opt, version)?) }; match ( index_scheduler_builder(), @@ -345,25 +355,29 @@ fn open_or_create_database_unchecked( } /// Ensures Meilisearch version is compatible with the database, returns an error in case of version mismatch. -fn check_version_and_update_task_queue( - opt: &Opt, - index_scheduler_opt: &IndexSchedulerOptions, -) -> anyhow::Result<()> { - let (major, minor, patch) = get_version(&opt.db_path)?; +/// Returns the version that was contained in the version file +fn check_version(opt: &Opt, binary_version: (u32, u32, u32)) -> anyhow::Result<(u32, u32, u32)> { + let (bin_major, bin_minor, bin_patch) = binary_version; + let (db_major, db_minor, db_patch) = get_version(&opt.db_path)?; - let version_major: u32 = VERSION_MAJOR.parse().unwrap(); - let version_minor: u32 = VERSION_MINOR.parse().unwrap(); - let version_patch: u32 = VERSION_PATCH.parse().unwrap(); - - if major != version_major || minor != version_minor || patch > version_patch { + if db_major != bin_major || db_minor != bin_minor || db_patch > bin_patch { if opt.experimental_dumpless_upgrade { - return upgrade_task_queue(index_scheduler_opt, (major, minor, patch)); + update_version_file_for_dumpless_upgrade( + &opt.db_path, + (db_major, db_minor, db_patch), + (bin_major, bin_minor, bin_patch), + )?; } else { - return Err(VersionFileError::VersionMismatch { major, minor, patch }.into()); + return Err(VersionFileError::VersionMismatch { + major: db_major, + minor: db_minor, + patch: db_patch, + } + .into()); } } - Ok(()) + Ok((db_major, db_minor, db_patch)) } /// Ensure you're in a valid state and open the IndexScheduler + AuthController for you. @@ -371,12 +385,11 @@ fn open_or_create_database( opt: &Opt, index_scheduler_opt: IndexSchedulerOptions, empty_db: bool, + binary_version: (u32, u32, u32), ) -> anyhow::Result<(IndexScheduler, AuthController)> { - if !empty_db { - check_version_and_update_task_queue(opt, &index_scheduler_opt)?; - } + let version = if !empty_db { check_version(opt, binary_version)? } else { binary_version }; - open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb) + open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version) } fn import_dump(