Move index swap error handling from meilisearch-http to index-scheduler

And make index_not_found error asynchronous, since we can't know
whether the index will exist by the time the index swap task is
processed.

Improve the index-swap test to verify that future tasks are not swapped
and to test the new error messages that were introduced.
This commit is contained in:
Loïc Lecrenier 2022-10-27 09:41:32 +02:00
parent b44cc62320
commit 78ffa00f98
11 changed files with 305 additions and 63 deletions

View file

@ -54,6 +54,7 @@ use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, m
use uuid::Uuid;
use crate::index_mapper::IndexMapper;
use crate::utils::check_index_swap_validity;
pub(crate) type BEI128 =
meilisearch_types::heed::zerocopy::I128<meilisearch_types::heed::byteorder::BE>;
@ -609,6 +610,10 @@ impl IndexScheduler {
// For deletion and cancelation tasks, we want to make extra sure that they
// don't attempt to delete/cancel tasks that are newer than themselves.
filter_out_references_to_newer_tasks(&mut task);
// If the register task is an index swap task, verify that it is well-formed
// (that it does not contain duplicate indexes).
check_index_swap_validity(&task)?;
// Get rid of the mutability.
let task = task;
@ -988,7 +993,7 @@ mod tests {
autobatching_enabled: bool,
planned_failures: Vec<(usize, FailureLocation)>,
) -> (Self, IndexSchedulerHandle) {
let tempdir = TempDir::new().unwrap();
let tempdir = TempDir::new_in(".").unwrap();
let (sender, receiver) = crossbeam::channel::bounded(0);
let options = IndexSchedulerOptions {
@ -1543,17 +1548,17 @@ mod tests {
})
.unwrap();
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_swap_processed");
index_scheduler
.register(KindWithContent::IndexSwap {
swaps: vec![IndexSwap { indexes: ("a".to_owned(), "c".to_owned()) }],
})
.unwrap();
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "two_swaps_registered");
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_swap_processed");
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
@ -1564,6 +1569,57 @@ mod tests {
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_empty_swap_processed");
}
#[test]
fn swap_indexes_errors() {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let to_enqueue = [
index_creation_task("a", "id"),
index_creation_task("b", "id"),
index_creation_task("c", "id"),
index_creation_task("d", "id"),
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
index_scheduler.assert_internally_consistent();
}
handle.advance_n_batch(4);
index_scheduler.assert_internally_consistent();
let first_snap = snapshot_index_scheduler(&index_scheduler);
snapshot!(first_snap, name: "initial_tasks_processed");
let err = index_scheduler
.register(KindWithContent::IndexSwap {
swaps: vec![
IndexSwap { indexes: ("a".to_owned(), "b".to_owned()) },
IndexSwap { indexes: ("b".to_owned(), "a".to_owned()) },
],
})
.unwrap_err();
snapshot!(format!("{err}"), @"Indexes must be declared only once during a swap. `a`, `b` were specified several times.");
index_scheduler.assert_internally_consistent();
let second_snap = snapshot_index_scheduler(&index_scheduler);
assert_eq!(first_snap, second_snap);
// Index `e` does not exist, but we don't check its existence yet
index_scheduler
.register(KindWithContent::IndexSwap {
swaps: vec![
IndexSwap { indexes: ("a".to_owned(), "b".to_owned()) },
IndexSwap { indexes: ("c".to_owned(), "e".to_owned()) },
IndexSwap { indexes: ("d".to_owned(), "f".to_owned()) },
],
})
.unwrap();
handle.advance_n_batch(1);
// Now the first swap should have an error message saying `e` and `f` do not exist
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_swap_failed");
}
#[test]
fn document_addition_and_index_deletion_on_unexisting_index() {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);