add progress for the task deletion and task cancelation

This commit is contained in:
Tamo 2024-12-10 22:29:31 +01:00
parent ab75f53efd
commit 26733c705d
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
4 changed files with 115 additions and 32 deletions

1
Cargo.lock generated
View File

@ -2632,6 +2632,7 @@ dependencies = [
"bincode",
"bumpalo",
"bumparaw-collections",
"convert_case 0.6.0",
"crossbeam-channel",
"csv",
"derive_builder 0.20.0",

View File

@ -15,6 +15,7 @@ anyhow = "1.0.86"
bincode = "1.3.3"
bumpalo = "3.16.0"
bumparaw-collections = "0.1.2"
convert_case = "0.6.0"
csv = "1.3.0"
derive_builder = "0.20.0"
dump = { path = "../dump" }

View File

@ -22,6 +22,7 @@ use std::ffi::OsStr;
use std::fmt;
use std::fs::{self, File};
use std::io::BufWriter;
use std::sync::atomic::Ordering;
use bumpalo::collections::CollectIn;
use bumpalo::Bump;
@ -48,6 +49,9 @@ use time::OffsetDateTime;
use uuid::Uuid;
use crate::autobatcher::{self, BatchKind};
use crate::processing::{
AtomicBatchStep, AtomicTaskStep, TaskCancelationProgress, TaskDeletionProgress,
};
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
use crate::{Error, IndexScheduler, Result, TaskId};
@ -583,8 +587,13 @@ impl IndexScheduler {
};
let rtxn = self.env.read_txn()?;
let mut canceled_tasks =
self.cancel_matched_tasks(&rtxn, task.uid, current_batch, matched_tasks)?;
let mut canceled_tasks = self.cancel_matched_tasks(
&rtxn,
task.uid,
current_batch,
matched_tasks,
&progress,
)?;
task.status = Status::Succeeded;
match &mut task.details {
@ -615,7 +624,8 @@ impl IndexScheduler {
}
let mut wtxn = self.env.write_txn()?;
let mut deleted_tasks = self.delete_matched_tasks(&mut wtxn, &matched_tasks)?;
let mut deleted_tasks =
self.delete_matched_tasks(&mut wtxn, &matched_tasks, &progress)?;
wtxn.commit()?;
for task in tasks.iter_mut() {
@ -1664,7 +1674,10 @@ impl IndexScheduler {
&self,
wtxn: &mut RwTxn,
matched_tasks: &RoaringBitmap,
progress: &Progress,
) -> Result<RoaringBitmap> {
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
// 1. Remove from this list the tasks that we are not allowed to delete
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
@ -1683,6 +1696,8 @@ impl IndexScheduler {
// The tasks that have been removed *per batches*.
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new();
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
progress.update_progress(task_progress);
for task_id in to_delete_tasks.iter() {
let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
@ -1706,22 +1721,35 @@ impl IndexScheduler {
if let Some(batch_uid) = task.batch_uid {
affected_batches.entry(batch_uid).or_default().insert(task_id);
}
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
let (atomic_progress, task_progress) = AtomicTaskStep::new(
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
);
progress.update_progress(task_progress);
for index in affected_indexes.iter() {
self.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?;
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
for status in affected_statuses.iter() {
self.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?;
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
for kind in affected_kinds.iter() {
self.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?;
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
progress.update_progress(TaskDeletionProgress::DeletingTasks);
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
progress.update_progress(task_progress);
for task in to_delete_tasks.iter() {
self.all_tasks.delete(wtxn, &task)?;
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
for canceled_by in affected_canceled_by {
if let Some(mut tasks) = self.canceled_by.get(wtxn, &canceled_by)? {
@ -1733,6 +1761,9 @@ impl IndexScheduler {
}
}
}
progress.update_progress(TaskDeletionProgress::DeletingBatches);
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32);
progress.update_progress(batch_progress);
for (batch_id, to_delete_tasks) in affected_batches {
if let Some(mut tasks) = self.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
tasks -= &to_delete_tasks;
@ -1774,6 +1805,7 @@ impl IndexScheduler {
}
}
}
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
Ok(to_delete_tasks)
@ -1788,21 +1820,36 @@ impl IndexScheduler {
cancel_task_id: TaskId,
current_batch: &mut ProcessingBatch,
matched_tasks: &RoaringBitmap,
progress: &Progress,
) -> Result<Vec<Task>> {
progress.update_progress(TaskCancelationProgress::RetrievingTasks);
// 1. Remove from this list the tasks that we are not allowed to cancel
// Notice that only the _enqueued_ ones are cancelable and we should
// have already aborted the indexation of the _processing_ ones
let cancelable_tasks = self.get_status(rtxn, Status::Enqueued)?;
let tasks_to_cancel = cancelable_tasks & matched_tasks;
// 2. We now have a list of tasks to cancel, cancel them
let mut tasks = self.get_existing_tasks(rtxn, tasks_to_cancel.iter())?;
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
progress.update_progress(progress_obj);
// 2. We now have a list of tasks to cancel, cancel them
let mut tasks = self.get_existing_tasks(
rtxn,
tasks_to_cancel.iter().inspect(|_| {
task_progress.fetch_add(1, Ordering::Relaxed);
}),
)?;
progress.update_progress(TaskCancelationProgress::UpdatingTasks);
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
progress.update_progress(progress_obj);
for task in tasks.iter_mut() {
task.status = Status::Canceled;
task.canceled_by = Some(cancel_task_id);
task.details = task.details.as_ref().map(|d| d.to_failed());
current_batch.processing(Some(task));
task_progress.fetch_add(1, Ordering::Relaxed);
}
Ok(tasks)

View File

@ -1,4 +1,5 @@
use crate::utils::ProcessingBatch;
use enum_iterator::Sequence;
use meilisearch_types::milli::progress::{AtomicSubStep, NamedStep, Progress, ProgressView, Step};
use roaring::RoaringBitmap;
use std::{borrow::Cow, sync::Arc};
@ -54,39 +55,72 @@ impl ProcessingTasks {
}
}
macro_rules! make_enum_progress {
(enum $name:ident: $(- $variant:ident)+ ) => {
#[repr(u8)]
#[derive(Copy, Clone)]
pub enum BatchProgress {
ProcessingTasks,
WritingTasksToDisk,
#[derive(Debug, Clone, Copy, PartialEq, Eq, Sequence)]
#[allow(clippy::enum_variant_names)]
pub enum $name {
$($variant),+
}
impl Step for BatchProgress {
impl Step for $name {
fn name(&self) -> Cow<'static, str> {
use convert_case::Casing;
match self {
BatchProgress::ProcessingTasks => Cow::Borrowed("processing tasks"),
BatchProgress::WritingTasksToDisk => Cow::Borrowed("writing tasks to disk"),
$(
$name::$variant => stringify!($variant).from_case(convert_case::Case::Camel).to_case(convert_case::Case::Lower).into()
),+
}
}
fn current(&self) -> u32 {
*self as u8 as u32
*self as u32
}
fn total(&self) -> u32 {
2
Self::CARDINALITY as u32
}
}
};
}
#[derive(Default)]
pub struct Task {}
impl NamedStep for Task {
macro_rules! make_atomic_progress {
($struct_name:ident alias $atomic_struct_name:ident => $step_name:literal) => {
#[derive(Default, Debug, Clone, Copy)]
pub struct $struct_name {}
impl NamedStep for $struct_name {
fn name(&self) -> &'static str {
"task"
$step_name
}
}
pub type AtomicTaskStep = AtomicSubStep<Task>;
pub type $atomic_struct_name = AtomicSubStep<$struct_name>;
};
}
make_enum_progress! {
enum BatchProgress:
- ProcessingTasks
- WritingTasksToDisk
}
make_enum_progress! {
enum TaskCancelationProgress:
- RetrievingTasks
- UpdatingTasks
}
make_enum_progress! {
enum TaskDeletionProgress:
- DeletingTasksDateTime
- DeletingTasksMetadata
- DeletingTasks
- DeletingBatches
}
make_atomic_progress!(Task alias AtomicTaskStep => "task" );
make_atomic_progress!(Batch alias AtomicBatchStep => "batch" );
#[cfg(test)]
mod test {