Add the new tasks with most of the job done

This commit is contained in:
Tamo 2025-01-14 14:54:00 +01:00
parent 0776217801
commit c21e6f7469
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
38 changed files with 572 additions and 204 deletions

View File

@ -141,6 +141,9 @@ pub enum KindDump {
instance_uid: Option<InstanceUid>, instance_uid: Option<InstanceUid>,
}, },
SnapshotCreation, SnapshotCreation,
UpgradeDatabase {
from: (u32, u32, u32),
},
} }
impl From<Task> for TaskDump { impl From<Task> for TaskDump {
@ -210,6 +213,9 @@ impl From<KindWithContent> for KindDump {
KindDump::DumpCreation { keys, instance_uid } KindDump::DumpCreation { keys, instance_uid }
} }
KindWithContent::SnapshotCreation => KindDump::SnapshotCreation, KindWithContent::SnapshotCreation => KindDump::SnapshotCreation,
KindWithContent::UpgradeDatabase { from: version } => {
KindDump::UpgradeDatabase { from: version }
}
} }
} }
} }

View File

@ -132,6 +132,7 @@ impl<'a> Dump<'a> {
KindWithContent::DumpCreation { keys, instance_uid } KindWithContent::DumpCreation { keys, instance_uid }
} }
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation, KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from },
}, },
}; };

View File

@ -3,7 +3,7 @@ use std::fmt::Display;
use meilisearch_types::batches::BatchId; use meilisearch_types::batches::BatchId;
use meilisearch_types::error::{Code, ErrorCode}; use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::tasks::{Kind, Status}; use meilisearch_types::tasks::{Kind, Status};
use meilisearch_types::{heed, milli}; use meilisearch_types::{heed, milli, versioning};
use thiserror::Error; use thiserror::Error;
use crate::TaskId; use crate::TaskId;

View File

@ -279,6 +279,9 @@ fn snapshot_details(d: &Details) -> String {
Details::IndexSwap { swaps } => { Details::IndexSwap { swaps } => {
format!("{{ swaps: {swaps:?} }}") format!("{{ swaps: {swaps:?} }}")
} }
Details::UpgradeDatabase { from } => {
format!("{{ from: v{}.{}.{} }}", from.0, from.1, from.2)
}
} }
} }

View File

@ -30,6 +30,7 @@ mod queue;
mod scheduler; mod scheduler;
#[cfg(test)] #[cfg(test)]
mod test_utils; mod test_utils;
pub mod upgrade;
mod utils; mod utils;
pub mod uuid_codec; pub mod uuid_codec;
@ -120,6 +121,8 @@ pub struct IndexSchedulerOptions {
pub batched_tasks_size_limit: u64, pub batched_tasks_size_limit: u64,
/// The experimental features enabled for this instance. /// The experimental features enabled for this instance.
pub instance_features: InstanceTogglableFeatures, pub instance_features: InstanceTogglableFeatures,
/// The experimental features enabled for this instance.
pub auto_upgrade: bool,
} }
/// Structure which holds meilisearch's indexes and schedules the tasks /// Structure which holds meilisearch's indexes and schedules the tasks

View File

