Implement POST /indexes-swap

This commit is contained in:
Loïc Lecrenier 2022-10-17 16:30:18 +02:00 committed by Clément Renault
parent bdb3702510
commit 14a44776f6
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
18 changed files with 463 additions and 64 deletions

View file

@ -412,6 +412,7 @@ impl IndexScheduler {
}
tasks &= index_tasks;
}
keep_tasks_within_datetimes(
&rtxn,
&mut tasks,
@ -436,7 +437,6 @@ impl IndexScheduler {
query.before_finished_at,
)?;
rtxn.commit().unwrap();
Ok(tasks)
}
@ -622,7 +622,7 @@ impl IndexScheduler {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexSwap { lhs, rhs } => KindWithContent::IndexSwap { lhs, rhs },
KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps },
KindDump::TaskCancelation { query, tasks } => {
KindWithContent::TaskCancelation { query, tasks }
}
@ -718,8 +718,7 @@ impl IndexScheduler {
Some(batch) => batch,
None => return Ok(0),
};
// we don't need this transaction any longer.
drop(rtxn);
drop(rtxn); //rtxn.commit()?;
// 1. store the starting date with the bitmap of processing tasks.
let mut ids = batch.ids();
@ -748,6 +747,7 @@ impl IndexScheduler {
// 2. Process the tasks
let res = self.process_batch(batch);
let mut wtxn = self.env.write_txn()?;
let finished_at = OffsetDateTime::now_utc();
match res {
Ok(tasks) => {
@ -933,10 +933,6 @@ mod tests {
let kinds = [
index_creation_task("catto", "mouse"),
replace_document_import_task("catto", None, 0, 12),
KindWithContent::TaskCancelation {
query: format!("uid=0,1"),
tasks: RoaringBitmap::from_iter([0, 1]),
},
replace_document_import_task("catto", None, 1, 50),
replace_document_import_task("doggo", Some("bone"), 2, 5000),
];
@ -956,13 +952,17 @@ mod tests {
fn insert_task_while_another_task_is_processing() {
let (index_scheduler, handle) = IndexScheduler::test(true);
index_scheduler.register(KindWithContent::Snapshot).unwrap();
index_scheduler
.register(index_creation_task("index_a", "id"))
.unwrap();
handle.wait_till(Breakpoint::BatchCreated);
// while the task is processing can we register another task?
index_scheduler.register(KindWithContent::Snapshot).unwrap();
index_scheduler
.register(index_creation_task("index_b", "id"))
.unwrap();
index_scheduler
.register(KindWithContent::IndexDeletion {
index_uid: S("doggos"),
index_uid: S("index_a"),
})
.unwrap();
@ -1284,6 +1284,49 @@ mod tests {
assert_eq!(tasks[5].status, Status::Succeeded);
}
#[test]
fn swap_indexes() {
let (index_scheduler, handle) = IndexScheduler::test(true);
let to_enqueue = [
index_creation_task("a", "id"),
index_creation_task("b", "id"),
index_creation_task("c", "id"),
index_creation_task("d", "id"),
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
}
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_processed");
index_scheduler
.register(KindWithContent::IndexSwap {
swaps: vec![
("a".to_owned(), "b".to_owned()),
("c".to_owned(), "d".to_owned()),
],
})
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_swap_processed");
index_scheduler
.register(KindWithContent::IndexSwap {
swaps: vec![("a".to_owned(), "c".to_owned())],
})
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed");
}
#[macro_export]
macro_rules! debug_snapshot {
($value:expr, @$snapshot:literal) => {{
@ -1294,6 +1337,6 @@ mod tests {
#[test]
fn simple_new() {
crate::IndexScheduler::test(true);
let (_index_scheduler, _handle) = crate::IndexScheduler::test(true);
}
}