mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-15 15:57:31 +01:00
Add the new tasks with most of the job done
This commit is contained in:
parent
e568dbbabb
commit
381b2f1504
@ -141,6 +141,9 @@ pub enum KindDump {
|
||||
instance_uid: Option<InstanceUid>,
|
||||
},
|
||||
SnapshotCreation,
|
||||
UpgradeDatabase {
|
||||
from: (u32, u32, u32),
|
||||
},
|
||||
}
|
||||
|
||||
impl From<Task> for TaskDump {
|
||||
@ -210,6 +213,9 @@ impl From<KindWithContent> for KindDump {
|
||||
KindDump::DumpCreation { keys, instance_uid }
|
||||
}
|
||||
KindWithContent::SnapshotCreation => KindDump::SnapshotCreation,
|
||||
KindWithContent::UpgradeDatabase { from: version } => {
|
||||
KindDump::UpgradeDatabase { from: version }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -132,6 +132,7 @@ impl<'a> Dump<'a> {
|
||||
KindWithContent::DumpCreation { keys, instance_uid }
|
||||
}
|
||||
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
|
||||
KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from },
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -3,7 +3,7 @@ use std::fmt::Display;
|
||||
use meilisearch_types::batches::BatchId;
|
||||
use meilisearch_types::error::{Code, ErrorCode};
|
||||
use meilisearch_types::tasks::{Kind, Status};
|
||||
use meilisearch_types::{heed, milli};
|
||||
use meilisearch_types::{heed, milli, versioning};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::TaskId;
|
||||
|
@ -279,6 +279,9 @@ fn snapshot_details(d: &Details) -> String {
|
||||
Details::IndexSwap { swaps } => {
|
||||
format!("{{ swaps: {swaps:?} }}")
|
||||
}
|
||||
Details::UpgradeDatabase { from } => {
|
||||
format!("{{ from: v{}.{}.{} }}", from.0, from.1, from.2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,7 @@ mod queue;
|
||||
mod scheduler;
|
||||
#[cfg(test)]
|
||||
mod test_utils;
|
||||
pub mod upgrade;
|
||||
mod utils;
|
||||
pub mod uuid_codec;
|
||||
|
||||
@ -120,6 +121,8 @@ pub struct IndexSchedulerOptions {
|
||||
pub batched_tasks_size_limit: u64,
|
||||
/// The experimental features enabled for this instance.
|
||||
pub instance_features: InstanceTogglableFeatures,
|
||||
/// The experimental features enabled for this instance.
|
||||
pub auto_upgrade: bool,
|
||||
}
|
||||
|
||||
/// Structure which holds meilisearch's indexes and schedules the tasks
|
||||
|
@ -129,6 +129,12 @@ make_enum_progress! {
|
||||
}
|
||||
}
|
||||
|
||||
make_enum_progress! {
|
||||
pub enum UpgradeDatabaseProgress {
|
||||
EnsuringCorrectnessOfTheSwap,
|
||||
}
|
||||
}
|
||||
|
||||
make_enum_progress! {
|
||||
pub enum InnerSwappingTwoIndexes {
|
||||
RetrieveTheTasks,
|
||||
@ -173,32 +179,6 @@ make_atomic_progress!(Document alias AtomicDocumentStep => "document" );
|
||||
make_atomic_progress!(Batch alias AtomicBatchStep => "batch" );
|
||||
make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" );
|
||||
|
||||
pub struct VariableNameStep {
|
||||
name: String,
|
||||
current: u32,
|
||||
total: u32,
|
||||
}
|
||||
|
||||
impl VariableNameStep {
|
||||
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
|
||||
Self { name: name.into(), current, total }
|
||||
}
|
||||
}
|
||||
|
||||
impl Step for VariableNameStep {
|
||||
fn name(&self) -> Cow<'static, str> {
|
||||
self.name.clone().into()
|
||||
}
|
||||
|
||||
fn current(&self) -> u32 {
|
||||
self.current
|
||||
}
|
||||
|
||||
fn total(&self) -> u32 {
|
||||
self.total
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::sync::atomic::Ordering;
|
||||
|
@ -20,8 +20,8 @@ use time::format_description::well_known::Rfc3339;
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
use self::batches::BatchQueue;
|
||||
use self::tasks::TaskQueue;
|
||||
pub(crate) use self::batches::BatchQueue;
|
||||
pub(crate) use self::tasks::TaskQueue;
|
||||
use crate::processing::ProcessingTasks;
|
||||
use crate::utils::{
|
||||
check_index_swap_validity, filter_out_references_to_newer_tasks, ProcessingBatch,
|
||||
|
@ -59,7 +59,7 @@ impl TaskQueue {
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
|
||||
pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
|
||||
Ok(Self {
|
||||
all_tasks: env.create_database(wtxn, Some(db_name::ALL_TASKS))?,
|
||||
status: env.create_database(wtxn, Some(db_name::STATUS))?,
|
||||
|
@ -85,6 +85,7 @@ impl From<KindWithContent> for AutobatchKind {
|
||||
KindWithContent::TaskCancelation { .. }
|
||||
| KindWithContent::TaskDeletion { .. }
|
||||
| KindWithContent::DumpCreation { .. }
|
||||
| KindWithContent::UpgradeDatabase { .. }
|
||||
| KindWithContent::SnapshotCreation => {
|
||||
panic!("The autobatcher should never be called with tasks that don't apply to an index.")
|
||||
}
|
||||
|
@ -47,6 +47,9 @@ pub(crate) enum Batch {
|
||||
IndexSwap {
|
||||
task: Task,
|
||||
},
|
||||
UpgradeDatabase {
|
||||
tasks: Vec<Task>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -105,6 +108,7 @@ impl Batch {
|
||||
}
|
||||
Batch::SnapshotCreation(tasks)
|
||||
| Batch::TaskDeletions(tasks)
|
||||
| Batch::UpgradeDatabase { tasks }
|
||||
| Batch::IndexDeletion { tasks, .. } => {
|
||||
RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid))
|
||||
}
|
||||
@ -138,6 +142,7 @@ impl Batch {
|
||||
| TaskDeletions(_)
|
||||
| SnapshotCreation(_)
|
||||
| Dump(_)
|
||||
| UpgradeDatabase { .. }
|
||||
| IndexSwap { .. } => None,
|
||||
IndexOperation { op, .. } => Some(op.index_uid()),
|
||||
IndexCreation { index_uid, .. }
|
||||
@ -162,6 +167,7 @@ impl fmt::Display for Batch {
|
||||
Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?,
|
||||
Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?,
|
||||
Batch::IndexSwap { .. } => f.write_str("IndexSwap")?,
|
||||
Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?,
|
||||
};
|
||||
match index_uid {
|
||||
Some(name) => f.write_fmt(format_args!(" on {name:?} from tasks: {tasks:?}")),
|
||||
@ -427,9 +433,18 @@ impl IndexScheduler {
|
||||
let mut current_batch = ProcessingBatch::new(batch_id);
|
||||
|
||||
let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?;
|
||||
let to_cancel = self.queue.tasks.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
|
||||
|
||||
// 0. The priority over everything is to upgrade the instance
|
||||
let upgrade = self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)? & enqueued;
|
||||
// There shouldn't be multiple upgrade tasks but just in case we're going to batch all of them at the same time
|
||||
if !upgrade.is_empty() {
|
||||
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, upgrade)?;
|
||||
current_batch.processing(&mut tasks);
|
||||
return Ok(Some((Batch::UpgradeDatabase { tasks }, current_batch)));
|
||||
}
|
||||
|
||||
// 1. we get the last task to cancel.
|
||||
let to_cancel = self.queue.tasks.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
|
||||
if let Some(task_id) = to_cancel.max() {
|
||||
let mut task =
|
||||
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
|
@ -6,6 +6,7 @@ mod process_batch;
|
||||
mod process_dump_creation;
|
||||
mod process_index_operation;
|
||||
mod process_snapshot_creation;
|
||||
mod process_upgrade;
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
#[cfg(test)]
|
||||
|
@ -3,7 +3,7 @@ use std::sync::atomic::Ordering;
|
||||
|
||||
use meilisearch_types::batches::BatchId;
|
||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::progress::Progress;
|
||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||
use meilisearch_types::milli::{self};
|
||||
use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task};
|
||||
use milli::update::Settings as MilliSettings;
|
||||
@ -13,7 +13,7 @@ use super::create_batch::Batch;
|
||||
use crate::processing::{
|
||||
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress,
|
||||
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
|
||||
UpdateIndexProgress, VariableNameStep,
|
||||
UpdateIndexProgress,
|
||||
};
|
||||
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
|
||||
use crate::{Error, IndexScheduler, Result, TaskId};
|
||||
@ -297,7 +297,7 @@ impl IndexScheduler {
|
||||
}
|
||||
progress.update_progress(SwappingTheIndexes::SwappingTheIndexes);
|
||||
for (step, swap) in swaps.iter().enumerate() {
|
||||
progress.update_progress(VariableNameStep::new(
|
||||
progress.update_progress(VariableNameStep::<SwappingTheIndexes>::new(
|
||||
format!("swapping index {} and {}", swap.indexes.0, swap.indexes.1),
|
||||
step as u32,
|
||||
swaps.len() as u32,
|
||||
@ -314,6 +314,7 @@ impl IndexScheduler {
|
||||
task.status = Status::Succeeded;
|
||||
Ok(vec![task])
|
||||
}
|
||||
Batch::UpgradeDatabase { tasks } => self.process_upgrade(progress, tasks),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,16 +5,14 @@ use std::sync::atomic::Ordering;
|
||||
use dump::IndexMetadata;
|
||||
use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
|
||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
||||
use meilisearch_types::milli::progress::Progress;
|
||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||
use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
|
||||
use meilisearch_types::milli::{self};
|
||||
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
|
||||
use time::macros::format_description;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::processing::{
|
||||
AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress, VariableNameStep,
|
||||
};
|
||||
use crate::processing::{AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress};
|
||||
use crate::{Error, IndexScheduler, Result};
|
||||
|
||||
impl IndexScheduler {
|
||||
@ -106,8 +104,12 @@ impl IndexScheduler {
|
||||
progress.update_progress(DumpCreationProgress::DumpTheIndexes);
|
||||
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32;
|
||||
let mut count = 0;
|
||||
let () = self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
|
||||
progress.update_progress(VariableNameStep::new(uid.to_string(), count, nb_indexes));
|
||||
self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
|
||||
progress.update_progress(VariableNameStep::<DumpCreationProgress>::new(
|
||||
uid.to_string(),
|
||||
count,
|
||||
nb_indexes,
|
||||
));
|
||||
count += 1;
|
||||
|
||||
let rtxn = index.read_txn()?;
|
||||
|
@ -3,12 +3,12 @@ use std::fs;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use meilisearch_types::heed::CompactionOption;
|
||||
use meilisearch_types::milli::progress::Progress;
|
||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||
use meilisearch_types::milli::{self};
|
||||
use meilisearch_types::tasks::{Status, Task};
|
||||
use meilisearch_types::{compression, VERSION_FILE_NAME};
|
||||
|
||||
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress, VariableNameStep};
|
||||
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress};
|
||||
use crate::{Error, IndexScheduler, Result};
|
||||
|
||||
impl IndexScheduler {
|
||||
@ -74,7 +74,9 @@ impl IndexScheduler {
|
||||
|
||||
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
|
||||
let (name, uuid) = result?;
|
||||
progress.update_progress(VariableNameStep::new(name, i as u32, nb_indexes));
|
||||
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
||||
name, i as u32, nb_indexes,
|
||||
));
|
||||
let index = self.index_mapper.index(&rtxn, name)?;
|
||||
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
|
||||
fs::create_dir_all(&dst)?;
|
||||
|
42
crates/index-scheduler/src/scheduler/process_upgrade/mod.rs
Normal file
42
crates/index-scheduler/src/scheduler/process_upgrade/mod.rs
Normal file
@ -0,0 +1,42 @@
|
||||
use meilisearch_types::{
|
||||
milli,
|
||||
milli::progress::{Progress, VariableNameStep},
|
||||
tasks::{KindWithContent, Status, Task},
|
||||
versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH},
|
||||
};
|
||||
|
||||
use crate::{processing::UpgradeDatabaseProgress, Error, IndexScheduler, Result};
|
||||
|
||||
impl IndexScheduler {
|
||||
pub(super) fn process_upgrade(
|
||||
&self,
|
||||
progress: Progress,
|
||||
mut tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>> {
|
||||
progress.update_progress(UpgradeDatabaseProgress::EnsuringCorrectnessOfTheSwap);
|
||||
|
||||
// Since we should not have multiple upgrade tasks, we're only going to process the latest one:
|
||||
let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
enum UpgradeIndex {}
|
||||
let indexes = self.index_names()?;
|
||||
|
||||
for (i, uid) in indexes.iter().enumerate() {
|
||||
progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
|
||||
format!("Upgrading index `{uid}`"),
|
||||
i as u32,
|
||||
indexes.len() as u32,
|
||||
));
|
||||
let index = self.index(uid)?;
|
||||
milli::update::upgrade::upgrade(&index, from, progress.clone());
|
||||
}
|
||||
|
||||
for task in tasks.iter_mut() {
|
||||
task.status = Status::Succeeded;
|
||||
}
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
}
|
@ -712,7 +712,7 @@ fn basic_get_stats() {
|
||||
let kind = index_creation_task("whalo", "fish");
|
||||
let _task = index_scheduler.register(kind, None, false).unwrap();
|
||||
|
||||
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
|
||||
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
|
||||
{
|
||||
"indexes": {
|
||||
"catto": 1,
|
||||
@ -738,13 +738,14 @@ fn basic_get_stats() {
|
||||
"settingsUpdate": 0,
|
||||
"snapshotCreation": 0,
|
||||
"taskCancelation": 0,
|
||||
"taskDeletion": 0
|
||||
"taskDeletion": 0,
|
||||
"upgradeDatabase": 0
|
||||
}
|
||||
}
|
||||
"###);
|
||||
"#);
|
||||
|
||||
handle.advance_till([Start, BatchCreated]);
|
||||
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
|
||||
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
|
||||
{
|
||||
"indexes": {
|
||||
"catto": 1,
|
||||
@ -770,10 +771,11 @@ fn basic_get_stats() {
|
||||
"settingsUpdate": 0,
|
||||
"snapshotCreation": 0,
|
||||
"taskCancelation": 0,
|
||||
"taskDeletion": 0
|
||||
"taskDeletion": 0,
|
||||
"upgradeDatabase": 0
|
||||
}
|
||||
}
|
||||
"###);
|
||||
"#);
|
||||
|
||||
handle.advance_till([
|
||||
InsideProcessBatch,
|
||||
@ -783,7 +785,7 @@ fn basic_get_stats() {
|
||||
Start,
|
||||
BatchCreated,
|
||||
]);
|
||||
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
|
||||
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
|
||||
{
|
||||
"indexes": {
|
||||
"catto": 1,
|
||||
@ -809,10 +811,11 @@ fn basic_get_stats() {
|
||||
"settingsUpdate": 0,
|
||||
"snapshotCreation": 0,
|
||||
"taskCancelation": 0,
|
||||
"taskDeletion": 0
|
||||
"taskDeletion": 0,
|
||||
"upgradeDatabase": 0
|
||||
}
|
||||
}
|
||||
"###);
|
||||
"#);
|
||||
|
||||
// now we make one more batch, the started_at field of the new tasks will be past `second_start_time`
|
||||
handle.advance_till([
|
||||
@ -823,7 +826,7 @@ fn basic_get_stats() {
|
||||
Start,
|
||||
BatchCreated,
|
||||
]);
|
||||
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
|
||||
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
|
||||
{
|
||||
"indexes": {
|
||||
"catto": 1,
|
||||
@ -849,10 +852,11 @@ fn basic_get_stats() {
|
||||
"settingsUpdate": 0,
|
||||
"snapshotCreation": 0,
|
||||
"taskCancelation": 0,
|
||||
"taskDeletion": 0
|
||||
"taskDeletion": 0,
|
||||
"upgradeDatabase": 0
|
||||
}
|
||||
}
|
||||
"###);
|
||||
"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -109,6 +109,7 @@ impl IndexScheduler {
|
||||
max_number_of_batched_tasks: usize::MAX,
|
||||
batched_tasks_size_limit: u64::MAX,
|
||||
instance_features: Default::default(),
|
||||
auto_upgrade: true, // Don't cost much and will ensure the happy path works
|
||||
};
|
||||
configuration(&mut options);
|
||||
|
||||
|
43
crates/index-scheduler/src/upgrade/mod.rs
Normal file
43
crates/index-scheduler/src/upgrade/mod.rs
Normal file
@ -0,0 +1,43 @@
|
||||
use std::path::Path;
|
||||
|
||||
use meilisearch_types::{
|
||||
heed,
|
||||
tasks::{KindWithContent, Status, Task},
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::info;
|
||||
|
||||
use crate::queue::TaskQueue;
|
||||
|
||||
pub fn upgrade_task_queue(tasks_path: &Path, version: (u32, u32, u32)) -> anyhow::Result<()> {
|
||||
info!("Upgrading the task queue");
|
||||
let env = unsafe {
|
||||
heed::EnvOpenOptions::new()
|
||||
.max_dbs(19)
|
||||
// Since that's the only database memory-mapped currently we don't need to check the budget yet
|
||||
.map_size(100 * 1024 * 1024)
|
||||
.open(tasks_path)
|
||||
}?;
|
||||
let mut wtxn = env.write_txn()?;
|
||||
let queue = TaskQueue::new(&env, &mut wtxn)?;
|
||||
let uid = queue.next_task_id(&wtxn)?;
|
||||
queue.register(
|
||||
&mut wtxn,
|
||||
&Task {
|
||||
uid,
|
||||
batch_uid: None,
|
||||
enqueued_at: OffsetDateTime::now_utc(),
|
||||
started_at: None,
|
||||
finished_at: None,
|
||||
error: None,
|
||||
canceled_by: None,
|
||||
details: None,
|
||||
status: Status::Enqueued,
|
||||
kind: KindWithContent::UpgradeDatabase { from: version },
|
||||
},
|
||||
)?;
|
||||
wtxn.commit()?;
|
||||
// Should be pretty much instantaneous since we're the only one reading this env
|
||||
env.prepare_for_closing().wait();
|
||||
Ok(())
|
||||
}
|
@ -234,6 +234,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
|
||||
K::TaskCancelation { .. }
|
||||
| K::TaskDeletion { .. }
|
||||
| K::DumpCreation { .. }
|
||||
| K::UpgradeDatabase { .. }
|
||||
| K::SnapshotCreation => (),
|
||||
};
|
||||
if let Some(Details::IndexSwap { swaps }) = &mut task.details {
|
||||
@ -547,6 +548,9 @@ impl crate::IndexScheduler {
|
||||
Details::Dump { dump_uid: _ } => {
|
||||
assert_eq!(kind.as_kind(), Kind::DumpCreation);
|
||||
}
|
||||
Details::UpgradeDatabase { from: _ } => {
|
||||
assert_eq!(kind.as_kind(), Kind::UpgradeDatabase);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -370,7 +370,8 @@ VectorEmbeddingError , InvalidRequest , BAD_REQUEST ;
|
||||
NotFoundSimilarId , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidDocumentEditionContext , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidDocumentEditionFunctionFilter , InvalidRequest , BAD_REQUEST ;
|
||||
EditDocumentsByFunctionError , InvalidRequest , BAD_REQUEST
|
||||
EditDocumentsByFunctionError , InvalidRequest , BAD_REQUEST ;
|
||||
CouldNotUpgrade , InvalidRequest , BAD_REQUEST
|
||||
}
|
||||
|
||||
impl ErrorCode for JoinError {
|
||||
@ -453,6 +454,9 @@ impl ErrorCode for milli::Error {
|
||||
| UserError::DocumentEditionCompilationError(_) => {
|
||||
Code::EditDocumentsByFunctionError
|
||||
}
|
||||
UserError::TooOldForUpgrade(_, _, _)
|
||||
| UserError::CannotDowngrade(_, _, _)
|
||||
| UserError::CannotUpgradeToUnknownVersion(_, _, _) => Code::CouldNotUpgrade,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -114,6 +114,8 @@ pub struct DetailsView {
|
||||
pub settings: Option<Box<Settings<Unchecked>>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub swaps: Option<Vec<IndexSwap>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub upgrade_from: Option<String>,
|
||||
}
|
||||
|
||||
impl DetailsView {
|
||||
@ -234,6 +236,11 @@ impl DetailsView {
|
||||
Some(left)
|
||||
}
|
||||
},
|
||||
upgrade_from: match (self.upgrade_from.clone(), other.upgrade_from.clone()) {
|
||||
(None, None) => None,
|
||||
(None, Some(from)) | (Some(from), None) => Some(from),
|
||||
(Some(_), Some(from)) => Some(from),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -311,6 +318,10 @@ impl From<Details> for DetailsView {
|
||||
Details::IndexSwap { swaps } => {
|
||||
DetailsView { swaps: Some(swaps), ..Default::default() }
|
||||
}
|
||||
Details::UpgradeDatabase { from } => DetailsView {
|
||||
upgrade_from: Some(format!("v{}.{}.{}", from.0, from.1, from.2)),
|
||||
..Default::default()
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -50,6 +50,7 @@ impl Task {
|
||||
| SnapshotCreation
|
||||
| TaskCancelation { .. }
|
||||
| TaskDeletion { .. }
|
||||
| UpgradeDatabase { .. }
|
||||
| IndexSwap { .. } => None,
|
||||
DocumentAdditionOrUpdate { index_uid, .. }
|
||||
| DocumentEdition { index_uid, .. }
|
||||
@ -84,7 +85,8 @@ impl Task {
|
||||
| KindWithContent::TaskCancelation { .. }
|
||||
| KindWithContent::TaskDeletion { .. }
|
||||
| KindWithContent::DumpCreation { .. }
|
||||
| KindWithContent::SnapshotCreation => None,
|
||||
| KindWithContent::SnapshotCreation
|
||||
| KindWithContent::UpgradeDatabase { .. } => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -150,6 +152,9 @@ pub enum KindWithContent {
|
||||
instance_uid: Option<InstanceUid>,
|
||||
},
|
||||
SnapshotCreation,
|
||||
UpgradeDatabase {
|
||||
from: (u32, u32, u32),
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
|
||||
@ -175,6 +180,7 @@ impl KindWithContent {
|
||||
KindWithContent::TaskDeletion { .. } => Kind::TaskDeletion,
|
||||
KindWithContent::DumpCreation { .. } => Kind::DumpCreation,
|
||||
KindWithContent::SnapshotCreation => Kind::SnapshotCreation,
|
||||
KindWithContent::UpgradeDatabase { .. } => Kind::UpgradeDatabase,
|
||||
}
|
||||
}
|
||||
|
||||
@ -185,7 +191,8 @@ impl KindWithContent {
|
||||
DumpCreation { .. }
|
||||
| SnapshotCreation
|
||||
| TaskCancelation { .. }
|
||||
| TaskDeletion { .. } => vec![],
|
||||
| TaskDeletion { .. }
|
||||
| UpgradeDatabase { .. } => vec![],
|
||||
DocumentAdditionOrUpdate { index_uid, .. }
|
||||
| DocumentEdition { index_uid, .. }
|
||||
| DocumentDeletion { index_uid, .. }
|
||||
@ -262,6 +269,7 @@ impl KindWithContent {
|
||||
}),
|
||||
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
|
||||
KindWithContent::SnapshotCreation => None,
|
||||
KindWithContent::UpgradeDatabase { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
@ -320,6 +328,7 @@ impl KindWithContent {
|
||||
}),
|
||||
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
|
||||
KindWithContent::SnapshotCreation => None,
|
||||
KindWithContent::UpgradeDatabase { .. } => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -360,6 +369,7 @@ impl From<&KindWithContent> for Option<Details> {
|
||||
}),
|
||||
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
|
||||
KindWithContent::SnapshotCreation => None,
|
||||
KindWithContent::UpgradeDatabase { .. } => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -468,6 +478,7 @@ pub enum Kind {
|
||||
TaskDeletion,
|
||||
DumpCreation,
|
||||
SnapshotCreation,
|
||||
UpgradeDatabase,
|
||||
}
|
||||
|
||||
impl Kind {
|
||||
@ -484,6 +495,7 @@ impl Kind {
|
||||
| Kind::TaskCancelation
|
||||
| Kind::TaskDeletion
|
||||
| Kind::DumpCreation
|
||||
| Kind::UpgradeDatabase
|
||||
| Kind::SnapshotCreation => false,
|
||||
}
|
||||
}
|
||||
@ -503,6 +515,7 @@ impl Display for Kind {
|
||||
Kind::TaskDeletion => write!(f, "taskDeletion"),
|
||||
Kind::DumpCreation => write!(f, "dumpCreation"),
|
||||
Kind::SnapshotCreation => write!(f, "snapshotCreation"),
|
||||
Kind::UpgradeDatabase => write!(f, "upgradeDatabase"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -607,6 +620,9 @@ pub enum Details {
|
||||
IndexSwap {
|
||||
swaps: Vec<IndexSwap>,
|
||||
},
|
||||
UpgradeDatabase {
|
||||
from: (usize, usize, usize),
|
||||
},
|
||||
}
|
||||
|
||||
impl Details {
|
||||
@ -627,6 +643,7 @@ impl Details {
|
||||
Self::SettingsUpdate { .. }
|
||||
| Self::IndexInfo { .. }
|
||||
| Self::Dump { .. }
|
||||
| Self::UpgradeDatabase { .. }
|
||||
| Self::IndexSwap { .. } => (),
|
||||
}
|
||||
|
||||
|
@ -5,9 +5,9 @@ use std::path::Path;
|
||||
/// The name of the file that contains the version of the database.
|
||||
pub const VERSION_FILE_NAME: &str = "VERSION";
|
||||
|
||||
static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR");
|
||||
static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
|
||||
static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
|
||||
pub static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR");
|
||||
pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
|
||||
pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
|
||||
|
||||
/// Persists the version of the current Meilisearch binary to a VERSION file
|
||||
pub fn create_current_version_file(db_path: &Path) -> io::Result<()> {
|
||||
@ -24,17 +24,6 @@ pub fn create_version_file(
|
||||
fs::write(version_path, format!("{}.{}.{}", major, minor, patch))
|
||||
}
|
||||
|
||||
/// Ensures Meilisearch version is compatible with the database, returns an error versions mismatch.
|
||||
pub fn check_version_file(db_path: &Path) -> anyhow::Result<()> {
|
||||
let (major, minor, patch) = get_version(db_path)?;
|
||||
|
||||
if major != VERSION_MAJOR || minor != VERSION_MINOR {
|
||||
return Err(VersionFileError::VersionMismatch { major, minor, patch }.into());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_version(db_path: &Path) -> Result<(String, String, String), VersionFileError> {
|
||||
let version_path = db_path.join(VERSION_FILE_NAME);
|
||||
|
||||
@ -48,7 +37,7 @@ pub fn get_version(db_path: &Path) -> Result<(String, String, String), VersionFi
|
||||
}
|
||||
|
||||
pub fn parse_version(version: &str) -> Result<(String, String, String), VersionFileError> {
|
||||
let version_components = version.split('.').collect::<Vec<_>>();
|
||||
let version_components = version.trim().split('.').collect::<Vec<_>>();
|
||||
let (major, minor, patch) = match &version_components[..] {
|
||||
[major, minor, patch] => (major.to_string(), minor.to_string(), patch.to_string()),
|
||||
_ => return Err(VersionFileError::MalformedVersionFile),
|
||||
|
@ -190,6 +190,7 @@ struct Infos {
|
||||
experimental_drop_search_after: usize,
|
||||
experimental_nb_searches_per_core: usize,
|
||||
experimental_logs_mode: LogMode,
|
||||
experimental_dumpless_upgrade: bool,
|
||||
experimental_replication_parameters: bool,
|
||||
experimental_enable_logs_route: bool,
|
||||
experimental_reduce_indexing_memory_usage: bool,
|
||||
@ -236,6 +237,7 @@ impl Infos {
|
||||
experimental_drop_search_after,
|
||||
experimental_nb_searches_per_core,
|
||||
experimental_logs_mode,
|
||||
experimental_dumpless_upgrade,
|
||||
experimental_replication_parameters,
|
||||
experimental_enable_logs_route,
|
||||
experimental_reduce_indexing_memory_usage,
|
||||
@ -299,6 +301,7 @@ impl Infos {
|
||||
experimental_drop_search_after: experimental_drop_search_after.into(),
|
||||
experimental_nb_searches_per_core: experimental_nb_searches_per_core.into(),
|
||||
experimental_logs_mode,
|
||||
experimental_dumpless_upgrade,
|
||||
experimental_replication_parameters,
|
||||
experimental_enable_logs_route: experimental_enable_logs_route | logs_route,
|
||||
experimental_reduce_indexing_memory_usage,
|
||||
|
@ -32,13 +32,16 @@ use analytics::Analytics;
|
||||
use anyhow::bail;
|
||||
use error::PayloadError;
|
||||
use extractors::payload::PayloadConfig;
|
||||
use index_scheduler::upgrade::upgrade_task_queue;
|
||||
use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
|
||||
use meilisearch_auth::AuthController;
|
||||
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
|
||||
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod};
|
||||
use meilisearch_types::settings::apply_settings_to_builder;
|
||||
use meilisearch_types::tasks::KindWithContent;
|
||||
use meilisearch_types::versioning::{check_version_file, create_current_version_file};
|
||||
use meilisearch_types::versioning::{
|
||||
create_current_version_file, get_version, VersionFileError, VERSION_MAJOR, VERSION_MINOR,
|
||||
};
|
||||
use meilisearch_types::{compression, milli, VERSION_FILE_NAME};
|
||||
pub use option::Opt;
|
||||
use option::ScheduleSnapshot;
|
||||
@ -316,6 +319,7 @@ fn open_or_create_database_unchecked(
|
||||
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize,
|
||||
index_count: DEFAULT_INDEX_COUNT,
|
||||
instance_features,
|
||||
auto_upgrade: opt.experimental_dumpless_upgrade,
|
||||
})?)
|
||||
};
|
||||
|
||||
@ -334,13 +338,36 @@ fn open_or_create_database_unchecked(
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensures Meilisearch version is compatible with the database, returns an error versions mismatch.
|
||||
fn check_version_and_update_task_queue(
|
||||
db_path: &Path,
|
||||
experimental_dumpless_upgrade: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let (major, minor, patch) = get_version(db_path)?;
|
||||
|
||||
if major != VERSION_MAJOR || minor != VERSION_MINOR {
|
||||
if experimental_dumpless_upgrade {
|
||||
let version = (
|
||||
major.parse().map_err(|_| VersionFileError::MalformedVersionFile)?,
|
||||
minor.parse().map_err(|_| VersionFileError::MalformedVersionFile)?,
|
||||
patch.parse().map_err(|_| VersionFileError::MalformedVersionFile)?,
|
||||
);
|
||||
return upgrade_task_queue(&db_path.join("tasks"), version);
|
||||
} else {
|
||||
return Err(VersionFileError::VersionMismatch { major, minor, patch }.into());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ensure you're in a valid state and open the IndexScheduler + AuthController for you.
|
||||
fn open_or_create_database(
|
||||
opt: &Opt,
|
||||
empty_db: bool,
|
||||
) -> anyhow::Result<(IndexScheduler, AuthController)> {
|
||||
if !empty_db {
|
||||
check_version_file(&opt.db_path)?;
|
||||
check_version_and_update_task_queue(&opt.db_path, opt.experimental_dumpless_upgrade)?;
|
||||
}
|
||||
|
||||
open_or_create_database_unchecked(opt, OnFailure::KeepDb)
|
||||
|
@ -49,6 +49,7 @@ const MEILI_IGNORE_DUMP_IF_DB_EXISTS: &str = "MEILI_IGNORE_DUMP_IF_DB_EXISTS";
|
||||
const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR";
|
||||
const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL";
|
||||
const MEILI_EXPERIMENTAL_LOGS_MODE: &str = "MEILI_EXPERIMENTAL_LOGS_MODE";
|
||||
const MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE: &str = "MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE";
|
||||
const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS";
|
||||
const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE";
|
||||
const MEILI_EXPERIMENTAL_CONTAINS_FILTER: &str = "MEILI_EXPERIMENTAL_CONTAINS_FILTER";
|
||||
@ -400,6 +401,13 @@ pub struct Opt {
|
||||
#[serde(default)]
|
||||
pub experimental_logs_mode: LogMode,
|
||||
|
||||
/// Experimental dumpless upgrade. For more information, see: <https://github.com/orgs/meilisearch/discussions/723>
|
||||
///
|
||||
/// When set, Meilisearch will auto-update its database without using a dump.
|
||||
#[clap(long, env = MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE, default_value_t)]
|
||||
#[serde(default)]
|
||||
pub experimental_dumpless_upgrade: bool,
|
||||
|
||||
/// Experimental logs route feature. For more information,
|
||||
/// see: <https://github.com/orgs/meilisearch/discussions/721>
|
||||
///
|
||||
@ -535,6 +543,7 @@ impl Opt {
|
||||
experimental_drop_search_after,
|
||||
experimental_nb_searches_per_core,
|
||||
experimental_logs_mode,
|
||||
experimental_dumpless_upgrade,
|
||||
experimental_enable_logs_route,
|
||||
experimental_replication_parameters,
|
||||
experimental_reduce_indexing_memory_usage,
|
||||
@ -608,6 +617,10 @@ impl Opt {
|
||||
MEILI_EXPERIMENTAL_LOGS_MODE,
|
||||
experimental_logs_mode.to_string(),
|
||||
);
|
||||
export_to_env_if_not_present(
|
||||
MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE,
|
||||
experimental_dumpless_upgrade.to_string(),
|
||||
);
|
||||
export_to_env_if_not_present(
|
||||
MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS,
|
||||
experimental_replication_parameters.to_string(),
|
||||
|
@ -912,14 +912,14 @@ mod tests {
|
||||
{
|
||||
let params = "types=createIndex";
|
||||
let err = deserr_query_params::<TaskDeletionOrCancelationQuery>(params).unwrap_err();
|
||||
snapshot!(meili_snap::json_string!(err), @r###"
|
||||
snapshot!(meili_snap::json_string!(err), @r#"
|
||||
{
|
||||
"message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.",
|
||||
"code": "invalid_task_types",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
|
||||
}
|
||||
"###);
|
||||
"#);
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
|
1
crates/meilisearch/src/upgrade/mod.rs
Normal file
1
crates/meilisearch/src/upgrade/mod.rs
Normal file
@ -0,0 +1 @@
|
||||
|
@ -42,7 +42,7 @@ async fn batch_bad_types() {
|
||||
snapshot!(code, @"400 Bad Request");
|
||||
snapshot!(json_string!(response), @r#"
|
||||
{
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.",
|
||||
"code": "invalid_task_types",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
|
||||
|
@ -95,36 +95,36 @@ async fn task_bad_types() {
|
||||
|
||||
let (response, code) = server.tasks_filter("types=doggo").await;
|
||||
snapshot!(code, @"400 Bad Request");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
snapshot!(json_string!(response), @r#"
|
||||
{
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.",
|
||||
"code": "invalid_task_types",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
|
||||
}
|
||||
"###);
|
||||
"#);
|
||||
|
||||
let (response, code) = server.cancel_tasks("types=doggo").await;
|
||||
snapshot!(code, @"400 Bad Request");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
snapshot!(json_string!(response), @r#"
|
||||
{
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.",
|
||||
"code": "invalid_task_types",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
|
||||
}
|
||||
"###);
|
||||
"#);
|
||||
|
||||
let (response, code) = server.delete_tasks("types=doggo").await;
|
||||
snapshot!(code, @"400 Bad Request");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
snapshot!(json_string!(response), @r#"
|
||||
{
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.",
|
||||
"code": "invalid_task_types",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
|
||||
}
|
||||
"###);
|
||||
"#);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
|
@ -1,2 +1,6 @@
|
||||
pub static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR");
|
||||
pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
|
||||
pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
|
||||
|
||||
pub const RESERVED_VECTORS_FIELD_NAME: &str = "_vectors";
|
||||
pub const RESERVED_GEO_FIELD_NAME: &str = "_geo";
|
||||
|
@ -10,7 +10,7 @@ use rhai::EvalAltResult;
|
||||
use serde_json::Value;
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::constants::RESERVED_GEO_FIELD_NAME;
|
||||
use crate::constants::{RESERVED_GEO_FIELD_NAME, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
||||
use crate::documents::{self, DocumentsBatchCursorError};
|
||||
use crate::thread_pool_no_abort::PanicCatched;
|
||||
use crate::{CriterionError, DocumentId, FieldId, Object, SortError};
|
||||
@ -286,6 +286,12 @@ and can not be more than 511 bytes.", .document_id.to_string()
|
||||
DocumentEditionCompilationError(rhai::ParseError),
|
||||
#[error("{0}")]
|
||||
DocumentEmbeddingError(String),
|
||||
#[error("Upgrade could not be processed because v{0}.{1}.{2} of the database is too old. Please re-open the v{0}.{1}.{2} and use a dump to upgrade your version. The oldest version meilisearch can upgrade from is v1.12.0.")]
|
||||
TooOldForUpgrade(u32, u32, u32),
|
||||
#[error("Upgrade could not be processed because the database version (v{0}.{1}.{2}) is newer than the targeted version (v{}.{}.{})", VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)]
|
||||
CannotDowngrade(u32, u32, u32),
|
||||
#[error("Cannot upgrade to unknown version v{0}.{1}.{2}.")]
|
||||
CannotUpgradeToUnknownVersion(u32, u32, u32),
|
||||
}
|
||||
|
||||
impl From<crate::vector::Error> for Error {
|
||||
|
@ -10,6 +10,7 @@ mod roaring_bitmap_length;
|
||||
mod str_beu32_codec;
|
||||
mod str_ref;
|
||||
mod str_str_u8_codec;
|
||||
pub mod version;
|
||||
|
||||
pub use byte_slice_ref::BytesRefCodec;
|
||||
use heed::BoxedError;
|
||||
|
44
crates/milli/src/heed_codec/version.rs
Normal file
44
crates/milli/src/heed_codec/version.rs
Normal file
@ -0,0 +1,44 @@
|
||||
use std::mem::size_of;
|
||||
use std::{borrow::Cow, mem::size_of_val};
|
||||
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use heed::{BoxedError, BytesDecode, BytesEncode};
|
||||
|
||||
const VERSION_SIZE: usize = std::mem::size_of::<u32>() * 3;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[error(
|
||||
"Could not decode the version: Expected {} bytes but instead received {0} bytes",
|
||||
VERSION_SIZE
|
||||
)]
|
||||
pub struct DecodeVersionError(usize);
|
||||
|
||||
pub struct VersionCodec;
|
||||
impl<'a> BytesEncode<'a> for VersionCodec {
|
||||
type EItem = (u32, u32, u32);
|
||||
|
||||
fn bytes_encode(item: &'a Self::EItem) -> Result<Cow<'a, [u8]>, BoxedError> {
|
||||
let mut ret = Vec::with_capacity(size_of::<u32>() * 3);
|
||||
ret.extend(&item.0.to_be_bytes());
|
||||
ret.extend(&item.1.to_be_bytes());
|
||||
ret.extend(&item.2.to_be_bytes());
|
||||
Ok(Cow::Owned(ret))
|
||||
}
|
||||
}
|
||||
impl<'a> BytesDecode<'a> for VersionCodec {
|
||||
type DItem = (u32, u32, u32);
|
||||
|
||||
fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, BoxedError> {
|
||||
if bytes.len() != VERSION_SIZE {
|
||||
Err(Box::new(DecodeVersionError(bytes.len())))
|
||||
} else {
|
||||
let major = BigEndian::read_u32(bytes);
|
||||
let bytes = &bytes[size_of_val(&major)..];
|
||||
let minor = BigEndian::read_u32(bytes);
|
||||
let bytes = &bytes[size_of_val(&major)..];
|
||||
let patch = BigEndian::read_u32(bytes);
|
||||
|
||||
Ok((major, minor, patch))
|
||||
}
|
||||
}
|
||||
}
|
@ -10,7 +10,7 @@ use roaring::RoaringBitmap;
|
||||
use rstar::RTree;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::constants::RESERVED_VECTORS_FIELD_NAME;
|
||||
use crate::constants::{self, RESERVED_VECTORS_FIELD_NAME};
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::error::{InternalError, UserError};
|
||||
use crate::fields_ids_map::FieldsIdsMap;
|
||||
@ -18,6 +18,7 @@ use crate::heed_codec::facet::{
|
||||
FacetGroupKeyCodec, FacetGroupValueCodec, FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec,
|
||||
FieldIdCodec, OrderedF64Codec,
|
||||
};
|
||||
use crate::heed_codec::version::VersionCodec;
|
||||
use crate::heed_codec::{BEU16StrCodec, FstSetCodec, StrBEU16Codec, StrRefCodec};
|
||||
use crate::order_by_map::OrderByMap;
|
||||
use crate::proximity::ProximityPrecision;
|
||||
@ -33,6 +34,7 @@ pub const DEFAULT_MIN_WORD_LEN_ONE_TYPO: u8 = 5;
|
||||
pub const DEFAULT_MIN_WORD_LEN_TWO_TYPOS: u8 = 9;
|
||||
|
||||
pub mod main_key {
|
||||
pub const VERSION_KEY: &str = "version";
|
||||
pub const CRITERIA_KEY: &str = "criteria";
|
||||
pub const DISPLAYED_FIELDS_KEY: &str = "displayed-fields";
|
||||
pub const DISTINCT_FIELD_KEY: &str = "distinct-field-key";
|
||||
@ -223,12 +225,9 @@ impl Index {
|
||||
let vector_arroy = env.create_database(&mut wtxn, Some(VECTOR_ARROY))?;
|
||||
|
||||
let documents = env.create_database(&mut wtxn, Some(DOCUMENTS))?;
|
||||
wtxn.commit()?;
|
||||
|
||||
Index::set_creation_dates(&env, main, created_at, updated_at)?;
|
||||
|
||||
Ok(Index {
|
||||
env,
|
||||
let this = Index {
|
||||
env: env.clone(),
|
||||
main,
|
||||
external_documents_ids,
|
||||
word_docids,
|
||||
@ -253,7 +252,22 @@ impl Index {
|
||||
vector_arroy,
|
||||
embedder_category_id,
|
||||
documents,
|
||||
})
|
||||
};
|
||||
if this.get_version(&wtxn)?.is_none() {
|
||||
this.put_version(
|
||||
&mut wtxn,
|
||||
(
|
||||
constants::VERSION_MAJOR.parse().unwrap(),
|
||||
constants::VERSION_MINOR.parse().unwrap(),
|
||||
constants::VERSION_PATCH.parse().unwrap(),
|
||||
),
|
||||
)?;
|
||||
}
|
||||
wtxn.commit()?;
|
||||
|
||||
Index::set_creation_dates(&this.env, this.main, created_at, updated_at)?;
|
||||
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
pub fn new<P: AsRef<Path>>(options: heed::EnvOpenOptions, path: P) -> Result<Index> {
|
||||
@ -331,6 +345,26 @@ impl Index {
|
||||
self.env.prepare_for_closing()
|
||||
}
|
||||
|
||||
/* version */
|
||||
|
||||
/// Writes the version of the database.
|
||||
pub(crate) fn put_version(
|
||||
&self,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
(major, minor, patch): (u32, u32, u32),
|
||||
) -> heed::Result<()> {
|
||||
self.main.remap_types::<Str, VersionCodec>().put(
|
||||
wtxn,
|
||||
main_key::VERSION_KEY,
|
||||
&(major, minor, patch),
|
||||
)
|
||||
}
|
||||
|
||||
/// Get the version of the database. `None` if it was never set.
|
||||
pub(crate) fn get_version(&self, rtxn: &RoTxn<'_>) -> heed::Result<Option<(u32, u32, u32)>> {
|
||||
self.main.remap_types::<Str, VersionCodec>().get(rtxn, main_key::VERSION_KEY)
|
||||
}
|
||||
|
||||
/* documents ids */
|
||||
|
||||
/// Writes the documents ids that corresponds to the user-ids-documents-ids FST.
|
||||
|
@ -1,5 +1,6 @@
|
||||
use std::any::TypeId;
|
||||
use std::borrow::Cow;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
@ -153,3 +154,41 @@ pub struct ProgressStepView {
|
||||
pub finished: u32,
|
||||
pub total: u32,
|
||||
}
|
||||
|
||||
/// Used when the name can change but it's still the same step.
|
||||
/// To avoid conflicts on the `TypeId`, create a unique type every time you use this step:
|
||||
/// ```text
|
||||
/// enum UpgradeVersion {}
|
||||
///
|
||||
/// progress.update_progress(VariableNameStep::<UpgradeVersion>::new(
|
||||
/// "v1 to v2",
|
||||
/// 0,
|
||||
/// 10,
|
||||
/// ));
|
||||
/// ```
|
||||
pub struct VariableNameStep<U: Send + Sync + 'static> {
|
||||
name: String,
|
||||
current: u32,
|
||||
total: u32,
|
||||
phantom: PhantomData<U>,
|
||||
}
|
||||
|
||||
impl<U: Send + Sync + 'static> VariableNameStep<U> {
|
||||
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
|
||||
Self { name: name.into(), current, total, phantom: PhantomData }
|
||||
}
|
||||
}
|
||||
|
||||
impl<U: Send + Sync + 'static> Step for VariableNameStep<U> {
|
||||
fn name(&self) -> Cow<'static, str> {
|
||||
self.name.clone().into()
|
||||
}
|
||||
|
||||
fn current(&self) -> u32 {
|
||||
self.current
|
||||
}
|
||||
|
||||
fn total(&self) -> u32 {
|
||||
self.total
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ mod indexer_config;
|
||||
pub mod new;
|
||||
pub(crate) mod settings;
|
||||
mod update_step;
|
||||
pub mod upgrade;
|
||||
mod word_prefix_docids;
|
||||
mod words_prefix_integer_docids;
|
||||
mod words_prefixes_fst;
|
||||
|
65
crates/milli/src/update/upgrade/mod.rs
Normal file
65
crates/milli/src/update/upgrade/mod.rs
Normal file
@ -0,0 +1,65 @@
|
||||
use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
||||
use crate::progress::{Progress, VariableNameStep};
|
||||
use crate::{Index, Result, UserError};
|
||||
|
||||
pub fn upgrade(index: &Index, base_version: (u32, u32, u32), progress: Progress) -> Result<()> {
|
||||
let wtxn = index.env.write_txn()?;
|
||||
let from = index.get_version(&wtxn)?;
|
||||
let upgrade_functions =
|
||||
[(v1_12_to_v1_13 as fn(&Index, Progress) -> Result<()>, "Upgrading from v1.12 to v1.13")];
|
||||
|
||||
let current_major: u32 = VERSION_MAJOR.parse().unwrap();
|
||||
let current_minor: u32 = VERSION_MINOR.parse().unwrap();
|
||||
let current_patch: u32 = VERSION_PATCH.parse().unwrap();
|
||||
|
||||
let start = match from {
|
||||
// If there was no version it means we're coming from the base version specified by the index-scheduler
|
||||
None if base_version.0 == 1 && base_version.1 == 12 => 0,
|
||||
Some((1, 12, _)) => 0,
|
||||
|
||||
// --- Error handling
|
||||
None => {
|
||||
return Err(UserError::TooOldForUpgrade(
|
||||
base_version.0,
|
||||
base_version.1,
|
||||
base_version.2,
|
||||
)
|
||||
.into());
|
||||
}
|
||||
Some((major, minor, patch)) if major == 0 || (major == 1 && minor < 12) => {
|
||||
return Err(UserError::TooOldForUpgrade(major, minor, patch).into());
|
||||
}
|
||||
Some((major, minor, patch)) if major > current_major => {
|
||||
return Err(UserError::CannotDowngrade(major, minor, patch).into());
|
||||
}
|
||||
Some((major, minor, patch)) if major == current_major && minor > current_minor => {
|
||||
return Err(UserError::CannotDowngrade(major, minor, patch).into());
|
||||
}
|
||||
Some((major, minor, patch))
|
||||
if major == current_major && minor == current_minor && patch > current_patch =>
|
||||
{
|
||||
return Err(UserError::CannotDowngrade(major, minor, patch).into());
|
||||
}
|
||||
Some((major, minor, patch)) => {
|
||||
return Err(UserError::CannotUpgradeToUnknownVersion(major, minor, patch).into())
|
||||
}
|
||||
};
|
||||
|
||||
enum UpgradeVersion {}
|
||||
let upgrade_path = &upgrade_functions[start..];
|
||||
|
||||
for (i, (upgrade_function, upgrade_msg)) in upgrade_path.iter().enumerate() {
|
||||
progress.update_progress(VariableNameStep::<UpgradeVersion>::new(
|
||||
upgrade_msg.to_string(),
|
||||
i as u32,
|
||||
upgrade_path.len() as u32,
|
||||
));
|
||||
(upgrade_function)(index, progress.clone())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn v1_12_to_v1_13(_index: &Index, _progress: Progress) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user