diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index f822421cf..0fb5570b0 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -141,6 +141,9 @@ pub enum KindDump { instance_uid: Option, }, SnapshotCreation, + UpgradeDatabase { + from: (u32, u32, u32), + }, } impl From for TaskDump { @@ -210,6 +213,9 @@ impl From for KindDump { KindDump::DumpCreation { keys, instance_uid } } KindWithContent::SnapshotCreation => KindDump::SnapshotCreation, + KindWithContent::UpgradeDatabase { from: version } => { + KindDump::UpgradeDatabase { from: version } + } } } } diff --git a/crates/index-scheduler/src/dump.rs b/crates/index-scheduler/src/dump.rs index 643255ac2..e8f55d5c7 100644 --- a/crates/index-scheduler/src/dump.rs +++ b/crates/index-scheduler/src/dump.rs @@ -132,6 +132,7 @@ impl<'a> Dump<'a> { KindWithContent::DumpCreation { keys, instance_uid } } KindDump::SnapshotCreation => KindWithContent::SnapshotCreation, + KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from }, }, }; diff --git a/crates/index-scheduler/src/error.rs b/crates/index-scheduler/src/error.rs index f6ee1f685..e9fa9bb59 100644 --- a/crates/index-scheduler/src/error.rs +++ b/crates/index-scheduler/src/error.rs @@ -3,7 +3,7 @@ use std::fmt::Display; use meilisearch_types::batches::BatchId; use meilisearch_types::error::{Code, ErrorCode}; use meilisearch_types::tasks::{Kind, Status}; -use meilisearch_types::{heed, milli}; +use meilisearch_types::{heed, milli, versioning}; use thiserror::Error; use crate::TaskId; diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index de79cd7c0..3a9009504 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -279,6 +279,9 @@ fn snapshot_details(d: &Details) -> String { Details::IndexSwap { swaps } => { format!("{{ swaps: {swaps:?} }}") } + Details::UpgradeDatabase { from } => { + format!("{{ from: v{}.{}.{} }}", from.0, from.1, from.2) + } } } diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 3c50283d9..cc9436a54 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -30,6 +30,7 @@ mod queue; mod scheduler; #[cfg(test)] mod test_utils; +pub mod upgrade; mod utils; pub mod uuid_codec; @@ -120,6 +121,8 @@ pub struct IndexSchedulerOptions { pub batched_tasks_size_limit: u64, /// The experimental features enabled for this instance. pub instance_features: InstanceTogglableFeatures, + /// The experimental features enabled for this instance. + pub auto_upgrade: bool, } /// Structure which holds meilisearch's indexes and schedules the tasks diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index d0382a81b..929749411 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -129,6 +129,12 @@ make_enum_progress! { } } +make_enum_progress! { + pub enum UpgradeDatabaseProgress { + EnsuringCorrectnessOfTheSwap, + } +} + make_enum_progress! { pub enum InnerSwappingTwoIndexes { RetrieveTheTasks, @@ -173,32 +179,6 @@ make_atomic_progress!(Document alias AtomicDocumentStep => "document" ); make_atomic_progress!(Batch alias AtomicBatchStep => "batch" ); make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" ); -pub struct VariableNameStep { - name: String, - current: u32, - total: u32, -} - -impl VariableNameStep { - pub fn new(name: impl Into, current: u32, total: u32) -> Self { - Self { name: name.into(), current, total } - } -} - -impl Step for VariableNameStep { - fn name(&self) -> Cow<'static, str> { - self.name.clone().into() - } - - fn current(&self) -> u32 { - self.current - } - - fn total(&self) -> u32 { - self.total - } -} - #[cfg(test)] mod test { use std::sync::atomic::Ordering; diff --git a/crates/index-scheduler/src/queue/mod.rs b/crates/index-scheduler/src/queue/mod.rs index 4921d05e6..f97218a21 100644 --- a/crates/index-scheduler/src/queue/mod.rs +++ b/crates/index-scheduler/src/queue/mod.rs @@ -20,8 +20,8 @@ use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; use uuid::Uuid; -use self::batches::BatchQueue; -use self::tasks::TaskQueue; +pub(crate) use self::batches::BatchQueue; +pub(crate) use self::tasks::TaskQueue; use crate::processing::ProcessingTasks; use crate::utils::{ check_index_swap_validity, filter_out_references_to_newer_tasks, ProcessingBatch, diff --git a/crates/index-scheduler/src/queue/tasks.rs b/crates/index-scheduler/src/queue/tasks.rs index c88192e17..9bd63c595 100644 --- a/crates/index-scheduler/src/queue/tasks.rs +++ b/crates/index-scheduler/src/queue/tasks.rs @@ -59,7 +59,7 @@ impl TaskQueue { } } - pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result { + pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result { Ok(Self { all_tasks: env.create_database(wtxn, Some(db_name::ALL_TASKS))?, status: env.create_database(wtxn, Some(db_name::STATUS))?, diff --git a/crates/index-scheduler/src/scheduler/autobatcher.rs b/crates/index-scheduler/src/scheduler/autobatcher.rs index 3363b2c8f..7f55a9254 100644 --- a/crates/index-scheduler/src/scheduler/autobatcher.rs +++ b/crates/index-scheduler/src/scheduler/autobatcher.rs @@ -85,6 +85,7 @@ impl From for AutobatchKind { KindWithContent::TaskCancelation { .. } | KindWithContent::TaskDeletion { .. } | KindWithContent::DumpCreation { .. } + | KindWithContent::UpgradeDatabase { .. } | KindWithContent::SnapshotCreation => { panic!("The autobatcher should never be called with tasks that don't apply to an index.") } diff --git a/crates/index-scheduler/src/scheduler/create_batch.rs b/crates/index-scheduler/src/scheduler/create_batch.rs index b224ee6eb..58bc5c9fc 100644 --- a/crates/index-scheduler/src/scheduler/create_batch.rs +++ b/crates/index-scheduler/src/scheduler/create_batch.rs @@ -47,6 +47,9 @@ pub(crate) enum Batch { IndexSwap { task: Task, }, + UpgradeDatabase { + tasks: Vec, + }, } #[derive(Debug)] @@ -105,6 +108,7 @@ impl Batch { } Batch::SnapshotCreation(tasks) | Batch::TaskDeletions(tasks) + | Batch::UpgradeDatabase { tasks } | Batch::IndexDeletion { tasks, .. } => { RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid)) } @@ -138,6 +142,7 @@ impl Batch { | TaskDeletions(_) | SnapshotCreation(_) | Dump(_) + | UpgradeDatabase { .. } | IndexSwap { .. } => None, IndexOperation { op, .. } => Some(op.index_uid()), IndexCreation { index_uid, .. } @@ -162,6 +167,7 @@ impl fmt::Display for Batch { Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?, Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?, Batch::IndexSwap { .. } => f.write_str("IndexSwap")?, + Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?, }; match index_uid { Some(name) => f.write_fmt(format_args!(" on {name:?} from tasks: {tasks:?}")), @@ -427,9 +433,18 @@ impl IndexScheduler { let mut current_batch = ProcessingBatch::new(batch_id); let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?; - let to_cancel = self.queue.tasks.get_kind(rtxn, Kind::TaskCancelation)? & enqueued; + + // 0. The priority over everything is to upgrade the instance + let upgrade = self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)? & enqueued; + // There shouldn't be multiple upgrade tasks but just in case we're going to batch all of them at the same time + if !upgrade.is_empty() { + let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, upgrade)?; + current_batch.processing(&mut tasks); + return Ok(Some((Batch::UpgradeDatabase { tasks }, current_batch))); + } // 1. we get the last task to cancel. + let to_cancel = self.queue.tasks.get_kind(rtxn, Kind::TaskCancelation)? & enqueued; if let Some(task_id) = to_cancel.max() { let mut task = self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index 2d20c4d55..6478cf07a 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -6,6 +6,7 @@ mod process_batch; mod process_dump_creation; mod process_index_operation; mod process_snapshot_creation; +mod process_upgrade; #[cfg(test)] mod test; #[cfg(test)] diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 9a86939a4..f6699ae87 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -3,7 +3,7 @@ use std::sync::atomic::Ordering; use meilisearch_types::batches::BatchId; use meilisearch_types::heed::{RoTxn, RwTxn}; -use meilisearch_types::milli::progress::Progress; +use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::{self}; use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task}; use milli::update::Settings as MilliSettings; @@ -13,7 +13,7 @@ use super::create_batch::Batch; use crate::processing::{ AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, - UpdateIndexProgress, VariableNameStep, + UpdateIndexProgress, }; use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; use crate::{Error, IndexScheduler, Result, TaskId}; @@ -297,7 +297,7 @@ impl IndexScheduler { } progress.update_progress(SwappingTheIndexes::SwappingTheIndexes); for (step, swap) in swaps.iter().enumerate() { - progress.update_progress(VariableNameStep::new( + progress.update_progress(VariableNameStep::::new( format!("swapping index {} and {}", swap.indexes.0, swap.indexes.1), step as u32, swaps.len() as u32, @@ -314,6 +314,7 @@ impl IndexScheduler { task.status = Status::Succeeded; Ok(vec![task]) } + Batch::UpgradeDatabase { tasks } => self.process_upgrade(progress, tasks), } } diff --git a/crates/index-scheduler/src/scheduler/process_dump_creation.rs b/crates/index-scheduler/src/scheduler/process_dump_creation.rs index 3fd5c795b..5c10e7f40 100644 --- a/crates/index-scheduler/src/scheduler/process_dump_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_dump_creation.rs @@ -5,16 +5,14 @@ use std::sync::atomic::Ordering; use dump::IndexMetadata; use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; -use meilisearch_types::milli::progress::Progress; +use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors}; use meilisearch_types::milli::{self}; use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; use time::macros::format_description; use time::OffsetDateTime; -use crate::processing::{ - AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress, VariableNameStep, -}; +use crate::processing::{AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress}; use crate::{Error, IndexScheduler, Result}; impl IndexScheduler { @@ -106,8 +104,12 @@ impl IndexScheduler { progress.update_progress(DumpCreationProgress::DumpTheIndexes); let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32; let mut count = 0; - let () = self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> { - progress.update_progress(VariableNameStep::new(uid.to_string(), count, nb_indexes)); + self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> { + progress.update_progress(VariableNameStep::::new( + uid.to_string(), + count, + nb_indexes, + )); count += 1; let rtxn = index.read_txn()?; diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index c6d6e2dc8..3e1a63ce3 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -3,12 +3,12 @@ use std::fs; use std::sync::atomic::Ordering; use meilisearch_types::heed::CompactionOption; -use meilisearch_types::milli::progress::Progress; +use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::{self}; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::{compression, VERSION_FILE_NAME}; -use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress, VariableNameStep}; +use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress}; use crate::{Error, IndexScheduler, Result}; impl IndexScheduler { @@ -74,7 +74,9 @@ impl IndexScheduler { for (i, result) in index_mapping.iter(&rtxn)?.enumerate() { let (name, uuid) = result?; - progress.update_progress(VariableNameStep::new(name, i as u32, nb_indexes)); + progress.update_progress(VariableNameStep::::new( + name, i as u32, nb_indexes, + )); let index = self.index_mapper.index(&rtxn, name)?; let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string()); fs::create_dir_all(&dst)?; diff --git a/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs b/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs new file mode 100644 index 000000000..e01958902 --- /dev/null +++ b/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs @@ -0,0 +1,42 @@ +use meilisearch_types::{ + milli, + milli::progress::{Progress, VariableNameStep}, + tasks::{KindWithContent, Status, Task}, + versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}, +}; + +use crate::{processing::UpgradeDatabaseProgress, Error, IndexScheduler, Result}; + +impl IndexScheduler { + pub(super) fn process_upgrade( + &self, + progress: Progress, + mut tasks: Vec, + ) -> Result> { + progress.update_progress(UpgradeDatabaseProgress::EnsuringCorrectnessOfTheSwap); + + // Since we should not have multiple upgrade tasks, we're only going to process the latest one: + let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else { + unreachable!() + }; + + enum UpgradeIndex {} + let indexes = self.index_names()?; + + for (i, uid) in indexes.iter().enumerate() { + progress.update_progress(VariableNameStep::::new( + format!("Upgrading index `{uid}`"), + i as u32, + indexes.len() as u32, + )); + let index = self.index(uid)?; + milli::update::upgrade::upgrade(&index, from, progress.clone()); + } + + for task in tasks.iter_mut() { + task.status = Status::Succeeded; + } + + Ok(tasks) + } +} diff --git a/crates/index-scheduler/src/scheduler/test.rs b/crates/index-scheduler/src/scheduler/test.rs index de12cb25d..e2b3666b2 100644 --- a/crates/index-scheduler/src/scheduler/test.rs +++ b/crates/index-scheduler/src/scheduler/test.rs @@ -713,68 +713,70 @@ fn basic_get_stats() { let kind = index_creation_task("whalo", "fish"); let _task = index_scheduler.register(kind, None, false).unwrap(); - snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###" - { - "indexes": { - "catto": 1, - "doggo": 1, - "whalo": 1 - }, - "statuses": { - "canceled": 0, - "enqueued": 3, - "failed": 0, - "processing": 0, - "succeeded": 0 - }, - "types": { - "documentAdditionOrUpdate": 0, - "documentDeletion": 0, - "documentEdition": 0, - "dumpCreation": 0, - "indexCreation": 3, - "indexDeletion": 0, - "indexSwap": 0, - "indexUpdate": 0, - "settingsUpdate": 0, - "snapshotCreation": 0, - "taskCancelation": 0, - "taskDeletion": 0 - } - } - "###); + snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#" + { + "indexes": { + "catto": 1, + "doggo": 1, + "whalo": 1 + }, + "statuses": { + "canceled": 0, + "enqueued": 3, + "failed": 0, + "processing": 0, + "succeeded": 0 + }, + "types": { + "documentAdditionOrUpdate": 0, + "documentDeletion": 0, + "documentEdition": 0, + "dumpCreation": 0, + "indexCreation": 3, + "indexDeletion": 0, + "indexSwap": 0, + "indexUpdate": 0, + "settingsUpdate": 0, + "snapshotCreation": 0, + "taskCancelation": 0, + "taskDeletion": 0, + "upgradeDatabase": 0 + } + } + "#); handle.advance_till([Start, BatchCreated]); - snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###" - { - "indexes": { - "catto": 1, - "doggo": 1, - "whalo": 1 - }, - "statuses": { - "canceled": 0, - "enqueued": 2, - "failed": 0, - "processing": 1, - "succeeded": 0 - }, - "types": { - "documentAdditionOrUpdate": 0, - "documentDeletion": 0, - "documentEdition": 0, - "dumpCreation": 0, - "indexCreation": 3, - "indexDeletion": 0, - "indexSwap": 0, - "indexUpdate": 0, - "settingsUpdate": 0, - "snapshotCreation": 0, - "taskCancelation": 0, - "taskDeletion": 0 - } - } - "###); + snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#" + { + "indexes": { + "catto": 1, + "doggo": 1, + "whalo": 1 + }, + "statuses": { + "canceled": 0, + "enqueued": 2, + "failed": 0, + "processing": 1, + "succeeded": 0 + }, + "types": { + "documentAdditionOrUpdate": 0, + "documentDeletion": 0, + "documentEdition": 0, + "dumpCreation": 0, + "indexCreation": 3, + "indexDeletion": 0, + "indexSwap": 0, + "indexUpdate": 0, + "settingsUpdate": 0, + "snapshotCreation": 0, + "taskCancelation": 0, + "taskDeletion": 0, + "upgradeDatabase": 0 + } + } + "#); handle.advance_till([ InsideProcessBatch, @@ -784,36 +786,37 @@ fn basic_get_stats() { Start, BatchCreated, ]); - snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###" - { - "indexes": { - "catto": 1, - "doggo": 1, - "whalo": 1 - }, - "statuses": { - "canceled": 0, - "enqueued": 1, - "failed": 0, - "processing": 1, - "succeeded": 1 - }, - "types": { - "documentAdditionOrUpdate": 0, - "documentDeletion": 0, - "documentEdition": 0, - "dumpCreation": 0, - "indexCreation": 3, - "indexDeletion": 0, - "indexSwap": 0, - "indexUpdate": 0, - "settingsUpdate": 0, - "snapshotCreation": 0, - "taskCancelation": 0, - "taskDeletion": 0 - } - } - "###); + snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#" + { + "indexes": { + "catto": 1, + "doggo": 1, + "whalo": 1 + }, + "statuses": { + "canceled": 0, + "enqueued": 1, + "failed": 0, + "processing": 1, + "succeeded": 1 + }, + "types": { + "documentAdditionOrUpdate": 0, + "documentDeletion": 0, + "documentEdition": 0, + "dumpCreation": 0, + "indexCreation": 3, + "indexDeletion": 0, + "indexSwap": 0, + "indexUpdate": 0, + "settingsUpdate": 0, + "snapshotCreation": 0, + "taskCancelation": 0, + "taskDeletion": 0, + "upgradeDatabase": 0 + } + } + "#); // now we make one more batch, the started_at field of the new tasks will be past `second_start_time` handle.advance_till([ @@ -824,36 +827,37 @@ fn basic_get_stats() { Start, BatchCreated, ]); - snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###" - { - "indexes": { - "catto": 1, - "doggo": 1, - "whalo": 1 - }, - "statuses": { - "canceled": 0, - "enqueued": 0, - "failed": 0, - "processing": 1, - "succeeded": 2 - }, - "types": { - "documentAdditionOrUpdate": 0, - "documentDeletion": 0, - "documentEdition": 0, - "dumpCreation": 0, - "indexCreation": 3, - "indexDeletion": 0, - "indexSwap": 0, - "indexUpdate": 0, - "settingsUpdate": 0, - "snapshotCreation": 0, - "taskCancelation": 0, - "taskDeletion": 0 - } - } - "###); + snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#" + { + "indexes": { + "catto": 1, + "doggo": 1, + "whalo": 1 + }, + "statuses": { + "canceled": 0, + "enqueued": 0, + "failed": 0, + "processing": 1, + "succeeded": 2 + }, + "types": { + "documentAdditionOrUpdate": 0, + "documentDeletion": 0, + "documentEdition": 0, + "dumpCreation": 0, + "indexCreation": 3, + "indexDeletion": 0, + "indexSwap": 0, + "indexUpdate": 0, + "settingsUpdate": 0, + "snapshotCreation": 0, + "taskCancelation": 0, + "taskDeletion": 0, + "upgradeDatabase": 0 + } + } + "#); } #[test] diff --git a/crates/index-scheduler/src/test_utils.rs b/crates/index-scheduler/src/test_utils.rs index 4be944037..c83a8ab0b 100644 --- a/crates/index-scheduler/src/test_utils.rs +++ b/crates/index-scheduler/src/test_utils.rs @@ -109,6 +109,7 @@ impl IndexScheduler { max_number_of_batched_tasks: usize::MAX, batched_tasks_size_limit: u64::MAX, instance_features: Default::default(), + auto_upgrade: true, // Don't cost much and will ensure the happy path works }; configuration(&mut options); diff --git a/crates/index-scheduler/src/upgrade/mod.rs b/crates/index-scheduler/src/upgrade/mod.rs new file mode 100644 index 000000000..cd4adef52 --- /dev/null +++ b/crates/index-scheduler/src/upgrade/mod.rs @@ -0,0 +1,43 @@ +use std::path::Path; + +use meilisearch_types::{ + heed, + tasks::{KindWithContent, Status, Task}, +}; +use time::OffsetDateTime; +use tracing::info; + +use crate::queue::TaskQueue; + +pub fn upgrade_task_queue(tasks_path: &Path, version: (u32, u32, u32)) -> anyhow::Result<()> { + info!("Upgrading the task queue"); + let env = unsafe { + heed::EnvOpenOptions::new() + .max_dbs(19) + // Since that's the only database memory-mapped currently we don't need to check the budget yet + .map_size(100 * 1024 * 1024) + .open(tasks_path) + }?; + let mut wtxn = env.write_txn()?; + let queue = TaskQueue::new(&env, &mut wtxn)?; + let uid = queue.next_task_id(&wtxn)?; + queue.register( + &mut wtxn, + &Task { + uid, + batch_uid: None, + enqueued_at: OffsetDateTime::now_utc(), + started_at: None, + finished_at: None, + error: None, + canceled_by: None, + details: None, + status: Status::Enqueued, + kind: KindWithContent::UpgradeDatabase { from: version }, + }, + )?; + wtxn.commit()?; + // Should be pretty much instantaneous since we're the only one reading this env + env.prepare_for_closing().wait(); + Ok(()) +} diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 1f861776f..9b77c478e 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -234,6 +234,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) { K::TaskCancelation { .. } | K::TaskDeletion { .. } | K::DumpCreation { .. } + | K::UpgradeDatabase { .. } | K::SnapshotCreation => (), }; if let Some(Details::IndexSwap { swaps }) = &mut task.details { @@ -547,6 +548,9 @@ impl crate::IndexScheduler { Details::Dump { dump_uid: _ } => { assert_eq!(kind.as_kind(), Kind::DumpCreation); } + Details::UpgradeDatabase { from: _ } => { + assert_eq!(kind.as_kind(), Kind::UpgradeDatabase); + } } } diff --git a/crates/meilisearch-types/src/error.rs b/crates/meilisearch-types/src/error.rs index 8caeb70c2..54b9c8474 100644 --- a/crates/meilisearch-types/src/error.rs +++ b/crates/meilisearch-types/src/error.rs @@ -371,7 +371,8 @@ VectorEmbeddingError , InvalidRequest , BAD_REQUEST ; NotFoundSimilarId , InvalidRequest , BAD_REQUEST ; InvalidDocumentEditionContext , InvalidRequest , BAD_REQUEST ; InvalidDocumentEditionFunctionFilter , InvalidRequest , BAD_REQUEST ; -EditDocumentsByFunctionError , InvalidRequest , BAD_REQUEST +EditDocumentsByFunctionError , InvalidRequest , BAD_REQUEST ; +CouldNotUpgrade , InvalidRequest , BAD_REQUEST } impl ErrorCode for JoinError { @@ -455,6 +456,9 @@ impl ErrorCode for milli::Error { | UserError::DocumentEditionCompilationError(_) => { Code::EditDocumentsByFunctionError } + UserError::TooOldForUpgrade(_, _, _) + | UserError::CannotDowngrade(_, _, _) + | UserError::CannotUpgradeToUnknownVersion(_, _, _) => Code::CouldNotUpgrade, } } } diff --git a/crates/meilisearch-types/src/task_view.rs b/crates/meilisearch-types/src/task_view.rs index 6032843aa..6224b326c 100644 --- a/crates/meilisearch-types/src/task_view.rs +++ b/crates/meilisearch-types/src/task_view.rs @@ -114,6 +114,8 @@ pub struct DetailsView { pub settings: Option>>, #[serde(skip_serializing_if = "Option::is_none")] pub swaps: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub upgrade_from: Option, } impl DetailsView { @@ -234,6 +236,11 @@ impl DetailsView { Some(left) } }, + upgrade_from: match (self.upgrade_from.clone(), other.upgrade_from.clone()) { + (None, None) => None, + (None, Some(from)) | (Some(from), None) => Some(from), + (Some(_), Some(from)) => Some(from), + }, } } } @@ -311,6 +318,10 @@ impl From
for DetailsView { Details::IndexSwap { swaps } => { DetailsView { swaps: Some(swaps), ..Default::default() } } + Details::UpgradeDatabase { from } => DetailsView { + upgrade_from: Some(format!("v{}.{}.{}", from.0, from.1, from.2)), + ..Default::default() + }, } } } diff --git a/crates/meilisearch-types/src/tasks.rs b/crates/meilisearch-types/src/tasks.rs index 167cfcd80..0caad08fb 100644 --- a/crates/meilisearch-types/src/tasks.rs +++ b/crates/meilisearch-types/src/tasks.rs @@ -50,6 +50,7 @@ impl Task { | SnapshotCreation | TaskCancelation { .. } | TaskDeletion { .. } + | UpgradeDatabase { .. } | IndexSwap { .. } => None, DocumentAdditionOrUpdate { index_uid, .. } | DocumentEdition { index_uid, .. } @@ -84,7 +85,8 @@ impl Task { | KindWithContent::TaskCancelation { .. } | KindWithContent::TaskDeletion { .. } | KindWithContent::DumpCreation { .. } - | KindWithContent::SnapshotCreation => None, + | KindWithContent::SnapshotCreation + | KindWithContent::UpgradeDatabase { .. } => None, } } } @@ -150,6 +152,9 @@ pub enum KindWithContent { instance_uid: Option, }, SnapshotCreation, + UpgradeDatabase { + from: (u32, u32, u32), + }, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] @@ -175,6 +180,7 @@ impl KindWithContent { KindWithContent::TaskDeletion { .. } => Kind::TaskDeletion, KindWithContent::DumpCreation { .. } => Kind::DumpCreation, KindWithContent::SnapshotCreation => Kind::SnapshotCreation, + KindWithContent::UpgradeDatabase { .. } => Kind::UpgradeDatabase, } } @@ -185,7 +191,8 @@ impl KindWithContent { DumpCreation { .. } | SnapshotCreation | TaskCancelation { .. } - | TaskDeletion { .. } => vec![], + | TaskDeletion { .. } + | UpgradeDatabase { .. } => vec![], DocumentAdditionOrUpdate { index_uid, .. } | DocumentEdition { index_uid, .. } | DocumentDeletion { index_uid, .. } @@ -262,6 +269,7 @@ impl KindWithContent { }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::SnapshotCreation => None, + KindWithContent::UpgradeDatabase { .. } => None, } } @@ -320,6 +328,7 @@ impl KindWithContent { }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::SnapshotCreation => None, + KindWithContent::UpgradeDatabase { .. } => None, } } } @@ -360,6 +369,7 @@ impl From<&KindWithContent> for Option
{ }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::SnapshotCreation => None, + KindWithContent::UpgradeDatabase { .. } => None, } } } @@ -468,6 +478,7 @@ pub enum Kind { TaskDeletion, DumpCreation, SnapshotCreation, + UpgradeDatabase, } impl Kind { @@ -484,6 +495,7 @@ impl Kind { | Kind::TaskCancelation | Kind::TaskDeletion | Kind::DumpCreation + | Kind::UpgradeDatabase | Kind::SnapshotCreation => false, } } @@ -503,6 +515,7 @@ impl Display for Kind { Kind::TaskDeletion => write!(f, "taskDeletion"), Kind::DumpCreation => write!(f, "dumpCreation"), Kind::SnapshotCreation => write!(f, "snapshotCreation"), + Kind::UpgradeDatabase => write!(f, "upgradeDatabase"), } } } @@ -607,6 +620,9 @@ pub enum Details { IndexSwap { swaps: Vec, }, + UpgradeDatabase { + from: (usize, usize, usize), + }, } impl Details { @@ -627,6 +643,7 @@ impl Details { Self::SettingsUpdate { .. } | Self::IndexInfo { .. } | Self::Dump { .. } + | Self::UpgradeDatabase { .. } | Self::IndexSwap { .. } => (), } diff --git a/crates/meilisearch-types/src/versioning.rs b/crates/meilisearch-types/src/versioning.rs index 2ec9d9b0c..081c95c6e 100644 --- a/crates/meilisearch-types/src/versioning.rs +++ b/crates/meilisearch-types/src/versioning.rs @@ -5,9 +5,9 @@ use std::path::Path; /// The name of the file that contains the version of the database. pub const VERSION_FILE_NAME: &str = "VERSION"; -static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR"); -static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR"); -static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH"); +pub static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR"); +pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR"); +pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH"); /// Persists the version of the current Meilisearch binary to a VERSION file pub fn create_current_version_file(db_path: &Path) -> io::Result<()> { @@ -24,17 +24,6 @@ pub fn create_version_file( fs::write(version_path, format!("{}.{}.{}", major, minor, patch)) } -/// Ensures Meilisearch version is compatible with the database, returns an error versions mismatch. -pub fn check_version_file(db_path: &Path) -> anyhow::Result<()> { - let (major, minor, patch) = get_version(db_path)?; - - if major != VERSION_MAJOR || minor != VERSION_MINOR { - return Err(VersionFileError::VersionMismatch { major, minor, patch }.into()); - } - - Ok(()) -} - pub fn get_version(db_path: &Path) -> Result<(String, String, String), VersionFileError> { let version_path = db_path.join(VERSION_FILE_NAME); @@ -48,7 +37,7 @@ pub fn get_version(db_path: &Path) -> Result<(String, String, String), VersionFi } pub fn parse_version(version: &str) -> Result<(String, String, String), VersionFileError> { - let version_components = version.split('.').collect::>(); + let version_components = version.trim().split('.').collect::>(); let (major, minor, patch) = match &version_components[..] { [major, minor, patch] => (major.to_string(), minor.to_string(), patch.to_string()), _ => return Err(VersionFileError::MalformedVersionFile), diff --git a/crates/meilisearch/src/analytics/segment_analytics.rs b/crates/meilisearch/src/analytics/segment_analytics.rs index a97813089..9fc212cc4 100644 --- a/crates/meilisearch/src/analytics/segment_analytics.rs +++ b/crates/meilisearch/src/analytics/segment_analytics.rs @@ -189,6 +189,7 @@ struct Infos { experimental_drop_search_after: usize, experimental_nb_searches_per_core: usize, experimental_logs_mode: LogMode, + experimental_dumpless_upgrade: bool, experimental_replication_parameters: bool, experimental_enable_logs_route: bool, experimental_reduce_indexing_memory_usage: bool, @@ -235,6 +236,7 @@ impl Infos { experimental_drop_search_after, experimental_nb_searches_per_core, experimental_logs_mode, + experimental_dumpless_upgrade, experimental_replication_parameters, experimental_enable_logs_route, experimental_reduce_indexing_memory_usage, @@ -296,6 +298,7 @@ impl Infos { experimental_drop_search_after: experimental_drop_search_after.into(), experimental_nb_searches_per_core: experimental_nb_searches_per_core.into(), experimental_logs_mode, + experimental_dumpless_upgrade, experimental_replication_parameters, experimental_enable_logs_route: experimental_enable_logs_route | logs_route, experimental_reduce_indexing_memory_usage, diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index a8b8b8eba..ebdaab7b6 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -32,13 +32,16 @@ use analytics::Analytics; use anyhow::bail; use error::PayloadError; use extractors::payload::PayloadConfig; +use index_scheduler::upgrade::upgrade_task_queue; use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use meilisearch_auth::AuthController; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::KindWithContent; -use meilisearch_types::versioning::{check_version_file, create_current_version_file}; +use meilisearch_types::versioning::{ + create_current_version_file, get_version, VersionFileError, VERSION_MAJOR, VERSION_MINOR, +}; use meilisearch_types::{compression, milli, VERSION_FILE_NAME}; pub use option::Opt; use option::ScheduleSnapshot; @@ -316,6 +319,7 @@ fn open_or_create_database_unchecked( index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize, index_count: DEFAULT_INDEX_COUNT, instance_features, + auto_upgrade: opt.experimental_dumpless_upgrade, })?) }; @@ -334,13 +338,36 @@ fn open_or_create_database_unchecked( } } +/// Ensures Meilisearch version is compatible with the database, returns an error versions mismatch. +fn check_version_and_update_task_queue( + db_path: &Path, + experimental_dumpless_upgrade: bool, +) -> anyhow::Result<()> { + let (major, minor, patch) = get_version(db_path)?; + + if major != VERSION_MAJOR || minor != VERSION_MINOR { + if experimental_dumpless_upgrade { + let version = ( + major.parse().map_err(|_| VersionFileError::MalformedVersionFile)?, + minor.parse().map_err(|_| VersionFileError::MalformedVersionFile)?, + patch.parse().map_err(|_| VersionFileError::MalformedVersionFile)?, + ); + return upgrade_task_queue(&db_path.join("tasks"), version); + } else { + return Err(VersionFileError::VersionMismatch { major, minor, patch }.into()); + } + } + + Ok(()) +} + /// Ensure you're in a valid state and open the IndexScheduler + AuthController for you. fn open_or_create_database( opt: &Opt, empty_db: bool, ) -> anyhow::Result<(IndexScheduler, AuthController)> { if !empty_db { - check_version_file(&opt.db_path)?; + check_version_and_update_task_queue(&opt.db_path, opt.experimental_dumpless_upgrade)?; } open_or_create_database_unchecked(opt, OnFailure::KeepDb) diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index b5aa6b9e7..8403017c3 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -49,6 +49,7 @@ const MEILI_IGNORE_DUMP_IF_DB_EXISTS: &str = "MEILI_IGNORE_DUMP_IF_DB_EXISTS"; const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR"; const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL"; const MEILI_EXPERIMENTAL_LOGS_MODE: &str = "MEILI_EXPERIMENTAL_LOGS_MODE"; +const MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE: &str = "MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE"; const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS"; const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE"; const MEILI_EXPERIMENTAL_CONTAINS_FILTER: &str = "MEILI_EXPERIMENTAL_CONTAINS_FILTER"; @@ -400,6 +401,13 @@ pub struct Opt { #[serde(default)] pub experimental_logs_mode: LogMode, + /// Experimental dumpless upgrade. For more information, see: + /// + /// When set, Meilisearch will auto-update its database without using a dump. + #[clap(long, env = MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE, default_value_t)] + #[serde(default)] + pub experimental_dumpless_upgrade: bool, + /// Experimental logs route feature. For more information, /// see: /// @@ -535,6 +543,7 @@ impl Opt { experimental_drop_search_after, experimental_nb_searches_per_core, experimental_logs_mode, + experimental_dumpless_upgrade, experimental_enable_logs_route, experimental_replication_parameters, experimental_reduce_indexing_memory_usage, @@ -608,6 +617,10 @@ impl Opt { MEILI_EXPERIMENTAL_LOGS_MODE, experimental_logs_mode.to_string(), ); + export_to_env_if_not_present( + MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE, + experimental_dumpless_upgrade.to_string(), + ); export_to_env_if_not_present( MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS, experimental_replication_parameters.to_string(), diff --git a/crates/meilisearch/src/routes/tasks.rs b/crates/meilisearch/src/routes/tasks.rs index fce2bc8bf..90fdc9c16 100644 --- a/crates/meilisearch/src/routes/tasks.rs +++ b/crates/meilisearch/src/routes/tasks.rs @@ -912,14 +912,14 @@ mod tests { { let params = "types=createIndex"; let err = deserr_query_params::(params).unwrap_err(); - snapshot!(meili_snap::json_string!(err), @r###" + snapshot!(meili_snap::json_string!(err), @r#" { - "message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.", + "message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", "code": "invalid_task_types", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#invalid_task_types" } - "###); + "#); } } #[test] diff --git a/crates/meilisearch/src/upgrade/mod.rs b/crates/meilisearch/src/upgrade/mod.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/crates/meilisearch/src/upgrade/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/meilisearch/tests/batches/errors.rs b/crates/meilisearch/tests/batches/errors.rs index 2c3484bc1..7f5fedb6a 100644 --- a/crates/meilisearch/tests/batches/errors.rs +++ b/crates/meilisearch/tests/batches/errors.rs @@ -42,7 +42,7 @@ async fn batch_bad_types() { snapshot!(code, @"400 Bad Request"); snapshot!(json_string!(response), @r#" { - "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.", + "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", "code": "invalid_task_types", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#invalid_task_types" diff --git a/crates/meilisearch/tests/tasks/errors.rs b/crates/meilisearch/tests/tasks/errors.rs index 932dd19d4..759531d42 100644 --- a/crates/meilisearch/tests/tasks/errors.rs +++ b/crates/meilisearch/tests/tasks/errors.rs @@ -95,36 +95,36 @@ async fn task_bad_types() { let (response, code) = server.tasks_filter("types=doggo").await; snapshot!(code, @"400 Bad Request"); - snapshot!(json_string!(response), @r###" + snapshot!(json_string!(response), @r#" { - "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.", + "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", "code": "invalid_task_types", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#invalid_task_types" } - "###); + "#); let (response, code) = server.cancel_tasks("types=doggo").await; snapshot!(code, @"400 Bad Request"); - snapshot!(json_string!(response), @r###" + snapshot!(json_string!(response), @r#" { - "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.", + "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", "code": "invalid_task_types", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#invalid_task_types" } - "###); + "#); let (response, code) = server.delete_tasks("types=doggo").await; snapshot!(code, @"400 Bad Request"); - snapshot!(json_string!(response), @r###" + snapshot!(json_string!(response), @r#" { - "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.", + "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", "code": "invalid_task_types", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#invalid_task_types" } - "###); + "#); } #[actix_rt::test] diff --git a/crates/milli/src/constants.rs b/crates/milli/src/constants.rs index 3dd787f1c..39b449661 100644 --- a/crates/milli/src/constants.rs +++ b/crates/milli/src/constants.rs @@ -1,2 +1,6 @@ +pub static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR"); +pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR"); +pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH"); + pub const RESERVED_VECTORS_FIELD_NAME: &str = "_vectors"; pub const RESERVED_GEO_FIELD_NAME: &str = "_geo"; diff --git a/crates/milli/src/error.rs b/crates/milli/src/error.rs index 79e7770f0..29e02b9f1 100644 --- a/crates/milli/src/error.rs +++ b/crates/milli/src/error.rs @@ -10,7 +10,7 @@ use rhai::EvalAltResult; use serde_json::Value; use thiserror::Error; -use crate::constants::RESERVED_GEO_FIELD_NAME; +use crate::constants::{RESERVED_GEO_FIELD_NAME, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use crate::documents::{self, DocumentsBatchCursorError}; use crate::thread_pool_no_abort::PanicCatched; use crate::{CriterionError, DocumentId, FieldId, Object, SortError}; @@ -288,6 +288,12 @@ and can not be more than 511 bytes.", .document_id.to_string() DocumentEditionCompilationError(rhai::ParseError), #[error("{0}")] DocumentEmbeddingError(String), + #[error("Upgrade could not be processed because v{0}.{1}.{2} of the database is too old. Please re-open the v{0}.{1}.{2} and use a dump to upgrade your version. The oldest version meilisearch can upgrade from is v1.12.0.")] + TooOldForUpgrade(u32, u32, u32), + #[error("Upgrade could not be processed because the database version (v{0}.{1}.{2}) is newer than the targeted version (v{}.{}.{})", VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)] + CannotDowngrade(u32, u32, u32), + #[error("Cannot upgrade to unknown version v{0}.{1}.{2}.")] + CannotUpgradeToUnknownVersion(u32, u32, u32), } impl From for Error { diff --git a/crates/milli/src/heed_codec/mod.rs b/crates/milli/src/heed_codec/mod.rs index 575b886bd..45f7f7075 100644 --- a/crates/milli/src/heed_codec/mod.rs +++ b/crates/milli/src/heed_codec/mod.rs @@ -10,6 +10,7 @@ mod roaring_bitmap_length; mod str_beu32_codec; mod str_ref; mod str_str_u8_codec; +pub mod version; pub use byte_slice_ref::BytesRefCodec; use heed::BoxedError; diff --git a/crates/milli/src/heed_codec/version.rs b/crates/milli/src/heed_codec/version.rs new file mode 100644 index 000000000..d63ae91d4 --- /dev/null +++ b/crates/milli/src/heed_codec/version.rs @@ -0,0 +1,44 @@ +use std::mem::size_of; +use std::{borrow::Cow, mem::size_of_val}; + +use byteorder::{BigEndian, ByteOrder}; +use heed::{BoxedError, BytesDecode, BytesEncode}; + +const VERSION_SIZE: usize = std::mem::size_of::() * 3; + +#[derive(thiserror::Error, Debug)] +#[error( + "Could not decode the version: Expected {} bytes but instead received {0} bytes", + VERSION_SIZE +)] +pub struct DecodeVersionError(usize); + +pub struct VersionCodec; +impl<'a> BytesEncode<'a> for VersionCodec { + type EItem = (u32, u32, u32); + + fn bytes_encode(item: &'a Self::EItem) -> Result, BoxedError> { + let mut ret = Vec::with_capacity(size_of::() * 3); + ret.extend(&item.0.to_be_bytes()); + ret.extend(&item.1.to_be_bytes()); + ret.extend(&item.2.to_be_bytes()); + Ok(Cow::Owned(ret)) + } +} +impl<'a> BytesDecode<'a> for VersionCodec { + type DItem = (u32, u32, u32); + + fn bytes_decode(bytes: &'a [u8]) -> Result { + if bytes.len() != VERSION_SIZE { + Err(Box::new(DecodeVersionError(bytes.len()))) + } else { + let major = BigEndian::read_u32(bytes); + let bytes = &bytes[size_of_val(&major)..]; + let minor = BigEndian::read_u32(bytes); + let bytes = &bytes[size_of_val(&major)..]; + let patch = BigEndian::read_u32(bytes); + + Ok((major, minor, patch)) + } + } +} diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 9829df2ee..bda57b531 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -10,7 +10,7 @@ use roaring::RoaringBitmap; use rstar::RTree; use serde::{Deserialize, Serialize}; -use crate::constants::RESERVED_VECTORS_FIELD_NAME; +use crate::constants::{self, RESERVED_VECTORS_FIELD_NAME}; use crate::documents::PrimaryKey; use crate::error::{InternalError, UserError}; use crate::fields_ids_map::FieldsIdsMap; @@ -18,6 +18,7 @@ use crate::heed_codec::facet::{ FacetGroupKeyCodec, FacetGroupValueCodec, FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec, FieldIdCodec, OrderedF64Codec, }; +use crate::heed_codec::version::VersionCodec; use crate::heed_codec::{BEU16StrCodec, FstSetCodec, StrBEU16Codec, StrRefCodec}; use crate::order_by_map::OrderByMap; use crate::proximity::ProximityPrecision; @@ -33,6 +34,7 @@ pub const DEFAULT_MIN_WORD_LEN_ONE_TYPO: u8 = 5; pub const DEFAULT_MIN_WORD_LEN_TWO_TYPOS: u8 = 9; pub mod main_key { + pub const VERSION_KEY: &str = "version"; pub const CRITERIA_KEY: &str = "criteria"; pub const DISPLAYED_FIELDS_KEY: &str = "displayed-fields"; pub const DISTINCT_FIELD_KEY: &str = "distinct-field-key"; @@ -223,12 +225,9 @@ impl Index { let vector_arroy = env.create_database(&mut wtxn, Some(VECTOR_ARROY))?; let documents = env.create_database(&mut wtxn, Some(DOCUMENTS))?; - wtxn.commit()?; - Index::set_creation_dates(&env, main, created_at, updated_at)?; - - Ok(Index { - env, + let this = Index { + env: env.clone(), main, external_documents_ids, word_docids, @@ -253,7 +252,22 @@ impl Index { vector_arroy, embedder_category_id, documents, - }) + }; + if this.get_version(&wtxn)?.is_none() { + this.put_version( + &mut wtxn, + ( + constants::VERSION_MAJOR.parse().unwrap(), + constants::VERSION_MINOR.parse().unwrap(), + constants::VERSION_PATCH.parse().unwrap(), + ), + )?; + } + wtxn.commit()?; + + Index::set_creation_dates(&this.env, this.main, created_at, updated_at)?; + + Ok(this) } pub fn new>(options: heed::EnvOpenOptions, path: P) -> Result { @@ -331,6 +345,26 @@ impl Index { self.env.prepare_for_closing() } + /* version */ + + /// Writes the version of the database. + pub(crate) fn put_version( + &self, + wtxn: &mut RwTxn<'_>, + (major, minor, patch): (u32, u32, u32), + ) -> heed::Result<()> { + self.main.remap_types::().put( + wtxn, + main_key::VERSION_KEY, + &(major, minor, patch), + ) + } + + /// Get the version of the database. `None` if it was never set. + pub(crate) fn get_version(&self, rtxn: &RoTxn<'_>) -> heed::Result> { + self.main.remap_types::().get(rtxn, main_key::VERSION_KEY) + } + /* documents ids */ /// Writes the documents ids that corresponds to the user-ids-documents-ids FST. diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index 622ec9842..870277bad 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -1,5 +1,6 @@ use std::any::TypeId; use std::borrow::Cow; +use std::marker::PhantomData; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; @@ -153,3 +154,41 @@ pub struct ProgressStepView { pub finished: u32, pub total: u32, } + +/// Used when the name can change but it's still the same step. +/// To avoid conflicts on the `TypeId`, create a unique type every time you use this step: +/// ```text +/// enum UpgradeVersion {} +/// +/// progress.update_progress(VariableNameStep::::new( +/// "v1 to v2", +/// 0, +/// 10, +/// )); +/// ``` +pub struct VariableNameStep { + name: String, + current: u32, + total: u32, + phantom: PhantomData, +} + +impl VariableNameStep { + pub fn new(name: impl Into, current: u32, total: u32) -> Self { + Self { name: name.into(), current, total, phantom: PhantomData } + } +} + +impl Step for VariableNameStep { + fn name(&self) -> Cow<'static, str> { + self.name.clone().into() + } + + fn current(&self) -> u32 { + self.current + } + + fn total(&self) -> u32 { + self.total + } +} diff --git a/crates/milli/src/update/mod.rs b/crates/milli/src/update/mod.rs index 5888a20db..68268db35 100644 --- a/crates/milli/src/update/mod.rs +++ b/crates/milli/src/update/mod.rs @@ -21,6 +21,7 @@ mod indexer_config; pub mod new; pub(crate) mod settings; mod update_step; +pub mod upgrade; mod word_prefix_docids; mod words_prefix_integer_docids; mod words_prefixes_fst; diff --git a/crates/milli/src/update/upgrade/mod.rs b/crates/milli/src/update/upgrade/mod.rs new file mode 100644 index 000000000..aab160b38 --- /dev/null +++ b/crates/milli/src/update/upgrade/mod.rs @@ -0,0 +1,65 @@ +use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; +use crate::progress::{Progress, VariableNameStep}; +use crate::{Index, Result, UserError}; + +pub fn upgrade(index: &Index, base_version: (u32, u32, u32), progress: Progress) -> Result<()> { + let wtxn = index.env.write_txn()?; + let from = index.get_version(&wtxn)?; + let upgrade_functions = + [(v1_12_to_v1_13 as fn(&Index, Progress) -> Result<()>, "Upgrading from v1.12 to v1.13")]; + + let current_major: u32 = VERSION_MAJOR.parse().unwrap(); + let current_minor: u32 = VERSION_MINOR.parse().unwrap(); + let current_patch: u32 = VERSION_PATCH.parse().unwrap(); + + let start = match from { + // If there was no version it means we're coming from the base version specified by the index-scheduler + None if base_version.0 == 1 && base_version.1 == 12 => 0, + Some((1, 12, _)) => 0, + + // --- Error handling + None => { + return Err(UserError::TooOldForUpgrade( + base_version.0, + base_version.1, + base_version.2, + ) + .into()); + } + Some((major, minor, patch)) if major == 0 || (major == 1 && minor < 12) => { + return Err(UserError::TooOldForUpgrade(major, minor, patch).into()); + } + Some((major, minor, patch)) if major > current_major => { + return Err(UserError::CannotDowngrade(major, minor, patch).into()); + } + Some((major, minor, patch)) if major == current_major && minor > current_minor => { + return Err(UserError::CannotDowngrade(major, minor, patch).into()); + } + Some((major, minor, patch)) + if major == current_major && minor == current_minor && patch > current_patch => + { + return Err(UserError::CannotDowngrade(major, minor, patch).into()); + } + Some((major, minor, patch)) => { + return Err(UserError::CannotUpgradeToUnknownVersion(major, minor, patch).into()) + } + }; + + enum UpgradeVersion {} + let upgrade_path = &upgrade_functions[start..]; + + for (i, (upgrade_function, upgrade_msg)) in upgrade_path.iter().enumerate() { + progress.update_progress(VariableNameStep::::new( + upgrade_msg.to_string(), + i as u32, + upgrade_path.len() as u32, + )); + (upgrade_function)(index, progress.clone())?; + } + + Ok(()) +} + +fn v1_12_to_v1_13(_index: &Index, _progress: Progress) -> Result<()> { + Ok(()) +}