@ -129,6 +129,12 @@ make_enum_progress! {
} }
} }
make_enum_progress! {
pub enum UpgradeDatabaseProgress {
EnsuringCorrectnessOfTheSwap,
}
}
make_enum_progress! { make_enum_progress! {
pub enum InnerSwappingTwoIndexes { pub enum InnerSwappingTwoIndexes {
RetrieveTheTasks, RetrieveTheTasks,
@ -173,32 +179,6 @@ make_atomic_progress!(Document alias AtomicDocumentStep => "document" );
make_atomic_progress!(Batch alias AtomicBatchStep => "batch" ); make_atomic_progress!(Batch alias AtomicBatchStep => "batch" );
make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" ); make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" );
pub struct VariableNameStep {
name: String,
current: u32,
total: u32,
}
impl VariableNameStep {
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
Self { name: name.into(), current, total }
}
}
impl Step for VariableNameStep {
fn name(&self) -> Cow<'static, str> {
self.name.clone().into()
}
fn current(&self) -> u32 {
self.current
}
fn total(&self) -> u32 {
self.total
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;

View File

@ -20,8 +20,8 @@ use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
use self::batches::BatchQueue; pub(crate) use self::batches::BatchQueue;
use self::tasks::TaskQueue; pub(crate) use self::tasks::TaskQueue;
use crate::processing::ProcessingTasks; use crate::processing::ProcessingTasks;
use crate::utils::{ use crate::utils::{
check_index_swap_validity, filter_out_references_to_newer_tasks, ProcessingBatch, check_index_swap_validity, filter_out_references_to_newer_tasks, ProcessingBatch,

View File

@ -59,7 +59,7 @@ impl TaskQueue {
} }
} }
pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> { pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
Ok(Self { Ok(Self {
all_tasks: env.create_database(wtxn, Some(db_name::ALL_TASKS))?, all_tasks: env.create_database(wtxn, Some(db_name::ALL_TASKS))?,
status: env.create_database(wtxn, Some(db_name::STATUS))?, status: env.create_database(wtxn, Some(db_name::STATUS))?,

View File

@ -85,6 +85,7 @@ impl From<KindWithContent> for AutobatchKind {
KindWithContent::TaskCancelation { .. } KindWithContent::TaskCancelation { .. }
| KindWithContent::TaskDeletion { .. } | KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpCreation { .. } | KindWithContent::DumpCreation { .. }
| KindWithContent::UpgradeDatabase { .. }
| KindWithContent::SnapshotCreation => { | KindWithContent::SnapshotCreation => {
panic!("The autobatcher should never be called with tasks that don't apply to an index.") panic!("The autobatcher should never be called with tasks that don't apply to an index.")
} }

View File

@ -47,6 +47,9 @@ pub(crate) enum Batch {
IndexSwap { IndexSwap {
task: Task, task: Task,
}, },
UpgradeDatabase {
tasks: Vec<Task>,
},
} }
#[derive(Debug)] #[derive(Debug)]
@ -105,6 +108,7 @@ impl Batch {
} }
Batch::SnapshotCreation(tasks) Batch::SnapshotCreation(tasks)
| Batch::TaskDeletions(tasks) | Batch::TaskDeletions(tasks)
| Batch::UpgradeDatabase { tasks }
| Batch::IndexDeletion { tasks, .. } => { | Batch::IndexDeletion { tasks, .. } => {
RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid)) RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid))
} }
@ -138,6 +142,7 @@ impl Batch {
| TaskDeletions(_) | TaskDeletions(_)
| SnapshotCreation(_) | SnapshotCreation(_)
| Dump(_) | Dump(_)
| UpgradeDatabase { .. }
| IndexSwap { .. } => None, | IndexSwap { .. } => None,
IndexOperation { op, .. } => Some(op.index_uid()), IndexOperation { op, .. } => Some(op.index_uid()),
IndexCreation { index_uid, .. } IndexCreation { index_uid, .. }
@ -162,6 +167,7 @@ impl fmt::Display for Batch {
Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?, Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?,
Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?, Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?,
Batch::IndexSwap { .. } => f.write_str("IndexSwap")?, Batch::IndexSwap { .. } => f.write_str("IndexSwap")?,
Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?,
}; };
match index_uid { match index_uid {
Some(name) => f.write_fmt(format_args!(" on {name:?} from tasks: {tasks:?}")), Some(name) => f.write_fmt(format_args!(" on {name:?} from tasks: {tasks:?}")),
@ -427,9 +433,18 @@ impl IndexScheduler {
let mut current_batch = ProcessingBatch::new(batch_id); let mut current_batch = ProcessingBatch::new(batch_id);
let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?; let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?;
let to_cancel = self.queue.tasks.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
// 0. The priority over everything is to upgrade the instance
let upgrade = self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)? & enqueued;
// There shouldn't be multiple upgrade tasks but just in case we're going to batch all of them at the same time
if !upgrade.is_empty() {
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, upgrade)?;
current_batch.processing(&mut tasks);
return Ok(Some((Batch::UpgradeDatabase { tasks }, current_batch)));
}
// 1. we get the last task to cancel. // 1. we get the last task to cancel.
let to_cancel = self.queue.tasks.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
if let Some(task_id) = to_cancel.max() { if let Some(task_id) = to_cancel.max() {
let mut task = let mut task =
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;

View File

@ -6,6 +6,7 @@ mod process_batch;
mod process_dump_creation; mod process_dump_creation;
mod process_index_operation; mod process_index_operation;
mod process_snapshot_creation; mod process_snapshot_creation;
mod process_upgrade;
#[cfg(test)] #[cfg(test)]
mod test; mod test;
#[cfg(test)] #[cfg(test)]

View File

@ -3,7 +3,7 @@ use std::sync::atomic::Ordering;
use meilisearch_types::batches::BatchId; use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self}; use meilisearch_types::milli::{self};
use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task};
use milli::update::Settings as MilliSettings; use milli::update::Settings as MilliSettings;
@ -13,7 +13,7 @@ use super::create_batch::Batch;
use crate::processing::{ use crate::processing::{
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress,
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress, VariableNameStep, UpdateIndexProgress,
}; };
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
use crate::{Error, IndexScheduler, Result, TaskId}; use crate::{Error, IndexScheduler, Result, TaskId};
@ -297,7 +297,7 @@ impl IndexScheduler {
} }
progress.update_progress(SwappingTheIndexes::SwappingTheIndexes); progress.update_progress(SwappingTheIndexes::SwappingTheIndexes);
for (step, swap) in swaps.iter().enumerate() { for (step, swap) in swaps.iter().enumerate() {
progress.update_progress(VariableNameStep::new( progress.update_progress(VariableNameStep::<SwappingTheIndexes>::new(
format!("swapping index {} and {}", swap.indexes.0, swap.indexes.1), format!("swapping index {} and {}", swap.indexes.0, swap.indexes.1),
step as u32, step as u32,
swaps.len() as u32, swaps.len() as u32,
@ -314,6 +314,7 @@ impl IndexScheduler {
task.status = Status::Succeeded; task.status = Status::Succeeded;
Ok(vec![task]) Ok(vec![task])
} }
Batch::UpgradeDatabase { tasks } => self.process_upgrade(progress, tasks),
} }
} }

View File

@ -5,16 +5,14 @@ use std::sync::atomic::Ordering;
use dump::IndexMetadata; use dump::IndexMetadata;
use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME; use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors}; use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
use meilisearch_types::milli::{self}; use meilisearch_types::milli::{self};
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use time::macros::format_description; use time::macros::format_description;
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::processing::{ use crate::processing::{AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress};
AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress, VariableNameStep,
};
use crate::{Error, IndexScheduler, Result}; use crate::{Error, IndexScheduler, Result};
impl IndexScheduler { impl IndexScheduler {
@ -106,8 +104,12 @@ impl IndexScheduler {
progress.update_progress(DumpCreationProgress::DumpTheIndexes); progress.update_progress(DumpCreationProgress::DumpTheIndexes);
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32; let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32;
let mut count = 0; let mut count = 0;
let () = self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> { self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
progress.update_progress(VariableNameStep::new(uid.to_string(), count, nb_indexes)); progress.update_progress(VariableNameStep::<DumpCreationProgress>::new(
uid.to_string(),
count,
nb_indexes,
));
count += 1; count += 1;
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;

View File

@ -3,12 +3,12 @@ use std::fs;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use meilisearch_types::heed::CompactionOption; use meilisearch_types::heed::CompactionOption;
use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self}; use meilisearch_types::milli::{self};
use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME}; use meilisearch_types::{compression, VERSION_FILE_NAME};
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress, VariableNameStep}; use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress};
use crate::{Error, IndexScheduler, Result}; use crate::{Error, IndexScheduler, Result};
impl IndexScheduler { impl IndexScheduler {
@ -74,7 +74,9 @@ impl IndexScheduler {
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() { for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
let (name, uuid) = result?; let (name, uuid) = result?;
progress.update_progress(VariableNameStep::new(name, i as u32, nb_indexes)); progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
name, i as u32, nb_indexes,
));
let index = self.index_mapper.index(&rtxn, name)?; let index = self.index_mapper.index(&rtxn, name)?;
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string()); let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
fs::create_dir_all(&dst)?; fs::create_dir_all(&dst)?;

View File

@ -0,0 +1,42 @@
use meilisearch_types::{
milli,
milli::progress::{Progress, VariableNameStep},
tasks::{KindWithContent, Status, Task},
versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH},
};
use crate::{processing::UpgradeDatabaseProgress, Error, IndexScheduler, Result};
impl IndexScheduler {
pub(super) fn process_upgrade(
&self,
progress: Progress,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
progress.update_progress(UpgradeDatabaseProgress::EnsuringCorrectnessOfTheSwap);
// Since we should not have multiple upgrade tasks, we're only going to process the latest one:
let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
unreachable!()
};
enum UpgradeIndex {}
let indexes = self.index_names()?;
for (i, uid) in indexes.iter().enumerate() {
progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
format!("Upgrading index `{uid}`"),
i as u32,
indexes.len() as u32,
));
let index = self.index(uid)?;
milli::update::upgrade::upgrade(&index, from, progress.clone());
}
for task in tasks.iter_mut() {
task.status = Status::Succeeded;
}
Ok(tasks)
}
}

View File

