Introduce the core algorithm of task cancelation

This commit is contained in:
Kerollmops 2022-10-18 13:47:22 +02:00 committed by Clément Renault
parent b2c5bc67b7
commit 725158b454
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
6 changed files with 57 additions and 20 deletions

View File

@ -115,7 +115,7 @@ pub enum KindDump {
}, },
TaskCancelation { TaskCancelation {
query: String, query: String,
tasks: Vec<TaskId>, tasks: RoaringBitmap,
}, },
TasksDeletion { TasksDeletion {
query: String, query: String,

View File

@ -127,6 +127,7 @@ impl BatchKind {
impl BatchKind { impl BatchKind {
/// Returns a `ControlFlow::Break` if you must stop right now. /// Returns a `ControlFlow::Break` if you must stop right now.
// TODO use an AutoBatchKind as input
pub fn new(task_id: TaskId, kind: KindWithContent) -> ControlFlow<BatchKind, BatchKind> { pub fn new(task_id: TaskId, kind: KindWithContent) -> ControlFlow<BatchKind, BatchKind> {
use AutobatchKind as K; use AutobatchKind as K;

View File

@ -1,13 +1,14 @@
use std::sync::atomic::Ordering::Relaxed;
use std::collections::HashSet; use std::collections::HashSet;
use std::fs::File; use std::fs::File;
use std::io::BufWriter; use std::io::BufWriter;
use std::sync::atomic::Ordering::Relaxed;
use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId}; use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId};
use dump::IndexMetadata; use dump::IndexMetadata;
use log::{debug, info}; use log::{debug, info};
use meilisearch_types::milli::documents::obkv_to_object;
use meilisearch_types::milli::update::IndexDocumentsConfig; use meilisearch_types::milli::update::IndexDocumentsConfig;
use meilisearch_types::milli::update::{ use meilisearch_types::milli::update::{
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod,
@ -15,7 +16,6 @@ use meilisearch_types::milli::update::{
use meilisearch_types::milli::{ use meilisearch_types::milli::{
self, documents::DocumentsBatchReader, update::Settings as MilliSettings, BEU32, self, documents::DocumentsBatchReader, update::Settings as MilliSettings, BEU32,
}; };
use meilisearch_types::milli::documents::obkv_to_object;
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task};
use meilisearch_types::{ use meilisearch_types::{
@ -976,7 +976,6 @@ impl IndexScheduler {
) -> Result<usize> { ) -> Result<usize> {
// 1. Remove from this list the tasks that we are not allowed to delete // 1. Remove from this list the tasks that we are not allowed to delete
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?; let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
let all_task_ids = self.all_task_ids(&wtxn)?; let all_task_ids = self.all_task_ids(&wtxn)?;
@ -1004,24 +1003,47 @@ impl IndexScheduler {
// In each of those cases, the persisted data is supposed to // In each of those cases, the persisted data is supposed to
// have been deleted already. // have been deleted already.
} }
for index in affected_indexes { for index in affected_indexes {
self.update_index(wtxn, &index, |bitmap| { self.update_index(wtxn, &index, |bitmap| *bitmap -= &to_delete_tasks)?;
*bitmap -= &to_delete_tasks;
})?;
} }
for status in affected_statuses { for status in affected_statuses {
self.update_status(wtxn, status, |bitmap| { self.update_status(wtxn, status, |bitmap| *bitmap -= &to_delete_tasks)?;
*bitmap -= &to_delete_tasks;
})?;
} }
for kind in affected_kinds { for kind in affected_kinds {
self.update_kind(wtxn, kind, |bitmap| { self.update_kind(wtxn, kind, |bitmap| *bitmap -= &to_delete_tasks)?;
*bitmap -= &to_delete_tasks;
})?;
} }
for task in to_delete_tasks.iter() { for task in to_delete_tasks.iter() {
self.all_tasks.delete(wtxn, &BEU32::new(task))?; self.all_tasks.delete(wtxn, &BEU32::new(task))?;
} }
Ok(to_delete_tasks.len() as usize) Ok(to_delete_tasks.len() as usize)
} }
/// Cancel each given task from all the databases (if it is cancelable).
///
/// Return the number of tasks that were actually canceled.
fn cancel_matched_tasks(
&self,
wtxn: &mut RwTxn,
matched_tasks: &RoaringBitmap,
) -> Result<usize> {
// 1. Remove from this list the tasks that we are not allowed to cancel
// Notice that only the _enqueued_ ones are cancelable and we should
// have already aborted the indexation of the _processing_ ones
let cancelable_tasks = self.get_status(&wtxn, Status::Enqueued)?;
let tasks_to_cancel = cancelable_tasks & matched_tasks;
// 2. We now have a list of tasks to cancel, cancel them
self.update_status(wtxn, Status::Enqueued, |bitmap| *bitmap -= &tasks_to_cancel)?;
self.update_status(wtxn, Status::Canceled, |bitmap| *bitmap |= &tasks_to_cancel)?;
// TODO update the content of the tasks i.e. canceled_by and finished_at
// TODO delete the content uuid of the tasks
Ok(tasks_to_cancel.len() as usize)
}
} }

View File

@ -271,9 +271,10 @@ fn import_dump(
log::info!("Importing the settings."); log::info!("Importing the settings.");
let settings = index_reader.settings()?; let settings = index_reader.settings()?;
apply_settings_to_builder(&settings, &mut builder); apply_settings_to_builder(&settings, &mut builder);
builder.execute(|indexing_step| { builder.execute(
log::debug!("update: {:?}", indexing_step); |indexing_step| log::debug!("update: {:?}", indexing_step),
})?; || false,
)?;
// 3.3 Import the documents. // 3.3 Import the documents.
// 3.3.1 We need to recreate the grenad+obkv format accepted by the index. // 3.3.1 We need to recreate the grenad+obkv format accepted by the index.
@ -300,6 +301,7 @@ fn import_dump(
..Default::default() ..Default::default()
}, },
|indexing_step| log::debug!("update: {:?}", indexing_step), |indexing_step| log::debug!("update: {:?}", indexing_step),
|| false,
)?; )?;
let (builder, user_result) = builder.add_documents(reader)?; let (builder, user_result) = builder.add_documents(reader)?;

View File

@ -103,6 +103,8 @@ pub struct DetailsView {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub matched_tasks: Option<u64>, pub matched_tasks: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub canceled_tasks: Option<Option<usize>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_tasks: Option<Option<usize>>, pub deleted_tasks: Option<Option<usize>>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub original_query: Option<String>, pub original_query: Option<String>,
@ -144,6 +146,16 @@ impl From<Details> for DetailsView {
deleted_documents: Some(deleted_documents), deleted_documents: Some(deleted_documents),
..DetailsView::default() ..DetailsView::default()
}, },
Details::TaskCancelation {
matched_tasks,
canceled_tasks,
original_query,
} => DetailsView {
matched_tasks: Some(matched_tasks),
canceled_tasks: Some(canceled_tasks),
original_query: Some(original_query),
..DetailsView::default()
},
Details::TaskDeletion { Details::TaskDeletion {
matched_tasks, matched_tasks,
deleted_tasks, deleted_tasks,

View File

@ -135,7 +135,7 @@ pub enum KindWithContent {
}, },
TaskCancelation { TaskCancelation {
query: String, query: String,
tasks: Vec<TaskId>, tasks: RoaringBitmap,
}, },
TaskDeletion { TaskDeletion {
query: String, query: String,
@ -349,9 +349,9 @@ impl FromStr for Kind {
Ok(Kind::DocumentDeletion) Ok(Kind::DocumentDeletion)
} else if kind.eq_ignore_ascii_case("settingsUpdate") { } else if kind.eq_ignore_ascii_case("settingsUpdate") {
Ok(Kind::Settings) Ok(Kind::Settings)
} else if kind.eq_ignore_ascii_case("TaskCancelation") { } else if kind.eq_ignore_ascii_case("taskCancelation") {
Ok(Kind::TaskCancelation) Ok(Kind::TaskCancelation)
} else if kind.eq_ignore_ascii_case("TaskDeletion") { } else if kind.eq_ignore_ascii_case("taskDeletion") {
Ok(Kind::TaskDeletion) Ok(Kind::TaskDeletion)
} else if kind.eq_ignore_ascii_case("dumpCreation") { } else if kind.eq_ignore_ascii_case("dumpCreation") {
Ok(Kind::DumpExport) Ok(Kind::DumpExport)
@ -397,7 +397,7 @@ pub enum Details {
deleted_documents: Option<u64>, deleted_documents: Option<u64>,
}, },
TaskCancelation { TaskCancelation {
matched_tasks: usize, matched_tasks: u64,
canceled_tasks: Option<usize>, canceled_tasks: Option<usize>,
original_query: String, original_query: String,
}, },