From 4de445d386789de7aa2c73baa8b8de2016056d89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Lecrenier?= Date: Thu, 20 Oct 2022 17:11:44 +0200 Subject: [PATCH] Start testing unexpected errors and panics in index scheduler --- index-scheduler/src/batch.rs | 8 + index-scheduler/src/error.rs | 3 + index-scheduler/src/lib.rs | 410 ++++++++++++++---- index-scheduler/src/snapshot.rs | 31 +- .../1.snap | 33 ++ .../document_addition_batch_created.snap | 34 ++ .../document_addition_failed.snap | 36 ++ .../index_creation_failed.snap | 36 ++ ...eeded_but_index_scheduler_not_updated.snap | 34 ++ .../second_iteratino.snap | 36 ++ .../index_creation_failed.snap | 36 ++ index-scheduler/src/utils.rs | 14 +- 12 files changed, 617 insertions(+), 94 deletions(-) create mode 100644 index-scheduler/src/snapshots/lib.rs/fail_in_create_batch_for_index_creation/1.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_addition/document_addition_batch_created.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_addition/document_addition_failed.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_index_creation/index_creation_failed.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/fail_in_update_task_after_process_batch_success_for_document_addition/document_addition_succeeded_but_index_scheduler_not_updated.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/fail_in_update_task_after_process_batch_success_for_document_addition/second_iteratino.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/panic_in_process_batch_for_index_creation/index_creation_failed.snap diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index bbd05365d..c1e41c552 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -384,6 +384,9 @@ impl IndexScheduler { /// 4. We get the *next* dump to process. /// 5. We get the *next* tasks to process for a specific index. pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result> { + #[cfg(test)] + self.maybe_fail(crate::tests::FailureLocation::InsideCreateBatch)?; + let enqueued = &self.get_status(rtxn, Status::Enqueued)?; let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued; @@ -465,6 +468,11 @@ impl IndexScheduler { /// list is updated accordingly, with the exception of the its date fields /// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at). pub(crate) fn process_batch(&self, batch: Batch) -> Result> { + #[cfg(test)] + self.maybe_fail(crate::tests::FailureLocation::InsideProcessBatch)?; + #[cfg(test)] + self.maybe_fail(crate::tests::FailureLocation::PanicInsideProcessBatch)?; + match batch { Batch::TaskCancelation(mut task) => { // 1. Retrieve the tasks that matched the query at enqueue-time. diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index caa5539d8..97b15904f 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -28,6 +28,8 @@ pub enum Error { Heed(#[from] heed::Error), #[error(transparent)] Milli(#[from] milli::Error), + #[error("An unexpected crash occurred when processing the task")] + MilliPanic, #[error(transparent)] FileStore(#[from] file_store::Error), #[error(transparent)] @@ -48,6 +50,7 @@ impl ErrorCode for Error { Error::Dump(e) => e.error_code(), Error::Milli(e) => e.error_code(), + Error::MilliPanic => Code::Internal, // TODO: TAMO: are all these errors really internal? Error::Heed(_) => Code::Internal, Error::FileStore(_) => Code::Internal, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index d02201606..b4a38f49c 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -29,26 +29,28 @@ mod utils; pub type Result = std::result::Result; pub type TaskId = u32; +use dump::{KindDump, TaskDump, UpdateFile}; +pub use error::Error; +use meilisearch_types::milli::documents::DocumentsBatchBuilder; +use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; + +use utils::keep_tasks_within_datetimes; + use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; use std::sync::{Arc, RwLock}; -use dump::{KindDump, TaskDump, UpdateFile}; -pub use error::Error; use file_store::FileStore; use meilisearch_types::error::ResponseError; use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{self, Database, Env}; use meilisearch_types::milli; -use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; -use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use roaring::RoaringBitmap; use synchronoise::SignalEvent; use time::OffsetDateTime; -use utils::keep_tasks_within_datetimes; use uuid::Uuid; use crate::index_mapper::IndexMapper; @@ -244,7 +246,44 @@ pub struct IndexScheduler { /// The next entry is dedicated to the tests. /// It provide a way to break in multiple part of the scheduler. #[cfg(test)] - test_breakpoint_sdr: crossbeam::channel::Sender, + test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, + + #[cfg(test)] + /// A list of planned failures within the [`tick`](IndexScheduler::tick) method of the index scheduler. + /// + /// The first field is the iteration index and the second field identifies a location in the code. + planned_failures: Vec<(usize, tests::FailureLocation)>, + + #[cfg(test)] + /// A counter that is incremented before every call to [`tick`](IndexScheduler::tick) + run_loop_iteration: Arc>, +} +impl IndexScheduler { + fn private_clone(&self) -> Self { + Self { + env: self.env.clone(), + must_stop_processing: self.must_stop_processing.clone(), + processing_tasks: self.processing_tasks.clone(), + file_store: self.file_store.clone(), + all_tasks: self.all_tasks.clone(), + status: self.status.clone(), + kind: self.kind.clone(), + index_tasks: self.index_tasks.clone(), + enqueued_at: self.enqueued_at.clone(), + started_at: self.started_at.clone(), + finished_at: self.finished_at.clone(), + index_mapper: self.index_mapper.clone(), + wake_up: self.wake_up.clone(), + autobatching_enabled: self.autobatching_enabled.clone(), + dumps_path: self.dumps_path.clone(), + #[cfg(test)] + test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), + #[cfg(test)] + planned_failures: self.planned_failures.clone(), + #[cfg(test)] + run_loop_iteration: self.run_loop_iteration.clone(), + } + } } #[cfg(test)] @@ -278,7 +317,8 @@ impl IndexScheduler { index_size: usize, indexer_config: IndexerConfig, autobatching_enabled: bool, - #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender, + #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, + #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, ) -> Result { std::fs::create_dir_all(&tasks_path)?; std::fs::create_dir_all(&update_file_path)?; @@ -313,6 +353,10 @@ impl IndexScheduler { #[cfg(test)] test_breakpoint_sdr, + #[cfg(test)] + planned_failures, + #[cfg(test)] + run_loop_iteration: Arc::new(RwLock::new(0)), }; this.run(); @@ -324,26 +368,7 @@ impl IndexScheduler { /// This function will execute in a different thread and must be called /// only once per index scheduler. fn run(&self) { - let run = Self { - must_stop_processing: MustStopProcessing::default(), - processing_tasks: self.processing_tasks.clone(), - file_store: self.file_store.clone(), - env: self.env.clone(), - all_tasks: self.all_tasks, - status: self.status, - kind: self.kind, - index_tasks: self.index_tasks, - enqueued_at: self.enqueued_at, - started_at: self.started_at, - finished_at: self.finished_at, - index_mapper: self.index_mapper.clone(), - wake_up: self.wake_up.clone(), - autobatching_enabled: self.autobatching_enabled, - dumps_path: self.dumps_path.clone(), - - #[cfg(test)] - test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), - }; + let run = self.private_clone(); std::thread::spawn(move || loop { run.wake_up.wait(); @@ -682,7 +707,10 @@ impl IndexScheduler { /// Returns the number of processed tasks. fn tick(&self) -> Result { #[cfg(test)] - self.test_breakpoint_sdr.send(Breakpoint::Start).unwrap(); + { + *self.run_loop_iteration.write().unwrap() += 1; + self.breakpoint(Breakpoint::Start); + } let rtxn = self.env.read_txn()?; let batch = match self.create_next_batch(&rtxn)? { @@ -703,21 +731,35 @@ impl IndexScheduler { self.processing_tasks.write().unwrap().start_processing_at(started_at, processing_tasks); #[cfg(test)] - { - self.test_breakpoint_sdr.send(Breakpoint::BatchCreated).unwrap(); - self.test_breakpoint_sdr.send(Breakpoint::BeforeProcessing).unwrap(); - } + self.breakpoint(Breakpoint::BatchCreated); // 2. Process the tasks - let res = self.process_batch(batch); + let res = { + let cloned_index_scheduler = self.private_clone(); + let handle = std::thread::spawn(move || cloned_index_scheduler.process_batch(batch)); + handle.join().unwrap_or_else(|_| Err(Error::MilliPanic)) + }; + + #[cfg(test)] + self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?; + let mut wtxn = self.env.write_txn()?; let finished_at = OffsetDateTime::now_utc(); match res { Ok(tasks) => { - for mut task in tasks { + #[allow(unused_variables)] + for (i, mut task) in tasks.into_iter().enumerate() { task.started_at = Some(started_at); task.finished_at = Some(finished_at); + + #[cfg(test)] + self.maybe_fail( + tests::FailureLocation::UpdatingTaskAfterProcessBatchSuccess { + task_uid: i as u32, + }, + )?; + self.update_task(&mut wtxn, &task)?; self.delete_persisted_task_data(&task)?; } @@ -741,15 +783,23 @@ impl IndexScheduler { task.status = Status::Failed; task.error = Some(error.clone()); + #[cfg(test)] + self.maybe_fail(tests::FailureLocation::UpdatingTaskAfterProcessBatchFailure)?; + + self.delete_persisted_task_data(&task)?; self.update_task(&mut wtxn, &task)?; } } } self.processing_tasks.write().unwrap().stop_processing_at(finished_at); + + #[cfg(test)] + self.maybe_fail(tests::FailureLocation::CommittingWtxn)?; + wtxn.commit()?; #[cfg(test)] - self.test_breakpoint_sdr.send(Breakpoint::AfterProcessing).unwrap(); + self.breakpoint(Breakpoint::AfterProcessing); Ok(processed_tasks) } @@ -760,6 +810,18 @@ impl IndexScheduler { None => Ok(()), } } + + #[cfg(test)] + fn breakpoint(&self, b: Breakpoint) { + // We send two messages. The first one will sync with the call + // to `handle.wait_until(b)`. The second one will block until the + // the next call to `handle.wait_until(..)`. + self.test_breakpoint_sdr.send((b, false)).unwrap(); + // This one will only be able to be sent if the test handle stays alive. + // If it fails, then it means that we have exited the test. + // By crashing with `unwrap`, we kill the run loop. + self.test_breakpoint_sdr.send((b, true)).unwrap(); + } } #[cfg(test)] @@ -771,8 +833,65 @@ mod tests { use tempfile::TempDir; use uuid::Uuid; + use crate::snapshot::{snapshot_bitmap, snapshot_index_scheduler}; + use super::*; - use crate::snapshot::snapshot_index_scheduler; + + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub enum FailureLocation { + InsideCreateBatch, + InsideProcessBatch, + PanicInsideProcessBatch, + AcquiringWtxn, + UpdatingTaskAfterProcessBatchSuccess { task_uid: u32 }, + UpdatingTaskAfterProcessBatchFailure, + CommittingWtxn, + } + + impl IndexScheduler { + pub fn test( + autobatching: bool, + planned_failures: Vec<(usize, FailureLocation)>, + ) -> (Self, IndexSchedulerHandle) { + let tempdir = TempDir::new().unwrap(); + let (sender, receiver) = crossbeam::channel::bounded(0); + + let index_scheduler = Self::new( + tempdir.path().join("db_path"), + tempdir.path().join("file_store"), + tempdir.path().join("indexes"), + tempdir.path().join("dumps"), + 1024 * 1024, + 1024 * 1024, + IndexerConfig::default(), + autobatching, // enable autobatching + sender, + planned_failures, + ) + .unwrap(); + + let index_scheduler_handle = + IndexSchedulerHandle { _tempdir: tempdir, test_breakpoint_rcv: receiver }; + + (index_scheduler, index_scheduler_handle) + } + + /// Return a [`CorruptedTaskQueue`](Error::CorruptedTaskQueue) error if a failure is planned + /// for the given location and current run loop iteration. + pub fn maybe_fail(&self, location: FailureLocation) -> Result<()> { + if self.planned_failures.contains(&(*self.run_loop_iteration.read().unwrap(), location)) + { + match location { + FailureLocation::PanicInsideProcessBatch => { + panic!("simulated panic") + } + _ => Err(Error::CorruptedTaskQueue), + } + } else { + Ok(()) + } + } + } /// Return a `KindWithContent::IndexCreation` task fn index_creation_task(index: &'static str, primary_key: &'static str) -> KindWithContent { @@ -826,53 +945,22 @@ mod tests { (file, documents_count) } - impl IndexScheduler { - pub fn test(autobatching: bool) -> (Self, IndexSchedulerHandle) { - let tempdir = TempDir::new().unwrap(); - let (sender, receiver) = crossbeam::channel::bounded(0); - - let index_scheduler = Self::new( - tempdir.path().join("db_path"), - tempdir.path().join("file_store"), - tempdir.path().join("indexes"), - tempdir.path().join("dumps"), - 1024 * 1024, - 1024 * 1024, - IndexerConfig::default(), - autobatching, // enable autobatching - sender, - ) - .unwrap(); - - let index_scheduler_handle = - IndexSchedulerHandle { _tempdir: tempdir, test_breakpoint_rcv: receiver }; - - (index_scheduler, index_scheduler_handle) - } - } - pub struct IndexSchedulerHandle { _tempdir: TempDir, - test_breakpoint_rcv: crossbeam::channel::Receiver, + test_breakpoint_rcv: crossbeam::channel::Receiver<(Breakpoint, bool)>, } impl IndexSchedulerHandle { /// Wait until the provided breakpoint is reached. fn wait_till(&self, breakpoint: Breakpoint) { - self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint); - } - - #[allow(unused)] - /// Wait until the provided breakpoint is reached. - fn next_breakpoint(&self) -> Breakpoint { - self.test_breakpoint_rcv.recv().unwrap() + self.test_breakpoint_rcv.iter().find(|b| *b == (breakpoint, false)); } } #[test] fn register() { // In this test, the handle doesn't make any progress, we only check that the tasks are registered - let (index_scheduler, _handle) = IndexScheduler::test(true); + let (index_scheduler, _handle) = IndexScheduler::test(true, vec![]); let kinds = [ index_creation_task("catto", "mouse"), @@ -902,7 +990,7 @@ mod tests { #[test] fn insert_task_while_another_task_is_processing() { - let (index_scheduler, handle) = IndexScheduler::test(true); + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); index_scheduler.register(index_creation_task("index_a", "id")).unwrap(); index_scheduler.assert_internally_consistent(); @@ -926,7 +1014,7 @@ mod tests { /// we send them very fast, we must make sure that they are all processed. #[test] fn process_tasks_inserted_without_new_signal() { - let (index_scheduler, handle) = IndexScheduler::test(true); + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); index_scheduler .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) @@ -965,7 +1053,7 @@ mod tests { #[test] fn process_tasks_without_autobatching() { - let (index_scheduler, handle) = IndexScheduler::test(false); + let (index_scheduler, handle) = IndexScheduler::test(false, vec![]); index_scheduler .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) @@ -1010,7 +1098,7 @@ mod tests { #[test] fn task_deletion_undeleteable() { - let (index_scheduler, handle) = IndexScheduler::test(true); + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0); let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1); @@ -1062,7 +1150,7 @@ mod tests { #[test] fn task_deletion_deleteable() { - let (index_scheduler, handle) = IndexScheduler::test(true); + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0); let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1); @@ -1104,7 +1192,7 @@ mod tests { #[test] fn task_deletion_delete_same_task_twice() { - let (index_scheduler, handle) = IndexScheduler::test(true); + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0); let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1); @@ -1149,7 +1237,7 @@ mod tests { #[test] fn document_addition() { - let (index_scheduler, handle) = IndexScheduler::test(true); + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); let content = r#" { @@ -1190,7 +1278,7 @@ mod tests { #[test] fn document_addition_and_index_deletion() { - let (index_scheduler, handle) = IndexScheduler::test(true); + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); let content = r#" { @@ -1236,7 +1324,7 @@ mod tests { #[test] fn do_not_batch_task_of_different_indexes() { - let (index_scheduler, handle) = IndexScheduler::test(true); + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); let index_names = ["doggos", "cattos", "girafos"]; for name in index_names { @@ -1274,7 +1362,7 @@ mod tests { #[test] fn swap_indexes() { - let (index_scheduler, handle) = IndexScheduler::test(true); + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); let to_enqueue = [ index_creation_task("a", "id"), @@ -1312,7 +1400,7 @@ mod tests { #[test] fn document_addition_and_index_deletion_on_unexisting_index() { - let (index_scheduler, handle) = IndexScheduler::test(true); + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); let content = r#" { @@ -1357,6 +1445,164 @@ mod tests { #[test] fn simple_new() { - let (_index_scheduler, _handle) = crate::IndexScheduler::test(true); + crate::IndexScheduler::test(true, vec![]); + } + + #[test] + fn query_processing_tasks() { + let (index_scheduler, handle) = + IndexScheduler::test(true, vec![(1, FailureLocation::InsideCreateBatch)]); + + let kind = index_creation_task("catto", "mouse"); + let _task = index_scheduler.register(kind).unwrap(); + + handle.wait_till(Breakpoint::BatchCreated); + let query = Query { status: Some(vec![Status::Processing]), ..Default::default() }; + let processing_tasks = index_scheduler.get_task_ids(&query).unwrap(); + snapshot!(snapshot_bitmap(&processing_tasks), @"[0,]"); + } + + #[test] + fn fail_in_create_batch_for_index_creation() { + let (index_scheduler, handle) = + IndexScheduler::test(true, vec![(1, FailureLocation::InsideCreateBatch)]); + + let kinds = [index_creation_task("catto", "mouse")]; + + for kind in kinds { + let _task = index_scheduler.register(kind).unwrap(); + index_scheduler.assert_internally_consistent(); + } + handle.wait_till(Breakpoint::BatchCreated); + + // We skipped an iteration of `tick` to reach BatchCreated + assert_eq!(*index_scheduler.run_loop_iteration.read().unwrap(), 2); + // Otherwise nothing weird happened + index_scheduler.assert_internally_consistent(); + snapshot!(snapshot_index_scheduler(&index_scheduler)); + } + + #[test] + fn fail_in_process_batch_for_index_creation() { + let (index_scheduler, handle) = + IndexScheduler::test(true, vec![(1, FailureLocation::InsideProcessBatch)]); + + let kind = index_creation_task("catto", "mouse"); + + let _task = index_scheduler.register(kind).unwrap(); + index_scheduler.assert_internally_consistent(); + + handle.wait_till(Breakpoint::AfterProcessing); + + // Still in the first iteration + assert_eq!(*index_scheduler.run_loop_iteration.read().unwrap(), 1); + // No matter what happens in process_batch, the index_scheduler should be internally consistent + index_scheduler.assert_internally_consistent(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed"); + } + + #[test] + fn fail_in_process_batch_for_document_addition() { + let (index_scheduler, handle) = + IndexScheduler::test(true, vec![(1, FailureLocation::InsideProcessBatch)]); + + let content = r#" + { + "id": 1, + "doggo": "bob" + }"#; + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); + let documents_count = + meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) + .unwrap() as u64; + file.persist().unwrap(); + index_scheduler + .register(KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }) + .unwrap(); + index_scheduler.assert_internally_consistent(); + handle.wait_till(Breakpoint::BatchCreated); + + snapshot!( + snapshot_index_scheduler(&index_scheduler), + name: "document_addition_batch_created" + ); + + handle.wait_till(Breakpoint::AfterProcessing); + index_scheduler.assert_internally_consistent(); + + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "document_addition_failed"); + } + + #[test] + fn fail_in_update_task_after_process_batch_success_for_document_addition() { + // TODO: this is not the correct behaviour, because we process the + // same tasks twice. + // + // I wonder whether the run loop should maybe be paused if we encounter a failure + // at that point. Alternatively, maybe we should preemptively mark the tasks + // as failed at the beginning of `IndexScheduler::tick`. + + let (index_scheduler, handle) = IndexScheduler::test( + true, + vec![(1, FailureLocation::UpdatingTaskAfterProcessBatchSuccess { task_uid: 0 })], + ); + + let content = r#" + { + "id": 1, + "doggo": "bob" + }"#; + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); + let documents_count = + meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) + .unwrap() as u64; + file.persist().unwrap(); + index_scheduler + .register(KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }) + .unwrap(); + index_scheduler.assert_internally_consistent(); + handle.wait_till(Breakpoint::Start); + + index_scheduler.assert_internally_consistent(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "document_addition_succeeded_but_index_scheduler_not_updated"); + + handle.wait_till(Breakpoint::AfterProcessing); + index_scheduler.assert_internally_consistent(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_iteratino"); + } + + #[test] + fn panic_in_process_batch_for_index_creation() { + let (index_scheduler, handle) = + IndexScheduler::test(true, vec![(1, FailureLocation::PanicInsideProcessBatch)]); + + let kind = index_creation_task("catto", "mouse"); + + let _task = index_scheduler.register(kind).unwrap(); + index_scheduler.assert_internally_consistent(); + + handle.wait_till(Breakpoint::AfterProcessing); + + // Still in the first iteration + assert_eq!(*index_scheduler.run_loop_iteration.read().unwrap(), 1); + // No matter what happens in process_batch, the index_scheduler should be internally consistent + index_scheduler.assert_internally_consistent(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed"); } } diff --git a/index-scheduler/src/snapshot.rs b/index-scheduler/src/snapshot.rs index b065301b0..83a547451 100644 --- a/index-scheduler/src/snapshot.rs +++ b/index-scheduler/src/snapshot.rs @@ -27,6 +27,8 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { wake_up: _, dumps_path: _, test_breakpoint_sdr: _, + planned_failures: _, + run_loop_iteration: _, } = scheduler; let rtxn = env.read_txn().unwrap(); @@ -78,7 +80,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { snap } -fn snapshot_file_store(file_store: &file_store::FileStore) -> String { +pub fn snapshot_file_store(file_store: &file_store::FileStore) -> String { let mut snap = String::new(); for uuid in file_store.__all_uuids() { snap.push_str(&format!("{uuid}\n")); @@ -86,7 +88,7 @@ fn snapshot_file_store(file_store: &file_store::FileStore) -> String { snap } -fn snapshot_bitmap(r: &RoaringBitmap) -> String { +pub fn snapshot_bitmap(r: &RoaringBitmap) -> String { let mut snap = String::new(); snap.push('['); for x in r { @@ -96,7 +98,7 @@ fn snapshot_bitmap(r: &RoaringBitmap) -> String { snap } -fn snapshot_all_tasks(rtxn: &RoTxn, db: Database, SerdeJson>) -> String { +pub fn snapshot_all_tasks(rtxn: &RoTxn, db: Database, SerdeJson>) -> String { let mut snap = String::new(); let iter = db.iter(rtxn).unwrap(); for next in iter { @@ -106,7 +108,7 @@ fn snapshot_all_tasks(rtxn: &RoTxn, db: Database, SerdeJson, CboRoaringBitmapCodec>, ) -> String { @@ -119,7 +121,7 @@ fn snapshot_date_db( snap } -fn snapshot_task(task: &Task) -> String { +pub fn snapshot_task(task: &Task) -> String { let mut snap = String::new(); let Task { uid, @@ -191,7 +193,10 @@ fn snaphsot_details(d: &Details) -> String { } } -fn snapshot_status(rtxn: &RoTxn, db: Database, RoaringBitmapCodec>) -> String { +pub fn snapshot_status( + rtxn: &RoTxn, + db: Database, RoaringBitmapCodec>, +) -> String { let mut snap = String::new(); let iter = db.iter(rtxn).unwrap(); for next in iter { @@ -200,8 +205,7 @@ fn snapshot_status(rtxn: &RoTxn, db: Database, RoaringBitma } snap } - -fn snapshot_kind(rtxn: &RoTxn, db: Database, RoaringBitmapCodec>) -> String { +pub fn snapshot_kind(rtxn: &RoTxn, db: Database, RoaringBitmapCodec>) -> String { let mut snap = String::new(); let iter = db.iter(rtxn).unwrap(); for next in iter { @@ -212,7 +216,7 @@ fn snapshot_kind(rtxn: &RoTxn, db: Database, RoaringBitmapCod snap } -fn snapshot_index_tasks(rtxn: &RoTxn, db: Database) -> String { +pub fn snapshot_index_tasks(rtxn: &RoTxn, db: Database) -> String { let mut snap = String::new(); let iter = db.iter(rtxn).unwrap(); for next in iter { @@ -222,7 +226,12 @@ fn snapshot_index_tasks(rtxn: &RoTxn, db: Database) -> snap } -fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String { - let names = mapper.indexes(rtxn).unwrap().into_iter().map(|(n, _)| n).collect::>(); +pub fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String { + let names = mapper + .indexes(rtxn) + .unwrap() + .into_iter() + .map(|(n, _)| n) + .collect::>(); format!("{names:?}") } diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_create_batch_for_index_creation/1.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_create_batch_for_index_creation/1.snap new file mode 100644 index 000000000..b78d63444 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_create_batch_for_index_creation/1.snap @@ -0,0 +1,33 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[0,] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +---------------------------------------------------------------------- +### Status: +enqueued [0,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_addition/document_addition_batch_created.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_addition/document_addition_batch_created.snap new file mode 100644 index 000000000..b9e745cf0 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_addition/document_addition_batch_created.snap @@ -0,0 +1,34 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[0,] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [0,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: +00000000-0000-0000-0000-000000000000 + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_addition/document_addition_failed.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_addition/document_addition_failed.snap new file mode 100644 index 000000000..750edbbf2 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_addition/document_addition_failed.snap @@ -0,0 +1,36 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "Corrupted task queue.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [] +failed [0,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_index_creation/index_creation_failed.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_index_creation/index_creation_failed.snap new file mode 100644 index 000000000..11bfb09c1 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_index_creation/index_creation_failed.snap @@ -0,0 +1,36 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "Corrupted task queue.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +---------------------------------------------------------------------- +### Status: +enqueued [] +failed [0,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_update_task_after_process_batch_success_for_document_addition/document_addition_succeeded_but_index_scheduler_not_updated.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_update_task_after_process_batch_success_for_document_addition/document_addition_succeeded_but_index_scheduler_not_updated.snap new file mode 100644 index 000000000..6abb00f81 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_update_task_after_process_batch_success_for_document_addition/document_addition_succeeded_but_index_scheduler_not_updated.snap @@ -0,0 +1,34 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [0,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: +00000000-0000-0000-0000-000000000000 + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_update_task_after_process_batch_success_for_document_addition/second_iteratino.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_update_task_after_process_batch_success_for_document_addition/second_iteratino.snap new file mode 100644 index 000000000..2bcc9368d --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_update_task_after_process_batch_success_for_document_addition/second_iteratino.snap @@ -0,0 +1,36 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,] +---------------------------------------------------------------------- +### Index Mapper: +["doggos"] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/panic_in_process_batch_for_index_creation/index_creation_failed.snap b/index-scheduler/src/snapshots/lib.rs/panic_in_process_batch_for_index_creation/index_creation_failed.snap new file mode 100644 index 000000000..d9a406c26 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/panic_in_process_batch_for_index_creation/index_creation_failed.snap @@ -0,0 +1,36 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "An unexpected crash occurred when processing the task", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +---------------------------------------------------------------------- +### Status: +enqueued [] +failed [0,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index 3611f3a17..f95d5c782 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -114,7 +114,19 @@ impl IndexScheduler { } pub(crate) fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result { - Ok(self.status.get(rtxn, &status)?.unwrap_or_default()) + match status { + Status::Processing => { + let tasks = self + .processing_tasks + .read() + .map_err(|_| Error::CorruptedTaskQueue)? + .processing + .clone(); + + Ok(tasks) + } + status => Ok(self.status.get(rtxn, &status)?.unwrap_or_default()), + } } pub(crate) fn put_status(