diff --git a/Cargo.lock b/Cargo.lock index de7dabc36..91c83fb13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2632,6 +2632,7 @@ dependencies = [ "bincode", "bumpalo", "bumparaw-collections", + "convert_case 0.6.0", "crossbeam-channel", "csv", "derive_builder 0.20.0", diff --git a/crates/index-scheduler/Cargo.toml b/crates/index-scheduler/Cargo.toml index 5d7eb1913..ec2f17f84 100644 --- a/crates/index-scheduler/Cargo.toml +++ b/crates/index-scheduler/Cargo.toml @@ -15,6 +15,7 @@ anyhow = "1.0.86" bincode = "1.3.3" bumpalo = "3.16.0" bumparaw-collections = "0.1.2" +convert_case = "0.6.0" csv = "1.3.0" derive_builder = "0.20.0" dump = { path = "../dump" } diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index 1bfa7f53b..fe055b185 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -22,6 +22,7 @@ use std::ffi::OsStr; use std::fmt; use std::fs::{self, File}; use std::io::BufWriter; +use std::sync::atomic::Ordering; use bumpalo::collections::CollectIn; use bumpalo::Bump; @@ -48,6 +49,9 @@ use time::OffsetDateTime; use uuid::Uuid; use crate::autobatcher::{self, BatchKind}; +use crate::processing::{ + AtomicBatchStep, AtomicTaskStep, TaskCancelationProgress, TaskDeletionProgress, +}; use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; use crate::{Error, IndexScheduler, Result, TaskId}; @@ -583,8 +587,13 @@ impl IndexScheduler { }; let rtxn = self.env.read_txn()?; - let mut canceled_tasks = - self.cancel_matched_tasks(&rtxn, task.uid, current_batch, matched_tasks)?; + let mut canceled_tasks = self.cancel_matched_tasks( + &rtxn, + task.uid, + current_batch, + matched_tasks, + &progress, + )?; task.status = Status::Succeeded; match &mut task.details { @@ -615,7 +624,8 @@ impl IndexScheduler { } let mut wtxn = self.env.write_txn()?; - let mut deleted_tasks = self.delete_matched_tasks(&mut wtxn, &matched_tasks)?; + let mut deleted_tasks = + self.delete_matched_tasks(&mut wtxn, &matched_tasks, &progress)?; wtxn.commit()?; for task in tasks.iter_mut() { @@ -1664,7 +1674,10 @@ impl IndexScheduler { &self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap, + progress: &Progress, ) -> Result { + progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); + // 1. Remove from this list the tasks that we are not allowed to delete let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?; let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); @@ -1683,6 +1696,8 @@ impl IndexScheduler { // The tasks that have been removed *per batches*. let mut affected_batches: HashMap = HashMap::new(); + let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32); + progress.update_progress(task_progress); for task_id in to_delete_tasks.iter() { let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; @@ -1706,22 +1721,35 @@ impl IndexScheduler { if let Some(batch_uid) = task.batch_uid { affected_batches.entry(batch_uid).or_default().insert(task_id); } + atomic_progress.fetch_add(1, Ordering::Relaxed); } + progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); + let (atomic_progress, task_progress) = AtomicTaskStep::new( + (affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32, + ); + progress.update_progress(task_progress); for index in affected_indexes.iter() { self.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?; + atomic_progress.fetch_add(1, Ordering::Relaxed); } for status in affected_statuses.iter() { self.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?; + atomic_progress.fetch_add(1, Ordering::Relaxed); } for kind in affected_kinds.iter() { self.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?; + atomic_progress.fetch_add(1, Ordering::Relaxed); } + progress.update_progress(TaskDeletionProgress::DeletingTasks); + let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32); + progress.update_progress(task_progress); for task in to_delete_tasks.iter() { self.all_tasks.delete(wtxn, &task)?; + atomic_progress.fetch_add(1, Ordering::Relaxed); } for canceled_by in affected_canceled_by { if let Some(mut tasks) = self.canceled_by.get(wtxn, &canceled_by)? { @@ -1733,6 +1761,9 @@ impl IndexScheduler { } } } + progress.update_progress(TaskDeletionProgress::DeletingBatches); + let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32); + progress.update_progress(batch_progress); for (batch_id, to_delete_tasks) in affected_batches { if let Some(mut tasks) = self.batch_to_tasks_mapping.get(wtxn, &batch_id)? { tasks -= &to_delete_tasks; @@ -1774,6 +1805,7 @@ impl IndexScheduler { } } } + atomic_progress.fetch_add(1, Ordering::Relaxed); } Ok(to_delete_tasks) @@ -1788,21 +1820,36 @@ impl IndexScheduler { cancel_task_id: TaskId, current_batch: &mut ProcessingBatch, matched_tasks: &RoaringBitmap, + progress: &Progress, ) -> Result> { + progress.update_progress(TaskCancelationProgress::RetrievingTasks); + // 1. Remove from this list the tasks that we are not allowed to cancel // Notice that only the _enqueued_ ones are cancelable and we should // have already aborted the indexation of the _processing_ ones let cancelable_tasks = self.get_status(rtxn, Status::Enqueued)?; let tasks_to_cancel = cancelable_tasks & matched_tasks; - // 2. We now have a list of tasks to cancel, cancel them - let mut tasks = self.get_existing_tasks(rtxn, tasks_to_cancel.iter())?; + let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32); + progress.update_progress(progress_obj); + // 2. We now have a list of tasks to cancel, cancel them + let mut tasks = self.get_existing_tasks( + rtxn, + tasks_to_cancel.iter().inspect(|_| { + task_progress.fetch_add(1, Ordering::Relaxed); + }), + )?; + + progress.update_progress(TaskCancelationProgress::UpdatingTasks); + let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32); + progress.update_progress(progress_obj); for task in tasks.iter_mut() { task.status = Status::Canceled; task.canceled_by = Some(cancel_task_id); task.details = task.details.as_ref().map(|d| d.to_failed()); current_batch.processing(Some(task)); + task_progress.fetch_add(1, Ordering::Relaxed); } Ok(tasks) diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index e5e892927..f28fa0219 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -1,4 +1,5 @@ use crate::utils::ProcessingBatch; +use enum_iterator::Sequence; use meilisearch_types::milli::progress::{AtomicSubStep, NamedStep, Progress, ProgressView, Step}; use roaring::RoaringBitmap; use std::{borrow::Cow, sync::Arc}; @@ -54,39 +55,72 @@ impl ProcessingTasks { } } -#[repr(u8)] -#[derive(Copy, Clone)] -pub enum BatchProgress { - ProcessingTasks, - WritingTasksToDisk, -} - -impl Step for BatchProgress { - fn name(&self) -> Cow<'static, str> { - match self { - BatchProgress::ProcessingTasks => Cow::Borrowed("processing tasks"), - BatchProgress::WritingTasksToDisk => Cow::Borrowed("writing tasks to disk"), +macro_rules! make_enum_progress { + (enum $name:ident: $(- $variant:ident)+ ) => { + #[repr(u8)] + #[derive(Debug, Clone, Copy, PartialEq, Eq, Sequence)] + #[allow(clippy::enum_variant_names)] + pub enum $name { + $($variant),+ } - } - fn current(&self) -> u32 { - *self as u8 as u32 - } + impl Step for $name { + fn name(&self) -> Cow<'static, str> { + use convert_case::Casing; - fn total(&self) -> u32 { - 2 - } + match self { + $( + $name::$variant => stringify!($variant).from_case(convert_case::Case::Camel).to_case(convert_case::Case::Lower).into() + ),+ + } + } + + fn current(&self) -> u32 { + *self as u32 + } + + fn total(&self) -> u32 { + Self::CARDINALITY as u32 + } + } + }; } -#[derive(Default)] -pub struct Task {} - -impl NamedStep for Task { - fn name(&self) -> &'static str { - "task" - } +macro_rules! make_atomic_progress { + ($struct_name:ident alias $atomic_struct_name:ident => $step_name:literal) => { + #[derive(Default, Debug, Clone, Copy)] + pub struct $struct_name {} + impl NamedStep for $struct_name { + fn name(&self) -> &'static str { + $step_name + } + } + pub type $atomic_struct_name = AtomicSubStep<$struct_name>; + }; } -pub type AtomicTaskStep = AtomicSubStep; + +make_enum_progress! { + enum BatchProgress: + - ProcessingTasks + - WritingTasksToDisk +} + +make_enum_progress! { + enum TaskCancelationProgress: + - RetrievingTasks + - UpdatingTasks +} + +make_enum_progress! { + enum TaskDeletionProgress: + - DeletingTasksDateTime + - DeletingTasksMetadata + - DeletingTasks + - DeletingBatches +} + +make_atomic_progress!(Task alias AtomicTaskStep => "task" ); +make_atomic_progress!(Batch alias AtomicBatchStep => "batch" ); #[cfg(test)] mod test {