@ -713,68 +713,70 @@ fn basic_get_stats() {
let kind = index_creation_task("whalo", "fish"); let kind = index_creation_task("whalo", "fish");
let _task = index_scheduler.register(kind, None, false).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###" snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
{ {
"indexes": { "indexes": {
"catto": 1, "catto": 1,
"doggo": 1, "doggo": 1,
"whalo": 1 "whalo": 1
}, },
"statuses": { "statuses": {
"canceled": 0, "canceled": 0,
"enqueued": 3, "enqueued": 3,
"failed": 0, "failed": 0,
"processing": 0, "processing": 0,
"succeeded": 0 "succeeded": 0
}, },
"types": { "types": {
"documentAdditionOrUpdate": 0, "documentAdditionOrUpdate": 0,
"documentDeletion": 0, "documentDeletion": 0,
"documentEdition": 0, "documentEdition": 0,
"dumpCreation": 0, "dumpCreation": 0,
"indexCreation": 3, "indexCreation": 3,
"indexDeletion": 0, "indexDeletion": 0,
"indexSwap": 0, "indexSwap": 0,
"indexUpdate": 0, "indexUpdate": 0,
"settingsUpdate": 0, "settingsUpdate": 0,
"snapshotCreation": 0, "snapshotCreation": 0,
"taskCancelation": 0, "taskCancelation": 0,
"taskDeletion": 0 "taskDeletion": 0,
} "upgradeDatabase": 0
} }
"###); }
"#);
handle.advance_till([Start, BatchCreated]); handle.advance_till([Start, BatchCreated]);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###" snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
{ {
"indexes": { "indexes": {
"catto": 1, "catto": 1,
"doggo": 1, "doggo": 1,
"whalo": 1 "whalo": 1
}, },
"statuses": { "statuses": {
"canceled": 0, "canceled": 0,
"enqueued": 2, "enqueued": 2,
"failed": 0, "failed": 0,
"processing": 1, "processing": 1,
"succeeded": 0 "succeeded": 0
}, },
"types": { "types": {
"documentAdditionOrUpdate": 0, "documentAdditionOrUpdate": 0,
"documentDeletion": 0, "documentDeletion": 0,
"documentEdition": 0, "documentEdition": 0,
"dumpCreation": 0, "dumpCreation": 0,
"indexCreation": 3, "indexCreation": 3,
"indexDeletion": 0, "indexDeletion": 0,
"indexSwap": 0, "indexSwap": 0,
"indexUpdate": 0, "indexUpdate": 0,
"settingsUpdate": 0, "settingsUpdate": 0,
"snapshotCreation": 0, "snapshotCreation": 0,
"taskCancelation": 0, "taskCancelation": 0,
"taskDeletion": 0 "taskDeletion": 0,
} "upgradeDatabase": 0
} }
"###); }
"#);
handle.advance_till([ handle.advance_till([
InsideProcessBatch, InsideProcessBatch,
@ -784,36 +786,37 @@ fn basic_get_stats() {
Start, Start,
BatchCreated, BatchCreated,
]); ]);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###" snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
{ {
"indexes": { "indexes": {
"catto": 1, "catto": 1,
"doggo": 1, "doggo": 1,
"whalo": 1 "whalo": 1
}, },
"statuses": { "statuses": {
"canceled": 0, "canceled": 0,
"enqueued": 1, "enqueued": 1,
"failed": 0, "failed": 0,
"processing": 1, "processing": 1,
"succeeded": 1 "succeeded": 1
}, },
"types": { "types": {
"documentAdditionOrUpdate": 0, "documentAdditionOrUpdate": 0,
"documentDeletion": 0, "documentDeletion": 0,
"documentEdition": 0, "documentEdition": 0,
"dumpCreation": 0, "dumpCreation": 0,
"indexCreation": 3, "indexCreation": 3,
"indexDeletion": 0, "indexDeletion": 0,
"indexSwap": 0, "indexSwap": 0,
"indexUpdate": 0, "indexUpdate": 0,
"settingsUpdate": 0, "settingsUpdate": 0,
"snapshotCreation": 0, "snapshotCreation": 0,
"taskCancelation": 0, "taskCancelation": 0,
"taskDeletion": 0 "taskDeletion": 0,
} "upgradeDatabase": 0
} }
"###); }
"#);
// now we make one more batch, the started_at field of the new tasks will be past `second_start_time` // now we make one more batch, the started_at field of the new tasks will be past `second_start_time`
handle.advance_till([ handle.advance_till([
@ -824,36 +827,37 @@ fn basic_get_stats() {
Start, Start,
BatchCreated, BatchCreated,
]); ]);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###" snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
{ {
"indexes": { "indexes": {
"catto": 1, "catto": 1,
"doggo": 1, "doggo": 1,
"whalo": 1 "whalo": 1
}, },
"statuses": { "statuses": {
"canceled": 0, "canceled": 0,
"enqueued": 0, "enqueued": 0,
"failed": 0, "failed": 0,
"processing": 1, "processing": 1,
"succeeded": 2 "succeeded": 2
}, },
"types": { "types": {
"documentAdditionOrUpdate": 0, "documentAdditionOrUpdate": 0,
"documentDeletion": 0, "documentDeletion": 0,
"documentEdition": 0, "documentEdition": 0,
"dumpCreation": 0, "dumpCreation": 0,
"indexCreation": 3, "indexCreation": 3,
"indexDeletion": 0, "indexDeletion": 0,
"indexSwap": 0, "indexSwap": 0,
"indexUpdate": 0, "indexUpdate": 0,
"settingsUpdate": 0, "settingsUpdate": 0,
"snapshotCreation": 0, "snapshotCreation": 0,
"taskCancelation": 0, "taskCancelation": 0,
"taskDeletion": 0 "taskDeletion": 0,
} "upgradeDatabase": 0
} }
"###); }
"#);
} }
#[test] #[test]

View File

@ -109,6 +109,7 @@ impl IndexScheduler {
max_number_of_batched_tasks: usize::MAX, max_number_of_batched_tasks: usize::MAX,
batched_tasks_size_limit: u64::MAX, batched_tasks_size_limit: u64::MAX,
instance_features: Default::default(), instance_features: Default::default(),
auto_upgrade: true, // Don't cost much and will ensure the happy path works
}; };
configuration(&mut options); configuration(&mut options);

View File

@ -0,0 +1,43 @@
use std::path::Path;
use meilisearch_types::{
heed,
tasks::{KindWithContent, Status, Task},
};
use time::OffsetDateTime;
use tracing::info;
use crate::queue::TaskQueue;
pub fn upgrade_task_queue(tasks_path: &Path, version: (u32, u32, u32)) -> anyhow::Result<()> {
info!("Upgrading the task queue");
let env = unsafe {
heed::EnvOpenOptions::new()
.max_dbs(19)
// Since that's the only database memory-mapped currently we don't need to check the budget yet
.map_size(100 * 1024 * 1024)
.open(tasks_path)
}?;
let mut wtxn = env.write_txn()?;
let queue = TaskQueue::new(&env, &mut wtxn)?;
let uid = queue.next_task_id(&wtxn)?;
queue.register(
&mut wtxn,
&Task {
uid,
batch_uid: None,
enqueued_at: OffsetDateTime::now_utc(),
started_at: None,
finished_at: None,
error: None,
canceled_by: None,
details: None,
status: Status::Enqueued,
kind: KindWithContent::UpgradeDatabase { from: version },
},
)?;
wtxn.commit()?;
// Should be pretty much instantaneous since we're the only one reading this env
env.prepare_for_closing().wait();
Ok(())
}

View File

@ -234,6 +234,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
K::TaskCancelation { .. } K::TaskCancelation { .. }
| K::TaskDeletion { .. } | K::TaskDeletion { .. }
| K::DumpCreation { .. } | K::DumpCreation { .. }
| K::UpgradeDatabase { .. }
| K::SnapshotCreation => (), | K::SnapshotCreation => (),
}; };
if let Some(Details::IndexSwap { swaps }) = &mut task.details { if let Some(Details::IndexSwap { swaps }) = &mut task.details {
@ -547,6 +548,9 @@ impl crate::IndexScheduler {
Details::Dump { dump_uid: _ } => { Details::Dump { dump_uid: _ } => {
assert_eq!(kind.as_kind(), Kind::DumpCreation); assert_eq!(kind.as_kind(), Kind::DumpCreation);
} }
Details::UpgradeDatabase { from: _ } => {
assert_eq!(kind.as_kind(), Kind::UpgradeDatabase);
}
} }
} }

View File

@ -371,7 +371,8 @@ VectorEmbeddingError , InvalidRequest , BAD_REQUEST ;
NotFoundSimilarId , InvalidRequest , BAD_REQUEST ; NotFoundSimilarId , InvalidRequest , BAD_REQUEST ;
InvalidDocumentEditionContext , InvalidRequest , BAD_REQUEST ; InvalidDocumentEditionContext , InvalidRequest , BAD_REQUEST ;
InvalidDocumentEditionFunctionFilter , InvalidRequest , BAD_REQUEST ; InvalidDocumentEditionFunctionFilter , InvalidRequest , BAD_REQUEST ;
EditDocumentsByFunctionError , InvalidRequest , BAD_REQUEST EditDocumentsByFunctionError , InvalidRequest , BAD_REQUEST ;
CouldNotUpgrade , InvalidRequest , BAD_REQUEST
} }
impl ErrorCode for JoinError { impl ErrorCode for JoinError {
@ -455,6 +456,9 @@ impl ErrorCode for milli::Error {
| UserError::DocumentEditionCompilationError(_) => { | UserError::DocumentEditionCompilationError(_) => {
Code::EditDocumentsByFunctionError Code::EditDocumentsByFunctionError
} }
UserError::TooOldForUpgrade(_, _, _)
| UserError::CannotDowngrade(_, _, _)
| UserError::CannotUpgradeToUnknownVersion(_, _, _) => Code::CouldNotUpgrade,
} }
} }
} }

