Add the new tasks with most of the job done

This commit is contained in:
Tamo 2025-01-14 14:54:00 +01:00 committed by Louis Dureuil
parent b15de68831
commit d3654906bf
No known key found for this signature in database
38 changed files with 572 additions and 204 deletions

View file

@ -144,6 +144,7 @@ impl<'a> Dump<'a> {
KindWithContent::DumpCreation { keys, instance_uid }
}
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from },
},
};

View file

@ -3,7 +3,7 @@ use std::fmt::Display;
use meilisearch_types::batches::BatchId;
use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::tasks::{Kind, Status};
use meilisearch_types::{heed, milli};
use meilisearch_types::{heed, milli, versioning};
use thiserror::Error;
use crate::TaskId;

View file

@ -279,6 +279,9 @@ fn snapshot_details(d: &Details) -> String {
Details::IndexSwap { swaps } => {
format!("{{ swaps: {swaps:?} }}")
}
Details::UpgradeDatabase { from } => {
format!("{{ from: v{}.{}.{} }}", from.0, from.1, from.2)
}
}
}

View file

@ -30,6 +30,7 @@ mod queue;
mod scheduler;
#[cfg(test)]
mod test_utils;
pub mod upgrade;
mod utils;
pub mod uuid_codec;
@ -120,6 +121,8 @@ pub struct IndexSchedulerOptions {
pub batched_tasks_size_limit: u64,
/// The experimental features enabled for this instance.
pub instance_features: InstanceTogglableFeatures,
/// The experimental features enabled for this instance.
pub auto_upgrade: bool,
}
/// Structure which holds meilisearch's indexes and schedules the tasks

View file

