implement the dry run ha parameter

This commit is contained in:
Tamo 2024-02-21 11:21:26 +01:00
parent 1eb1c043b5
commit 36c27a18a1
11 changed files with 317 additions and 79 deletions

View File

@ -56,7 +56,7 @@ impl FileStore {
let file = NamedTempFile::new_in(&self.path)?; let file = NamedTempFile::new_in(&self.path)?;
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let path = self.path.join(uuid.to_string()); let path = self.path.join(uuid.to_string());
let update_file = File { file, path }; let update_file = File { dry: false, file, path };
Ok((uuid, update_file)) Ok((uuid, update_file))
} }
@ -67,7 +67,7 @@ impl FileStore {
let file = NamedTempFile::new_in(&self.path)?; let file = NamedTempFile::new_in(&self.path)?;
let uuid = Uuid::from_u128(uuid); let uuid = Uuid::from_u128(uuid);
let path = self.path.join(uuid.to_string()); let path = self.path.join(uuid.to_string());
let update_file = File { file, path }; let update_file = File { dry: false, file, path };
Ok((uuid, update_file)) Ok((uuid, update_file))
} }
@ -135,13 +135,29 @@ impl FileStore {
} }
pub struct File { pub struct File {
dry: bool,
path: PathBuf, path: PathBuf,
file: NamedTempFile, file: NamedTempFile,
} }
impl File { impl File {
pub fn dry_file() -> Result<Self> {
#[cfg(target_family = "unix")]
let path = PathBuf::from_str("/dev/null").unwrap();
#[cfg(target_family = "windows")]
let path = PathBuf::from_str("\\Device\\Null").unwrap();
Ok(Self {
dry: true,
path: path.clone(),
file: tempfile::Builder::new().make(|_| std::fs::File::create(path.clone()))?,
})
}
pub fn persist(self) -> Result<()> { pub fn persist(self) -> Result<()> {
if !self.dry {
self.file.persist(&self.path)?; self.file.persist(&self.path)?;
}
Ok(()) Ok(())
} }
} }

View File

