Add index scheduler tests for task cancelation

This commit is contained in:
Loïc Lecrenier 2022-10-25 11:42:14 +02:00 committed by Clément Renault
parent 12669bf07c
commit d92425658e
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
13 changed files with 511 additions and 6 deletions

View file

@ -298,6 +298,7 @@ pub enum Breakpoint {
AbortedIndexation,
ProcessBatchSucceeded,
ProcessBatchFailed,
InsideProcessBatch,
}
impl IndexScheduler {
@ -1545,6 +1546,136 @@ mod tests {
snapshot!(snapshot_index_scheduler(&index_scheduler));
}
#[test]
fn cancel_enqueued_task() {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
file0.persist().unwrap();
let to_enqueue = [
replace_document_import_task("catto", None, 0, documents_count0),
KindWithContent::TaskCancelation {
query: "test_query".to_owned(),
tasks: RoaringBitmap::from_iter([0]),
},
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
index_scheduler.assert_internally_consistent();
}
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued");
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_processed");
}
#[test]
fn cancel_succeeded_task() {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
file0.persist().unwrap();
let _ = index_scheduler
.register(replace_document_import_task("catto", None, 0, documents_count0))
.unwrap();
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_task_processed");
index_scheduler
.register(KindWithContent::TaskCancelation {
query: "test_query".to_owned(),
tasks: RoaringBitmap::from_iter([0]),
})
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_processed");
}
#[test]
fn cancel_processing_task() {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
file0.persist().unwrap();
let _ = index_scheduler
.register(replace_document_import_task("catto", None, 0, documents_count0))
.unwrap();
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::InsideProcessBatch);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_task_processing");
index_scheduler
.register(KindWithContent::TaskCancelation {
query: "test_query".to_owned(),
tasks: RoaringBitmap::from_iter([0]),
})
.unwrap();
index_scheduler.assert_internally_consistent();
// Now we check that we can reach the AbortedIndexation error handling
handle.wait_till(Breakpoint::AbortedIndexation);
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_processed");
}
#[test]
fn cancel_mix_of_tasks() {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
file0.persist().unwrap();
let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1);
file1.persist().unwrap();
let (file2, documents_count2) = sample_documents(&index_scheduler, 2, 2);
file2.persist().unwrap();
let to_enqueue = [
replace_document_import_task("catto", None, 0, documents_count0),
replace_document_import_task("beavero", None, 1, documents_count1),
replace_document_import_task("wolfo", None, 2, documents_count2),
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
index_scheduler.assert_internally_consistent();
}
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_task_processed");
handle.wait_till(Breakpoint::InsideProcessBatch);
index_scheduler
.register(KindWithContent::TaskCancelation {
query: "test_query".to_owned(),
tasks: RoaringBitmap::from_iter([0, 1, 2]),
})
.unwrap();
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "processing_second_task_cancel_enqueued");
handle.wait_till(Breakpoint::AbortedIndexation);
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_processed");
}
#[macro_export]
macro_rules! debug_snapshot {
($value:expr, @$snapshot:literal) => {{