@ -129,6 +129,12 @@ make_enum_progress! {
}
}
make_enum_progress! {
pub enum UpgradeDatabaseProgress {
EnsuringCorrectnessOfTheSwap,
}
}
make_enum_progress! {
pub enum InnerSwappingTwoIndexes {
RetrieveTheTasks,
@ -173,32 +179,6 @@ make_atomic_progress!(Document alias AtomicDocumentStep => "document" );
make_atomic_progress!(Batch alias AtomicBatchStep => "batch" );
make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" );
pub struct VariableNameStep {
name: String,
current: u32,
total: u32,
}
impl VariableNameStep {
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
Self { name: name.into(), current, total }
}
}
impl Step for VariableNameStep {
fn name(&self) -> Cow<'static, str> {
self.name.clone().into()
}
fn current(&self) -> u32 {
self.current
}
fn total(&self) -> u32 {
self.total
}
}
#[cfg(test)]
mod test {
use std::sync::atomic::Ordering;

View file

@ -20,8 +20,8 @@ use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use uuid::Uuid;
use self::batches::BatchQueue;
use self::tasks::TaskQueue;
pub(crate) use self::batches::BatchQueue;
pub(crate) use self::tasks::TaskQueue;
use crate::processing::ProcessingTasks;
use crate::utils::{
check_index_swap_validity, filter_out_references_to_newer_tasks, ProcessingBatch,

View file

@ -59,7 +59,7 @@ impl TaskQueue {
}
}
pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
Ok(Self {
all_tasks: env.create_database(wtxn, Some(db_name::ALL_TASKS))?,
status: env.create_database(wtxn, Some(db_name::STATUS))?,

View file

@ -85,6 +85,7 @@ impl From<KindWithContent> for AutobatchKind {
KindWithContent::TaskCancelation { .. }
| KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpCreation { .. }
| KindWithContent::UpgradeDatabase { .. }
| KindWithContent::SnapshotCreation => {
panic!("The autobatcher should never be called with tasks that don't apply to an index.")
}

View file

@ -47,6 +47,9 @@ pub(crate) enum Batch {
IndexSwap {
task: Task,
},
UpgradeDatabase {
tasks: Vec<Task>,
},
}
#[derive(Debug)]
@ -105,6 +108,7 @@ impl Batch {
}
Batch::SnapshotCreation(tasks)
| Batch::TaskDeletions(tasks)
| Batch::UpgradeDatabase { tasks }
| Batch::IndexDeletion { tasks, .. } => {
RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid))
}
@ -138,6 +142,7 @@ impl Batch {
| TaskDeletions(_)
| SnapshotCreation(_)
| Dump(_)
| UpgradeDatabase { .. }
| IndexSwap { .. } => None,
IndexOperation { op, .. } => Some(op.index_uid()),
IndexCreation { index_uid, .. }
@ -162,6 +167,7 @@ impl fmt::Display for Batch {
Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?,
Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?,
Batch::IndexSwap { .. } => f.write_str("IndexSwap")?,
Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?,
};
match index_uid {
Some(name) => f.write_fmt(format_args!(" on {name:?} from tasks: {tasks:?}")),
@ -427,9 +433,18 @@ impl IndexScheduler {
let mut current_batch = ProcessingBatch::new(batch_id);
let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?;
let to_cancel = self.queue.tasks.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
// 0. The priority over everything is to upgrade the instance
let upgrade = self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)? & enqueued;
// There shouldn't be multiple upgrade tasks but just in case we're going to batch all of them at the same time
if !upgrade.is_empty() {
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, upgrade)?;
current_batch.processing(&mut tasks);
return Ok(Some((Batch::UpgradeDatabase { tasks }, current_batch)));
}
// 1. we get the last task to cancel.
let to_cancel = self.queue.tasks.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
if let Some(task_id) = to_cancel.max() {
let mut task =
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;

View file

@ -6,6 +6,7 @@ mod process_batch;
mod process_dump_creation;
mod process_index_operation;
mod process_snapshot_creation;
mod process_upgrade;
#[cfg(test)]
mod test;
#[cfg(test)]

View file

@ -3,7 +3,7 @@ use std::sync::atomic::Ordering;
use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self};
use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task};
use milli::update::Settings as MilliSettings;
@ -13,7 +13,7 @@ use super::create_batch::Batch;
use crate::processing::{
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress,
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress, VariableNameStep,
UpdateIndexProgress,
};
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
use crate::{Error, IndexScheduler, Result, TaskId};
@ -297,7 +297,7 @@ impl IndexScheduler {
}
progress.update_progress(SwappingTheIndexes::SwappingTheIndexes);
for (step, swap) in swaps.iter().enumerate() {
progress.update_progress(VariableNameStep::new(
progress.update_progress(VariableNameStep::<SwappingTheIndexes>::new(
format!("swapping index {} and {}", swap.indexes.0, swap.indexes.1),
step as u32,
swaps.len() as u32,
@ -314,6 +314,7 @@ impl IndexScheduler {
task.status = Status::Succeeded;
Ok(vec![task])
}
Batch::UpgradeDatabase { tasks } => self.process_upgrade(progress, tasks),
}
}

View file

@ -4,16 +4,14 @@ use std::sync::atomic::Ordering;
use dump::IndexMetadata;
use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
use meilisearch_types::milli::{self};
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use time::macros::format_description;
use time::OffsetDateTime;
use crate::processing::{
AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress, VariableNameStep,
};
use crate::processing::{AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress};
use crate::{Error, IndexScheduler, Result};
impl IndexScheduler {
@ -108,8 +106,12 @@ impl IndexScheduler {
progress.update_progress(DumpCreationProgress::DumpTheIndexes);
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32;
let mut count = 0;
let () = self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
progress.update_progress(VariableNameStep::new(uid.to_string(), count, nb_indexes));
self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
progress.update_progress(VariableNameStep::<DumpCreationProgress>::new(
uid.to_string(),
count,
nb_indexes,
));
count += 1;
let rtxn = index.read_txn()?;

View file

@ -3,12 +3,12 @@ use std::fs;
use std::sync::atomic::Ordering;
use meilisearch_types::heed::CompactionOption;
use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self};
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME};
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress, VariableNameStep};
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress};
use crate::{Error, IndexScheduler, Result};
impl IndexScheduler {
@ -74,7 +74,9 @@ impl IndexScheduler {
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
let (name, uuid) = result?;
progress.update_progress(VariableNameStep::new(name, i as u32, nb_indexes));
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
name, i as u32, nb_indexes,
));
let index = self.index_mapper.index(&rtxn, name)?;
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
fs::create_dir_all(&dst)?;

View file

@ -0,0 +1,42 @@
use meilisearch_types::{
milli,
milli::progress::{Progress, VariableNameStep},
tasks::{KindWithContent, Status, Task},
versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH},
};
use crate::{processing::UpgradeDatabaseProgress, Error, IndexScheduler, Result};
impl IndexScheduler {
pub(super) fn process_upgrade(
&self,
progress: Progress,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
progress.update_progress(UpgradeDatabaseProgress::EnsuringCorrectnessOfTheSwap);
// Since we should not have multiple upgrade tasks, we're only going to process the latest one:
let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
unreachable!()
};
enum UpgradeIndex {}
let indexes = self.index_names()?;
for (i, uid) in indexes.iter().enumerate() {
progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
format!("Upgrading index `{uid}`"),
i as u32,
indexes.len() as u32,
));
let index = self.index(uid)?;
milli::update::upgrade::upgrade(&index, from, progress.clone());
}
for task in tasks.iter_mut() {
task.status = Status::Succeeded;
}
Ok(tasks)
}
}

