mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-29 16:24:26 +01:00
Make sure that we don't delete or cancel future tasks
This should already have been the case before, but there is no harm in adding another check.
This commit is contained in:
parent
e0821ad4b0
commit
6db90ba6cc
@ -50,7 +50,7 @@ use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
|||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use synchronoise::SignalEvent;
|
use synchronoise::SignalEvent;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use utils::{keep_tasks_within_datetimes, map_bound};
|
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::index_mapper::IndexMapper;
|
use crate::index_mapper::IndexMapper;
|
||||||
@ -565,7 +565,7 @@ impl IndexScheduler {
|
|||||||
pub fn register(&self, kind: KindWithContent) -> Result<Task> {
|
pub fn register(&self, kind: KindWithContent) -> Result<Task> {
|
||||||
let mut wtxn = self.env.write_txn()?;
|
let mut wtxn = self.env.write_txn()?;
|
||||||
|
|
||||||
let task = Task {
|
let mut task = Task {
|
||||||
uid: self.next_task_id(&wtxn)?,
|
uid: self.next_task_id(&wtxn)?,
|
||||||
enqueued_at: time::OffsetDateTime::now_utc(),
|
enqueued_at: time::OffsetDateTime::now_utc(),
|
||||||
started_at: None,
|
started_at: None,
|
||||||
@ -576,6 +576,12 @@ impl IndexScheduler {
|
|||||||
status: Status::Enqueued,
|
status: Status::Enqueued,
|
||||||
kind: kind.clone(),
|
kind: kind.clone(),
|
||||||
};
|
};
|
||||||
|
// For deletion and cancelation tasks, we want to make extra sure that they
|
||||||
|
// don't attempt to delete/cancel tasks that are newer than themselves.
|
||||||
|
filter_out_references_to_newer_tasks(&mut task);
|
||||||
|
// Get rid of the mutability.
|
||||||
|
let task = task;
|
||||||
|
|
||||||
self.all_tasks.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
|
self.all_tasks.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
|
||||||
|
|
||||||
for index in task.indexes() {
|
for index in task.indexes() {
|
||||||
|
@ -276,6 +276,28 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Remove references to task ids that are greater than the id of the given task.
|
||||||
|
pub(crate) fn filter_out_references_to_newer_tasks(task: &mut Task) {
|
||||||
|
let new_nbr_of_matched_tasks = match &mut task.kind {
|
||||||
|
KindWithContent::TaskCancelation { tasks, .. }
|
||||||
|
| KindWithContent::TaskDeletion { tasks, .. } => {
|
||||||
|
tasks.remove_range(task.uid..);
|
||||||
|
tasks.len()
|
||||||
|
}
|
||||||
|
_ => return,
|
||||||
|
};
|
||||||
|
match &mut task.details {
|
||||||
|
Some(
|
||||||
|
Details::TaskCancelation { matched_tasks, .. }
|
||||||
|
| Details::TaskDeletion { matched_tasks, .. },
|
||||||
|
) => {
|
||||||
|
*matched_tasks = new_nbr_of_matched_tasks;
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
impl IndexScheduler {
|
impl IndexScheduler {
|
||||||
/// Asserts that the index scheduler's content is internally consistent.
|
/// Asserts that the index scheduler's content is internally consistent.
|
||||||
|
Loading…
Reference in New Issue
Block a user