mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-11 05:46:29 +01:00
add progress for the task deletion and task cancelation
This commit is contained in:
parent
da32f4c37a
commit
1d80fb32cf
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -2632,6 +2632,7 @@ dependencies = [
|
||||
"bincode",
|
||||
"bumpalo",
|
||||
"bumparaw-collections",
|
||||
"convert_case 0.6.0",
|
||||
"crossbeam-channel",
|
||||
"csv",
|
||||
"derive_builder 0.20.0",
|
||||
|
@ -15,6 +15,7 @@ anyhow = "1.0.86"
|
||||
bincode = "1.3.3"
|
||||
bumpalo = "3.16.0"
|
||||
bumparaw-collections = "0.1.2"
|
||||
convert_case = "0.6.0"
|
||||
csv = "1.3.0"
|
||||
derive_builder = "0.20.0"
|
||||
dump = { path = "../dump" }
|
||||
|
@ -22,6 +22,7 @@ use std::ffi::OsStr;
|
||||
use std::fmt;
|
||||
use std::fs::{self, File};
|
||||
use std::io::BufWriter;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use bumpalo::collections::CollectIn;
|
||||
use bumpalo::Bump;
|
||||
@ -46,6 +47,9 @@ use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::autobatcher::{self, BatchKind};
|
||||
use crate::processing::{
|
||||
AtomicBatchStep, AtomicTaskStep, TaskCancelationProgress, TaskDeletionProgress,
|
||||
};
|
||||
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
|
||||
use crate::{Error, IndexScheduler, Result, TaskId};
|
||||
|
||||
@ -581,8 +585,13 @@ impl IndexScheduler {
|
||||
};
|
||||
|
||||
let rtxn = self.env.read_txn()?;
|
||||
let mut canceled_tasks =
|
||||
self.cancel_matched_tasks(&rtxn, task.uid, current_batch, matched_tasks)?;
|
||||
let mut canceled_tasks = self.cancel_matched_tasks(
|
||||
&rtxn,
|
||||
task.uid,
|
||||
current_batch,
|
||||
matched_tasks,
|
||||
&progress,
|
||||
)?;
|
||||
|
||||
task.status = Status::Succeeded;
|
||||
match &mut task.details {
|
||||
@ -613,7 +622,8 @@ impl IndexScheduler {
|
||||
}
|
||||
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let mut deleted_tasks = self.delete_matched_tasks(&mut wtxn, &matched_tasks)?;
|
||||
let mut deleted_tasks =
|
||||
self.delete_matched_tasks(&mut wtxn, &matched_tasks, &progress)?;
|
||||
wtxn.commit()?;
|
||||
|
||||
for task in tasks.iter_mut() {
|
||||
@ -1639,7 +1649,10 @@ impl IndexScheduler {
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
matched_tasks: &RoaringBitmap,
|
||||
progress: &Progress,
|
||||
) -> Result<RoaringBitmap> {
|
||||
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
||||
|
||||
// 1. Remove from this list the tasks that we are not allowed to delete
|
||||
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
|
||||
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
|
||||
@ -1658,6 +1671,8 @@ impl IndexScheduler {
|
||||
// The tasks that have been removed *per batches*.
|
||||
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new();
|
||||
|
||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
||||
progress.update_progress(task_progress);
|
||||
for task_id in to_delete_tasks.iter() {
|
||||
let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
|
||||
@ -1681,22 +1696,35 @@ impl IndexScheduler {
|
||||
if let Some(batch_uid) = task.batch_uid {
|
||||
affected_batches.entry(batch_uid).or_default().insert(task_id);
|
||||
}
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
||||
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
|
||||
);
|
||||
progress.update_progress(task_progress);
|
||||
for index in affected_indexes.iter() {
|
||||
self.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
for status in affected_statuses.iter() {
|
||||
self.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
for kind in affected_kinds.iter() {
|
||||
self.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
||||
progress.update_progress(task_progress);
|
||||
for task in to_delete_tasks.iter() {
|
||||
self.all_tasks.delete(wtxn, &task)?;
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
for canceled_by in affected_canceled_by {
|
||||
if let Some(mut tasks) = self.canceled_by.get(wtxn, &canceled_by)? {
|
||||
@ -1708,6 +1736,9 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
}
|
||||
progress.update_progress(TaskDeletionProgress::DeletingBatches);
|
||||
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32);
|
||||
progress.update_progress(batch_progress);
|
||||
for (batch_id, to_delete_tasks) in affected_batches {
|
||||
if let Some(mut tasks) = self.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
|
||||
tasks -= &to_delete_tasks;
|
||||
@ -1749,6 +1780,7 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
}
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
Ok(to_delete_tasks)
|
||||
@ -1763,21 +1795,36 @@ impl IndexScheduler {
|
||||
cancel_task_id: TaskId,
|
||||
current_batch: &mut ProcessingBatch,
|
||||
matched_tasks: &RoaringBitmap,
|
||||
progress: &Progress,
|
||||
) -> Result<Vec<Task>> {
|
||||
progress.update_progress(TaskCancelationProgress::RetrievingTasks);
|
||||
|
||||
// 1. Remove from this list the tasks that we are not allowed to cancel
|
||||
// Notice that only the _enqueued_ ones are cancelable and we should
|
||||
// have already aborted the indexation of the _processing_ ones
|
||||
let cancelable_tasks = self.get_status(rtxn, Status::Enqueued)?;
|
||||
let tasks_to_cancel = cancelable_tasks & matched_tasks;
|
||||
|
||||
// 2. We now have a list of tasks to cancel, cancel them
|
||||
let mut tasks = self.get_existing_tasks(rtxn, tasks_to_cancel.iter())?;
|
||||
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
|
||||
progress.update_progress(progress_obj);
|
||||
|
||||
// 2. We now have a list of tasks to cancel, cancel them
|
||||
let mut tasks = self.get_existing_tasks(
|
||||
rtxn,
|
||||
tasks_to_cancel.iter().inspect(|_| {
|
||||
task_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}),
|
||||
)?;
|
||||
|
||||
progress.update_progress(TaskCancelationProgress::UpdatingTasks);
|
||||
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
|
||||
progress.update_progress(progress_obj);
|
||||
for task in tasks.iter_mut() {
|
||||
task.status = Status::Canceled;
|
||||
task.canceled_by = Some(cancel_task_id);
|
||||
task.details = task.details.as_ref().map(|d| d.to_failed());
|
||||
current_batch.processing(Some(task));
|
||||
task_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
Ok(tasks)
|
||||
|
@ -1,4 +1,5 @@
|
||||
use crate::utils::ProcessingBatch;
|
||||
use enum_iterator::Sequence;
|
||||
use meilisearch_types::milli::progress::{AtomicSubStep, NamedStep, Progress, ProgressView, Step};
|
||||
use roaring::RoaringBitmap;
|
||||
use std::{borrow::Cow, sync::Arc};
|
||||
@ -54,39 +55,72 @@ impl ProcessingTasks {
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(u8)]
|
||||
#[derive(Copy, Clone)]
|
||||
pub enum BatchProgress {
|
||||
ProcessingTasks,
|
||||
WritingTasksToDisk,
|
||||
}
|
||||
|
||||
impl Step for BatchProgress {
|
||||
fn name(&self) -> Cow<'static, str> {
|
||||
match self {
|
||||
BatchProgress::ProcessingTasks => Cow::Borrowed("processing tasks"),
|
||||
BatchProgress::WritingTasksToDisk => Cow::Borrowed("writing tasks to disk"),
|
||||
macro_rules! make_enum_progress {
|
||||
(enum $name:ident: $(- $variant:ident)+ ) => {
|
||||
#[repr(u8)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Sequence)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
pub enum $name {
|
||||
$($variant),+
|
||||
}
|
||||
}
|
||||
|
||||
fn current(&self) -> u32 {
|
||||
*self as u8 as u32
|
||||
}
|
||||
impl Step for $name {
|
||||
fn name(&self) -> Cow<'static, str> {
|
||||
use convert_case::Casing;
|
||||
|
||||
fn total(&self) -> u32 {
|
||||
2
|
||||
}
|
||||
match self {
|
||||
$(
|
||||
$name::$variant => stringify!($variant).from_case(convert_case::Case::Camel).to_case(convert_case::Case::Lower).into()
|
||||
),+
|
||||
}
|
||||
}
|
||||
|
||||
fn current(&self) -> u32 {
|
||||
*self as u32
|
||||
}
|
||||
|
||||
fn total(&self) -> u32 {
|
||||
Self::CARDINALITY as u32
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Task {}
|
||||
|
||||
impl NamedStep for Task {
|
||||
fn name(&self) -> &'static str {
|
||||
"task"
|
||||
}
|
||||
macro_rules! make_atomic_progress {
|
||||
($struct_name:ident alias $atomic_struct_name:ident => $step_name:literal) => {
|
||||
#[derive(Default, Debug, Clone, Copy)]
|
||||
pub struct $struct_name {}
|
||||
impl NamedStep for $struct_name {
|
||||
fn name(&self) -> &'static str {
|
||||
$step_name
|
||||
}
|
||||
}
|
||||
pub type $atomic_struct_name = AtomicSubStep<$struct_name>;
|
||||
};
|
||||
}
|
||||
pub type AtomicTaskStep = AtomicSubStep<Task>;
|
||||
|
||||
make_enum_progress! {
|
||||
enum BatchProgress:
|
||||
- ProcessingTasks
|
||||
- WritingTasksToDisk
|
||||
}
|
||||
|
||||
make_enum_progress! {
|
||||
enum TaskCancelationProgress:
|
||||
- RetrievingTasks
|
||||
- UpdatingTasks
|
||||
}
|
||||
|
||||
make_enum_progress! {
|
||||
enum TaskDeletionProgress:
|
||||
- DeletingTasksDateTime
|
||||
- DeletingTasksMetadata
|
||||
- DeletingTasks
|
||||
- DeletingBatches
|
||||
}
|
||||
|
||||
make_atomic_progress!(Task alias AtomicTaskStep => "task" );
|
||||
make_atomic_progress!(Batch alias AtomicBatchStep => "batch" );
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
Loading…
Reference in New Issue
Block a user