@ -1001,7 +1001,12 @@ impl IndexScheduler {
/// Register a new task in the scheduler. /// Register a new task in the scheduler.
/// ///
/// If it fails and data was associated with the task, it tries to delete the associated data. /// If it fails and data was associated with the task, it tries to delete the associated data.
pub fn register(&self, kind: KindWithContent, task_id: Option<TaskId>) -> Result<Task> { pub fn register(
&self,
kind: KindWithContent,
task_id: Option<TaskId>,
dry_run: bool,
) -> Result<Task> {
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task
@ -1037,6 +1042,11 @@ impl IndexScheduler {
// (that it does not contain duplicate indexes). // (that it does not contain duplicate indexes).
check_index_swap_validity(&task)?; check_index_swap_validity(&task)?;
// At this point the task is going to be registered and no further checks will be done
if dry_run {
return Ok(task);
}
// Get rid of the mutability. // Get rid of the mutability.
let task = task; let task = task;
@ -1101,9 +1111,13 @@ impl IndexScheduler {
/// The returned file and uuid can be used to associate /// The returned file and uuid can be used to associate
/// some data to a task. The file will be kept until /// some data to a task. The file will be kept until
/// the task has been fully processed. /// the task has been fully processed.
pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> { pub fn create_update_file(&self, dry_run: bool) -> Result<(Uuid, file_store::File)> {
if dry_run {
Ok((Uuid::nil(), file_store::File::dry_file()?))
} else {
Ok(self.file_store.new_update()?) Ok(self.file_store.new_update()?)
} }
}
#[cfg(test)] #[cfg(test)]
pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, file_store::File)> { pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, file_store::File)> {
@ -1413,6 +1427,7 @@ impl IndexScheduler {
tasks: to_delete, tasks: to_delete,
}, },
None, None,
false,
)?; )?;
Ok(()) Ok(())
@ -1534,7 +1549,7 @@ impl<'a> Dump<'a> {
) -> Result<Task> { ) -> Result<Task> {
let content_uuid = match content_file { let content_uuid = match content_file {
Some(content_file) if task.status == Status::Enqueued => { Some(content_file) if task.status == Status::Enqueued => {
let (uuid, mut file) = self.index_scheduler.create_update_file()?; let (uuid, mut file) = self.index_scheduler.create_update_file(false)?;
let mut builder = DocumentsBatchBuilder::new(file.as_file_mut()); let mut builder = DocumentsBatchBuilder::new(file.as_file_mut());
for doc in content_file { for doc in content_file {
builder.append_json_object(&doc?)?; builder.append_json_object(&doc?)?;
@ -2038,7 +2053,7 @@ mod tests {
for (idx, kind) in kinds.into_iter().enumerate() { for (idx, kind) in kinds.into_iter().enumerate() {
let k = kind.as_kind(); let k = kind.as_kind();
let task = index_scheduler.register(kind, None).unwrap(); let task = index_scheduler.register(kind, None, false).unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
assert_eq!(task.uid, idx as u32); assert_eq!(task.uid, idx as u32);
@ -2053,18 +2068,18 @@ mod tests {
fn insert_task_while_another_task_is_processing() { fn insert_task_while_another_task_is_processing() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
index_scheduler.register(index_creation_task("index_a", "id"), None).unwrap(); index_scheduler.register(index_creation_task("index_a", "id"), None, false).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
handle.advance_till([Start, BatchCreated]); handle.advance_till([Start, BatchCreated]);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_batch_creation"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_batch_creation");
// while the task is processing can we register another task? // while the task is processing can we register another task?
index_scheduler.register(index_creation_task("index_b", "id"), None).unwrap(); index_scheduler.register(index_creation_task("index_b", "id"), None, false).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task");
index_scheduler index_scheduler
.register(KindWithContent::IndexDeletion { index_uid: S("index_a") }, None) .register(KindWithContent::IndexDeletion { index_uid: S("index_a") }, None, false)
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task");
} }
@ -2073,7 +2088,7 @@ mod tests {
fn test_task_is_processing() { fn test_task_is_processing() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
index_scheduler.register(index_creation_task("index_a", "id"), None).unwrap(); index_scheduler.register(index_creation_task("index_a", "id"), None, false).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_a_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_a_task");
handle.advance_till([Start, BatchCreated]); handle.advance_till([Start, BatchCreated]);
@ -2090,6 +2105,7 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
@ -2098,12 +2114,13 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("cattos"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("cattos"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task");
index_scheduler index_scheduler
.register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None) .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None, false)
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task");
@ -2125,22 +2142,23 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
index_scheduler index_scheduler
.register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None) .register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None, false)
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task");
index_scheduler index_scheduler
.register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None) .register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None, false)
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task");
index_scheduler index_scheduler
.register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None) .register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None, false)
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_fourth_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_fourth_task");
@ -2173,7 +2191,7 @@ mod tests {
]; ];
for task in to_enqueue { for task in to_enqueue {
let _ = index_scheduler.register(task, None).unwrap(); let _ = index_scheduler.register(task, None, false).unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
} }
@ -2188,6 +2206,7 @@ mod tests {
tasks: RoaringBitmap::from_iter([0, 1]), tasks: RoaringBitmap::from_iter([0, 1]),
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
// again, no progress made at all, but one more task is registered // again, no progress made at all, but one more task is registered
@ -2222,7 +2241,7 @@ mod tests {
]; ];
for task in to_enqueue { for task in to_enqueue {
let _ = index_scheduler.register(task, None).unwrap(); let _ = index_scheduler.register(task, None, false).unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
} }
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued");
@ -2239,6 +2258,7 @@ mod tests {
tasks: RoaringBitmap::from_iter([0]), tasks: RoaringBitmap::from_iter([0]),
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_registering_the_task_deletion"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_registering_the_task_deletion");
@ -2262,7 +2282,7 @@ mod tests {
]; ];
for task in to_enqueue { for task in to_enqueue {
let _ = index_scheduler.register(task, None).unwrap(); let _ = index_scheduler.register(task, None, false).unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
} }
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued");
@ -2280,6 +2300,7 @@ mod tests {
tasks: RoaringBitmap::from_iter([0]), tasks: RoaringBitmap::from_iter([0]),
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -2313,6 +2334,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_register"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_register");
@ -2338,6 +2360,7 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
@ -2356,12 +2379,13 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task");
index_scheduler index_scheduler
.register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None) .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None, false)
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task");
@ -2395,6 +2419,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
@ -2405,6 +2430,7 @@ mod tests {
documents_ids: vec![S("1"), S("2")], documents_ids: vec![S("1"), S("2")],
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task");
@ -2434,6 +2460,7 @@ mod tests {
documents_ids: vec![S("1"), S("2")], documents_ids: vec![S("1"), S("2")],
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
@ -2458,6 +2485,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task");
@ -2495,6 +2523,7 @@ mod tests {
primary_key: None, primary_key: None,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -2502,7 +2531,11 @@ mod tests {
for name in index_names { for name in index_names {
index_scheduler index_scheduler
.register(KindWithContent::DocumentClear { index_uid: name.to_string() }, None) .register(
KindWithContent::DocumentClear { index_uid: name.to_string() },
None,
false,
)
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
} }
@ -2527,7 +2560,7 @@ mod tests {
]; ];
for task in to_enqueue { for task in to_enqueue {
let _ = index_scheduler.register(task, None).unwrap(); let _ = index_scheduler.register(task, None, false).unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
} }
@ -2549,6 +2582,7 @@ mod tests {
], ],
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_swap_registered"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_swap_registered");
@ -2558,6 +2592,7 @@ mod tests {
swaps: vec![IndexSwap { indexes: ("a".to_owned(), "c".to_owned()) }], swaps: vec![IndexSwap { indexes: ("a".to_owned(), "c".to_owned()) }],
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "two_swaps_registered"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "two_swaps_registered");
@ -2568,7 +2603,9 @@ mod tests {
handle.advance_one_successful_batch(); handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed");
index_scheduler.register(KindWithContent::IndexSwap { swaps: vec![] }, None).unwrap(); index_scheduler
.register(KindWithContent::IndexSwap { swaps: vec![] }, None, false)
.unwrap();
handle.advance_one_successful_batch(); handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_empty_swap_processed"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_empty_swap_processed");
} }
@ -2585,7 +2622,7 @@ mod tests {
]; ];
for task in to_enqueue { for task in to_enqueue {
let _ = index_scheduler.register(task, None).unwrap(); let _ = index_scheduler.register(task, None, false).unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
} }
handle.advance_n_successful_batches(4); handle.advance_n_successful_batches(4);
@ -2603,6 +2640,7 @@ mod tests {
], ],
}, },
None, None,
false,
) )
.unwrap_err(); .unwrap_err();
snapshot!(format!("{err}"), @"Indexes must be declared only once during a swap. `a`, `b` were specified several times."); snapshot!(format!("{err}"), @"Indexes must be declared only once during a swap. `a`, `b` were specified several times.");
@ -2621,6 +2659,7 @@ mod tests {
], ],
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
handle.advance_one_failed_batch(); handle.advance_one_failed_batch();
@ -2652,10 +2691,11 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler index_scheduler
.register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None) .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None, false)
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler)); snapshot!(snapshot_index_scheduler(&index_scheduler));
@ -2680,7 +2720,7 @@ mod tests {
}, },
]; ];
for task in to_enqueue { for task in to_enqueue {
let _ = index_scheduler.register(task, None).unwrap(); let _ = index_scheduler.register(task, None, false).unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
} }
@ -2697,7 +2737,7 @@ mod tests {
file0.persist().unwrap(); file0.persist().unwrap();
let _ = index_scheduler let _ = index_scheduler
.register(replace_document_import_task("catto", None, 0, documents_count0), None) .register(replace_document_import_task("catto", None, 0, documents_count0), None, false)
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
@ -2711,6 +2751,7 @@ mod tests {
tasks: RoaringBitmap::from_iter([0]), tasks: RoaringBitmap::from_iter([0]),
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
@ -2726,7 +2767,7 @@ mod tests {
file0.persist().unwrap(); file0.persist().unwrap();
let _ = index_scheduler let _ = index_scheduler
.register(replace_document_import_task("catto", None, 0, documents_count0), None) .register(replace_document_import_task("catto", None, 0, documents_count0), None, false)
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
@ -2740,6 +2781,7 @@ mod tests {
tasks: RoaringBitmap::from_iter([0]), tasks: RoaringBitmap::from_iter([0]),
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
@ -2770,7 +2812,7 @@ mod tests {
replace_document_import_task("wolfo", None, 2, documents_count2), replace_document_import_task("wolfo", None, 2, documents_count2),
]; ];
for task in to_enqueue { for task in to_enqueue {
let _ = index_scheduler.register(task, None).unwrap(); let _ = index_scheduler.register(task, None, false).unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
} }
handle.advance_one_successful_batch(); handle.advance_one_successful_batch();
@ -2784,6 +2826,7 @@ mod tests {
tasks: RoaringBitmap::from_iter([0, 1, 2]), tasks: RoaringBitmap::from_iter([0, 1, 2]),
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "processing_second_task_cancel_enqueued"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "processing_second_task_cancel_enqueued");
@ -2822,6 +2865,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -2872,6 +2916,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -2924,6 +2969,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -2977,6 +3023,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -3031,6 +3078,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -3076,13 +3124,13 @@ mod tests {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
let kind = index_creation_task("doggo", "bone"); let kind = index_creation_task("doggo", "bone");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
let kind = index_creation_task("whalo", "plankton"); let kind = index_creation_task("whalo", "plankton");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task");
let kind = index_creation_task("catto", "his_own_vomit"); let kind = index_creation_task("catto", "his_own_vomit");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task");
handle.advance_n_successful_batches(3); handle.advance_n_successful_batches(3);
@ -3140,11 +3188,11 @@ mod tests {
IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]); IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]);
let kind = index_creation_task("catto", "mouse"); let kind = index_creation_task("catto", "mouse");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
let kind = index_creation_task("doggo", "sheep"); let kind = index_creation_task("doggo", "sheep");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
let kind = index_creation_task("whalo", "fish"); let kind = index_creation_task("whalo", "fish");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
@ -3363,17 +3411,17 @@ mod tests {
IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]); IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]);
let kind = index_creation_task("catto", "mouse"); let kind = index_creation_task("catto", "mouse");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
let kind = index_creation_task("doggo", "sheep"); let kind = index_creation_task("doggo", "sheep");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
let kind = KindWithContent::IndexSwap { let kind = KindWithContent::IndexSwap {
swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }], swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }],
}; };
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
let kind = KindWithContent::IndexSwap { let kind = KindWithContent::IndexSwap {
swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "whalo".to_owned()) }], swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "whalo".to_owned()) }],
}; };
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
@ -3449,20 +3497,20 @@ mod tests {
IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]); IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]);
let kind = index_creation_task("catto", "mouse"); let kind = index_creation_task("catto", "mouse");
let _ = index_scheduler.register(kind, None).unwrap(); let _ = index_scheduler.register(kind, None, false).unwrap();
let kind = index_creation_task("doggo", "sheep"); let kind = index_creation_task("doggo", "sheep");
let _ = index_scheduler.register(kind, None).unwrap(); let _ = index_scheduler.register(kind, None, false).unwrap();
let kind = KindWithContent::IndexSwap { let kind = KindWithContent::IndexSwap {
swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }], swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }],
}; };
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
handle.advance_n_successful_batches(1); handle.advance_n_successful_batches(1);
let kind = KindWithContent::TaskCancelation { let kind = KindWithContent::TaskCancelation {
query: "test_query".to_string(), query: "test_query".to_string(),
tasks: [0, 1, 2, 3].into_iter().collect(), tasks: [0, 1, 2, 3].into_iter().collect(),
}; };
let task_cancelation = index_scheduler.register(kind, None).unwrap(); let task_cancelation = index_scheduler.register(kind, None, false).unwrap();
handle.advance_n_successful_batches(1); handle.advance_n_successful_batches(1);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
@ -3497,7 +3545,7 @@ mod tests {
let kind = index_creation_task("catto", "mouse"); let kind = index_creation_task("catto", "mouse");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_register"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_register");
handle.advance_one_failed_batch(); handle.advance_one_failed_batch();
@ -3532,6 +3580,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
@ -3573,6 +3622,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
@ -3632,6 +3682,7 @@ mod tests {
allow_index_creation: false, allow_index_creation: false,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -3683,6 +3734,7 @@ mod tests {
allow_index_creation: false, allow_index_creation: false,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -3714,6 +3766,7 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
@ -3743,6 +3796,7 @@ mod tests {
allow_index_creation: false, allow_index_creation: false,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -3779,6 +3833,7 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
@ -3808,6 +3863,7 @@ mod tests {
allow_index_creation: false, allow_index_creation: false,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -3848,6 +3904,7 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
@ -3878,6 +3935,7 @@ mod tests {
allow_index_creation, allow_index_creation,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -3934,6 +3992,7 @@ mod tests {
allow_index_creation, allow_index_creation,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -3989,6 +4048,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -4053,6 +4113,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -4113,6 +4174,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -4197,6 +4259,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -4283,6 +4346,7 @@ mod tests {
allow_index_creation: true, allow_index_creation: true,
}, },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
@ -4337,7 +4401,7 @@ mod tests {
let kind = index_creation_task("catto", "mouse"); let kind = index_creation_task("catto", "mouse");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
handle.advance_till([Start, BatchCreated, ProcessBatchFailed, AfterProcessing]); handle.advance_till([Start, BatchCreated, ProcessBatchFailed, AfterProcessing]);
@ -4360,6 +4424,7 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
handle.advance_one_successful_batch(); handle.advance_one_successful_batch();
@ -4368,6 +4433,7 @@ mod tests {
let result = index_scheduler.register( let result = index_scheduler.register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None, None,
false,
); );
if result.is_err() { if result.is_err() {
break; break;
@ -4381,6 +4447,7 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None, None,
false,
) )
.unwrap_err(); .unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
@ -4392,6 +4459,7 @@ mod tests {
.register( .register(
KindWithContent::TaskDeletion { query: S("test"), tasks: RoaringBitmap::new() }, KindWithContent::TaskDeletion { query: S("test"), tasks: RoaringBitmap::new() },
None, None,
false,
) )
.unwrap_err(); .unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
@ -4403,6 +4471,7 @@ mod tests {
.register( .register(
KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() }, KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() },
None, None,
false,
) )
.unwrap(); .unwrap();
handle.advance_one_successful_batch(); handle.advance_one_successful_batch();
@ -4412,6 +4481,7 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
handle.advance_one_failed_batch(); handle.advance_one_failed_batch();
@ -4428,6 +4498,7 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
handle.advance_one_successful_batch(); handle.advance_one_successful_batch();
@ -4436,6 +4507,7 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
handle.advance_one_failed_batch(); handle.advance_one_failed_batch();
@ -4446,12 +4518,14 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler index_scheduler
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
@ -4507,6 +4581,7 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
handle.advance_one_successful_batch(); handle.advance_one_successful_batch();
@ -4515,6 +4590,7 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
handle.advance_one_failed_batch(); handle.advance_one_failed_batch();
@ -4525,12 +4601,14 @@ mod tests {
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
index_scheduler index_scheduler
.register( .register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None, None,
false,
) )
.unwrap(); .unwrap();
@ -4555,11 +4633,11 @@ mod tests {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
let kind = index_creation_task("catto", "mouse"); let kind = index_creation_task("catto", "mouse");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
let kind = index_creation_task("doggo", "sheep"); let kind = index_creation_task("doggo", "sheep");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
let kind = index_creation_task("whalo", "fish"); let kind = index_creation_task("whalo", "fish");
let _task = index_scheduler.register(kind, None).unwrap(); let _task = index_scheduler.register(kind, None, false).unwrap();
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###" snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
{ {
@ -4709,11 +4787,11 @@ mod tests {
query: "cancel dump".to_owned(), query: "cancel dump".to_owned(),
tasks: RoaringBitmap::from_iter([0]), tasks: RoaringBitmap::from_iter([0]),
}; };
let _ = index_scheduler.register(dump_creation, None).unwrap(); let _ = index_scheduler.register(dump_creation, None, false).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_dump_register"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_dump_register");
handle.advance_till([Start, BatchCreated, InsideProcessBatch]); handle.advance_till([Start, BatchCreated, InsideProcessBatch]);
let _ = index_scheduler.register(dump_cancellation, None).unwrap(); let _ = index_scheduler.register(dump_cancellation, None, false).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_registered"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_registered");
snapshot!(format!("{:?}", handle.advance()), @"AbortedIndexation"); snapshot!(format!("{:?}", handle.advance()), @"AbortedIndexation");
@ -4727,15 +4805,86 @@ mod tests {
let (index_scheduler, _handle) = IndexScheduler::test(true, vec![]); let (index_scheduler, _handle) = IndexScheduler::test(true, vec![]);
let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }; let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None };
let task = index_scheduler.register(kind, None).unwrap(); let task = index_scheduler.register(kind, None, false).unwrap();
snapshot!(task.uid, @"0"); snapshot!(task.uid, @"0");
let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }; let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None };
let task = index_scheduler.register(kind, Some(12)).unwrap(); let task = index_scheduler.register(kind, Some(12), false).unwrap();
snapshot!(task.uid, @"12"); snapshot!(task.uid, @"12");
let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }; let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None };
let error = index_scheduler.register(kind, Some(5)).unwrap_err(); let error = index_scheduler.register(kind, Some(5), false).unwrap_err();
snapshot!(error, @"Received bad task id: 5 should be >= to 13."); snapshot!(error, @"Received bad task id: 5 should be >= to 13.");
} }
#[test]
fn dry_run() {
let (index_scheduler, _handle) = IndexScheduler::test(true, vec![]);
let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None };
let task = index_scheduler.register(kind, None, true).unwrap();
snapshot!(task.uid, @"0");
snapshot!(snapshot_index_scheduler(&index_scheduler), @r###"
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
----------------------------------------------------------------------
### Status:
----------------------------------------------------------------------
### Kind:
----------------------------------------------------------------------
### Index Tasks:
----------------------------------------------------------------------
### Index Mapper:
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------
"###);
let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None };
let task = index_scheduler.register(kind, Some(12), true).unwrap();
snapshot!(task.uid, @"12");
snapshot!(snapshot_index_scheduler(&index_scheduler), @r###"
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
----------------------------------------------------------------------
### Status:
----------------------------------------------------------------------
### Kind:
----------------------------------------------------------------------
### Index Tasks:
----------------------------------------------------------------------
### Index Mapper:
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------
"###);
}
} }