View file

@ -713,68 +713,70 @@ fn basic_get_stats() {
let kind = index_creation_task("whalo", "fish");
let _task = index_scheduler.register(kind, None, false).unwrap();
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
{
"indexes": {
"catto": 1,
"doggo": 1,
"whalo": 1
},
"statuses": {
"canceled": 0,
"enqueued": 3,
"failed": 0,
"processing": 0,
"succeeded": 0
},
"types": {
"documentAdditionOrUpdate": 0,
"documentDeletion": 0,
"documentEdition": 0,
"dumpCreation": 0,
"indexCreation": 3,
"indexDeletion": 0,
"indexSwap": 0,
"indexUpdate": 0,
"settingsUpdate": 0,
"snapshotCreation": 0,
"taskCancelation": 0,
"taskDeletion": 0
}
}
"###);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
{
"indexes": {
"catto": 1,
"doggo": 1,
"whalo": 1
},
"statuses": {
"canceled": 0,
"enqueued": 3,
"failed": 0,
"processing": 0,
"succeeded": 0
},
"types": {
"documentAdditionOrUpdate": 0,
"documentDeletion": 0,
"documentEdition": 0,
"dumpCreation": 0,
"indexCreation": 3,
"indexDeletion": 0,
"indexSwap": 0,
"indexUpdate": 0,
"settingsUpdate": 0,
"snapshotCreation": 0,
"taskCancelation": 0,
"taskDeletion": 0,
"upgradeDatabase": 0
}
}
"#);
handle.advance_till([Start, BatchCreated]);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
{
"indexes": {
"catto": 1,
"doggo": 1,
"whalo": 1
},
"statuses": {
"canceled": 0,
"enqueued": 2,
"failed": 0,
"processing": 1,
"succeeded": 0
},
"types": {
"documentAdditionOrUpdate": 0,
"documentDeletion": 0,
"documentEdition": 0,
"dumpCreation": 0,
"indexCreation": 3,
"indexDeletion": 0,
"indexSwap": 0,
"indexUpdate": 0,
"settingsUpdate": 0,
"snapshotCreation": 0,
"taskCancelation": 0,
"taskDeletion": 0
}
}
"###);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
{
"indexes": {
"catto": 1,
"doggo": 1,
"whalo": 1
},
"statuses": {
"canceled": 0,
"enqueued": 2,
"failed": 0,
"processing": 1,
"succeeded": 0
},
"types": {
"documentAdditionOrUpdate": 0,
"documentDeletion": 0,
"documentEdition": 0,
"dumpCreation": 0,
"indexCreation": 3,
"indexDeletion": 0,
"indexSwap": 0,
"indexUpdate": 0,
"settingsUpdate": 0,
"snapshotCreation": 0,
"taskCancelation": 0,
"taskDeletion": 0,
"upgradeDatabase": 0
}
}
"#);
handle.advance_till([
InsideProcessBatch,
@ -784,36 +786,37 @@ fn basic_get_stats() {
Start,
BatchCreated,
]);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
{
"indexes": {
"catto": 1,
"doggo": 1,
"whalo": 1
},
"statuses": {
"canceled": 0,
"enqueued": 1,
"failed": 0,
"processing": 1,
"succeeded": 1
},
"types": {
"documentAdditionOrUpdate": 0,
"documentDeletion": 0,
"documentEdition": 0,
"dumpCreation": 0,
"indexCreation": 3,
"indexDeletion": 0,
"indexSwap": 0,
"indexUpdate": 0,
"settingsUpdate": 0,
"snapshotCreation": 0,
"taskCancelation": 0,
"taskDeletion": 0
}
}
"###);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
{
"indexes": {
"catto": 1,
"doggo": 1,
"whalo": 1
},
"statuses": {
"canceled": 0,
"enqueued": 1,
"failed": 0,
"processing": 1,
"succeeded": 1
},
"types": {
"documentAdditionOrUpdate": 0,
"documentDeletion": 0,
"documentEdition": 0,
"dumpCreation": 0,
"indexCreation": 3,
"indexDeletion": 0,
"indexSwap": 0,
"indexUpdate": 0,
"settingsUpdate": 0,
"snapshotCreation": 0,
"taskCancelation": 0,
"taskDeletion": 0,
"upgradeDatabase": 0
}
}
"#);
// now we make one more batch, the started_at field of the new tasks will be past `second_start_time`
handle.advance_till([
@ -824,36 +827,37 @@ fn basic_get_stats() {
Start,
BatchCreated,
]);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
{
"indexes": {
"catto": 1,
"doggo": 1,
"whalo": 1
},
"statuses": {
"canceled": 0,
"enqueued": 0,
"failed": 0,
"processing": 1,
"succeeded": 2
},
"types": {
"documentAdditionOrUpdate": 0,
"documentDeletion": 0,
"documentEdition": 0,
"dumpCreation": 0,
"indexCreation": 3,
"indexDeletion": 0,
"indexSwap": 0,
"indexUpdate": 0,
"settingsUpdate": 0,
"snapshotCreation": 0,
"taskCancelation": 0,
"taskDeletion": 0
}
}
"###);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
{
"indexes": {
"catto": 1,
"doggo": 1,
"whalo": 1
},
"statuses": {
"canceled": 0,
"enqueued": 0,
"failed": 0,
"processing": 1,
"succeeded": 2
},
"types": {
"documentAdditionOrUpdate": 0,
"documentDeletion": 0,
"documentEdition": 0,
"dumpCreation": 0,
"indexCreation": 3,
"indexDeletion": 0,
"indexSwap": 0,
"indexUpdate": 0,
"settingsUpdate": 0,
"snapshotCreation": 0,
"taskCancelation": 0,
"taskDeletion": 0,
"upgradeDatabase": 0
}
}
"#);
}
#[test]

View file

@ -109,6 +109,7 @@ impl IndexScheduler {
max_number_of_batched_tasks: usize::MAX,
batched_tasks_size_limit: u64::MAX,
instance_features: Default::default(),
auto_upgrade: true, // Don't cost much and will ensure the happy path works
};
configuration(&mut options);

View file

@ -0,0 +1,43 @@
use std::path::Path;
use meilisearch_types::{
heed,
tasks::{KindWithContent, Status, Task},
};
use time::OffsetDateTime;
use tracing::info;
use crate::queue::TaskQueue;
pub fn upgrade_task_queue(tasks_path: &Path, version: (u32, u32, u32)) -> anyhow::Result<()> {
info!("Upgrading the task queue");
let env = unsafe {
heed::EnvOpenOptions::new()
.max_dbs(19)
// Since that's the only database memory-mapped currently we don't need to check the budget yet
.map_size(100 * 1024 * 1024)
.open(tasks_path)
}?;
let mut wtxn = env.write_txn()?;
let queue = TaskQueue::new(&env, &mut wtxn)?;
let uid = queue.next_task_id(&wtxn)?;
queue.register(
&mut wtxn,
&Task {
uid,
batch_uid: None,
enqueued_at: OffsetDateTime::now_utc(),
started_at: None,
finished_at: None,
error: None,
canceled_by: None,
details: None,
status: Status::Enqueued,
kind: KindWithContent::UpgradeDatabase { from: version },
},
)?;
wtxn.commit()?;
// Should be pretty much instantaneous since we're the only one reading this env
env.prepare_for_closing().wait();
Ok(())
}

View file

@ -234,6 +234,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
K::TaskCancelation { .. }
| K::TaskDeletion { .. }
| K::DumpCreation { .. }
| K::UpgradeDatabase { .. }
| K::SnapshotCreation => (),
};
if let Some(Details::IndexSwap { swaps }) = &mut task.details {
@ -547,6 +548,9 @@ impl crate::IndexScheduler {
Details::Dump { dump_uid: _ } => {
assert_eq!(kind.as_kind(), Kind::DumpCreation);
}
Details::UpgradeDatabase { from: _ } => {
assert_eq!(kind.as_kind(), Kind::UpgradeDatabase);
}
}
}