diff --git a/dump/src/lib.rs b/dump/src/lib.rs index dd3b90cad..ee0839ed4 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -113,8 +113,7 @@ pub enum KindDump { primary_key: Option, }, IndexSwap { - lhs: String, - rhs: String, + swaps: Vec<(String, String)>, }, TaskCancelation { query: String, @@ -185,7 +184,7 @@ impl From for KindDump { KindWithContent::IndexUpdate { primary_key, .. } => { KindDump::IndexUpdate { primary_key } } - KindWithContent::IndexSwap { lhs, rhs } => KindDump::IndexSwap { lhs, rhs }, + KindWithContent::IndexSwap { swaps } => KindDump::IndexSwap { swaps }, KindWithContent::TaskCancelation { query, tasks } => { KindDump::TaskCancelation { query, tasks } } diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 880dc1197..edd780965 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -496,8 +496,7 @@ mod tests { fn idx_swap() -> KindWithContent { KindWithContent::IndexSwap { - lhs: String::from("doggo"), - rhs: String::from("catto"), + swaps: vec![(String::from("doggo"), String::from("catto"))], } } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index e5c16b0b2..945f47eef 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -21,7 +21,8 @@ use std::collections::HashSet; use std::fs::File; use std::io::BufWriter; -use crate::utils; +use crate::utils::{self, swap_index_uid_in_task}; +use crate::Query; use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId}; use dump::IndexMetadata; @@ -71,6 +72,9 @@ pub(crate) enum Batch { index_uid: String, tasks: Vec, }, + IndexSwap { + task: Task, + }, } /// A [batch](Batch) that combines multiple tasks operating on an index. @@ -156,6 +160,7 @@ impl Batch { .. } => tasks.iter().chain(other).map(|task| task.uid).collect(), }, + Batch::IndexSwap { task } => vec![task.uid], } } } @@ -399,7 +404,10 @@ impl IndexScheduler { index_uid, tasks: self.get_existing_tasks(rtxn, ids)?, })), - BatchKind::IndexSwap { id: _ } => todo!(), + BatchKind::IndexSwap { id } => { + let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; + Ok(Some(Batch::IndexSwap { task })) + } } } @@ -564,7 +572,6 @@ impl IndexScheduler { } _ => unreachable!(), } - wtxn.commit()?; Ok(vec![task]) } @@ -679,7 +686,8 @@ impl IndexScheduler { | IndexOperation::DocumentClear { ref index_uid, .. } => { // only get the index, don't create it let rtxn = self.env.read_txn()?; - self.index_mapper.index(&rtxn, index_uid)? + let r = self.index_mapper.index(&rtxn, index_uid)?; + r } IndexOperation::DocumentImport { ref index_uid, allow_index_creation, .. } | IndexOperation::Settings { ref index_uid, allow_index_creation, .. } @@ -693,7 +701,8 @@ impl IndexScheduler { index } else { let rtxn = self.env.read_txn()?; - self.index_mapper.index(&rtxn, index_uid)? + let r = self.index_mapper.index(&rtxn, index_uid)?; + r } } }; @@ -742,7 +751,6 @@ impl IndexScheduler { )?; index_wtxn.commit()?; } - task.status = Status::Succeeded; task.details = Some(Details::IndexInfo { primary_key }); @@ -776,9 +784,69 @@ impl IndexScheduler { Ok(tasks) } + Batch::IndexSwap { mut task } => { + let mut wtxn = self.env.write_txn()?; + let swaps = if let KindWithContent::IndexSwap { swaps } = &task.kind { + swaps + } else { + unreachable!() + }; + for (lhs, rhs) in swaps { + self.apply_index_swap(&mut wtxn, task.uid, lhs, rhs)?; + } + wtxn.commit()?; + task.status = Status::Succeeded; + Ok(vec![task]) + } } } + /// Swap the index `lhs` with the index `rhs`. + fn apply_index_swap(&self, wtxn: &mut RwTxn, task_id: u32, lhs: &str, rhs: &str) -> Result<()> { + // 1. Verify that both lhs and rhs are existing indexes + let index_lhs_exists = self.index_mapper.index_exists(&wtxn, lhs)?; + if !index_lhs_exists { + return Err(Error::IndexNotFound(lhs.to_owned())); + } + let index_rhs_exists = self.index_mapper.index_exists(&wtxn, rhs)?; + if !index_rhs_exists { + return Err(Error::IndexNotFound(rhs.to_owned())); + } + + // 2. Get the task set for index = name. + let mut index_lhs_task_ids = + self.get_task_ids(&Query::default().with_index(lhs.to_owned()))?; + index_lhs_task_ids.remove_range(task_id..); + let mut index_rhs_task_ids = + self.get_task_ids(&Query::default().with_index(rhs.to_owned()))?; + index_rhs_task_ids.remove_range(task_id..); + + // 3. before_name -> new_name in the task's KindWithContent + for task_id in &index_lhs_task_ids | &index_rhs_task_ids { + let mut task = self + .get_task(&wtxn, task_id)? + .ok_or(Error::CorruptedTaskQueue)?; + swap_index_uid_in_task(&mut task, (lhs, rhs)); + self.all_tasks.put(wtxn, &BEU32::new(task_id), &task)?; + } + + // 4. remove the task from indexuid = before_name + // 5. add the task to indexuid = after_name + self.update_index(wtxn, lhs, |lhs_tasks| { + *lhs_tasks -= &index_lhs_task_ids; + *lhs_tasks |= &index_rhs_task_ids; + })?; + self.update_index(wtxn, rhs, |lhs_tasks| { + *lhs_tasks -= &index_rhs_task_ids; + *lhs_tasks |= &index_lhs_task_ids; + })?; + + // 6. Swap in the index mapper + self.index_mapper.swap(wtxn, lhs, rhs)?; + + Ok(()) + } + /// Process the index operation on the given index. /// /// ## Return diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 0e80213c1..ef0552c9f 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -93,7 +93,6 @@ impl IndexMapper { assert!(self.index_mapping.delete(&mut wtxn, name)?); wtxn.commit()?; - // We remove the index from the in-memory index map. let mut lock = self.index_map.write().unwrap(); let closing_event = match lock.insert(uuid, BeingDeleted) { @@ -186,7 +185,7 @@ impl IndexMapper { .collect() } - /// Swap two index name. + /// Swap two index names. pub fn swap(&self, wtxn: &mut RwTxn, lhs: &str, rhs: &str) -> Result<()> { let lhs_uuid = self .index_mapping @@ -203,6 +202,10 @@ impl IndexMapper { Ok(()) } + pub fn index_exists(&self, rtxn: &RoTxn, name: &str) -> Result { + Ok(self.index_mapping.get(rtxn, name)?.is_some()) + } + pub fn indexer_config(&self) -> &IndexerConfig { &self.indexer_config } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 42a69ed2b..42a5f38b7 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -412,6 +412,7 @@ impl IndexScheduler { } tasks &= index_tasks; } + keep_tasks_within_datetimes( &rtxn, &mut tasks, @@ -436,7 +437,6 @@ impl IndexScheduler { query.before_finished_at, )?; - rtxn.commit().unwrap(); Ok(tasks) } @@ -622,7 +622,7 @@ impl IndexScheduler { index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, primary_key, }, - KindDump::IndexSwap { lhs, rhs } => KindWithContent::IndexSwap { lhs, rhs }, + KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps }, KindDump::TaskCancelation { query, tasks } => { KindWithContent::TaskCancelation { query, tasks } } @@ -718,8 +718,7 @@ impl IndexScheduler { Some(batch) => batch, None => return Ok(0), }; - // we don't need this transaction any longer. - drop(rtxn); + drop(rtxn); //rtxn.commit()?; // 1. store the starting date with the bitmap of processing tasks. let mut ids = batch.ids(); @@ -748,6 +747,7 @@ impl IndexScheduler { // 2. Process the tasks let res = self.process_batch(batch); let mut wtxn = self.env.write_txn()?; + let finished_at = OffsetDateTime::now_utc(); match res { Ok(tasks) => { @@ -933,10 +933,6 @@ mod tests { let kinds = [ index_creation_task("catto", "mouse"), replace_document_import_task("catto", None, 0, 12), - KindWithContent::TaskCancelation { - query: format!("uid=0,1"), - tasks: RoaringBitmap::from_iter([0, 1]), - }, replace_document_import_task("catto", None, 1, 50), replace_document_import_task("doggo", Some("bone"), 2, 5000), ]; @@ -956,13 +952,17 @@ mod tests { fn insert_task_while_another_task_is_processing() { let (index_scheduler, handle) = IndexScheduler::test(true); - index_scheduler.register(KindWithContent::Snapshot).unwrap(); + index_scheduler + .register(index_creation_task("index_a", "id")) + .unwrap(); handle.wait_till(Breakpoint::BatchCreated); // while the task is processing can we register another task? - index_scheduler.register(KindWithContent::Snapshot).unwrap(); + index_scheduler + .register(index_creation_task("index_b", "id")) + .unwrap(); index_scheduler .register(KindWithContent::IndexDeletion { - index_uid: S("doggos"), + index_uid: S("index_a"), }) .unwrap(); @@ -1284,6 +1284,49 @@ mod tests { assert_eq!(tasks[5].status, Status::Succeeded); } + #[test] + fn swap_indexes() { + let (index_scheduler, handle) = IndexScheduler::test(true); + + let to_enqueue = [ + index_creation_task("a", "id"), + index_creation_task("b", "id"), + index_creation_task("c", "id"), + index_creation_task("d", "id"), + ]; + + for task in to_enqueue { + let _ = index_scheduler.register(task).unwrap(); + } + + handle.wait_till(Breakpoint::AfterProcessing); + handle.wait_till(Breakpoint::AfterProcessing); + handle.wait_till(Breakpoint::AfterProcessing); + handle.wait_till(Breakpoint::AfterProcessing); + + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_processed"); + + index_scheduler + .register(KindWithContent::IndexSwap { + swaps: vec![ + ("a".to_owned(), "b".to_owned()), + ("c".to_owned(), "d".to_owned()), + ], + }) + .unwrap(); + + handle.wait_till(Breakpoint::AfterProcessing); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_swap_processed"); + + index_scheduler + .register(KindWithContent::IndexSwap { + swaps: vec![("a".to_owned(), "c".to_owned())], + }) + .unwrap(); + handle.wait_till(Breakpoint::AfterProcessing); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed"); + } + #[macro_export] macro_rules! debug_snapshot { ($value:expr, @$snapshot:literal) => {{ @@ -1294,6 +1337,6 @@ mod tests { #[test] fn simple_new() { - crate::IndexScheduler::test(true); + let (_index_scheduler, _handle) = crate::IndexScheduler::test(true); } } diff --git a/index-scheduler/src/snapshot.rs b/index-scheduler/src/snapshot.rs index d10a5c331..13ad68631 100644 --- a/index-scheduler/src/snapshot.rs +++ b/index-scheduler/src/snapshot.rs @@ -191,6 +191,10 @@ fn snaphsot_details(d: &Details) -> String { Details::Dump { dump_uid } => { format!("{{ dump_uid: {dump_uid:?} }}") }, + Details::IndexSwap { swaps } => { + format!("{{ indexes: {swaps:?} }}") + }, + } } diff --git a/index-scheduler/src/snapshots/lib.rs/insert_task_while_another_task_is_processing/1.snap b/index-scheduler/src/snapshots/lib.rs/insert_task_while_another_task_is_processing/1.snap index 4410dbcad..ddac65249 100644 --- a/index-scheduler/src/snapshots/lib.rs/insert_task_while_another_task_is_processing/1.snap +++ b/index-scheduler/src/snapshots/lib.rs/insert_task_while_another_task_is_processing/1.snap @@ -6,19 +6,20 @@ source: index-scheduler/src/lib.rs [0,] ---------------------------------------------------------------------- ### All Tasks: -0 {uid: 0, status: enqueued, kind: Snapshot} -1 {uid: 1, status: enqueued, kind: Snapshot} -2 {uid: 2, status: enqueued, kind: IndexDeletion { index_uid: "doggos" }} +0 {uid: 0, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_a", primary_key: Some("id") }} +1 {uid: 1, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_b", primary_key: Some("id") }} +2 {uid: 2, status: enqueued, kind: IndexDeletion { index_uid: "index_a" }} ---------------------------------------------------------------------- ### Status: enqueued [0,1,2,] ---------------------------------------------------------------------- ### Kind: +"indexCreation" [0,1,] "indexDeletion" [2,] -"snapshot" [0,1,] ---------------------------------------------------------------------- ### Index Tasks: -doggos [2,] +index_a [0,2,] +index_b [1,] ---------------------------------------------------------------------- ### Index Mapper: [] diff --git a/index-scheduler/src/snapshots/lib.rs/register/1.snap b/index-scheduler/src/snapshots/lib.rs/register/1.snap index 894a440d6..929ee5609 100644 --- a/index-scheduler/src/snapshots/lib.rs/register/1.snap +++ b/index-scheduler/src/snapshots/lib.rs/register/1.snap @@ -8,21 +8,19 @@ source: index-scheduler/src/lib.rs ### All Tasks: 0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} 1 {uid: 1, status: enqueued, details: { received_documents: 12, indexed_documents: None }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 12, allow_index_creation: true }} -2 {uid: 2, status: enqueued, details: { matched_tasks: 2, canceled_tasks: None, original_query: "uid=0,1" }, kind: TaskCancelation { query: "uid=0,1", tasks: RoaringBitmap<[0, 1]> }} -3 {uid: 3, status: enqueued, details: { received_documents: 50, indexed_documents: None }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 50, allow_index_creation: true }} -4 {uid: 4, status: enqueued, details: { received_documents: 5000, indexed_documents: None }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 5000, allow_index_creation: true }} +2 {uid: 2, status: enqueued, details: { received_documents: 50, indexed_documents: None }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 50, allow_index_creation: true }} +3 {uid: 3, status: enqueued, details: { received_documents: 5000, indexed_documents: None }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 5000, allow_index_creation: true }} ---------------------------------------------------------------------- ### Status: -enqueued [0,1,2,3,4,] +enqueued [0,1,2,3,] ---------------------------------------------------------------------- ### Kind: -"documentImport" [1,3,4,] +"documentImport" [1,2,3,] "indexCreation" [0,] -"taskCancelation" [2,] ---------------------------------------------------------------------- ### Index Tasks: -catto [0,1,3,] -doggo [4,] +catto [0,1,2,] +doggo [3,] ---------------------------------------------------------------------- ### Index Mapper: [] @@ -32,7 +30,6 @@ doggo [4,] [timestamp] [1,] [timestamp] [2,] [timestamp] [3,] -[timestamp] [4,] ---------------------------------------------------------------------- ### Started At: ---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/swap_indexes/first_swap_processed.snap b/index-scheduler/src/snapshots/lib.rs/swap_indexes/first_swap_processed.snap new file mode 100644 index 000000000..6744367c3 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/swap_indexes/first_swap_processed.snap @@ -0,0 +1,56 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "b", primary_key: Some("id") }} +1 {uid: 1, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "a", primary_key: Some("id") }} +2 {uid: 2, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "d", primary_key: Some("id") }} +3 {uid: 3, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "c", primary_key: Some("id") }} +4 {uid: 4, status: succeeded, details: { indexes: [("a", "b"), ("c", "d")] }, kind: IndexSwap { swaps: [("a", "b"), ("c", "d")] }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,2,3,4,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,2,3,] +"indexSwap" [4,] +---------------------------------------------------------------------- +### Index Tasks: +a [1,4,] +b [0,4,] +c [3,4,] +d [2,4,] +---------------------------------------------------------------------- +### Index Mapper: +["a", "b", "c", "d"] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/swap_indexes/initial_tasks_processed.snap b/index-scheduler/src/snapshots/lib.rs/swap_indexes/initial_tasks_processed.snap new file mode 100644 index 000000000..073f280f3 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/swap_indexes/initial_tasks_processed.snap @@ -0,0 +1,51 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "a", primary_key: Some("id") }} +1 {uid: 1, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "b", primary_key: Some("id") }} +2 {uid: 2, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "c", primary_key: Some("id") }} +3 {uid: 3, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "d", primary_key: Some("id") }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,2,3,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,2,3,] +---------------------------------------------------------------------- +### Index Tasks: +a [0,] +b [1,] +c [2,] +d [3,] +---------------------------------------------------------------------- +### Index Mapper: +["a", "b", "c", "d"] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/swap_indexes/second_swap_processed.snap b/index-scheduler/src/snapshots/lib.rs/swap_indexes/second_swap_processed.snap new file mode 100644 index 000000000..543a0afa4 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/swap_indexes/second_swap_processed.snap @@ -0,0 +1,60 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "b", primary_key: Some("id") }} +1 {uid: 1, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "c", primary_key: Some("id") }} +2 {uid: 2, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "d", primary_key: Some("id") }} +3 {uid: 3, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "a", primary_key: Some("id") }} +4 {uid: 4, status: succeeded, details: { indexes: [("a", "b"), ("c", "d")] }, kind: IndexSwap { swaps: [("c", "b"), ("a", "d")] }} +5 {uid: 5, status: succeeded, details: { indexes: [("a", "c")] }, kind: IndexSwap { swaps: [("a", "c")] }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,2,3,4,5,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,2,3,] +"indexSwap" [4,5,] +---------------------------------------------------------------------- +### Index Tasks: +a [3,4,5,] +b [0,4,] +c [1,4,5,] +d [2,4,] +---------------------------------------------------------------------- +### Index Mapper: +["a", "b", "c", "d"] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index 4bab8dc93..b8f6f0b38 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -10,7 +10,7 @@ use roaring::{MultiOps, RoaringBitmap}; use time::OffsetDateTime; use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128}; -use meilisearch_types::tasks::{Kind, Status}; +use meilisearch_types::tasks::{Kind, KindWithContent, Status}; impl IndexScheduler { pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result { @@ -247,3 +247,35 @@ fn map_bound(bound: Bound, map: impl FnOnce(T) -> U) -> Bound { Bound::Unbounded => Bound::Unbounded, } } + +pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) { + use KindWithContent as K; + let mut index_uids = vec![]; + match &mut task.kind { + K::DocumentImport { index_uid, .. } => index_uids.push(index_uid), + K::DocumentDeletion { index_uid, .. } => index_uids.push(index_uid), + K::DocumentClear { index_uid } => index_uids.push(index_uid), + K::Settings { index_uid, .. } => index_uids.push(index_uid), + K::IndexDeletion { index_uid } => index_uids.push(index_uid), + K::IndexCreation { index_uid, .. } => index_uids.push(index_uid), + K::IndexUpdate { index_uid, .. } => index_uids.push(index_uid), + K::IndexSwap { swaps } => { + for (lhs, rhs) in swaps.iter_mut() { + if lhs == &swap.0 || lhs == &swap.1 { + index_uids.push(lhs); + } + if rhs == &swap.0 || rhs == &swap.1 { + index_uids.push(rhs); + } + } + } + K::TaskCancelation { .. } | K::TaskDeletion { .. } | K::DumpExport { .. } | K::Snapshot => (), + }; + for index_uid in index_uids { + if index_uid == &swap.0 { + *index_uid = swap.1.to_owned(); + } else if index_uid == &swap.1 { + *index_uid = swap.0.to_owned(); + } + } +} diff --git a/meilisearch-http/src/routes/indexes_swap.rs b/meilisearch-http/src/routes/indexes_swap.rs new file mode 100644 index 000000000..c3f81da08 --- /dev/null +++ b/meilisearch-http/src/routes/indexes_swap.rs @@ -0,0 +1,77 @@ +use std::collections::HashSet; + +use crate::extractors::authentication::{policies::*, GuardedData}; +use crate::extractors::sequential_extractor::SeqHandler; +use crate::routes::tasks::TaskView; +use actix_web::web::Data; +use actix_web::{web, HttpResponse}; +use index_scheduler::IndexScheduler; +use meilisearch_types::error::{Code, ResponseError}; +use meilisearch_types::tasks::KindWithContent; +use serde::Deserialize; + +pub fn configure(cfg: &mut web::ServiceConfig) { + cfg.service(web::resource("").route(web::post().to(SeqHandler(indexes_swap)))); +} + +// TODO: Lo: revisit this struct once we have decided on what the payload should be +#[derive(Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub struct IndexesSwapPayload { + indexes: (String, String), +} + +pub async fn indexes_swap( + index_scheduler: GuardedData, Data>, + params: web::Json>, +) -> Result { + let search_rules = &index_scheduler.filters().search_rules; + + // TODO: Lo: error when the params are empty + // TODO: Lo: error when the same index appears more than once + // TODO: Lo: error when not authorized to swap + + let mut swaps = vec![]; + let mut indexes_set = HashSet::::default(); + for IndexesSwapPayload { + indexes: (lhs, rhs), + } in params.into_inner().into_iter() + { + if !search_rules.is_index_authorized(&lhs) || !search_rules.is_index_authorized(&lhs) { + return Err(ResponseError::from_msg( + "TODO: error message when we swap with an index were not allowed to access" + .to_owned(), + Code::BadRequest, + )); + } + swaps.push((lhs.clone(), rhs.clone())); + // TODO: Lo: should this check be here or within the index scheduler? + let is_unique_index_lhs = indexes_set.insert(lhs); + if !is_unique_index_lhs { + return Err(ResponseError::from_msg( + "TODO: error message when same index is in more than one swap".to_owned(), + Code::BadRequest, + )); + } + let is_unique_index_rhs = indexes_set.insert(rhs); + if !is_unique_index_rhs { + return Err(ResponseError::from_msg( + "TODO: error message when same index is in more than one swap".to_owned(), + Code::BadRequest, + )); + } + } + if swaps.is_empty() { + return Err(ResponseError::from_msg( + "TODO: error message when swaps is empty".to_owned(), + Code::BadRequest, + )); + } + + let task = KindWithContent::IndexSwap { swaps }; + + let task = index_scheduler.register(task)?; + let task_view = TaskView::from_task(&task); + + Ok(HttpResponse::Accepted().json(task_view)) +} diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index b816fabb4..ec47c38fe 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -20,6 +20,7 @@ use self::indexes::IndexStats; mod api_key; mod dump; pub mod indexes; +mod indexes_swap; mod tasks; pub fn configure(cfg: &mut web::ServiceConfig) { @@ -29,7 +30,8 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::scope("/dumps").configure(dump::configure)) .service(web::resource("/stats").route(web::get().to(get_stats))) .service(web::resource("/version").route(web::get().to(get_version))) - .service(web::scope("/indexes").configure(indexes::configure)); + .service(web::scope("/indexes").configure(indexes::configure)) + .service(web::scope("indexes-swap").configure(indexes_swap::configure)); } /// Extracts the raw values from the `StarOr` types and diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index d05737ac7..e8eaa3a4f 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -71,12 +71,10 @@ pub struct TaskView { } impl TaskView { - fn from_task(task: &Task) -> TaskView { + pub fn from_task(task: &Task) -> TaskView { TaskView { uid: task.uid, - index_uid: task - .indexes() - .and_then(|vec| vec.first().map(|i| i.to_string())), + index_uid: task.index_uid().map(ToOwned::to_owned), status: task.status, kind: task.kind.as_kind(), canceled_by: task.canceled_by, @@ -119,6 +117,8 @@ pub struct DetailsView { #[serde(skip_serializing_if = "Option::is_none")] #[serde(flatten)] pub settings: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub indexes: Option>, } impl From
for DetailsView { @@ -176,6 +176,10 @@ impl From
for DetailsView { dump_uid: Some(dump_uid), ..DetailsView::default() }, + Details::IndexSwap { swaps } => DetailsView { + indexes: Some(swaps), + ..Default::default() + }, } } } diff --git a/meilisearch-http/tests/auth/authorization.rs b/meilisearch-http/tests/auth/authorization.rs index 51df6fb79..c488909da 100644 --- a/meilisearch-http/tests/auth/authorization.rs +++ b/meilisearch-http/tests/auth/authorization.rs @@ -22,6 +22,7 @@ pub static AUTHORIZATIONS: Lazy hashset!{"indexes.update", "indexes.*", "*"}, ("GET", "/indexes/products/") => hashset!{"indexes.get", "indexes.*", "*"}, ("DELETE", "/indexes/products/") => hashset!{"indexes.delete", "indexes.*", "*"}, + ("POST", "/indexes-swap") => hashset!{"indexes.swap", "indexes.*", "*"}, ("POST", "/indexes") => hashset!{"indexes.create", "indexes.*", "*"}, ("GET", "/indexes") => hashset!{"indexes.get", "indexes.*", "*"}, ("GET", "/indexes/products/settings") => hashset!{"settings.get", "settings.*", "*"}, diff --git a/meilisearch-types/src/keys.rs b/meilisearch-types/src/keys.rs index 7ccacf2b1..b8425db78 100644 --- a/meilisearch-types/src/keys.rs +++ b/meilisearch-types/src/keys.rs @@ -222,6 +222,8 @@ pub enum Action { IndexesUpdate, #[serde(rename = "indexes.delete")] IndexesDelete, + #[serde(rename = "indexes.swap")] + IndexesSwap, #[serde(rename = "tasks.*")] TasksAll, #[serde(rename = "tasks.cancel")] @@ -316,6 +318,7 @@ pub mod actions { pub const INDEXES_GET: u8 = IndexesGet.repr(); pub const INDEXES_UPDATE: u8 = IndexesUpdate.repr(); pub const INDEXES_DELETE: u8 = IndexesDelete.repr(); + pub const INDEXES_SWAP: u8 = IndexesSwap.repr(); pub const TASKS_ALL: u8 = TasksAll.repr(); pub const TASKS_CANCEL: u8 = TasksCancel.repr(); pub const TASKS_DELETE: u8 = TasksDelete.repr(); diff --git a/meilisearch-types/src/tasks.rs b/meilisearch-types/src/tasks.rs index 374b2549a..ace338857 100644 --- a/meilisearch-types/src/tasks.rs +++ b/meilisearch-types/src/tasks.rs @@ -1,4 +1,5 @@ use std::fmt::{Display, Write}; +use std::collections::HashSet; use std::str::FromStr; use enum_iterator::Sequence; @@ -59,19 +60,7 @@ impl Task { /// Return the list of indexes updated by this tasks. pub fn indexes(&self) -> Option> { - use KindWithContent::*; - - match &self.kind { - DumpExport { .. } | Snapshot | TaskCancelation { .. } | TaskDeletion { .. } => None, - DocumentImport { index_uid, .. } - | DocumentDeletion { index_uid, .. } - | DocumentClear { index_uid } - | Settings { index_uid, .. } - | IndexCreation { index_uid, .. } - | IndexUpdate { index_uid, .. } - | IndexDeletion { index_uid } => Some(vec![index_uid]), - IndexSwap { lhs, rhs } => Some(vec![lhs, rhs]), - } + self.kind.indexes() } /// Return the content-uuid if there is one @@ -131,8 +120,7 @@ pub enum KindWithContent { primary_key: Option, }, IndexSwap { - lhs: String, - rhs: String, + swaps: Vec<(String, String)>, }, TaskCancelation { query: String, @@ -180,7 +168,14 @@ impl KindWithContent { | IndexCreation { index_uid, .. } | IndexUpdate { index_uid, .. } | IndexDeletion { index_uid } => Some(vec![index_uid]), - IndexSwap { lhs, rhs } => Some(vec![lhs, rhs]), + IndexSwap { swaps } => { + let mut indexes = HashSet::<&str>::default(); + for (lhs, rhs) in swaps { + indexes.insert(lhs.as_str()); + indexes.insert(rhs.as_str()); + } + Some(indexes.into_iter().collect()) + } } } @@ -212,9 +207,9 @@ impl KindWithContent { | KindWithContent::IndexUpdate { primary_key, .. } => Some(Details::IndexInfo { primary_key: primary_key.clone(), }), - KindWithContent::IndexSwap { .. } => { - todo!() - } + KindWithContent::IndexSwap { swaps } => Some(Details::IndexSwap { + swaps: swaps.clone(), + }), KindWithContent::TaskCancelation { query, tasks } => Some(Details::TaskCancelation { matched_tasks: tasks.len(), canceled_tasks: None, @@ -460,6 +455,10 @@ pub enum Details { Dump { dump_uid: String, }, + // TODO: Lo: Revisit this variant once we have decided on what the POST payload of swapping indexes should be + IndexSwap { + swaps: Vec<(String, String)>, + }, } /// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for