Start testing unexpected errors and panics in index scheduler

This commit is contained in:
Loïc Lecrenier 2022-10-20 17:11:44 +02:00 committed by Clément Renault
parent e3848b5f28
commit 4de445d386
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
12 changed files with 617 additions and 94 deletions

View File

@ -384,6 +384,9 @@ impl IndexScheduler {
/// 4. We get the *next* dump to process. /// 4. We get the *next* dump to process.
/// 5. We get the *next* tasks to process for a specific index. /// 5. We get the *next* tasks to process for a specific index.
pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result<Option<Batch>> { pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result<Option<Batch>> {
#[cfg(test)]
self.maybe_fail(crate::tests::FailureLocation::InsideCreateBatch)?;
let enqueued = &self.get_status(rtxn, Status::Enqueued)?; let enqueued = &self.get_status(rtxn, Status::Enqueued)?;
let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued; let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
@ -465,6 +468,11 @@ impl IndexScheduler {
/// list is updated accordingly, with the exception of the its date fields /// list is updated accordingly, with the exception of the its date fields
/// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at). /// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at).
pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> { pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> {
#[cfg(test)]
self.maybe_fail(crate::tests::FailureLocation::InsideProcessBatch)?;
#[cfg(test)]
self.maybe_fail(crate::tests::FailureLocation::PanicInsideProcessBatch)?;
match batch { match batch {
Batch::TaskCancelation(mut task) => { Batch::TaskCancelation(mut task) => {
// 1. Retrieve the tasks that matched the query at enqueue-time. // 1. Retrieve the tasks that matched the query at enqueue-time.

View File

@ -28,6 +28,8 @@ pub enum Error {
Heed(#[from] heed::Error), Heed(#[from] heed::Error),
#[error(transparent)] #[error(transparent)]
Milli(#[from] milli::Error), Milli(#[from] milli::Error),
#[error("An unexpected crash occurred when processing the task")]
MilliPanic,
#[error(transparent)] #[error(transparent)]
FileStore(#[from] file_store::Error), FileStore(#[from] file_store::Error),
#[error(transparent)] #[error(transparent)]
@ -48,6 +50,7 @@ impl ErrorCode for Error {
Error::Dump(e) => e.error_code(), Error::Dump(e) => e.error_code(),
Error::Milli(e) => e.error_code(), Error::Milli(e) => e.error_code(),
Error::MilliPanic => Code::Internal,
// TODO: TAMO: are all these errors really internal? // TODO: TAMO: are all these errors really internal?
Error::Heed(_) => Code::Internal, Error::Heed(_) => Code::Internal,
Error::FileStore(_) => Code::Internal, Error::FileStore(_) => Code::Internal,

View File

@ -29,26 +29,28 @@ mod utils;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32; pub type TaskId = u32;
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use utils::keep_tasks_within_datetimes;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
use file_store::FileStore; use file_store::FileStore;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env}; use meilisearch_types::heed::{self, Database, Env};
use meilisearch_types::milli; use meilisearch_types::milli;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use synchronoise::SignalEvent; use synchronoise::SignalEvent;
use time::OffsetDateTime; use time::OffsetDateTime;
use utils::keep_tasks_within_datetimes;
use uuid::Uuid; use uuid::Uuid;
use crate::index_mapper::IndexMapper; use crate::index_mapper::IndexMapper;
@ -244,7 +246,44 @@ pub struct IndexScheduler {
/// The next entry is dedicated to the tests. /// The next entry is dedicated to the tests.
/// It provide a way to break in multiple part of the scheduler. /// It provide a way to break in multiple part of the scheduler.
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>, test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)]
/// A list of planned failures within the [`tick`](IndexScheduler::tick) method of the index scheduler.
///
/// The first field is the iteration index and the second field identifies a location in the code.
planned_failures: Vec<(usize, tests::FailureLocation)>,
#[cfg(test)]
/// A counter that is incremented before every call to [`tick`](IndexScheduler::tick)
run_loop_iteration: Arc<RwLock<usize>>,
}
impl IndexScheduler {
fn private_clone(&self) -> Self {
Self {
env: self.env.clone(),
must_stop_processing: self.must_stop_processing.clone(),
processing_tasks: self.processing_tasks.clone(),
file_store: self.file_store.clone(),
all_tasks: self.all_tasks.clone(),
status: self.status.clone(),
kind: self.kind.clone(),
index_tasks: self.index_tasks.clone(),
enqueued_at: self.enqueued_at.clone(),
started_at: self.started_at.clone(),
finished_at: self.finished_at.clone(),
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled.clone(),
dumps_path: self.dumps_path.clone(),
#[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
#[cfg(test)]
planned_failures: self.planned_failures.clone(),
#[cfg(test)]
run_loop_iteration: self.run_loop_iteration.clone(),
}
}
} }
#[cfg(test)] #[cfg(test)]
@ -278,7 +317,8 @@ impl IndexScheduler {
index_size: usize, index_size: usize,
indexer_config: IndexerConfig, indexer_config: IndexerConfig,
autobatching_enabled: bool, autobatching_enabled: bool,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> { ) -> Result<Self> {
std::fs::create_dir_all(&tasks_path)?; std::fs::create_dir_all(&tasks_path)?;
std::fs::create_dir_all(&update_file_path)?; std::fs::create_dir_all(&update_file_path)?;
@ -313,6 +353,10 @@ impl IndexScheduler {
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr, test_breakpoint_sdr,
#[cfg(test)]
planned_failures,
#[cfg(test)]
run_loop_iteration: Arc::new(RwLock::new(0)),
}; };
this.run(); this.run();
@ -324,26 +368,7 @@ impl IndexScheduler {
/// This function will execute in a different thread and must be called /// This function will execute in a different thread and must be called
/// only once per index scheduler. /// only once per index scheduler.
fn run(&self) { fn run(&self) {
let run = Self { let run = self.private_clone();
must_stop_processing: MustStopProcessing::default(),
processing_tasks: self.processing_tasks.clone(),
file_store: self.file_store.clone(),
env: self.env.clone(),
all_tasks: self.all_tasks,
status: self.status,
kind: self.kind,
index_tasks: self.index_tasks,
enqueued_at: self.enqueued_at,
started_at: self.started_at,
finished_at: self.finished_at,
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
dumps_path: self.dumps_path.clone(),
#[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
};
std::thread::spawn(move || loop { std::thread::spawn(move || loop {
run.wake_up.wait(); run.wake_up.wait();
@ -682,7 +707,10 @@ impl IndexScheduler {
/// Returns the number of processed tasks. /// Returns the number of processed tasks.
fn tick(&self) -> Result<usize> { fn tick(&self) -> Result<usize> {
#[cfg(test)] #[cfg(test)]
self.test_breakpoint_sdr.send(Breakpoint::Start).unwrap(); {
*self.run_loop_iteration.write().unwrap() += 1;
self.breakpoint(Breakpoint::Start);
}
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let batch = match self.create_next_batch(&rtxn)? { let batch = match self.create_next_batch(&rtxn)? {
@ -703,21 +731,35 @@ impl IndexScheduler {
self.processing_tasks.write().unwrap().start_processing_at(started_at, processing_tasks); self.processing_tasks.write().unwrap().start_processing_at(started_at, processing_tasks);
#[cfg(test)] #[cfg(test)]
{ self.breakpoint(Breakpoint::BatchCreated);
self.test_breakpoint_sdr.send(Breakpoint::BatchCreated).unwrap();
self.test_breakpoint_sdr.send(Breakpoint::BeforeProcessing).unwrap();
}
// 2. Process the tasks // 2. Process the tasks
let res = self.process_batch(batch); let res = {
let cloned_index_scheduler = self.private_clone();
let handle = std::thread::spawn(move || cloned_index_scheduler.process_batch(batch));
handle.join().unwrap_or_else(|_| Err(Error::MilliPanic))
};
#[cfg(test)]
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
let finished_at = OffsetDateTime::now_utc(); let finished_at = OffsetDateTime::now_utc();
match res { match res {
Ok(tasks) => { Ok(tasks) => {
for mut task in tasks { #[allow(unused_variables)]
for (i, mut task) in tasks.into_iter().enumerate() {
task.started_at = Some(started_at); task.started_at = Some(started_at);
task.finished_at = Some(finished_at); task.finished_at = Some(finished_at);
#[cfg(test)]
self.maybe_fail(
tests::FailureLocation::UpdatingTaskAfterProcessBatchSuccess {
task_uid: i as u32,
},
)?;
self.update_task(&mut wtxn, &task)?; self.update_task(&mut wtxn, &task)?;
self.delete_persisted_task_data(&task)?; self.delete_persisted_task_data(&task)?;
} }
@ -741,15 +783,23 @@ impl IndexScheduler {
task.status = Status::Failed; task.status = Status::Failed;
task.error = Some(error.clone()); task.error = Some(error.clone());
#[cfg(test)]
self.maybe_fail(tests::FailureLocation::UpdatingTaskAfterProcessBatchFailure)?;
self.delete_persisted_task_data(&task)?;
self.update_task(&mut wtxn, &task)?; self.update_task(&mut wtxn, &task)?;
} }
} }
} }
self.processing_tasks.write().unwrap().stop_processing_at(finished_at); self.processing_tasks.write().unwrap().stop_processing_at(finished_at);
#[cfg(test)]
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;
wtxn.commit()?; wtxn.commit()?;
#[cfg(test)] #[cfg(test)]
self.test_breakpoint_sdr.send(Breakpoint::AfterProcessing).unwrap(); self.breakpoint(Breakpoint::AfterProcessing);
Ok(processed_tasks) Ok(processed_tasks)
} }
@ -760,6 +810,18 @@ impl IndexScheduler {
None => Ok(()), None => Ok(()),
} }
} }
#[cfg(test)]
fn breakpoint(&self, b: Breakpoint) {
// We send two messages. The first one will sync with the call
// to `handle.wait_until(b)`. The second one will block until the
// the next call to `handle.wait_until(..)`.
self.test_breakpoint_sdr.send((b, false)).unwrap();
// This one will only be able to be sent if the test handle stays alive.
// If it fails, then it means that we have exited the test.
// By crashing with `unwrap`, we kill the run loop.
self.test_breakpoint_sdr.send((b, true)).unwrap();
}
} }
#[cfg(test)] #[cfg(test)]
@ -771,8 +833,65 @@ mod tests {
use tempfile::TempDir; use tempfile::TempDir;
use uuid::Uuid; use uuid::Uuid;
use crate::snapshot::{snapshot_bitmap, snapshot_index_scheduler};
use super::*; use super::*;
use crate::snapshot::snapshot_index_scheduler;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FailureLocation {
InsideCreateBatch,
InsideProcessBatch,
PanicInsideProcessBatch,
AcquiringWtxn,
UpdatingTaskAfterProcessBatchSuccess { task_uid: u32 },
UpdatingTaskAfterProcessBatchFailure,
CommittingWtxn,
}
impl IndexScheduler {
pub fn test(
autobatching: bool,
planned_failures: Vec<(usize, FailureLocation)>,
) -> (Self, IndexSchedulerHandle) {
let tempdir = TempDir::new().unwrap();
let (sender, receiver) = crossbeam::channel::bounded(0);
let index_scheduler = Self::new(
tempdir.path().join("db_path"),
tempdir.path().join("file_store"),
tempdir.path().join("indexes"),
tempdir.path().join("dumps"),
1024 * 1024,
1024 * 1024,
IndexerConfig::default(),
autobatching, // enable autobatching
sender,
planned_failures,
)
.unwrap();
let index_scheduler_handle =
IndexSchedulerHandle { _tempdir: tempdir, test_breakpoint_rcv: receiver };
(index_scheduler, index_scheduler_handle)
}
/// Return a [`CorruptedTaskQueue`](Error::CorruptedTaskQueue) error if a failure is planned
/// for the given location and current run loop iteration.
pub fn maybe_fail(&self, location: FailureLocation) -> Result<()> {
if self.planned_failures.contains(&(*self.run_loop_iteration.read().unwrap(), location))
{
match location {
FailureLocation::PanicInsideProcessBatch => {
panic!("simulated panic")
}
_ => Err(Error::CorruptedTaskQueue),
}
} else {
Ok(())
}
}
}
/// Return a `KindWithContent::IndexCreation` task /// Return a `KindWithContent::IndexCreation` task
fn index_creation_task(index: &'static str, primary_key: &'static str) -> KindWithContent { fn index_creation_task(index: &'static str, primary_key: &'static str) -> KindWithContent {
@ -826,53 +945,22 @@ mod tests {
(file, documents_count) (file, documents_count)
} }
impl IndexScheduler {
pub fn test(autobatching: bool) -> (Self, IndexSchedulerHandle) {
let tempdir = TempDir::new().unwrap();
let (sender, receiver) = crossbeam::channel::bounded(0);
let index_scheduler = Self::new(
tempdir.path().join("db_path"),
tempdir.path().join("file_store"),
tempdir.path().join("indexes"),
tempdir.path().join("dumps"),
1024 * 1024,
1024 * 1024,
IndexerConfig::default(),
autobatching, // enable autobatching
sender,
)
.unwrap();
let index_scheduler_handle =
IndexSchedulerHandle { _tempdir: tempdir, test_breakpoint_rcv: receiver };
(index_scheduler, index_scheduler_handle)
}
}
pub struct IndexSchedulerHandle { pub struct IndexSchedulerHandle {
_tempdir: TempDir, _tempdir: TempDir,
test_breakpoint_rcv: crossbeam::channel::Receiver<Breakpoint>, test_breakpoint_rcv: crossbeam::channel::Receiver<(Breakpoint, bool)>,
} }
impl IndexSchedulerHandle { impl IndexSchedulerHandle {
/// Wait until the provided breakpoint is reached. /// Wait until the provided breakpoint is reached.
fn wait_till(&self, breakpoint: Breakpoint) { fn wait_till(&self, breakpoint: Breakpoint) {
self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint); self.test_breakpoint_rcv.iter().find(|b| *b == (breakpoint, false));
}
#[allow(unused)]
/// Wait until the provided breakpoint is reached.
fn next_breakpoint(&self) -> Breakpoint {
self.test_breakpoint_rcv.recv().unwrap()
} }
} }
#[test] #[test]
fn register() { fn register() {
// In this test, the handle doesn't make any progress, we only check that the tasks are registered // In this test, the handle doesn't make any progress, we only check that the tasks are registered
let (index_scheduler, _handle) = IndexScheduler::test(true); let (index_scheduler, _handle) = IndexScheduler::test(true, vec![]);
let kinds = [ let kinds = [
index_creation_task("catto", "mouse"), index_creation_task("catto", "mouse"),
@ -902,7 +990,7 @@ mod tests {
#[test] #[test]
fn insert_task_while_another_task_is_processing() { fn insert_task_while_another_task_is_processing() {
let (index_scheduler, handle) = IndexScheduler::test(true); let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
index_scheduler.register(index_creation_task("index_a", "id")).unwrap(); index_scheduler.register(index_creation_task("index_a", "id")).unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -926,7 +1014,7 @@ mod tests {
/// we send them very fast, we must make sure that they are all processed. /// we send them very fast, we must make sure that they are all processed.
#[test] #[test]
fn process_tasks_inserted_without_new_signal() { fn process_tasks_inserted_without_new_signal() {
let (index_scheduler, handle) = IndexScheduler::test(true); let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
index_scheduler index_scheduler
.register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None })
@ -965,7 +1053,7 @@ mod tests {
#[test] #[test]
fn process_tasks_without_autobatching() { fn process_tasks_without_autobatching() {
let (index_scheduler, handle) = IndexScheduler::test(false); let (index_scheduler, handle) = IndexScheduler::test(false, vec![]);
index_scheduler index_scheduler
.register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None })
@ -1010,7 +1098,7 @@ mod tests {
#[test] #[test]
fn task_deletion_undeleteable() { fn task_deletion_undeleteable() {
let (index_scheduler, handle) = IndexScheduler::test(true); let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0); let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1); let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1);
@ -1062,7 +1150,7 @@ mod tests {
#[test] #[test]
fn task_deletion_deleteable() { fn task_deletion_deleteable() {
let (index_scheduler, handle) = IndexScheduler::test(true); let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0); let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1); let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1);
@ -1104,7 +1192,7 @@ mod tests {
#[test] #[test]
fn task_deletion_delete_same_task_twice() { fn task_deletion_delete_same_task_twice() {
let (index_scheduler, handle) = IndexScheduler::test(true); let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0); let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1); let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1);
@ -1149,7 +1237,7 @@ mod tests {
#[test] #[test]
fn document_addition() { fn document_addition() {
let (index_scheduler, handle) = IndexScheduler::test(true); let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let content = r#" let content = r#"
{ {
@ -1190,7 +1278,7 @@ mod tests {
#[test] #[test]
fn document_addition_and_index_deletion() { fn document_addition_and_index_deletion() {
let (index_scheduler, handle) = IndexScheduler::test(true); let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let content = r#" let content = r#"
{ {
@ -1236,7 +1324,7 @@ mod tests {
#[test] #[test]
fn do_not_batch_task_of_different_indexes() { fn do_not_batch_task_of_different_indexes() {
let (index_scheduler, handle) = IndexScheduler::test(true); let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let index_names = ["doggos", "cattos", "girafos"]; let index_names = ["doggos", "cattos", "girafos"];
for name in index_names { for name in index_names {
@ -1274,7 +1362,7 @@ mod tests {
#[test] #[test]
fn swap_indexes() { fn swap_indexes() {
let (index_scheduler, handle) = IndexScheduler::test(true); let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let to_enqueue = [ let to_enqueue = [
index_creation_task("a", "id"), index_creation_task("a", "id"),
@ -1312,7 +1400,7 @@ mod tests {
#[test] #[test]
fn document_addition_and_index_deletion_on_unexisting_index() { fn document_addition_and_index_deletion_on_unexisting_index() {
let (index_scheduler, handle) = IndexScheduler::test(true); let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let content = r#" let content = r#"
{ {
@ -1357,6 +1445,164 @@ mod tests {
#[test] #[test]
fn simple_new() { fn simple_new() {
let (_index_scheduler, _handle) = crate::IndexScheduler::test(true); crate::IndexScheduler::test(true, vec![]);
}
#[test]
fn query_processing_tasks() {
let (index_scheduler, handle) =
IndexScheduler::test(true, vec![(1, FailureLocation::InsideCreateBatch)]);
let kind = index_creation_task("catto", "mouse");
let _task = index_scheduler.register(kind).unwrap();
handle.wait_till(Breakpoint::BatchCreated);
let query = Query { status: Some(vec![Status::Processing]), ..Default::default() };
let processing_tasks = index_scheduler.get_task_ids(&query).unwrap();
snapshot!(snapshot_bitmap(&processing_tasks), @"[0,]");
}
#[test]
fn fail_in_create_batch_for_index_creation() {
let (index_scheduler, handle) =
IndexScheduler::test(true, vec![(1, FailureLocation::InsideCreateBatch)]);
let kinds = [index_creation_task("catto", "mouse")];
for kind in kinds {
let _task = index_scheduler.register(kind).unwrap();
index_scheduler.assert_internally_consistent();
}
handle.wait_till(Breakpoint::BatchCreated);
// We skipped an iteration of `tick` to reach BatchCreated
assert_eq!(*index_scheduler.run_loop_iteration.read().unwrap(), 2);
// Otherwise nothing weird happened
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler));
}
#[test]
fn fail_in_process_batch_for_index_creation() {
let (index_scheduler, handle) =
IndexScheduler::test(true, vec![(1, FailureLocation::InsideProcessBatch)]);
let kind = index_creation_task("catto", "mouse");
let _task = index_scheduler.register(kind).unwrap();
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::AfterProcessing);
// Still in the first iteration
assert_eq!(*index_scheduler.run_loop_iteration.read().unwrap(), 1);
// No matter what happens in process_batch, the index_scheduler should be internally consistent
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed");
}
#[test]
fn fail_in_process_batch_for_document_addition() {
let (index_scheduler, handle) =
IndexScheduler::test(true, vec![(1, FailureLocation::InsideProcessBatch)]);
let content = r#"
{
"id": 1,
"doggo": "bob"
}"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count =
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
.unwrap() as u64;
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: Some(S("id")),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::BatchCreated);
snapshot!(
snapshot_index_scheduler(&index_scheduler),
name: "document_addition_batch_created"
);
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "document_addition_failed");
}
#[test]
fn fail_in_update_task_after_process_batch_success_for_document_addition() {
// TODO: this is not the correct behaviour, because we process the
// same tasks twice.
//
// I wonder whether the run loop should maybe be paused if we encounter a failure
// at that point. Alternatively, maybe we should preemptively mark the tasks
// as failed at the beginning of `IndexScheduler::tick`.
let (index_scheduler, handle) = IndexScheduler::test(
true,
vec![(1, FailureLocation::UpdatingTaskAfterProcessBatchSuccess { task_uid: 0 })],
);
let content = r#"
{
"id": 1,
"doggo": "bob"
}"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count =
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
.unwrap() as u64;
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: Some(S("id")),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::Start);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "document_addition_succeeded_but_index_scheduler_not_updated");
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_iteratino");
}
#[test]
fn panic_in_process_batch_for_index_creation() {
let (index_scheduler, handle) =
IndexScheduler::test(true, vec![(1, FailureLocation::PanicInsideProcessBatch)]);
let kind = index_creation_task("catto", "mouse");
let _task = index_scheduler.register(kind).unwrap();
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::AfterProcessing);
// Still in the first iteration
assert_eq!(*index_scheduler.run_loop_iteration.read().unwrap(), 1);
// No matter what happens in process_batch, the index_scheduler should be internally consistent
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed");
} }
} }

View File

@ -27,6 +27,8 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
wake_up: _, wake_up: _,
dumps_path: _, dumps_path: _,
test_breakpoint_sdr: _, test_breakpoint_sdr: _,
planned_failures: _,
run_loop_iteration: _,
} = scheduler; } = scheduler;
let rtxn = env.read_txn().unwrap(); let rtxn = env.read_txn().unwrap();
@ -78,7 +80,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
snap snap
} }
fn snapshot_file_store(file_store: &file_store::FileStore) -> String { pub fn snapshot_file_store(file_store: &file_store::FileStore) -> String {
let mut snap = String::new(); let mut snap = String::new();
for uuid in file_store.__all_uuids() { for uuid in file_store.__all_uuids() {
snap.push_str(&format!("{uuid}\n")); snap.push_str(&format!("{uuid}\n"));
@ -86,7 +88,7 @@ fn snapshot_file_store(file_store: &file_store::FileStore) -> String {
snap snap
} }
fn snapshot_bitmap(r: &RoaringBitmap) -> String { pub fn snapshot_bitmap(r: &RoaringBitmap) -> String {
let mut snap = String::new(); let mut snap = String::new();
snap.push('['); snap.push('[');
for x in r { for x in r {
@ -96,7 +98,7 @@ fn snapshot_bitmap(r: &RoaringBitmap) -> String {
snap snap
} }
fn snapshot_all_tasks(rtxn: &RoTxn, db: Database<OwnedType<BEU32>, SerdeJson<Task>>) -> String { pub fn snapshot_all_tasks(rtxn: &RoTxn, db: Database<OwnedType<BEU32>, SerdeJson<Task>>) -> String {
let mut snap = String::new(); let mut snap = String::new();
let iter = db.iter(rtxn).unwrap(); let iter = db.iter(rtxn).unwrap();
for next in iter { for next in iter {
@ -106,7 +108,7 @@ fn snapshot_all_tasks(rtxn: &RoTxn, db: Database<OwnedType<BEU32>, SerdeJson<Tas
snap snap
} }
fn snapshot_date_db( pub fn snapshot_date_db(
rtxn: &RoTxn, rtxn: &RoTxn,
db: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>, db: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
) -> String { ) -> String {
@ -119,7 +121,7 @@ fn snapshot_date_db(
snap snap
} }
fn snapshot_task(task: &Task) -> String { pub fn snapshot_task(task: &Task) -> String {
let mut snap = String::new(); let mut snap = String::new();
let Task { let Task {
uid, uid,
@ -191,7 +193,10 @@ fn snaphsot_details(d: &Details) -> String {
} }
} }
fn snapshot_status(rtxn: &RoTxn, db: Database<SerdeBincode<Status>, RoaringBitmapCodec>) -> String { pub fn snapshot_status(
rtxn: &RoTxn,
db: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
) -> String {
let mut snap = String::new(); let mut snap = String::new();
let iter = db.iter(rtxn).unwrap(); let iter = db.iter(rtxn).unwrap();
for next in iter { for next in iter {
@ -200,8 +205,7 @@ fn snapshot_status(rtxn: &RoTxn, db: Database<SerdeBincode<Status>, RoaringBitma
} }
snap snap
} }
pub fn snapshot_kind(rtxn: &RoTxn, db: Database<SerdeBincode<Kind>, RoaringBitmapCodec>) -> String {
fn snapshot_kind(rtxn: &RoTxn, db: Database<SerdeBincode<Kind>, RoaringBitmapCodec>) -> String {
let mut snap = String::new(); let mut snap = String::new();
let iter = db.iter(rtxn).unwrap(); let iter = db.iter(rtxn).unwrap();
for next in iter { for next in iter {
@ -212,7 +216,7 @@ fn snapshot_kind(rtxn: &RoTxn, db: Database<SerdeBincode<Kind>, RoaringBitmapCod
snap snap
} }
fn snapshot_index_tasks(rtxn: &RoTxn, db: Database<Str, RoaringBitmapCodec>) -> String { pub fn snapshot_index_tasks(rtxn: &RoTxn, db: Database<Str, RoaringBitmapCodec>) -> String {
let mut snap = String::new(); let mut snap = String::new();
let iter = db.iter(rtxn).unwrap(); let iter = db.iter(rtxn).unwrap();
for next in iter { for next in iter {
@ -222,7 +226,12 @@ fn snapshot_index_tasks(rtxn: &RoTxn, db: Database<Str, RoaringBitmapCodec>) ->
snap snap
} }
fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String { pub fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String {
let names = mapper.indexes(rtxn).unwrap().into_iter().map(|(n, _)| n).collect::<Vec<_>>(); let names = mapper
.indexes(rtxn)
.unwrap()
.into_iter()
.map(|(n, _)| n)
.collect::<Vec<_>>();
format!("{names:?}") format!("{names:?}")
} }

View File

@ -0,0 +1,33 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[0,]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:
enqueued [0,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,34 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[0,]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000
----------------------------------------------------------------------

View File

@ -0,0 +1,36 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "Corrupted task queue.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued []
failed [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,36 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "Corrupted task queue.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:
enqueued []
failed [0,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,34 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000
----------------------------------------------------------------------

View File

@ -0,0 +1,36 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,36 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "An unexpected crash occurred when processing the task", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:
enqueued []
failed [0,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -114,7 +114,19 @@ impl IndexScheduler {
} }
pub(crate) fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result<RoaringBitmap> { pub(crate) fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result<RoaringBitmap> {
Ok(self.status.get(rtxn, &status)?.unwrap_or_default()) match status {
Status::Processing => {
let tasks = self
.processing_tasks
.read()
.map_err(|_| Error::CorruptedTaskQueue)?
.processing
.clone();
Ok(tasks)
}
status => Ok(self.status.get(rtxn, &status)?.unwrap_or_default()),
}
} }
pub(crate) fn put_status( pub(crate) fn put_status(