View File

@ -265,7 +265,9 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
.name(String::from("register-snapshot-tasks")) .name(String::from("register-snapshot-tasks"))
.spawn(move || loop { .spawn(move || loop {
thread::sleep(snapshot_delay); thread::sleep(snapshot_delay);
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation, None) { if let Err(e) =
index_scheduler.register(KindWithContent::SnapshotCreation, None, false)
{
error!("Error while registering snapshot: {}", e); error!("Error while registering snapshot: {}", e);
} }
}) })

View File

@ -11,7 +11,7 @@ use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::{get_task_id, SummarizedTaskView}; use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::Opt; use crate::Opt;
pub fn configure(cfg: &mut web::ServiceConfig) { pub fn configure(cfg: &mut web::ServiceConfig) {
@ -32,8 +32,11 @@ pub async fn create_dump(
instance_uid: analytics.instance_uid().cloned(), instance_uid: analytics.instance_uid().cloned(),
}; };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Create dump"); debug!(returns = ?task, "Create dump");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -36,7 +36,9 @@ use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::payload::Payload; use crate::extractors::payload::Payload;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::{get_task_id, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; use crate::routes::{
get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT,
};
use crate::search::parse_filter; use crate::search::parse_filter;
use crate::Opt; use crate::Opt;
@ -133,8 +135,11 @@ pub async fn delete_document(
documents_ids: vec![document_id], documents_ids: vec![document_id],
}; };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!("returns: {:?}", task); debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
} }
@ -282,6 +287,7 @@ pub async fn replace_documents(
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = document_addition( let task = document_addition(
extract_mime_type(&req)?, extract_mime_type(&req)?,
index_scheduler, index_scheduler,
@ -291,6 +297,7 @@ pub async fn replace_documents(
body, body,
IndexDocumentsMethod::ReplaceDocuments, IndexDocumentsMethod::ReplaceDocuments,
uid, uid,
dry_run,
allow_index_creation, allow_index_creation,
) )
.await?; .await?;
@ -317,6 +324,7 @@ pub async fn update_documents(
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = document_addition( let task = document_addition(
extract_mime_type(&req)?, extract_mime_type(&req)?,
index_scheduler, index_scheduler,
@ -326,6 +334,7 @@ pub async fn update_documents(
body, body,
IndexDocumentsMethod::UpdateDocuments, IndexDocumentsMethod::UpdateDocuments,
uid, uid,
dry_run,
allow_index_creation, allow_index_creation,
) )
.await?; .await?;
@ -344,6 +353,7 @@ async fn document_addition(
mut body: Payload, mut body: Payload,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
task_id: Option<TaskId>, task_id: Option<TaskId>,
dry_run: bool,
allow_index_creation: bool, allow_index_creation: bool,
) -> Result<SummarizedTaskView, MeilisearchHttpError> { ) -> Result<SummarizedTaskView, MeilisearchHttpError> {
let format = match ( let format = match (
@ -376,7 +386,7 @@ async fn document_addition(
} }
}; };
let (uuid, mut update_file) = index_scheduler.create_update_file()?; let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?;
let temp_file = match tempfile() { let temp_file = match tempfile() {
Ok(file) => file, Ok(file) => file,
@ -460,7 +470,9 @@ async fn document_addition(
}; };
let scheduler = index_scheduler.clone(); let scheduler = index_scheduler.clone();
let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id)).await? { let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id, dry_run))
.await?
{
Ok(task) => task, Ok(task) => task,
Err(e) => { Err(e) => {
index_scheduler.delete_update_file(uuid)?; index_scheduler.delete_update_file(uuid)?;
@ -492,8 +504,11 @@ pub async fn delete_documents_batch(
let task = let task =
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids }; KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete documents by batch"); debug!(returns = ?task, "Delete documents by batch");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -530,8 +545,11 @@ pub async fn delete_documents_by_filter(
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter }; let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete documents by filter"); debug!(returns = ?task, "Delete documents by filter");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -549,8 +567,11 @@ pub async fn clear_all_documents(
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() }; let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete all documents"); debug!(returns = ?task, "Delete all documents");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -22,6 +22,7 @@ use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::{AuthenticationError, GuardedData}; use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::is_dry_run;
use crate::Opt; use crate::Opt;
pub mod documents; pub mod documents;
@ -140,8 +141,11 @@ pub async fn create_index(
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key }; let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Create index"); debug!(returns = ?task, "Create index");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -211,8 +215,11 @@ pub async fn update_index(
}; };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Update index"); debug!(returns = ?task, "Update index");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -227,8 +234,11 @@ pub async fn delete_index(
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() }; let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete index"); debug!(returns = ?task, "Delete index");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -15,7 +15,7 @@ use tracing::debug;
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::routes::{get_task_id, SummarizedTaskView}; use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::Opt; use crate::Opt;
#[macro_export] #[macro_export]
@ -36,7 +36,7 @@ macro_rules! make_setting_route {
use $crate::extractors::authentication::GuardedData; use $crate::extractors::authentication::GuardedData;
use $crate::extractors::sequential_extractor::SeqHandler; use $crate::extractors::sequential_extractor::SeqHandler;
use $crate::Opt; use $crate::Opt;
use $crate::routes::{get_task_id, SummarizedTaskView}; use $crate::routes::{is_dry_run, get_task_id, SummarizedTaskView};
pub async fn delete( pub async fn delete(
index_scheduler: GuardedData< index_scheduler: GuardedData<
@ -61,8 +61,9 @@ macro_rules! make_setting_route {
allow_index_creation, allow_index_creation,
}; };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)) tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await?? .await??
.into(); .into();
@ -112,8 +113,9 @@ macro_rules! make_setting_route {
allow_index_creation, allow_index_creation,
}; };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)) tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await?? .await??
.into(); .into();
@ -776,8 +778,11 @@ pub async fn update_all(
allow_index_creation, allow_index_creation,
}; };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Update all settings"); debug!(returns = ?task, "Update all settings");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -815,8 +820,11 @@ pub async fn delete_all(
allow_index_creation, allow_index_creation,
}; };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete all settings"); debug!(returns = ?task, "Delete all settings");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -77,6 +77,25 @@ pub fn get_task_id(req: &HttpRequest, opt: &Opt) -> Result<Option<TaskId>, Respo
Ok(task_id) Ok(task_id)
} }
pub fn is_dry_run(req: &HttpRequest, opt: &Opt) -> Result<bool, ResponseError> {
if !opt.experimental_ha_parameters {
return Ok(false);
}
Ok(req
.headers()
.get("DryRun")
.map(|header| {
header.to_str().map_err(|e| {
ResponseError::from_msg(
format!("DryRun is not a valid utf-8 string: {e}"),
Code::BadRequest,
)
})
})
.transpose()?
.map_or(false, |s| s.to_lowercase() == "true"))
}
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct SummarizedTaskView { pub struct SummarizedTaskView {

View File

@ -10,7 +10,7 @@ use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::{get_task_id, SummarizedTaskView}; use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::Opt; use crate::Opt;
pub fn configure(cfg: &mut web::ServiceConfig) { pub fn configure(cfg: &mut web::ServiceConfig) {
@ -27,8 +27,11 @@ pub async fn create_snapshot(
let task = KindWithContent::SnapshotCreation; let task = KindWithContent::SnapshotCreation;
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Create snapshot"); debug!(returns = ?task, "Create snapshot");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -10,7 +10,7 @@ use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::tasks::{IndexSwap, KindWithContent}; use meilisearch_types::tasks::{IndexSwap, KindWithContent};
use serde_json::json; use serde_json::json;
use super::{get_task_id, SummarizedTaskView}; use super::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::error::MeilisearchHttpError; use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
@ -63,7 +63,10 @@ pub async fn swap_indexes(
let task = KindWithContent::IndexSwap { swaps }; let task = KindWithContent::IndexSwap { swaps };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
} }

View File

@ -18,7 +18,7 @@ use time::macros::format_description;
use time::{Date, Duration, OffsetDateTime, Time}; use time::{Date, Duration, OffsetDateTime, Time};
use tokio::task; use tokio::task;
use super::{get_task_id, SummarizedTaskView}; use super::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
@ -200,8 +200,10 @@ async fn cancel_tasks(
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks }; KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = let task =
task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid)).await??; task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid, dry_run))
.await??;
let task: SummarizedTaskView = task.into(); let task: SummarizedTaskView = task.into();
Ok(HttpResponse::Ok().json(task)) Ok(HttpResponse::Ok().json(task))
@ -248,7 +250,9 @@ async fn delete_tasks(
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks }; KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
let uid = get_task_id(&req, &opt)?; let uid = get_task_id(&req, &opt)?;
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid)).await??; let dry_run = is_dry_run(&req, &opt)?;
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid, dry_run))
.await??;
let task: SummarizedTaskView = task.into(); let task: SummarizedTaskView = task.into();
Ok(HttpResponse::Ok().json(task)) Ok(HttpResponse::Ok().json(task))