View File

@ -114,6 +114,8 @@ pub struct DetailsView {
pub settings: Option<Box<Settings<Unchecked>>>, pub settings: Option<Box<Settings<Unchecked>>>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub swaps: Option<Vec<IndexSwap>>, pub swaps: Option<Vec<IndexSwap>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub upgrade_from: Option<String>,
} }
impl DetailsView { impl DetailsView {
@ -234,6 +236,11 @@ impl DetailsView {
Some(left) Some(left)
} }
}, },
upgrade_from: match (self.upgrade_from.clone(), other.upgrade_from.clone()) {
(None, None) => None,
(None, Some(from)) | (Some(from), None) => Some(from),
(Some(_), Some(from)) => Some(from),
},
} }
} }
} }
@ -311,6 +318,10 @@ impl From<Details> for DetailsView {
Details::IndexSwap { swaps } => { Details::IndexSwap { swaps } => {
DetailsView { swaps: Some(swaps), ..Default::default() } DetailsView { swaps: Some(swaps), ..Default::default() }
} }
Details::UpgradeDatabase { from } => DetailsView {
upgrade_from: Some(format!("v{}.{}.{}", from.0, from.1, from.2)),
..Default::default()
},
} }
} }
} }

View File

@ -50,6 +50,7 @@ impl Task {
| SnapshotCreation | SnapshotCreation
| TaskCancelation { .. } | TaskCancelation { .. }
| TaskDeletion { .. } | TaskDeletion { .. }
| UpgradeDatabase { .. }
| IndexSwap { .. } => None, | IndexSwap { .. } => None,
DocumentAdditionOrUpdate { index_uid, .. } DocumentAdditionOrUpdate { index_uid, .. }
| DocumentEdition { index_uid, .. } | DocumentEdition { index_uid, .. }
@ -84,7 +85,8 @@ impl Task {
| KindWithContent::TaskCancelation { .. } | KindWithContent::TaskCancelation { .. }
| KindWithContent::TaskDeletion { .. } | KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpCreation { .. } | KindWithContent::DumpCreation { .. }
| KindWithContent::SnapshotCreation => None, | KindWithContent::SnapshotCreation
| KindWithContent::UpgradeDatabase { .. } => None,
} }
} }
} }
@ -150,6 +152,9 @@ pub enum KindWithContent {
instance_uid: Option<InstanceUid>, instance_uid: Option<InstanceUid>,
}, },
SnapshotCreation, SnapshotCreation,
UpgradeDatabase {
from: (u32, u32, u32),
},
} }
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
@ -175,6 +180,7 @@ impl KindWithContent {
KindWithContent::TaskDeletion { .. } => Kind::TaskDeletion, KindWithContent::TaskDeletion { .. } => Kind::TaskDeletion,
KindWithContent::DumpCreation { .. } => Kind::DumpCreation, KindWithContent::DumpCreation { .. } => Kind::DumpCreation,
KindWithContent::SnapshotCreation => Kind::SnapshotCreation, KindWithContent::SnapshotCreation => Kind::SnapshotCreation,
KindWithContent::UpgradeDatabase { .. } => Kind::UpgradeDatabase,
} }
} }
@ -185,7 +191,8 @@ impl KindWithContent {
DumpCreation { .. } DumpCreation { .. }
| SnapshotCreation | SnapshotCreation
| TaskCancelation { .. } | TaskCancelation { .. }
| TaskDeletion { .. } => vec![], | TaskDeletion { .. }
| UpgradeDatabase { .. } => vec![],
DocumentAdditionOrUpdate { index_uid, .. } DocumentAdditionOrUpdate { index_uid, .. }
| DocumentEdition { index_uid, .. } | DocumentEdition { index_uid, .. }
| DocumentDeletion { index_uid, .. } | DocumentDeletion { index_uid, .. }
@ -262,6 +269,7 @@ impl KindWithContent {
}), }),
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
KindWithContent::SnapshotCreation => None, KindWithContent::SnapshotCreation => None,
KindWithContent::UpgradeDatabase { .. } => None,
} }
} }
@ -320,6 +328,7 @@ impl KindWithContent {
}), }),
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
KindWithContent::SnapshotCreation => None, KindWithContent::SnapshotCreation => None,
KindWithContent::UpgradeDatabase { .. } => None,
} }
} }
} }
@ -360,6 +369,7 @@ impl From<&KindWithContent> for Option<Details> {
}), }),
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
KindWithContent::SnapshotCreation => None, KindWithContent::SnapshotCreation => None,
KindWithContent::UpgradeDatabase { .. } => None,
} }
} }
} }
@ -468,6 +478,7 @@ pub enum Kind {
TaskDeletion, TaskDeletion,
DumpCreation, DumpCreation,
SnapshotCreation, SnapshotCreation,
UpgradeDatabase,
} }
impl Kind { impl Kind {
@ -484,6 +495,7 @@ impl Kind {
| Kind::TaskCancelation | Kind::TaskCancelation
| Kind::TaskDeletion | Kind::TaskDeletion
| Kind::DumpCreation | Kind::DumpCreation
| Kind::UpgradeDatabase
| Kind::SnapshotCreation => false, | Kind::SnapshotCreation => false,
} }
} }
@ -503,6 +515,7 @@ impl Display for Kind {
Kind::TaskDeletion => write!(f, "taskDeletion"), Kind::TaskDeletion => write!(f, "taskDeletion"),
Kind::DumpCreation => write!(f, "dumpCreation"), Kind::DumpCreation => write!(f, "dumpCreation"),
Kind::SnapshotCreation => write!(f, "snapshotCreation"), Kind::SnapshotCreation => write!(f, "snapshotCreation"),
Kind::UpgradeDatabase => write!(f, "upgradeDatabase"),
} }
} }
} }
@ -607,6 +620,9 @@ pub enum Details {
IndexSwap { IndexSwap {
swaps: Vec<IndexSwap>, swaps: Vec<IndexSwap>,
}, },
UpgradeDatabase {
from: (usize, usize, usize),
},
} }
impl Details { impl Details {
@ -627,6 +643,7 @@ impl Details {
Self::SettingsUpdate { .. } Self::SettingsUpdate { .. }
| Self::IndexInfo { .. } | Self::IndexInfo { .. }
| Self::Dump { .. } | Self::Dump { .. }
| Self::UpgradeDatabase { .. }
| Self::IndexSwap { .. } => (), | Self::IndexSwap { .. } => (),
} }

View File

@ -5,9 +5,9 @@ use std::path::Path;
/// The name of the file that contains the version of the database. /// The name of the file that contains the version of the database.
pub const VERSION_FILE_NAME: &str = "VERSION"; pub const VERSION_FILE_NAME: &str = "VERSION";
static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR"); pub static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR");
static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR"); pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH"); pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
/// Persists the version of the current Meilisearch binary to a VERSION file /// Persists the version of the current Meilisearch binary to a VERSION file
pub fn create_current_version_file(db_path: &Path) -> io::Result<()> { pub fn create_current_version_file(db_path: &Path) -> io::Result<()> {
@ -24,17 +24,6 @@ pub fn create_version_file(
fs::write(version_path, format!("{}.{}.{}", major, minor, patch)) fs::write(version_path, format!("{}.{}.{}", major, minor, patch))
} }
/// Ensures Meilisearch version is compatible with the database, returns an error versions mismatch.
pub fn check_version_file(db_path: &Path) -> anyhow::Result<()> {
let (major, minor, patch) = get_version(db_path)?;
if major != VERSION_MAJOR || minor != VERSION_MINOR {
return Err(VersionFileError::VersionMismatch { major, minor, patch }.into());
}
Ok(())
}
pub fn get_version(db_path: &Path) -> Result<(String, String, String), VersionFileError> { pub fn get_version(db_path: &Path) -> Result<(String, String, String), VersionFileError> {
let version_path = db_path.join(VERSION_FILE_NAME); let version_path = db_path.join(VERSION_FILE_NAME);
@ -48,7 +37,7 @@ pub fn get_version(db_path: &Path) -> Result<(String, String, String), VersionFi
} }
pub fn parse_version(version: &str) -> Result<(String, String, String), VersionFileError> { pub fn parse_version(version: &str) -> Result<(String, String, String), VersionFileError> {
let version_components = version.split('.').collect::<Vec<_>>(); let version_components = version.trim().split('.').collect::<Vec<_>>();
let (major, minor, patch) = match &version_components[..] { let (major, minor, patch) = match &version_components[..] {
[major, minor, patch] => (major.to_string(), minor.to_string(), patch.to_string()), [major, minor, patch] => (major.to_string(), minor.to_string(), patch.to_string()),
_ => return Err(VersionFileError::MalformedVersionFile), _ => return Err(VersionFileError::MalformedVersionFile),

View File

@ -189,6 +189,7 @@ struct Infos {
experimental_drop_search_after: usize, experimental_drop_search_after: usize,
experimental_nb_searches_per_core: usize, experimental_nb_searches_per_core: usize,
experimental_logs_mode: LogMode, experimental_logs_mode: LogMode,
experimental_dumpless_upgrade: bool,
experimental_replication_parameters: bool, experimental_replication_parameters: bool,
experimental_enable_logs_route: bool, experimental_enable_logs_route: bool,
experimental_reduce_indexing_memory_usage: bool, experimental_reduce_indexing_memory_usage: bool,
@ -235,6 +236,7 @@ impl Infos {
experimental_drop_search_after, experimental_drop_search_after,
experimental_nb_searches_per_core, experimental_nb_searches_per_core,
experimental_logs_mode, experimental_logs_mode,
experimental_dumpless_upgrade,
experimental_replication_parameters, experimental_replication_parameters,
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
@ -296,6 +298,7 @@ impl Infos {
experimental_drop_search_after: experimental_drop_search_after.into(), experimental_drop_search_after: experimental_drop_search_after.into(),
experimental_nb_searches_per_core: experimental_nb_searches_per_core.into(), experimental_nb_searches_per_core: experimental_nb_searches_per_core.into(),
experimental_logs_mode, experimental_logs_mode,
experimental_dumpless_upgrade,
experimental_replication_parameters, experimental_replication_parameters,
experimental_enable_logs_route: experimental_enable_logs_route | logs_route, experimental_enable_logs_route: experimental_enable_logs_route | logs_route,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,

View File

@ -32,13 +32,16 @@ use analytics::Analytics;
use anyhow::bail; use anyhow::bail;
use error::PayloadError; use error::PayloadError;
use extractors::payload::PayloadConfig; use extractors::payload::PayloadConfig;
use index_scheduler::upgrade::upgrade_task_queue;
use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod};
use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::tasks::KindWithContent; use meilisearch_types::tasks::KindWithContent;
use meilisearch_types::versioning::{check_version_file, create_current_version_file}; use meilisearch_types::versioning::{
create_current_version_file, get_version, VersionFileError, VERSION_MAJOR, VERSION_MINOR,
};
use meilisearch_types::{compression, milli, VERSION_FILE_NAME}; use meilisearch_types::{compression, milli, VERSION_FILE_NAME};
pub use option::Opt; pub use option::Opt;
use option::ScheduleSnapshot; use option::ScheduleSnapshot;
@ -316,6 +319,7 @@ fn open_or_create_database_unchecked(
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize,
index_count: DEFAULT_INDEX_COUNT, index_count: DEFAULT_INDEX_COUNT,
instance_features, instance_features,
auto_upgrade: opt.experimental_dumpless_upgrade,
})?) })?)
}; };
@ -334,13 +338,36 @@ fn open_or_create_database_unchecked(
} }
} }
/// Ensures Meilisearch version is compatible with the database, returns an error versions mismatch.
fn check_version_and_update_task_queue(
db_path: &Path,
experimental_dumpless_upgrade: bool,
) -> anyhow::Result<()> {
let (major, minor, patch) = get_version(db_path)?;
if major != VERSION_MAJOR || minor != VERSION_MINOR {
if experimental_dumpless_upgrade {
let version = (
major.parse().map_err(|_| VersionFileError::MalformedVersionFile)?,
minor.parse().map_err(|_| VersionFileError::MalformedVersionFile)?,
patch.parse().map_err(|_| VersionFileError::MalformedVersionFile)?,
);
return upgrade_task_queue(&db_path.join("tasks"), version);
} else {
return Err(VersionFileError::VersionMismatch { major, minor, patch }.into());
}
}
Ok(())
}
/// Ensure you're in a valid state and open the IndexScheduler + AuthController for you. /// Ensure you're in a valid state and open the IndexScheduler + AuthController for you.
fn open_or_create_database( fn open_or_create_database(
opt: &Opt, opt: &Opt,
empty_db: bool, empty_db: bool,
) -> anyhow::Result<(IndexScheduler, AuthController)> { ) -> anyhow::Result<(IndexScheduler, AuthController)> {
if !empty_db { if !empty_db {
check_version_file(&opt.db_path)?; check_version_and_update_task_queue(&opt.db_path, opt.experimental_dumpless_upgrade)?;
} }
open_or_create_database_unchecked(opt, OnFailure::KeepDb) open_or_create_database_unchecked(opt, OnFailure::KeepDb)

View File

@ -49,6 +49,7 @@ const MEILI_IGNORE_DUMP_IF_DB_EXISTS: &str = "MEILI_IGNORE_DUMP_IF_DB_EXISTS";
const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR"; const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR";
const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL"; const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL";
const MEILI_EXPERIMENTAL_LOGS_MODE: &str = "MEILI_EXPERIMENTAL_LOGS_MODE"; const MEILI_EXPERIMENTAL_LOGS_MODE: &str = "MEILI_EXPERIMENTAL_LOGS_MODE";
const MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE: &str = "MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE";
const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS"; const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS";
const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE"; const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE";
const MEILI_EXPERIMENTAL_CONTAINS_FILTER: &str = "MEILI_EXPERIMENTAL_CONTAINS_FILTER"; const MEILI_EXPERIMENTAL_CONTAINS_FILTER: &str = "MEILI_EXPERIMENTAL_CONTAINS_FILTER";
@ -400,6 +401,13 @@ pub struct Opt {
#[serde(default)] #[serde(default)]
pub experimental_logs_mode: LogMode, pub experimental_logs_mode: LogMode,
/// Experimental dumpless upgrade. For more information, see: <https://github.com/orgs/meilisearch/discussions/723>
///
/// When set, Meilisearch will auto-update its database without using a dump.
#[clap(long, env = MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE, default_value_t)]
#[serde(default)]
pub experimental_dumpless_upgrade: bool,
/// Experimental logs route feature. For more information, /// Experimental logs route feature. For more information,
/// see: <https://github.com/orgs/meilisearch/discussions/721> /// see: <https://github.com/orgs/meilisearch/discussions/721>
/// ///
@ -535,6 +543,7 @@ impl Opt {
experimental_drop_search_after, experimental_drop_search_after,
experimental_nb_searches_per_core, experimental_nb_searches_per_core,
experimental_logs_mode, experimental_logs_mode,
experimental_dumpless_upgrade,
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_replication_parameters, experimental_replication_parameters,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
@ -608,6 +617,10 @@ impl Opt {
MEILI_EXPERIMENTAL_LOGS_MODE, MEILI_EXPERIMENTAL_LOGS_MODE,
experimental_logs_mode.to_string(), experimental_logs_mode.to_string(),
); );
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE,
experimental_dumpless_upgrade.to_string(),
);
export_to_env_if_not_present( export_to_env_if_not_present(
MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS, MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS,
experimental_replication_parameters.to_string(), experimental_replication_parameters.to_string(),

View File

@ -912,14 +912,14 @@ mod tests {
{ {
let params = "types=createIndex"; let params = "types=createIndex";
let err = deserr_query_params::<TaskDeletionOrCancelationQuery>(params).unwrap_err(); let err = deserr_query_params::<TaskDeletionOrCancelationQuery>(params).unwrap_err();
snapshot!(meili_snap::json_string!(err), @r###" snapshot!(meili_snap::json_string!(err), @r#"
{ {
"message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.", "message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.",
"code": "invalid_task_types", "code": "invalid_task_types",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types" "link": "https://docs.meilisearch.com/errors#invalid_task_types"
} }
"###); "#);
} }
} }
#[test] #[test]

View File

@ -0,0 +1 @@

View File

@ -42,7 +42,7 @@ async fn batch_bad_types() {
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#" snapshot!(json_string!(response), @r#"
{ {
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.", "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.",
"code": "invalid_task_types", "code": "invalid_task_types",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types" "link": "https://docs.meilisearch.com/errors#invalid_task_types"

View File

@ -95,36 +95,36 @@ async fn task_bad_types() {
let (response, code) = server.tasks_filter("types=doggo").await; let (response, code) = server.tasks_filter("types=doggo").await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r#"
{ {
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.", "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.",
"code": "invalid_task_types", "code": "invalid_task_types",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types" "link": "https://docs.meilisearch.com/errors#invalid_task_types"
} }
"###); "#);
let (response, code) = server.cancel_tasks("types=doggo").await; let (response, code) = server.cancel_tasks("types=doggo").await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r#"
{ {
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.", "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.",
"code": "invalid_task_types", "code": "invalid_task_types",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types" "link": "https://docs.meilisearch.com/errors#invalid_task_types"
} }
"###); "#);
let (response, code) = server.delete_tasks("types=doggo").await; let (response, code) = server.delete_tasks("types=doggo").await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r#"
{ {
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.", "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.",
"code": "invalid_task_types", "code": "invalid_task_types",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types" "link": "https://docs.meilisearch.com/errors#invalid_task_types"
} }
"###); "#);
} }
#[actix_rt::test] #[actix_rt::test]

View File

@ -1,2 +1,6 @@
pub static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR");
pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
pub const RESERVED_VECTORS_FIELD_NAME: &str = "_vectors"; pub const RESERVED_VECTORS_FIELD_NAME: &str = "_vectors";
pub const RESERVED_GEO_FIELD_NAME: &str = "_geo"; pub const RESERVED_GEO_FIELD_NAME: &str = "_geo";

View File

@ -10,7 +10,7 @@ use rhai::EvalAltResult;
use serde_json::Value; use serde_json::Value;
use thiserror::Error; use thiserror::Error;
use crate::constants::RESERVED_GEO_FIELD_NAME; use crate::constants::{RESERVED_GEO_FIELD_NAME, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use crate::documents::{self, DocumentsBatchCursorError}; use crate::documents::{self, DocumentsBatchCursorError};
use crate::thread_pool_no_abort::PanicCatched; use crate::thread_pool_no_abort::PanicCatched;
use crate::{CriterionError, DocumentId, FieldId, Object, SortError}; use crate::{CriterionError, DocumentId, FieldId, Object, SortError};
@ -288,6 +288,12 @@ and can not be more than 511 bytes.", .document_id.to_string()
DocumentEditionCompilationError(rhai::ParseError), DocumentEditionCompilationError(rhai::ParseError),
#[error("{0}")] #[error("{0}")]
DocumentEmbeddingError(String), DocumentEmbeddingError(String),
#[error("Upgrade could not be processed because v{0}.{1}.{2} of the database is too old. Please re-open the v{0}.{1}.{2} and use a dump to upgrade your version. The oldest version meilisearch can upgrade from is v1.12.0.")]
TooOldForUpgrade(u32, u32, u32),
#[error("Upgrade could not be processed because the database version (v{0}.{1}.{2}) is newer than the targeted version (v{}.{}.{})", VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)]
CannotDowngrade(u32, u32, u32),
#[error("Cannot upgrade to unknown version v{0}.{1}.{2}.")]
CannotUpgradeToUnknownVersion(u32, u32, u32),
} }
impl From<crate::vector::Error> for Error { impl From<crate::vector::Error> for Error {

View File

@ -10,6 +10,7 @@ mod roaring_bitmap_length;
mod str_beu32_codec; mod str_beu32_codec;
mod str_ref; mod str_ref;
mod str_str_u8_codec; mod str_str_u8_codec;
pub mod version;
pub use byte_slice_ref::BytesRefCodec; pub use byte_slice_ref::BytesRefCodec;
use heed::BoxedError; use heed::BoxedError;

View File

@ -0,0 +1,44 @@
use std::mem::size_of;
use std::{borrow::Cow, mem::size_of_val};
use byteorder::{BigEndian, ByteOrder};
use heed::{BoxedError, BytesDecode, BytesEncode};
const VERSION_SIZE: usize = std::mem::size_of::<u32>() * 3;
#[derive(thiserror::Error, Debug)]
#[error(
"Could not decode the version: Expected {} bytes but instead received {0} bytes",
VERSION_SIZE
)]
pub struct DecodeVersionError(usize);
pub struct VersionCodec;
impl<'a> BytesEncode<'a> for VersionCodec {
type EItem = (u32, u32, u32);
fn bytes_encode(item: &'a Self::EItem) -> Result<Cow<'a, [u8]>, BoxedError> {
let mut ret = Vec::with_capacity(size_of::<u32>() * 3);
ret.extend(&item.0.to_be_bytes());
ret.extend(&item.1.to_be_bytes());
ret.extend(&item.2.to_be_bytes());
Ok(Cow::Owned(ret))
}
}
impl<'a> BytesDecode<'a> for VersionCodec {
type DItem = (u32, u32, u32);
fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, BoxedError> {
if bytes.len() != VERSION_SIZE {
Err(Box::new(DecodeVersionError(bytes.len())))
} else {
let major = BigEndian::read_u32(bytes);
let bytes = &bytes[size_of_val(&major)..];
let minor = BigEndian::read_u32(bytes);
let bytes = &bytes[size_of_val(&major)..];
let patch = BigEndian::read_u32(bytes);
Ok((major, minor, patch))
}
}
}

View File

@ -10,7 +10,7 @@ use roaring::RoaringBitmap;
use rstar::RTree; use rstar::RTree;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::constants::RESERVED_VECTORS_FIELD_NAME; use crate::constants::{self, RESERVED_VECTORS_FIELD_NAME};
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::error::{InternalError, UserError}; use crate::error::{InternalError, UserError};
use crate::fields_ids_map::FieldsIdsMap; use crate::fields_ids_map::FieldsIdsMap;
@ -18,6 +18,7 @@ use crate::heed_codec::facet::{
FacetGroupKeyCodec, FacetGroupValueCodec, FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec, FacetGroupKeyCodec, FacetGroupValueCodec, FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec,
FieldIdCodec, OrderedF64Codec, FieldIdCodec, OrderedF64Codec,
}; };
use crate::heed_codec::version::VersionCodec;
use crate::heed_codec::{BEU16StrCodec, FstSetCodec, StrBEU16Codec, StrRefCodec}; use crate::heed_codec::{BEU16StrCodec, FstSetCodec, StrBEU16Codec, StrRefCodec};
use crate::order_by_map::OrderByMap; use crate::order_by_map::OrderByMap;
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
@ -33,6 +34,7 @@ pub const DEFAULT_MIN_WORD_LEN_ONE_TYPO: u8 = 5;
pub const DEFAULT_MIN_WORD_LEN_TWO_TYPOS: u8 = 9; pub const DEFAULT_MIN_WORD_LEN_TWO_TYPOS: u8 = 9;
pub mod main_key { pub mod main_key {
pub const VERSION_KEY: &str = "version";
pub const CRITERIA_KEY: &str = "criteria"; pub const CRITERIA_KEY: &str = "criteria";
pub const DISPLAYED_FIELDS_KEY: &str = "displayed-fields"; pub const DISPLAYED_FIELDS_KEY: &str = "displayed-fields";
pub const DISTINCT_FIELD_KEY: &str = "distinct-field-key"; pub const DISTINCT_FIELD_KEY: &str = "distinct-field-key";
@ -223,12 +225,9 @@ impl Index {
let vector_arroy = env.create_database(&mut wtxn, Some(VECTOR_ARROY))?; let vector_arroy = env.create_database(&mut wtxn, Some(VECTOR_ARROY))?;
let documents = env.create_database(&mut wtxn, Some(DOCUMENTS))?; let documents = env.create_database(&mut wtxn, Some(DOCUMENTS))?;
wtxn.commit()?;
Index::set_creation_dates(&env, main, created_at, updated_at)?; let this = Index {
env: env.clone(),
Ok(Index {
env,
main, main,
external_documents_ids, external_documents_ids,
word_docids, word_docids,
@ -253,7 +252,22 @@ impl Index {
vector_arroy, vector_arroy,
embedder_category_id, embedder_category_id,
documents, documents,
}) };
if this.get_version(&wtxn)?.is_none() {
this.put_version(
&mut wtxn,
(
constants::VERSION_MAJOR.parse().unwrap(),
constants::VERSION_MINOR.parse().unwrap(),
constants::VERSION_PATCH.parse().unwrap(),
),
)?;
}
wtxn.commit()?;
Index::set_creation_dates(&this.env, this.main, created_at, updated_at)?;
Ok(this)
} }
pub fn new<P: AsRef<Path>>(options: heed::EnvOpenOptions, path: P) -> Result<Index> { pub fn new<P: AsRef<Path>>(options: heed::EnvOpenOptions, path: P) -> Result<Index> {
@ -331,6 +345,26 @@ impl Index {
self.env.prepare_for_closing() self.env.prepare_for_closing()
} }
/* version */
/// Writes the version of the database.
pub(crate) fn put_version(
&self,
wtxn: &mut RwTxn<'_>,
(major, minor, patch): (u32, u32, u32),
) -> heed::Result<()> {
self.main.remap_types::<Str, VersionCodec>().put(
wtxn,
main_key::VERSION_KEY,
&(major, minor, patch),
)
}
/// Get the version of the database. `None` if it was never set.
pub(crate) fn get_version(&self, rtxn: &RoTxn<'_>) -> heed::Result<Option<(u32, u32, u32)>> {
self.main.remap_types::<Str, VersionCodec>().get(rtxn, main_key::VERSION_KEY)
}
/* documents ids */ /* documents ids */
/// Writes the documents ids that corresponds to the user-ids-documents-ids FST. /// Writes the documents ids that corresponds to the user-ids-documents-ids FST.

View File

@ -1,5 +1,6 @@
use std::any::TypeId; use std::any::TypeId;
use std::borrow::Cow; use std::borrow::Cow;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -153,3 +154,41 @@ pub struct ProgressStepView {
pub finished: u32, pub finished: u32,
pub total: u32, pub total: u32,
} }
/// Used when the name can change but it's still the same step.
/// To avoid conflicts on the `TypeId`, create a unique type every time you use this step:
/// ```text
/// enum UpgradeVersion {}
///
/// progress.update_progress(VariableNameStep::<UpgradeVersion>::new(
/// "v1 to v2",
/// 0,
/// 10,
/// ));
/// ```
pub struct VariableNameStep<U: Send + Sync + 'static> {
name: String,
current: u32,
total: u32,
phantom: PhantomData<U>,
}
impl<U: Send + Sync + 'static> VariableNameStep<U> {
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
Self { name: name.into(), current, total, phantom: PhantomData }
}
}
impl<U: Send + Sync + 'static> Step for VariableNameStep<U> {
fn name(&self) -> Cow<'static, str> {
self.name.clone().into()
}
fn current(&self) -> u32 {
self.current
}
fn total(&self) -> u32 {
self.total
}
}

View File

@ -21,6 +21,7 @@ mod indexer_config;
pub mod new; pub mod new;
pub(crate) mod settings; pub(crate) mod settings;
mod update_step; mod update_step;
pub mod upgrade;
mod word_prefix_docids; mod word_prefix_docids;
mod words_prefix_integer_docids; mod words_prefix_integer_docids;
mod words_prefixes_fst; mod words_prefixes_fst;

View File

@ -0,0 +1,65 @@
use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use crate::progress::{Progress, VariableNameStep};
use crate::{Index, Result, UserError};
pub fn upgrade(index: &Index, base_version: (u32, u32, u32), progress: Progress) -> Result<()> {
let wtxn = index.env.write_txn()?;
let from = index.get_version(&wtxn)?;
let upgrade_functions =
[(v1_12_to_v1_13 as fn(&Index, Progress) -> Result<()>, "Upgrading from v1.12 to v1.13")];
let current_major: u32 = VERSION_MAJOR.parse().unwrap();
let current_minor: u32 = VERSION_MINOR.parse().unwrap();
let current_patch: u32 = VERSION_PATCH.parse().unwrap();
let start = match from {
// If there was no version it means we're coming from the base version specified by the index-scheduler
None if base_version.0 == 1 && base_version.1 == 12 => 0,
Some((1, 12, _)) => 0,
// --- Error handling
None => {
return Err(UserError::TooOldForUpgrade(
base_version.0,
base_version.1,
base_version.2,
)
.into());
}
Some((major, minor, patch)) if major == 0 || (major == 1 && minor < 12) => {
return Err(UserError::TooOldForUpgrade(major, minor, patch).into());
}
Some((major, minor, patch)) if major > current_major => {
return Err(UserError::CannotDowngrade(major, minor, patch).into());
}
Some((major, minor, patch)) if major == current_major && minor > current_minor => {
return Err(UserError::CannotDowngrade(major, minor, patch).into());
}
Some((major, minor, patch))
if major == current_major && minor == current_minor && patch > current_patch =>
{
return Err(UserError::CannotDowngrade(major, minor, patch).into());
}
Some((major, minor, patch)) => {
return Err(UserError::CannotUpgradeToUnknownVersion(major, minor, patch).into())
}
};
enum UpgradeVersion {}
let upgrade_path = &upgrade_functions[start..];
for (i, (upgrade_function, upgrade_msg)) in upgrade_path.iter().enumerate() {
progress.update_progress(VariableNameStep::<UpgradeVersion>::new(
upgrade_msg.to_string(),
i as u32,
upgrade_path.len() as u32,
));
(upgrade_function)(index, progress.clone())?;
}
Ok(())
}
fn v1_12_to_v1_13(_index: &Index, _progress: Progress) -> Result<()> {
Ok(())
}