Update the canceledBy and finishedAt fields

This commit is contained in:
Kerollmops 2022-10-18 13:57:58 +02:00 committed by Clément Renault
parent 725158b454
commit 290945e258
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
6 changed files with 29 additions and 8 deletions

View File

@ -62,6 +62,9 @@ pub struct TaskDump {
#[serde(rename = "type")] #[serde(rename = "type")]
pub kind: KindDump, pub kind: KindDump,
#[serde(skip_serializing_if = "Option::is_none")]
pub canceled_by: Option<TaskId>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<Details>, pub details: Option<Details>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
@ -136,6 +139,7 @@ impl From<Task> for TaskDump {
index_uid: task.index_uid().map(|uid| uid.to_string()), index_uid: task.index_uid().map(|uid| uid.to_string()),
status: task.status, status: task.status,
kind: task.kind.into(), kind: task.kind.into(),
canceled_by: task.canceled_by,
details: task.details, details: task.details,
error: task.error, error: task.error,
enqueued_at: task.enqueued_at, enqueued_at: task.enqueued_at,
@ -289,6 +293,7 @@ pub(crate) mod test {
primary_key: Some(S("bone")), primary_key: Some(S("bone")),
documents_count: 12, documents_count: 12,
}, },
canceled_by: None,
details: Some(Details::DocumentAddition { details: Some(Details::DocumentAddition {
received_documents: 12, received_documents: 12,
indexed_documents: Some(10), indexed_documents: Some(10),
@ -311,6 +316,7 @@ pub(crate) mod test {
primary_key: None, primary_key: None,
documents_count: 2, documents_count: 2,
}, },
canceled_by: None,
details: Some(Details::DocumentAddition { details: Some(Details::DocumentAddition {
received_documents: 2, received_documents: 2,
indexed_documents: None, indexed_documents: None,
@ -337,6 +343,7 @@ pub(crate) mod test {
index_uid: Some(S("catto")), index_uid: Some(S("catto")),
status: Status::Enqueued, status: Status::Enqueued,
kind: KindDump::IndexDeletion, kind: KindDump::IndexDeletion,
canceled_by: None,
details: None, details: None,
error: None, error: None,
enqueued_at: datetime!(2022-11-15 0:00 UTC), enqueued_at: datetime!(2022-11-15 0:00 UTC),

View File

@ -125,6 +125,7 @@ impl CompatV5ToV6 {
instance_uid: instance_uid.clone(), instance_uid: instance_uid.clone(),
}, },
}, },
canceled_by: None,
details: task_view.details.map(|details| match details { details: task_view.details.map(|details| match details {
v5::Details::DocumentAddition { v5::Details::DocumentAddition {
received_documents, received_documents,

View File

@ -468,7 +468,8 @@ impl IndexScheduler {
}; };
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
let nbr_canceled_tasks = self.cancel_matched_tasks(&mut wtxn, matched_tasks)?; let canceled_tasks_count =
self.cancel_matched_tasks(&mut wtxn, task.uid, matched_tasks)?;
task.status = Status::Succeeded; task.status = Status::Succeeded;
match &mut task.details { match &mut task.details {
@ -477,7 +478,7 @@ impl IndexScheduler {
canceled_tasks, canceled_tasks,
original_query: _, original_query: _,
}) => { }) => {
*canceled_tasks = Some(nbr_canceled_tasks); *canceled_tasks = Some(canceled_tasks_count);
} }
_ => unreachable!(), _ => unreachable!(),
} }
@ -1029,20 +1030,25 @@ impl IndexScheduler {
fn cancel_matched_tasks( fn cancel_matched_tasks(
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
cancel_task_id: TaskId,
matched_tasks: &RoaringBitmap, matched_tasks: &RoaringBitmap,
) -> Result<usize> { ) -> Result<usize> {
let now = OffsetDateTime::now_utc();
// 1. Remove from this list the tasks that we are not allowed to cancel // 1. Remove from this list the tasks that we are not allowed to cancel
// Notice that only the _enqueued_ ones are cancelable and we should // Notice that only the _enqueued_ ones are cancelable and we should
// have already aborted the indexation of the _processing_ ones // have already aborted the indexation of the _processing_ ones
let cancelable_tasks = self.get_status(&wtxn, Status::Enqueued)?; let cancelable_tasks = self.get_status(wtxn, Status::Enqueued)?;
let tasks_to_cancel = cancelable_tasks & matched_tasks; let tasks_to_cancel = cancelable_tasks & matched_tasks;
// 2. We now have a list of tasks to cancel, cancel them // 2. We now have a list of tasks to cancel, cancel them
self.update_status(wtxn, Status::Enqueued, |bitmap| *bitmap -= &tasks_to_cancel)?; for mut task in self.get_existing_tasks(wtxn, tasks_to_cancel.iter())? {
self.update_status(wtxn, Status::Canceled, |bitmap| *bitmap |= &tasks_to_cancel)?; // TODO delete the content uuid of the task
task.status = Status::Canceled;
// TODO update the content of the tasks i.e. canceled_by and finished_at task.canceled_by = Some(cancel_task_id);
// TODO delete the content uuid of the tasks task.finished_at = Some(now);
self.update_task(wtxn, &task)?;
}
Ok(tasks_to_cancel.len() as usize) Ok(tasks_to_cancel.len() as usize)
} }

View File

@ -397,6 +397,7 @@ impl IndexScheduler {
started_at: None, started_at: None,
finished_at: None, finished_at: None,
error: None, error: None,
canceled_by: None,
details: kind.default_details(), details: kind.default_details(),
status: Status::Enqueued, status: Status::Enqueued,
kind: kind.clone(), kind: kind.clone(),
@ -478,6 +479,7 @@ impl IndexScheduler {
started_at: task.started_at, started_at: task.started_at,
finished_at: task.finished_at, finished_at: task.finished_at,
error: task.error, error: task.error,
canceled_by: task.canceled_by,
details: task.details, details: task.details,
status: task.status, status: task.status,
kind: match task.kind { kind: match task.kind {

View File

@ -38,6 +38,9 @@ pub struct TaskView {
#[serde(rename = "type")] #[serde(rename = "type")]
pub kind: Kind, pub kind: Kind,
#[serde(skip_serializing_if = "Option::is_none")]
pub canceled_by: Option<TaskId>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<DetailsView>, pub details: Option<DetailsView>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
@ -74,6 +77,7 @@ impl TaskView {
.and_then(|vec| vec.first().map(|i| i.to_string())), .and_then(|vec| vec.first().map(|i| i.to_string())),
status: task.status, status: task.status,
kind: task.kind.as_kind(), kind: task.kind.as_kind(),
canceled_by: task.canceled_by,
details: task.details.clone().map(DetailsView::from), details: task.details.clone().map(DetailsView::from),
error: task.error.clone(), error: task.error.clone(),
duration: task duration: task

View File

@ -30,6 +30,7 @@ pub struct Task {
pub finished_at: Option<OffsetDateTime>, pub finished_at: Option<OffsetDateTime>,
pub error: Option<ResponseError>, pub error: Option<ResponseError>,
pub canceled_by: Option<TaskId>,
pub details: Option<Details>, pub details: Option<Details>,
pub status: Status, pub status: Status,