diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index bbe526460..223b84762 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -48,6 +48,8 @@ impl From for Code { pub enum Error { #[error("{1}")] WithCustomErrorCode(Code, Box), + #[error("Received bad task id: {received} should be >= to {expected}.")] + BadTaskId { received: TaskId, expected: TaskId }, #[error("Index `{0}` not found.")] IndexNotFound(String), #[error("Index `{0}` already exists.")] @@ -161,6 +163,7 @@ impl Error { match self { Error::IndexNotFound(_) | Error::WithCustomErrorCode(_, _) + | Error::BadTaskId { .. } | Error::IndexAlreadyExists(_) | Error::SwapDuplicateIndexFound(_) | Error::SwapDuplicateIndexesFound(_) @@ -205,6 +208,7 @@ impl ErrorCode for Error { fn error_code(&self) -> Code { match self { Error::WithCustomErrorCode(code, _) => *code, + Error::BadTaskId { .. } => Code::BadRequest, Error::IndexNotFound(_) => Code::IndexNotFound, Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists, Error::SwapDuplicateIndexesFound(_) => Code::InvalidSwapDuplicateIndexFound, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 7514a2a68..b1edaabe5 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -993,7 +993,7 @@ impl IndexScheduler { /// Register a new task in the scheduler. /// /// If it fails and data was associated with the task, it tries to delete the associated data. - pub fn register(&self, kind: KindWithContent) -> Result { + pub fn register(&self, kind: KindWithContent, task_id: Option) -> Result { let mut wtxn = self.env.write_txn()?; // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task @@ -1003,8 +1003,16 @@ impl IndexScheduler { return Err(Error::NoSpaceLeftInTaskQueue); } + let next_task_id = self.next_task_id(&wtxn)?; + + if let Some(uid) = task_id { + if uid < next_task_id { + return Err(Error::BadTaskId { received: uid, expected: next_task_id }); + } + } + let mut task = Task { - uid: self.next_task_id(&wtxn)?, + uid: task_id.unwrap_or(next_task_id), enqueued_at: OffsetDateTime::now_utc(), started_at: None, finished_at: None, @@ -1386,13 +1394,16 @@ impl IndexScheduler { // increase time by one nanosecond so that the enqueuedAt of the last task to delete is also lower than that date. let delete_before = last_task_to_delete.enqueued_at + Duration::from_nanos(1); - self.register(KindWithContent::TaskDeletion { - query: format!( - "?beforeEnqueuedAt={}&statuses=succeeded,failed,canceled", - delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?, - ), - tasks: to_delete, - })?; + self.register( + KindWithContent::TaskDeletion { + query: format!( + "?beforeEnqueuedAt={}&statuses=succeeded,failed,canceled", + delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?, + ), + tasks: to_delete, + }, + None, + )?; Ok(()) } @@ -2016,7 +2027,7 @@ mod tests { for (idx, kind) in kinds.into_iter().enumerate() { let k = kind.as_kind(); - let task = index_scheduler.register(kind).unwrap(); + let task = index_scheduler.register(kind, None).unwrap(); index_scheduler.assert_internally_consistent(); assert_eq!(task.uid, idx as u32); @@ -2031,18 +2042,18 @@ mod tests { fn insert_task_while_another_task_is_processing() { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); - index_scheduler.register(index_creation_task("index_a", "id")).unwrap(); + index_scheduler.register(index_creation_task("index_a", "id"), None).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); handle.advance_till([Start, BatchCreated]); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_batch_creation"); // while the task is processing can we register another task? - index_scheduler.register(index_creation_task("index_b", "id")).unwrap(); + index_scheduler.register(index_creation_task("index_b", "id"), None).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); index_scheduler - .register(KindWithContent::IndexDeletion { index_uid: S("index_a") }) + .register(KindWithContent::IndexDeletion { index_uid: S("index_a") }, None) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); } @@ -2051,7 +2062,7 @@ mod tests { fn test_task_is_processing() { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); - index_scheduler.register(index_creation_task("index_a", "id")).unwrap(); + index_scheduler.register(index_creation_task("index_a", "id"), None).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_a_task"); handle.advance_till([Start, BatchCreated]); @@ -2065,17 +2076,23 @@ mod tests { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("cattos"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("cattos"), primary_key: None }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); index_scheduler - .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }) + .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); @@ -2094,22 +2111,25 @@ mod tests { let (index_scheduler, mut handle) = IndexScheduler::test(false, vec![]); index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); index_scheduler - .register(KindWithContent::DocumentClear { index_uid: S("doggos") }) + .register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); index_scheduler - .register(KindWithContent::DocumentClear { index_uid: S("doggos") }) + .register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); index_scheduler - .register(KindWithContent::DocumentClear { index_uid: S("doggos") }) + .register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_fourth_task"); @@ -2142,7 +2162,7 @@ mod tests { ]; for task in to_enqueue { - let _ = index_scheduler.register(task).unwrap(); + let _ = index_scheduler.register(task, None).unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2151,10 +2171,13 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued"); index_scheduler - .register(KindWithContent::TaskDeletion { - query: "test_query".to_owned(), - tasks: RoaringBitmap::from_iter([0, 1]), - }) + .register( + KindWithContent::TaskDeletion { + query: "test_query".to_owned(), + tasks: RoaringBitmap::from_iter([0, 1]), + }, + None, + ) .unwrap(); // again, no progress made at all, but one more task is registered snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_enqueued"); @@ -2188,7 +2211,7 @@ mod tests { ]; for task in to_enqueue { - let _ = index_scheduler.register(task).unwrap(); + let _ = index_scheduler.register(task, None).unwrap(); index_scheduler.assert_internally_consistent(); } snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued"); @@ -2199,10 +2222,13 @@ mod tests { // Now we delete the first task index_scheduler - .register(KindWithContent::TaskDeletion { - query: "test_query".to_owned(), - tasks: RoaringBitmap::from_iter([0]), - }) + .register( + KindWithContent::TaskDeletion { + query: "test_query".to_owned(), + tasks: RoaringBitmap::from_iter([0]), + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_registering_the_task_deletion"); @@ -2225,7 +2251,7 @@ mod tests { ]; for task in to_enqueue { - let _ = index_scheduler.register(task).unwrap(); + let _ = index_scheduler.register(task, None).unwrap(); index_scheduler.assert_internally_consistent(); } snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued"); @@ -2237,10 +2263,13 @@ mod tests { // Now we delete the first task multiple times in a row for _ in 0..2 { index_scheduler - .register(KindWithContent::TaskDeletion { - query: "test_query".to_owned(), - tasks: RoaringBitmap::from_iter([0]), - }) + .register( + KindWithContent::TaskDeletion { + query: "test_query".to_owned(), + tasks: RoaringBitmap::from_iter([0]), + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2263,14 +2292,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_register"); @@ -2292,7 +2324,10 @@ mod tests { }"#; index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -2300,19 +2335,22 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); index_scheduler - .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }) + .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); @@ -2336,21 +2374,27 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); index_scheduler - .register(KindWithContent::DocumentDeletion { - index_uid: S("doggos"), - documents_ids: vec![S("1"), S("2")], - }) + .register( + KindWithContent::DocumentDeletion { + index_uid: S("doggos"), + documents_ids: vec![S("1"), S("2")], + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); @@ -2373,10 +2417,13 @@ mod tests { fn document_deletion_and_document_addition() { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); index_scheduler - .register(KindWithContent::DocumentDeletion { - index_uid: S("doggos"), - documents_ids: vec![S("1"), S("2")], - }) + .register( + KindWithContent::DocumentDeletion { + index_uid: S("doggos"), + documents_ids: vec![S("1"), S("2")], + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -2390,14 +2437,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); @@ -2428,17 +2478,20 @@ mod tests { for name in index_names { index_scheduler - .register(KindWithContent::IndexCreation { - index_uid: name.to_string(), - primary_key: None, - }) + .register( + KindWithContent::IndexCreation { + index_uid: name.to_string(), + primary_key: None, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } for name in index_names { index_scheduler - .register(KindWithContent::DocumentClear { index_uid: name.to_string() }) + .register(KindWithContent::DocumentClear { index_uid: name.to_string() }, None) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2463,7 +2516,7 @@ mod tests { ]; for task in to_enqueue { - let _ = index_scheduler.register(task).unwrap(); + let _ = index_scheduler.register(task, None).unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2477,18 +2530,24 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "create_d"); index_scheduler - .register(KindWithContent::IndexSwap { - swaps: vec![ - IndexSwap { indexes: ("a".to_owned(), "b".to_owned()) }, - IndexSwap { indexes: ("c".to_owned(), "d".to_owned()) }, - ], - }) + .register( + KindWithContent::IndexSwap { + swaps: vec![ + IndexSwap { indexes: ("a".to_owned(), "b".to_owned()) }, + IndexSwap { indexes: ("c".to_owned(), "d".to_owned()) }, + ], + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_swap_registered"); index_scheduler - .register(KindWithContent::IndexSwap { - swaps: vec![IndexSwap { indexes: ("a".to_owned(), "c".to_owned()) }], - }) + .register( + KindWithContent::IndexSwap { + swaps: vec![IndexSwap { indexes: ("a".to_owned(), "c".to_owned()) }], + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "two_swaps_registered"); @@ -2498,7 +2557,7 @@ mod tests { handle.advance_one_successful_batch(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed"); - index_scheduler.register(KindWithContent::IndexSwap { swaps: vec![] }).unwrap(); + index_scheduler.register(KindWithContent::IndexSwap { swaps: vec![] }, None).unwrap(); handle.advance_one_successful_batch(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_empty_swap_processed"); } @@ -2515,7 +2574,7 @@ mod tests { ]; for task in to_enqueue { - let _ = index_scheduler.register(task).unwrap(); + let _ = index_scheduler.register(task, None).unwrap(); index_scheduler.assert_internally_consistent(); } handle.advance_n_successful_batches(4); @@ -2525,12 +2584,15 @@ mod tests { snapshot!(first_snap, name: "initial_tasks_processed"); let err = index_scheduler - .register(KindWithContent::IndexSwap { - swaps: vec![ - IndexSwap { indexes: ("a".to_owned(), "b".to_owned()) }, - IndexSwap { indexes: ("b".to_owned(), "a".to_owned()) }, - ], - }) + .register( + KindWithContent::IndexSwap { + swaps: vec![ + IndexSwap { indexes: ("a".to_owned(), "b".to_owned()) }, + IndexSwap { indexes: ("b".to_owned(), "a".to_owned()) }, + ], + }, + None, + ) .unwrap_err(); snapshot!(format!("{err}"), @"Indexes must be declared only once during a swap. `a`, `b` were specified several times."); @@ -2539,13 +2601,16 @@ mod tests { // Index `e` does not exist, but we don't check its existence yet index_scheduler - .register(KindWithContent::IndexSwap { - swaps: vec![ - IndexSwap { indexes: ("a".to_owned(), "b".to_owned()) }, - IndexSwap { indexes: ("c".to_owned(), "e".to_owned()) }, - IndexSwap { indexes: ("d".to_owned(), "f".to_owned()) }, - ], - }) + .register( + KindWithContent::IndexSwap { + swaps: vec![ + IndexSwap { indexes: ("a".to_owned(), "b".to_owned()) }, + IndexSwap { indexes: ("c".to_owned(), "e".to_owned()) }, + IndexSwap { indexes: ("d".to_owned(), "f".to_owned()) }, + ], + }, + None, + ) .unwrap(); handle.advance_one_failed_batch(); // Now the first swap should have an error message saying `e` and `f` do not exist @@ -2566,17 +2631,20 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); index_scheduler - .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }) + .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler)); @@ -2601,7 +2669,7 @@ mod tests { }, ]; for task in to_enqueue { - let _ = index_scheduler.register(task).unwrap(); + let _ = index_scheduler.register(task, None).unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2618,7 +2686,7 @@ mod tests { file0.persist().unwrap(); let _ = index_scheduler - .register(replace_document_import_task("catto", None, 0, documents_count0)) + .register(replace_document_import_task("catto", None, 0, documents_count0), None) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -2626,10 +2694,13 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_task_processed"); index_scheduler - .register(KindWithContent::TaskCancelation { - query: "test_query".to_owned(), - tasks: RoaringBitmap::from_iter([0]), - }) + .register( + KindWithContent::TaskCancelation { + query: "test_query".to_owned(), + tasks: RoaringBitmap::from_iter([0]), + }, + None, + ) .unwrap(); handle.advance_one_successful_batch(); @@ -2644,7 +2715,7 @@ mod tests { file0.persist().unwrap(); let _ = index_scheduler - .register(replace_document_import_task("catto", None, 0, documents_count0)) + .register(replace_document_import_task("catto", None, 0, documents_count0), None) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -2652,10 +2723,13 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_task_processing"); index_scheduler - .register(KindWithContent::TaskCancelation { - query: "test_query".to_owned(), - tasks: RoaringBitmap::from_iter([0]), - }) + .register( + KindWithContent::TaskCancelation { + query: "test_query".to_owned(), + tasks: RoaringBitmap::from_iter([0]), + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_task_registered"); @@ -2685,7 +2759,7 @@ mod tests { replace_document_import_task("wolfo", None, 2, documents_count2), ]; for task in to_enqueue { - let _ = index_scheduler.register(task).unwrap(); + let _ = index_scheduler.register(task, None).unwrap(); index_scheduler.assert_internally_consistent(); } handle.advance_one_successful_batch(); @@ -2693,10 +2767,13 @@ mod tests { handle.advance_till([Start, BatchCreated, InsideProcessBatch]); index_scheduler - .register(KindWithContent::TaskCancelation { - query: "test_query".to_owned(), - tasks: RoaringBitmap::from_iter([0, 1, 2]), - }) + .register( + KindWithContent::TaskCancelation { + query: "test_query".to_owned(), + tasks: RoaringBitmap::from_iter([0, 1, 2]), + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "processing_second_task_cancel_enqueued"); @@ -2724,14 +2801,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2771,14 +2851,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: UpdateDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: UpdateDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2820,14 +2903,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2870,14 +2956,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2921,14 +3010,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: UpdateDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: UpdateDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2973,13 +3065,13 @@ mod tests { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); let kind = index_creation_task("doggo", "bone"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); let kind = index_creation_task("whalo", "plankton"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); let kind = index_creation_task("catto", "his_own_vomit"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); handle.advance_n_successful_batches(3); @@ -3037,11 +3129,11 @@ mod tests { IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]); let kind = index_creation_task("catto", "mouse"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); let kind = index_creation_task("doggo", "sheep"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); let kind = index_creation_task("whalo", "fish"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); @@ -3260,17 +3352,17 @@ mod tests { IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]); let kind = index_creation_task("catto", "mouse"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); let kind = index_creation_task("doggo", "sheep"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); let kind = KindWithContent::IndexSwap { swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }], }; - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); let kind = KindWithContent::IndexSwap { swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "whalo".to_owned()) }], }; - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); @@ -3346,20 +3438,20 @@ mod tests { IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]); let kind = index_creation_task("catto", "mouse"); - let _ = index_scheduler.register(kind).unwrap(); + let _ = index_scheduler.register(kind, None).unwrap(); let kind = index_creation_task("doggo", "sheep"); - let _ = index_scheduler.register(kind).unwrap(); + let _ = index_scheduler.register(kind, None).unwrap(); let kind = KindWithContent::IndexSwap { swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }], }; - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); handle.advance_n_successful_batches(1); let kind = KindWithContent::TaskCancelation { query: "test_query".to_string(), tasks: [0, 1, 2, 3].into_iter().collect(), }; - let task_cancelation = index_scheduler.register(kind).unwrap(); + let task_cancelation = index_scheduler.register(kind, None).unwrap(); handle.advance_n_successful_batches(1); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); @@ -3394,7 +3486,7 @@ mod tests { let kind = index_creation_task("catto", "mouse"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_register"); handle.advance_one_failed_batch(); @@ -3419,14 +3511,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); handle.advance_till([Start, BatchCreated]); @@ -3457,14 +3552,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -3513,14 +3611,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: false, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: false, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -3561,14 +3662,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: false, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: false, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -3596,7 +3700,10 @@ mod tests { // Create the index. index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); handle.advance_one_successful_batch(); @@ -3615,14 +3722,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: false, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: false, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -3655,7 +3765,10 @@ mod tests { // Create the index. index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); handle.advance_one_successful_batch(); @@ -3674,14 +3787,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: false, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: false, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -3718,7 +3834,10 @@ mod tests { // Create the index. index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, + None, + ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); handle.advance_one_successful_batch(); @@ -3738,14 +3857,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -3791,14 +3913,17 @@ mod tests { let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -3843,14 +3968,17 @@ mod tests { file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S(primary_key)), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S(primary_key)), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -3904,14 +4032,17 @@ mod tests { file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S(primary_key)), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S(primary_key)), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -3961,14 +4092,17 @@ mod tests { file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S(primary_key)), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S(primary_key)), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -4042,14 +4176,17 @@ mod tests { file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: primary_key.map(|pk| pk.to_string()), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: primary_key.map(|pk| pk.to_string()), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -4125,14 +4262,17 @@ mod tests { file.persist().unwrap(); index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: primary_key.map(|pk| pk.to_string()), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: primary_key.map(|pk| pk.to_string()), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -4186,7 +4326,7 @@ mod tests { let kind = index_creation_task("catto", "mouse"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); handle.advance_till([Start, BatchCreated, ProcessBatchFailed, AfterProcessing]); @@ -4206,15 +4346,18 @@ mod tests { }); index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, + None, + ) .unwrap(); handle.advance_one_successful_batch(); // on average this task takes ~600 bytes loop { - let result = index_scheduler.register(KindWithContent::IndexCreation { - index_uid: S("doggo"), - primary_key: None, - }); + let result = index_scheduler.register( + KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, + None, + ); if result.is_err() { break; } @@ -4224,7 +4367,10 @@ mod tests { // at this point the task DB shoud have reached its limit and we should not be able to register new tasks let result = index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, + None, + ) .unwrap_err(); snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); // we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code @@ -4232,10 +4378,10 @@ mod tests { // Even the task deletion that doesn't delete anything shouldn't be accepted let result = index_scheduler - .register(KindWithContent::TaskDeletion { - query: S("test"), - tasks: RoaringBitmap::new(), - }) + .register( + KindWithContent::TaskDeletion { query: S("test"), tasks: RoaringBitmap::new() }, + None, + ) .unwrap_err(); snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); // we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code @@ -4243,13 +4389,19 @@ mod tests { // But a task deletion that delete something should works index_scheduler - .register(KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() }) + .register( + KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() }, + None, + ) .unwrap(); handle.advance_one_successful_batch(); // Now we should be able to enqueue a few tasks again index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, + None, + ) .unwrap(); handle.advance_one_failed_batch(); } @@ -4262,22 +4414,34 @@ mod tests { }); index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, + None, + ) .unwrap(); handle.advance_one_successful_batch(); index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, + None, + ) .unwrap(); handle.advance_one_failed_batch(); // at this point the max number of tasks is reached // we can still enqueue multiple tasks index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, + None, + ) .unwrap(); index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .register( + KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, + None, + ) .unwrap(); let rtxn = index_scheduler.env.read_txn().unwrap(); @@ -4325,11 +4489,11 @@ mod tests { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); let kind = index_creation_task("catto", "mouse"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); let kind = index_creation_task("doggo", "sheep"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); let kind = index_creation_task("whalo", "fish"); - let _task = index_scheduler.register(kind).unwrap(); + let _task = index_scheduler.register(kind, None).unwrap(); snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###" { @@ -4479,11 +4643,11 @@ mod tests { query: "cancel dump".to_owned(), tasks: RoaringBitmap::from_iter([0]), }; - let _ = index_scheduler.register(dump_creation).unwrap(); + let _ = index_scheduler.register(dump_creation, None).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_dump_register"); handle.advance_till([Start, BatchCreated, InsideProcessBatch]); - let _ = index_scheduler.register(dump_cancellation).unwrap(); + let _ = index_scheduler.register(dump_cancellation, None).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_registered"); snapshot!(format!("{:?}", handle.advance()), @"AbortedIndexation"); @@ -4491,4 +4655,21 @@ mod tests { handle.advance_one_successful_batch(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_processed"); } + + #[test] + fn basic_set_taskid() { + let (index_scheduler, _handle) = IndexScheduler::test(true, vec![]); + + let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }; + let task = index_scheduler.register(kind, None).unwrap(); + snapshot!(task.uid, @"0"); + + let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }; + let task = index_scheduler.register(kind, Some(12)).unwrap(); + snapshot!(task.uid, @"12"); + + let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }; + let error = index_scheduler.register(kind, Some(5)).unwrap_err(); + snapshot!(error, @"Received bad task id: 5 should be >= to 13."); + } } diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 01ca63857..b91edaf01 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -265,7 +265,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< .name(String::from("register-snapshot-tasks")) .spawn(move || loop { thread::sleep(snapshot_delay); - if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) { + if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation, None) { error!("Error while registering snapshot: {}", e); } }) diff --git a/meilisearch/src/routes/dump.rs b/meilisearch/src/routes/dump.rs index 071ae60b8..8f44070d8 100644 --- a/meilisearch/src/routes/dump.rs +++ b/meilisearch/src/routes/dump.rs @@ -11,7 +11,7 @@ use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::sequential_extractor::SeqHandler; -use crate::routes::SummarizedTaskView; +use crate::routes::{get_task_id, SummarizedTaskView}; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump)))); @@ -29,8 +29,9 @@ pub async fn create_dump( keys: auth_controller.list_keys()?, instance_uid: analytics.instance_uid().cloned(), }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!(returns = ?task, "Create dump"); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 1f41fa10c..492f039cf 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -7,7 +7,7 @@ use bstr::ByteSlice as _; use deserr::actix_web::{AwebJson, AwebQueryParameter}; use deserr::Deserr; use futures::StreamExt; -use index_scheduler::IndexScheduler; +use index_scheduler::{IndexScheduler, TaskId}; use meilisearch_types::deserr::query_params::Param; use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError}; use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType}; @@ -36,7 +36,7 @@ use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::payload::Payload; use crate::extractors::sequential_extractor::SeqHandler; -use crate::routes::{PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; +use crate::routes::{get_task_id, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; use crate::search::parse_filter; static ACCEPTED_CONTENT_TYPE: Lazy> = Lazy::new(|| { @@ -130,9 +130,10 @@ pub async fn delete_document( index_uid: index_uid.to_string(), documents_ids: vec![document_id], }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); - debug!(returns = ?task, "Delete document"); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) } @@ -277,6 +278,7 @@ pub async fn replace_documents( analytics.add_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req); let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); + let uid = get_task_id(&req)?; let task = document_addition( extract_mime_type(&req)?, index_scheduler, @@ -285,6 +287,7 @@ pub async fn replace_documents( params.csv_delimiter, body, IndexDocumentsMethod::ReplaceDocuments, + uid, allow_index_creation, ) .await?; @@ -309,6 +312,7 @@ pub async fn update_documents( analytics.update_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req); let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); + let uid = get_task_id(&req)?; let task = document_addition( extract_mime_type(&req)?, index_scheduler, @@ -317,6 +321,7 @@ pub async fn update_documents( params.csv_delimiter, body, IndexDocumentsMethod::UpdateDocuments, + uid, allow_index_creation, ) .await?; @@ -334,6 +339,7 @@ async fn document_addition( csv_delimiter: Option, mut body: Payload, method: IndexDocumentsMethod, + task_id: Option, allow_index_creation: bool, ) -> Result { let format = match ( @@ -450,7 +456,7 @@ async fn document_addition( }; let scheduler = index_scheduler.clone(); - let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? { + let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id)).await? { Ok(task) => task, Err(e) => { index_scheduler.delete_update_file(uuid)?; @@ -480,8 +486,9 @@ pub async fn delete_documents_batch( let task = KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!(returns = ?task, "Delete documents by batch"); Ok(HttpResponse::Accepted().json(task)) @@ -516,8 +523,9 @@ pub async fn delete_documents_by_filter( .map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?; let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!(returns = ?task, "Delete documents by filter"); Ok(HttpResponse::Accepted().json(task)) @@ -533,8 +541,9 @@ pub async fn clear_all_documents( analytics.delete_documents(DocumentDeletionKind::ClearAll, &req); let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!(returns = ?task, "Delete all documents"); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/indexes/mod.rs b/meilisearch/src/routes/indexes/mod.rs index d80bd9c61..6451d930d 100644 --- a/meilisearch/src/routes/indexes/mod.rs +++ b/meilisearch/src/routes/indexes/mod.rs @@ -17,7 +17,7 @@ use serde_json::json; use time::OffsetDateTime; use tracing::debug; -use super::{Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; +use super::{get_task_id, Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::{AuthenticationError, GuardedData}; @@ -137,8 +137,9 @@ pub async fn create_index( ); let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!(returns = ?task, "Create index"); Ok(HttpResponse::Accepted().json(task)) @@ -206,8 +207,9 @@ pub async fn update_index( primary_key: body.primary_key, }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!(returns = ?task, "Update index"); Ok(HttpResponse::Accepted().json(task)) @@ -216,11 +218,13 @@ pub async fn update_index( pub async fn delete_index( index_scheduler: GuardedData, Data>, index_uid: web::Path, + req: HttpRequest, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!(returns = ?task, "Delete index"); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/indexes/settings.rs b/meilisearch/src/routes/indexes/settings.rs index 23e8925c7..9fbd84161 100644 --- a/meilisearch/src/routes/indexes/settings.rs +++ b/meilisearch/src/routes/indexes/settings.rs @@ -15,7 +15,7 @@ use tracing::debug; use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; -use crate::routes::SummarizedTaskView; +use crate::routes::{get_task_id, SummarizedTaskView}; #[macro_export] macro_rules! make_setting_route { @@ -34,7 +34,7 @@ macro_rules! make_setting_route { use $crate::extractors::authentication::policies::*; use $crate::extractors::authentication::GuardedData; use $crate::extractors::sequential_extractor::SeqHandler; - use $crate::routes::SummarizedTaskView; + use $crate::routes::{get_task_id, SummarizedTaskView}; pub async fn delete( index_scheduler: GuardedData< @@ -42,6 +42,7 @@ macro_rules! make_setting_route { Data, >, index_uid: web::Path, + req: HttpRequest, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; @@ -56,8 +57,9 @@ macro_rules! make_setting_route { is_deletion: true, allow_index_creation, }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)) + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)) .await?? .into(); @@ -105,8 +107,9 @@ macro_rules! make_setting_route { is_deletion: false, allow_index_creation, }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)) + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)) .await?? .into(); @@ -767,8 +770,9 @@ pub async fn update_all( is_deletion: false, allow_index_creation, }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!(returns = ?task, "Update all settings"); Ok(HttpResponse::Accepted().json(task)) @@ -790,6 +794,7 @@ pub async fn get_all( pub async fn delete_all( index_scheduler: GuardedData, Data>, index_uid: web::Path, + req: HttpRequest, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; @@ -803,8 +808,9 @@ pub async fn delete_all( is_deletion: true, allow_index_creation, }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!(returns = ?task, "Delete all settings"); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/mod.rs b/meilisearch/src/routes/mod.rs index 89cf63c50..61a9f3352 100644 --- a/meilisearch/src/routes/mod.rs +++ b/meilisearch/src/routes/mod.rs @@ -4,7 +4,7 @@ use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; use index_scheduler::IndexScheduler; use meilisearch_auth::AuthController; -use meilisearch_types::error::ResponseError; +use meilisearch_types::error::{Code, ResponseError}; use meilisearch_types::settings::{Settings, Unchecked}; use meilisearch_types::tasks::{Kind, Status, Task, TaskId}; use serde::{Deserialize, Serialize}; @@ -45,6 +45,34 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::scope("/experimental-features").configure(features::configure)); } +pub fn get_task_id(req: &HttpRequest) -> Result, ResponseError> { + let task_id = req + .headers() + .get("TaskId") + .map(|header| { + header.to_str().map_err(|e| { + ResponseError::from_msg( + format!("TaskId is not a valid utf-8 string: {e}"), + Code::BadRequest, + ) + }) + }) + .transpose()? + .map(|s| { + s.parse::().map_err(|e| { + ResponseError::from_msg( + format!( + "Could not parse the TaskId as a {}: {e}", + std::any::type_name::(), + ), + Code::BadRequest, + ) + }) + }) + .transpose()?; + Ok(task_id) +} + #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct SummarizedTaskView { diff --git a/meilisearch/src/routes/snapshot.rs b/meilisearch/src/routes/snapshot.rs index c94529932..28dbac85f 100644 --- a/meilisearch/src/routes/snapshot.rs +++ b/meilisearch/src/routes/snapshot.rs @@ -10,7 +10,7 @@ use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::sequential_extractor::SeqHandler; -use crate::routes::SummarizedTaskView; +use crate::routes::{get_task_id, SummarizedTaskView}; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(SeqHandler(create_snapshot)))); @@ -24,8 +24,9 @@ pub async fn create_snapshot( analytics.publish("Snapshot Created".to_string(), json!({}), Some(&req)); let task = KindWithContent::SnapshotCreation; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!(returns = ?task, "Create snapshot"); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/swap_indexes.rs b/meilisearch/src/routes/swap_indexes.rs index 79e619705..64268dbfa 100644 --- a/meilisearch/src/routes/swap_indexes.rs +++ b/meilisearch/src/routes/swap_indexes.rs @@ -10,7 +10,7 @@ use meilisearch_types::index_uid::IndexUid; use meilisearch_types::tasks::{IndexSwap, KindWithContent}; use serde_json::json; -use super::SummarizedTaskView; +use super::{get_task_id, SummarizedTaskView}; use crate::analytics::Analytics; use crate::error::MeilisearchHttpError; use crate::extractors::authentication::policies::*; @@ -60,7 +60,8 @@ pub async fn swap_indexes( } let task = KindWithContent::IndexSwap { swaps }; + let uid = get_task_id(&req)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); Ok(HttpResponse::Accepted().json(task)) } diff --git a/meilisearch/src/routes/tasks.rs b/meilisearch/src/routes/tasks.rs index 03b63001d..26e1c43f8 100644 --- a/meilisearch/src/routes/tasks.rs +++ b/meilisearch/src/routes/tasks.rs @@ -18,7 +18,7 @@ use time::macros::format_description; use time::{Date, Duration, OffsetDateTime, Time}; use tokio::task; -use super::SummarizedTaskView; +use super::{get_task_id, SummarizedTaskView}; use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; @@ -197,7 +197,9 @@ async fn cancel_tasks( let task_cancelation = KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks }; - let task = task::spawn_blocking(move || index_scheduler.register(task_cancelation)).await??; + let uid = get_task_id(&req)?; + let task = + task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid)).await??; let task: SummarizedTaskView = task.into(); Ok(HttpResponse::Ok().json(task)) @@ -242,7 +244,8 @@ async fn delete_tasks( let task_deletion = KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks }; - let task = task::spawn_blocking(move || index_scheduler.register(task_deletion)).await??; + let uid = get_task_id(&req)?; + let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid)).await??; let task: SummarizedTaskView = task.into(); Ok(HttpResponse::Ok().json(task)) diff --git a/meilisearch/tests/index/create_index.rs b/meilisearch/tests/index/create_index.rs index 7ce56d440..b9f755f35 100644 --- a/meilisearch/tests/index/create_index.rs +++ b/meilisearch/tests/index/create_index.rs @@ -199,3 +199,74 @@ async fn error_create_with_invalid_index_uid() { } "###); } + +#[actix_rt::test] +async fn send_task_id() { + let server = Server::new().await; + let app = server.init_web_app().await; + let index = server.index("catto"); + let (response, code) = index.create(None).await; + snapshot!(code, @"202 Accepted"); + snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###" + { + "taskUid": 0, + "indexUid": "catto", + "status": "enqueued", + "type": "indexCreation", + "enqueuedAt": "[date]" + } + "###); + + let body = serde_json::to_string(&json!({ + "uid": "doggo", + "primaryKey": None::<&str>, + })) + .unwrap(); + let req = test::TestRequest::post() + .uri("/indexes") + .insert_header(("TaskId", "25")) + .insert_header(ContentType::json()) + .set_payload(body) + .to_request(); + + let res = test::call_service(&app, req).await; + snapshot!(res.status(), @"202 Accepted"); + + let bytes = test::read_body(res).await; + let response = serde_json::from_slice::(&bytes).expect("Expecting valid json"); + snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###" + { + "taskUid": 25, + "indexUid": "doggo", + "status": "enqueued", + "type": "indexCreation", + "enqueuedAt": "[date]" + } + "###); + + let body = serde_json::to_string(&json!({ + "uid": "girafo", + "primaryKey": None::<&str>, + })) + .unwrap(); + let req = test::TestRequest::post() + .uri("/indexes") + .insert_header(("TaskId", "12")) + .insert_header(ContentType::json()) + .set_payload(body) + .to_request(); + + let res = test::call_service(&app, req).await; + snapshot!(res.status(), @"400 Bad Request"); + + let bytes = test::read_body(res).await; + let response = serde_json::from_slice::(&bytes).expect("Expecting valid json"); + snapshot!(json_string!(response), @r###" + { + "message": "Received bad task id: 12 should be >= to 26.", + "code": "bad_request", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#bad_request" + } + "###); +}