diff --git a/crates/index-scheduler/src/queue/tasks.rs b/crates/index-scheduler/src/queue/tasks.rs index 9bd63c595..00e745e71 100644 --- a/crates/index-scheduler/src/queue/tasks.rs +++ b/crates/index-scheduler/src/queue/tasks.rs @@ -9,7 +9,9 @@ use time::OffsetDateTime; use super::{Query, Queue}; use crate::processing::ProcessingTasks; -use crate::utils::{self, insert_task_datetime, keep_ids_within_datetimes, map_bound}; +use crate::utils::{ + self, insert_task_datetime, keep_ids_within_datetimes, map_bound, remove_task_datetime, +}; use crate::{Error, Result, TaskId, BEI128}; /// Database const names for the `IndexScheduler`. @@ -90,12 +92,14 @@ impl TaskQueue { pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?; + let reprocessing = old_task.status != Status::Enqueued; debug_assert!(old_task != *task); debug_assert_eq!(old_task.uid, task.uid); - debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some()); + + // If we're processing a task that failed it may already contains a batch_uid debug_assert!( - old_task.batch_uid.is_none() && task.batch_uid.is_some(), + reprocessing || (old_task.batch_uid.is_none() && task.batch_uid.is_some()), "\n==> old: {old_task:?}\n==> new: {task:?}" ); @@ -122,13 +126,25 @@ impl TaskQueue { "Cannot update a task's enqueued_at time" ); if old_task.started_at != task.started_at { - assert!(old_task.started_at.is_none(), "Cannot update a task's started_at time"); + assert!( + reprocessing || old_task.started_at.is_none(), + "Cannot update a task's started_at time" + ); + if let Some(started_at) = old_task.started_at { + remove_task_datetime(wtxn, self.started_at, started_at, task.uid)?; + } if let Some(started_at) = task.started_at { insert_task_datetime(wtxn, self.started_at, started_at, task.uid)?; } } if old_task.finished_at != task.finished_at { - assert!(old_task.finished_at.is_none(), "Cannot update a task's finished_at time"); + assert!( + reprocessing || old_task.finished_at.is_none(), + "Cannot update a task's finished_at time" + ); + if let Some(finished_at) = old_task.finished_at { + remove_task_datetime(wtxn, self.finished_at, finished_at, task.uid)?; + } if let Some(finished_at) = task.finished_at { insert_task_datetime(wtxn, self.finished_at, finished_at, task.uid)?; } diff --git a/crates/index-scheduler/src/scheduler/create_batch.rs b/crates/index-scheduler/src/scheduler/create_batch.rs index 58bc5c9fc..41bc46a11 100644 --- a/crates/index-scheduler/src/scheduler/create_batch.rs +++ b/crates/index-scheduler/src/scheduler/create_batch.rs @@ -433,9 +433,10 @@ impl IndexScheduler { let mut current_batch = ProcessingBatch::new(batch_id); let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?; + let failed = &self.queue.tasks.get_status(rtxn, Status::Failed)?; // 0. The priority over everything is to upgrade the instance - let upgrade = self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)? & enqueued; + let upgrade = self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)? & (enqueued | failed); // There shouldn't be multiple upgrade tasks but just in case we're going to batch all of them at the same time if !upgrade.is_empty() { let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, upgrade)?; diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index ae98dc83c..2f4a8e7da 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -329,6 +329,8 @@ impl IndexScheduler { for task in tasks.iter_mut() { task.status = Status::Succeeded; + // Since this task can be retried we must reset its error status + task.error = None; } Ok(tasks) diff --git a/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs b/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs index 1471723ef..8472f2dba 100644 --- a/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs +++ b/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs @@ -9,6 +9,9 @@ impl IndexScheduler { pub(super) fn process_upgrade(&self, progress: Progress) -> Result<()> { progress.update_progress(UpgradeDatabaseProgress::EnsuringCorrectnessOfTheSwap); + #[cfg(test)] + self.maybe_fail(crate::test_utils::FailureLocation::ProcessUpgrade)?; + enum UpgradeIndex {} let indexes = self.index_names()?; @@ -20,9 +23,9 @@ impl IndexScheduler { )); let index = self.index(uid)?; let mut wtxn = index.write_txn()?; - let regenerate = milli::update::upgrade::upgrade(&mut wtxn, &index, progress.clone()) + let regen_stats = milli::update::upgrade::upgrade(&mut wtxn, &index, progress.clone()) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; - if regenerate { + if regen_stats { let stats = crate::index_mapper::IndexStats::new(&index, &wtxn) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; // Release wtxn as soon as possible because it stops us from registering tasks diff --git a/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_processing_everything.snap b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_processing_everything.snap new file mode 100644 index 000000000..543ddf384 --- /dev/null +++ b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_processing_everything.snap @@ -0,0 +1,118 @@ +--- +source: crates/index-scheduler/src/scheduler/test_failure.rs +snapshot_kind: text +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, batch_uid: 3, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +1 {uid: 1, batch_uid: 2, status: succeeded, kind: UpgradeDatabase { from: (1, 12, 0) }} +2 {uid: 2, batch_uid: 4, status: succeeded, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }} +3 {uid: 3, batch_uid: 5, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }} +4 {uid: 4, batch_uid: 6, status: succeeded, details: { primary_key: Some("leaves") }, kind: IndexCreation { index_uid: "girafo", primary_key: Some("leaves") }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,2,4,] +failed [3,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,2,3,4,] +"upgradeDatabase" [1,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +doggo [2,3,] +girafo [4,] +---------------------------------------------------------------------- +### Index Mapper: +catto: { number_of_documents: 0, field_distribution: {} } +doggo: { number_of_documents: 0, field_distribution: {} } +girafo: { number_of_documents: 0, field_distribution: {} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [1,] +[timestamp] [0,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [1,] +[timestamp] [0,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +---------------------------------------------------------------------- +### All Batches: +0 {uid: 0, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, } +1 {uid: 1, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, } +2 {uid: 2, details: {}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, } +3 {uid: 3, details: {"primaryKey":"mouse"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"catto":1}}, } +4 {uid: 4, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, } +5 {uid: 5, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, } +6 {uid: 6, details: {"primaryKey":"leaves"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"girafo":1}}, } +---------------------------------------------------------------------- +### Batch to tasks mapping: +0 [1,] +1 [1,] +2 [1,] +3 [0,] +4 [2,] +5 [3,] +6 [4,] +---------------------------------------------------------------------- +### Batches Status: +succeeded [2,3,4,6,] +failed [0,1,5,] +---------------------------------------------------------------------- +### Batches Kind: +"indexCreation" [3,4,5,6,] +"upgradeDatabase" [0,1,2,] +---------------------------------------------------------------------- +### Batches Index Tasks: +catto [3,] +doggo [4,5,] +girafo [6,] +---------------------------------------------------------------------- +### Batches Enqueued At: +[timestamp] [3,] +[timestamp] [0,1,2,] +[timestamp] [4,] +[timestamp] [5,] +[timestamp] [6,] +---------------------------------------------------------------------- +### Batches Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +[timestamp] [6,] +---------------------------------------------------------------------- +### Batches Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +[timestamp] [6,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/registered_a_task_and_upgrade_task.snap b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/registered_a_task_and_upgrade_task.snap new file mode 100644 index 000000000..2294e7845 --- /dev/null +++ b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/registered_a_task_and_upgrade_task.snap @@ -0,0 +1,55 @@ +--- +source: crates/index-scheduler/src/scheduler/test_failure.rs +snapshot_kind: text +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +1 {uid: 1, status: enqueued, kind: UpgradeDatabase { from: (1, 12, 0) }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,] +"upgradeDatabase" [1,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### All Batches: +---------------------------------------------------------------------- +### Batch to tasks mapping: +---------------------------------------------------------------------- +### Batches Status: +---------------------------------------------------------------------- +### Batches Kind: +---------------------------------------------------------------------- +### Batches Index Tasks: +---------------------------------------------------------------------- +### Batches Enqueued At: +---------------------------------------------------------------------- +### Batches Started At: +---------------------------------------------------------------------- +### Batches Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/upgrade_task_failed.snap b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/upgrade_task_failed.snap new file mode 100644 index 000000000..0ec4f1057 --- /dev/null +++ b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/upgrade_task_failed.snap @@ -0,0 +1,65 @@ +--- +source: crates/index-scheduler/src/scheduler/test_failure.rs +snapshot_kind: text +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +1 {uid: 1, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, kind: UpgradeDatabase { from: (1, 12, 0) }} +---------------------------------------------------------------------- +### Status: +enqueued [0,] +failed [1,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,] +"upgradeDatabase" [1,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [1,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [1,] +---------------------------------------------------------------------- +### All Batches: +0 {uid: 0, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, } +---------------------------------------------------------------------- +### Batch to tasks mapping: +0 [1,] +---------------------------------------------------------------------- +### Batches Status: +failed [0,] +---------------------------------------------------------------------- +### Batches Kind: +"upgradeDatabase" [0,] +---------------------------------------------------------------------- +### Batches Index Tasks: +---------------------------------------------------------------------- +### Batches Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Batches Started At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Batches Finished At: +[timestamp] [0,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/upgrade_task_failed_again.snap b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/upgrade_task_failed_again.snap new file mode 100644 index 000000000..7c2e5d427 --- /dev/null +++ b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/upgrade_task_failed_again.snap @@ -0,0 +1,72 @@ +--- +source: crates/index-scheduler/src/scheduler/test_failure.rs +snapshot_kind: text +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +1 {uid: 1, batch_uid: 1, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, kind: UpgradeDatabase { from: (1, 12, 0) }} +2 {uid: 2, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }} +---------------------------------------------------------------------- +### Status: +enqueued [0,2,] +failed [1,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,2,] +"upgradeDatabase" [1,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +doggo [2,] +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [1,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [1,] +---------------------------------------------------------------------- +### All Batches: +0 {uid: 0, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, } +1 {uid: 1, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, } +---------------------------------------------------------------------- +### Batch to tasks mapping: +0 [1,] +1 [1,] +---------------------------------------------------------------------- +### Batches Status: +failed [0,1,] +---------------------------------------------------------------------- +### Batches Kind: +"upgradeDatabase" [0,1,] +---------------------------------------------------------------------- +### Batches Index Tasks: +---------------------------------------------------------------------- +### Batches Enqueued At: +[timestamp] [0,1,] +---------------------------------------------------------------------- +### Batches Started At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Batches Finished At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/upgrade_task_succeeded.snap b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/upgrade_task_succeeded.snap new file mode 100644 index 000000000..a641048a0 --- /dev/null +++ b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/upgrade_task_succeeded.snap @@ -0,0 +1,80 @@ +--- +source: crates/index-scheduler/src/scheduler/test_failure.rs +snapshot_kind: text +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +1 {uid: 1, batch_uid: 2, status: succeeded, kind: UpgradeDatabase { from: (1, 12, 0) }} +2 {uid: 2, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }} +3 {uid: 3, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }} +---------------------------------------------------------------------- +### Status: +enqueued [0,2,3,] +succeeded [1,] +failed [] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,2,3,] +"upgradeDatabase" [1,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +doggo [2,3,] +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [1,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [1,] +---------------------------------------------------------------------- +### All Batches: +0 {uid: 0, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, } +1 {uid: 1, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, } +2 {uid: 2, details: {}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, } +---------------------------------------------------------------------- +### Batch to tasks mapping: +0 [1,] +1 [1,] +2 [1,] +---------------------------------------------------------------------- +### Batches Status: +succeeded [2,] +failed [0,1,] +---------------------------------------------------------------------- +### Batches Kind: +"upgradeDatabase" [0,1,2,] +---------------------------------------------------------------------- +### Batches Index Tasks: +---------------------------------------------------------------------- +### Batches Enqueued At: +[timestamp] [0,1,2,] +---------------------------------------------------------------------- +### Batches Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Batches Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/scheduler/test_failure.rs b/crates/index-scheduler/src/scheduler/test_failure.rs index cf835daa3..b215083fa 100644 --- a/crates/index-scheduler/src/scheduler/test_failure.rs +++ b/crates/index-scheduler/src/scheduler/test_failure.rs @@ -7,8 +7,10 @@ use meilisearch_types::milli::obkv_to_json; use meilisearch_types::milli::update::IndexDocumentsMethod::*; use meilisearch_types::milli::update::Setting; use meilisearch_types::tasks::KindWithContent; +use roaring::RoaringBitmap; use crate::insta_snapshot::snapshot_index_scheduler; +use crate::test_utils::Breakpoint; use crate::test_utils::Breakpoint::*; use crate::test_utils::{index_creation_task, read_json, FailureLocation}; use crate::IndexScheduler; @@ -249,3 +251,68 @@ fn panic_in_process_batch_for_index_creation() { // No matter what happens in process_batch, the index_scheduler should be internally consistent snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed"); } + +#[test] +fn upgrade_failure() { + let (index_scheduler, mut handle) = + IndexScheduler::test(true, vec![(1, FailureLocation::ProcessUpgrade)]); + + let kind = index_creation_task("catto", "mouse"); + let _task = index_scheduler.register(kind, None, false).unwrap(); + let upgrade_database_task = index_scheduler + .register(KindWithContent::UpgradeDatabase { from: (1, 12, 0) }, None, false) + .unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_a_task_and_upgrade_task"); + + handle.advance_one_failed_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "upgrade_task_failed"); + + // We can still register tasks + let kind = index_creation_task("doggo", "bone"); + let _task = index_scheduler.register(kind, None, false).unwrap(); + + // But the scheduler is down and won't process anything ever again + handle.scheduler_is_down(); + + // =====> After a restart is it still working as expected? + let (index_scheduler, mut handle) = + handle.restart(index_scheduler, true, vec![(1, FailureLocation::ProcessUpgrade)]); + + handle.advance_one_failed_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "upgrade_task_failed_again"); + // We can still register tasks + let kind = index_creation_task("doggo", "bone"); + let _task = index_scheduler.register(kind, None, false).unwrap(); + // And the scheduler is still down and won't process anything ever again + handle.scheduler_is_down(); + + // =====> After a rerestart and without failure can we upgrade the indexes and process the tasks + let (index_scheduler, mut handle) = handle.restart(index_scheduler, true, vec![]); + + handle.advance_one_successful_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "upgrade_task_succeeded"); + // We can still register tasks + let kind = index_creation_task("girafo", "leaves"); + let _task = index_scheduler.register(kind, None, false).unwrap(); + // The scheduler is up and running + handle.advance_one_successful_batch(); + handle.advance_one_successful_batch(); + handle.advance_one_failed_batch(); // doggo already exists + handle.advance_one_successful_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_processing_everything"); + + // When deleting the single upgrade task it should remove the three associated batches + let _task = index_scheduler + .register( + KindWithContent::TaskDeletion { + query: String::from("test"), + tasks: RoaringBitmap::from_iter([upgrade_database_task.uid]), + }, + None, + false, + ) + .unwrap(); + + handle.advance_one_successful_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_removing_the_upgrade"); +} diff --git a/crates/index-scheduler/src/test_utils.rs b/crates/index-scheduler/src/test_utils.rs index c83a8ab0b..a63825104 100644 --- a/crates/index-scheduler/src/test_utils.rs +++ b/crates/index-scheduler/src/test_utils.rs @@ -1,10 +1,18 @@ use std::io::{BufWriter, Write}; use std::sync::Arc; +use std::time::Duration; +use big_s::S; +use crossbeam_channel::RecvTimeoutError; use file_store::File; use meilisearch_types::document_formats::DocumentFormatError; use meilisearch_types::milli::update::IndexDocumentsMethod::ReplaceDocuments; +use meilisearch_types::milli::update::IndexerConfig; +use meilisearch_types::tasks::KindWithContent; +use meilisearch_types::VERSION_FILE_NAME; +use tempfile::{NamedTempFile, TempDir}; use uuid::Uuid; +use Breakpoint::*; use crate::insta_snapshot::snapshot_index_scheduler; use crate::{Error, IndexScheduler, IndexSchedulerOptions}; @@ -28,20 +36,13 @@ pub(crate) enum FailureLocation { InsideCreateBatch, InsideProcessBatch, PanicInsideProcessBatch, + ProcessUpgrade, AcquiringWtxn, UpdatingTaskAfterProcessBatchSuccess { task_uid: u32 }, UpdatingTaskAfterProcessBatchFailure, CommittingWtxn, } -use big_s::S; -use crossbeam_channel::RecvTimeoutError; -use meilisearch_types::milli::update::IndexerConfig; -use meilisearch_types::tasks::KindWithContent; -use meilisearch_types::VERSION_FILE_NAME; -use tempfile::{NamedTempFile, TempDir}; -use Breakpoint::*; - impl IndexScheduler { /// Blocks the thread until the test handle asks to progress to/through this breakpoint. /// @@ -55,7 +56,6 @@ impl IndexScheduler { /// As soon as we find it, the index scheduler is unblocked but then wait again on the call to /// `test_breakpoint_sdr.send(b, true)`. This message will only be able to send once the /// test asks to progress to the next `(b2, false)`. - #[cfg(test)] pub(crate) fn breakpoint(&self, b: Breakpoint) { // We send two messages. The first one will sync with the call // to `handle.wait_until(b)`. The second one will block until the @@ -225,6 +225,46 @@ pub struct IndexSchedulerHandle { } impl IndexSchedulerHandle { + /// Restarts the index-scheduler on the same database. + /// To use this function you must give back the index-scheduler that was given to you when + /// creating the handle the first time. + /// If the index-scheduler has been cloned in the meantime you must drop all copy otherwise + /// the function will panic. + pub(crate) fn restart( + self, + index_scheduler: IndexScheduler, + autobatching_enabled: bool, + planned_failures: Vec<(usize, FailureLocation)>, + ) -> (IndexScheduler, Self) { + drop(index_scheduler); + let Self { _tempdir: tempdir, index_scheduler, test_breakpoint_rcv, last_breakpoint: _ } = + self; + drop(index_scheduler); + + // We must ensure that the `run` function has stopped running before restarting the index scheduler + loop { + match test_breakpoint_rcv.recv_timeout(Duration::from_secs(5)) { + Ok(_) => continue, + Err(RecvTimeoutError::Timeout) => panic!("The indexing loop is stuck somewhere"), + Err(RecvTimeoutError::Disconnected) => break, + } + } + + let (scheduler, mut handle) = + IndexScheduler::test_with_custom_config(planned_failures, |config| { + config.autobatching_enabled = autobatching_enabled; + config.version_file_path = tempdir.path().join(VERSION_FILE_NAME); + config.auth_path = tempdir.path().join("auth"); + config.tasks_path = tempdir.path().join("db_path"); + config.update_file_path = tempdir.path().join("file_store"); + config.indexes_path = tempdir.path().join("indexes"); + config.snapshots_path = tempdir.path().join("snapshots"); + config.dumps_path = tempdir.path().join("dumps"); + }); + handle._tempdir = tempdir; + (scheduler, handle) + } + /// Advance the scheduler to the next tick. /// Panic /// * If the scheduler is waiting for a task to be registered. @@ -350,4 +390,18 @@ impl IndexSchedulerHandle { } self.advance_till([AfterProcessing]); } + + // Wait for one failed batch. + #[track_caller] + pub(crate) fn scheduler_is_down(&mut self) { + loop { + match self + .test_breakpoint_rcv + .recv_timeout(std::time::Duration::from_secs(1)) { + Ok((_, true)) => continue, + Ok((b, false)) => panic!("The scheduler was supposed to be down but successfully moved to the next breakpoint: {b:?}"), + Err(RecvTimeoutError::Timeout | RecvTimeoutError::Disconnected) => break, + } + } + } }