Implement POST /indexes-swap

This commit is contained in:
Loïc Lecrenier 2022-10-17 16:30:18 +02:00 committed by Clément Renault
parent 28bd8b6c6b
commit 17cd2a4aa0
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
18 changed files with 463 additions and 64 deletions

View file

@ -496,8 +496,7 @@ mod tests {
fn idx_swap() -> KindWithContent {
KindWithContent::IndexSwap {
lhs: String::from("doggo"),
rhs: String::from("catto"),
swaps: vec![(String::from("doggo"), String::from("catto"))],
}
}

View file

@ -21,7 +21,8 @@ use std::collections::HashSet;
use std::fs::File;
use std::io::BufWriter;
use crate::utils;
use crate::utils::{self, swap_index_uid_in_task};
use crate::Query;
use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId};
use dump::IndexMetadata;
@ -71,6 +72,9 @@ pub(crate) enum Batch {
index_uid: String,
tasks: Vec<Task>,
},
IndexSwap {
task: Task,
},
}
/// A [batch](Batch) that combines multiple tasks operating on an index.
@ -156,6 +160,7 @@ impl Batch {
..
} => tasks.iter().chain(other).map(|task| task.uid).collect(),
},
Batch::IndexSwap { task } => vec![task.uid],
}
}
}
@ -399,7 +404,10 @@ impl IndexScheduler {
index_uid,
tasks: self.get_existing_tasks(rtxn, ids)?,
})),
BatchKind::IndexSwap { id: _ } => todo!(),
BatchKind::IndexSwap { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
Ok(Some(Batch::IndexSwap { task }))
}
}
}
@ -564,7 +572,6 @@ impl IndexScheduler {
}
_ => unreachable!(),
}
wtxn.commit()?;
Ok(vec![task])
}
@ -679,7 +686,8 @@ impl IndexScheduler {
| IndexOperation::DocumentClear { ref index_uid, .. } => {
// only get the index, don't create it
let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, index_uid)?
let r = self.index_mapper.index(&rtxn, index_uid)?;
r
}
IndexOperation::DocumentImport { ref index_uid, allow_index_creation, .. }
| IndexOperation::Settings { ref index_uid, allow_index_creation, .. }
@ -693,7 +701,8 @@ impl IndexScheduler {
index
} else {
let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, index_uid)?
let r = self.index_mapper.index(&rtxn, index_uid)?;
r
}
}
};
@ -742,7 +751,6 @@ impl IndexScheduler {
)?;
index_wtxn.commit()?;
}
task.status = Status::Succeeded;
task.details = Some(Details::IndexInfo { primary_key });
@ -776,9 +784,69 @@ impl IndexScheduler {
Ok(tasks)
}
Batch::IndexSwap { mut task } => {
let mut wtxn = self.env.write_txn()?;
let swaps = if let KindWithContent::IndexSwap { swaps } = &task.kind {
swaps
} else {
unreachable!()
};
for (lhs, rhs) in swaps {
self.apply_index_swap(&mut wtxn, task.uid, lhs, rhs)?;
}
wtxn.commit()?;
task.status = Status::Succeeded;
Ok(vec![task])
}
}
}
/// Swap the index `lhs` with the index `rhs`.
fn apply_index_swap(&self, wtxn: &mut RwTxn, task_id: u32, lhs: &str, rhs: &str) -> Result<()> {
// 1. Verify that both lhs and rhs are existing indexes
let index_lhs_exists = self.index_mapper.index_exists(&wtxn, lhs)?;
if !index_lhs_exists {
return Err(Error::IndexNotFound(lhs.to_owned()));
}
let index_rhs_exists = self.index_mapper.index_exists(&wtxn, rhs)?;
if !index_rhs_exists {
return Err(Error::IndexNotFound(rhs.to_owned()));
}
// 2. Get the task set for index = name.
let mut index_lhs_task_ids =
self.get_task_ids(&Query::default().with_index(lhs.to_owned()))?;
index_lhs_task_ids.remove_range(task_id..);
let mut index_rhs_task_ids =
self.get_task_ids(&Query::default().with_index(rhs.to_owned()))?;
index_rhs_task_ids.remove_range(task_id..);
// 3. before_name -> new_name in the task's KindWithContent
for task_id in &index_lhs_task_ids | &index_rhs_task_ids {
let mut task = self
.get_task(&wtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
swap_index_uid_in_task(&mut task, (lhs, rhs));
self.all_tasks.put(wtxn, &BEU32::new(task_id), &task)?;
}
// 4. remove the task from indexuid = before_name
// 5. add the task to indexuid = after_name
self.update_index(wtxn, lhs, |lhs_tasks| {
*lhs_tasks -= &index_lhs_task_ids;
*lhs_tasks |= &index_rhs_task_ids;
})?;
self.update_index(wtxn, rhs, |lhs_tasks| {
*lhs_tasks -= &index_rhs_task_ids;
*lhs_tasks |= &index_lhs_task_ids;
})?;
// 6. Swap in the index mapper
self.index_mapper.swap(wtxn, lhs, rhs)?;
Ok(())
}
/// Process the index operation on the given index.
///
/// ## Return

View file

@ -93,7 +93,6 @@ impl IndexMapper {
assert!(self.index_mapping.delete(&mut wtxn, name)?);
wtxn.commit()?;
// We remove the index from the in-memory index map.
let mut lock = self.index_map.write().unwrap();
let closing_event = match lock.insert(uuid, BeingDeleted) {
@ -186,7 +185,7 @@ impl IndexMapper {
.collect()
}
/// Swap two index name.
/// Swap two index names.
pub fn swap(&self, wtxn: &mut RwTxn, lhs: &str, rhs: &str) -> Result<()> {
let lhs_uuid = self
.index_mapping
@ -203,6 +202,10 @@ impl IndexMapper {
Ok(())
}
pub fn index_exists(&self, rtxn: &RoTxn, name: &str) -> Result<bool> {
Ok(self.index_mapping.get(rtxn, name)?.is_some())
}
pub fn indexer_config(&self) -> &IndexerConfig {
&self.indexer_config
}

View file

@ -412,6 +412,7 @@ impl IndexScheduler {
}
tasks &= index_tasks;
}
keep_tasks_within_datetimes(
&rtxn,
&mut tasks,
@ -436,7 +437,6 @@ impl IndexScheduler {
query.before_finished_at,
)?;
rtxn.commit().unwrap();
Ok(tasks)
}
@ -622,7 +622,7 @@ impl IndexScheduler {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexSwap { lhs, rhs } => KindWithContent::IndexSwap { lhs, rhs },
KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps },
KindDump::TaskCancelation { query, tasks } => {
KindWithContent::TaskCancelation { query, tasks }
}
@ -718,8 +718,7 @@ impl IndexScheduler {
Some(batch) => batch,
None => return Ok(0),
};
// we don't need this transaction any longer.
drop(rtxn);
drop(rtxn); //rtxn.commit()?;
// 1. store the starting date with the bitmap of processing tasks.
let mut ids = batch.ids();
@ -748,6 +747,7 @@ impl IndexScheduler {
// 2. Process the tasks
let res = self.process_batch(batch);
let mut wtxn = self.env.write_txn()?;
let finished_at = OffsetDateTime::now_utc();
match res {
Ok(tasks) => {
@ -933,10 +933,6 @@ mod tests {
let kinds = [
index_creation_task("catto", "mouse"),
replace_document_import_task("catto", None, 0, 12),
KindWithContent::TaskCancelation {
query: format!("uid=0,1"),
tasks: RoaringBitmap::from_iter([0, 1]),
},
replace_document_import_task("catto", None, 1, 50),
replace_document_import_task("doggo", Some("bone"), 2, 5000),
];
@ -956,13 +952,17 @@ mod tests {
fn insert_task_while_another_task_is_processing() {
let (index_scheduler, handle) = IndexScheduler::test(true);
index_scheduler.register(KindWithContent::Snapshot).unwrap();
index_scheduler
.register(index_creation_task("index_a", "id"))
.unwrap();
handle.wait_till(Breakpoint::BatchCreated);
// while the task is processing can we register another task?
index_scheduler.register(KindWithContent::Snapshot).unwrap();
index_scheduler
.register(index_creation_task("index_b", "id"))
.unwrap();
index_scheduler
.register(KindWithContent::IndexDeletion {
index_uid: S("doggos"),
index_uid: S("index_a"),
})
.unwrap();
@ -1284,6 +1284,49 @@ mod tests {
assert_eq!(tasks[5].status, Status::Succeeded);
}
#[test]
fn swap_indexes() {
let (index_scheduler, handle) = IndexScheduler::test(true);
let to_enqueue = [
index_creation_task("a", "id"),
index_creation_task("b", "id"),
index_creation_task("c", "id"),
index_creation_task("d", "id"),
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
}
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_processed");
index_scheduler
.register(KindWithContent::IndexSwap {
swaps: vec![
("a".to_owned(), "b".to_owned()),
("c".to_owned(), "d".to_owned()),
],
})
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_swap_processed");
index_scheduler
.register(KindWithContent::IndexSwap {
swaps: vec![("a".to_owned(), "c".to_owned())],
})
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed");
}
#[macro_export]
macro_rules! debug_snapshot {
($value:expr, @$snapshot:literal) => {{
@ -1294,6 +1337,6 @@ mod tests {
#[test]
fn simple_new() {
crate::IndexScheduler::test(true);
let (_index_scheduler, _handle) = crate::IndexScheduler::test(true);
}
}

View file

@ -191,6 +191,10 @@ fn snaphsot_details(d: &Details) -> String {
Details::Dump { dump_uid } => {
format!("{{ dump_uid: {dump_uid:?} }}")
},
Details::IndexSwap { swaps } => {
format!("{{ indexes: {swaps:?} }}")
},
}
}

View file

@ -6,19 +6,20 @@ source: index-scheduler/src/lib.rs
[0,]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, kind: Snapshot}
1 {uid: 1, status: enqueued, kind: Snapshot}
2 {uid: 2, status: enqueued, kind: IndexDeletion { index_uid: "doggos" }}
0 {uid: 0, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_a", primary_key: Some("id") }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_b", primary_key: Some("id") }}
2 {uid: 2, status: enqueued, kind: IndexDeletion { index_uid: "index_a" }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,2,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,]
"indexDeletion" [2,]
"snapshot" [0,1,]
----------------------------------------------------------------------
### Index Tasks:
doggos [2,]
index_a [0,2,]
index_b [1,]
----------------------------------------------------------------------
### Index Mapper:
[]

View file

@ -8,21 +8,19 @@ source: index-scheduler/src/lib.rs
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, status: enqueued, details: { received_documents: 12, indexed_documents: None }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 12, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { matched_tasks: 2, canceled_tasks: None, original_query: "uid=0,1" }, kind: TaskCancelation { query: "uid=0,1", tasks: RoaringBitmap<[0, 1]> }}
3 {uid: 3, status: enqueued, details: { received_documents: 50, indexed_documents: None }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 50, allow_index_creation: true }}
4 {uid: 4, status: enqueued, details: { received_documents: 5000, indexed_documents: None }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 5000, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { received_documents: 50, indexed_documents: None }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 50, allow_index_creation: true }}
3 {uid: 3, status: enqueued, details: { received_documents: 5000, indexed_documents: None }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 5000, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,2,3,4,]
enqueued [0,1,2,3,]
----------------------------------------------------------------------
### Kind:
"documentImport" [1,3,4,]
"documentImport" [1,2,3,]
"indexCreation" [0,]
"taskCancelation" [2,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,1,3,]
doggo [4,]
catto [0,1,2,]
doggo [3,]
----------------------------------------------------------------------
### Index Mapper:
[]
@ -32,7 +30,6 @@ doggo [4,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------

View file

@ -0,0 +1,56 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "b", primary_key: Some("id") }}
1 {uid: 1, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "a", primary_key: Some("id") }}
2 {uid: 2, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "d", primary_key: Some("id") }}
3 {uid: 3, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "c", primary_key: Some("id") }}
4 {uid: 4, status: succeeded, details: { indexes: [("a", "b"), ("c", "d")] }, kind: IndexSwap { swaps: [("a", "b"), ("c", "d")] }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,2,3,4,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,2,3,]
"indexSwap" [4,]
----------------------------------------------------------------------
### Index Tasks:
a [1,4,]
b [0,4,]
c [3,4,]
d [2,4,]
----------------------------------------------------------------------
### Index Mapper:
["a", "b", "c", "d"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View file

@ -0,0 +1,51 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "a", primary_key: Some("id") }}
1 {uid: 1, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "b", primary_key: Some("id") }}
2 {uid: 2, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "c", primary_key: Some("id") }}
3 {uid: 3, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "d", primary_key: Some("id") }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,2,3,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,2,3,]
----------------------------------------------------------------------
### Index Tasks:
a [0,]
b [1,]
c [2,]
d [3,]
----------------------------------------------------------------------
### Index Mapper:
["a", "b", "c", "d"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View file

@ -0,0 +1,60 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "b", primary_key: Some("id") }}
1 {uid: 1, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "c", primary_key: Some("id") }}
2 {uid: 2, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "d", primary_key: Some("id") }}
3 {uid: 3, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "a", primary_key: Some("id") }}
4 {uid: 4, status: succeeded, details: { indexes: [("a", "b"), ("c", "d")] }, kind: IndexSwap { swaps: [("c", "b"), ("a", "d")] }}
5 {uid: 5, status: succeeded, details: { indexes: [("a", "c")] }, kind: IndexSwap { swaps: [("a", "c")] }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,2,3,4,5,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,2,3,]
"indexSwap" [4,5,]
----------------------------------------------------------------------
### Index Tasks:
a [3,4,5,]
b [0,4,]
c [1,4,5,]
d [2,4,]
----------------------------------------------------------------------
### Index Mapper:
["a", "b", "c", "d"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View file

@ -10,7 +10,7 @@ use roaring::{MultiOps, RoaringBitmap};
use time::OffsetDateTime;
use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128};
use meilisearch_types::tasks::{Kind, Status};
use meilisearch_types::tasks::{Kind, KindWithContent, Status};
impl IndexScheduler {
pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result<RoaringBitmap> {
@ -247,3 +247,35 @@ fn map_bound<T, U>(bound: Bound<T>, map: impl FnOnce(T) -> U) -> Bound<U> {
Bound::Unbounded => Bound::Unbounded,
}
}
pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
use KindWithContent as K;
let mut index_uids = vec![];
match &mut task.kind {
K::DocumentImport { index_uid, .. } => index_uids.push(index_uid),
K::DocumentDeletion { index_uid, .. } => index_uids.push(index_uid),
K::DocumentClear { index_uid } => index_uids.push(index_uid),
K::Settings { index_uid, .. } => index_uids.push(index_uid),
K::IndexDeletion { index_uid } => index_uids.push(index_uid),
K::IndexCreation { index_uid, .. } => index_uids.push(index_uid),
K::IndexUpdate { index_uid, .. } => index_uids.push(index_uid),
K::IndexSwap { swaps } => {
for (lhs, rhs) in swaps.iter_mut() {
if lhs == &swap.0 || lhs == &swap.1 {
index_uids.push(lhs);
}
if rhs == &swap.0 || rhs == &swap.1 {
index_uids.push(rhs);
}
}
}
K::TaskCancelation { .. } | K::TaskDeletion { .. } | K::DumpExport { .. } | K::Snapshot => (),
};
for index_uid in index_uids {
if index_uid == &swap.0 {
*index_uid = swap.1.to_owned();
} else if index_uid == &swap.1 {
*index_uid = swap.0.to_owned();
}
}
}