diff --git a/dump/src/lib.rs b/dump/src/lib.rs index da98f9d2a..c7aed6280 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -115,7 +115,7 @@ pub enum KindDump { }, TaskCancelation { query: String, - tasks: Vec, + tasks: RoaringBitmap, }, TasksDeletion { query: String, diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index dfb7cab97..a6adfd549 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -127,6 +127,7 @@ impl BatchKind { impl BatchKind { /// Returns a `ControlFlow::Break` if you must stop right now. + // TODO use an AutoBatchKind as input pub fn new(task_id: TaskId, kind: KindWithContent) -> ControlFlow { use AutobatchKind as K; diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 144b696b5..03fecea9b 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1,13 +1,14 @@ -use std::sync::atomic::Ordering::Relaxed; use std::collections::HashSet; use std::fs::File; use std::io::BufWriter; +use std::sync::atomic::Ordering::Relaxed; use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId}; use dump::IndexMetadata; use log::{debug, info}; +use meilisearch_types::milli::documents::obkv_to_object; use meilisearch_types::milli::update::IndexDocumentsConfig; use meilisearch_types::milli::update::{ DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, @@ -15,7 +16,6 @@ use meilisearch_types::milli::update::{ use meilisearch_types::milli::{ self, documents::DocumentsBatchReader, update::Settings as MilliSettings, BEU32, }; -use meilisearch_types::milli::documents::obkv_to_object; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task}; use meilisearch_types::{ @@ -976,7 +976,6 @@ impl IndexScheduler { ) -> Result { // 1. Remove from this list the tasks that we are not allowed to delete let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?; - let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); let all_task_ids = self.all_task_ids(&wtxn)?; @@ -1004,24 +1003,47 @@ impl IndexScheduler { // In each of those cases, the persisted data is supposed to // have been deleted already. } + for index in affected_indexes { - self.update_index(wtxn, &index, |bitmap| { - *bitmap -= &to_delete_tasks; - })?; + self.update_index(wtxn, &index, |bitmap| *bitmap -= &to_delete_tasks)?; } + for status in affected_statuses { - self.update_status(wtxn, status, |bitmap| { - *bitmap -= &to_delete_tasks; - })?; + self.update_status(wtxn, status, |bitmap| *bitmap -= &to_delete_tasks)?; } + for kind in affected_kinds { - self.update_kind(wtxn, kind, |bitmap| { - *bitmap -= &to_delete_tasks; - })?; + self.update_kind(wtxn, kind, |bitmap| *bitmap -= &to_delete_tasks)?; } + for task in to_delete_tasks.iter() { self.all_tasks.delete(wtxn, &BEU32::new(task))?; } + Ok(to_delete_tasks.len() as usize) } + + /// Cancel each given task from all the databases (if it is cancelable). + /// + /// Return the number of tasks that were actually canceled. + fn cancel_matched_tasks( + &self, + wtxn: &mut RwTxn, + matched_tasks: &RoaringBitmap, + ) -> Result { + // 1. Remove from this list the tasks that we are not allowed to cancel + // Notice that only the _enqueued_ ones are cancelable and we should + // have already aborted the indexation of the _processing_ ones + let cancelable_tasks = self.get_status(&wtxn, Status::Enqueued)?; + let tasks_to_cancel = cancelable_tasks & matched_tasks; + + // 2. We now have a list of tasks to cancel, cancel them + self.update_status(wtxn, Status::Enqueued, |bitmap| *bitmap -= &tasks_to_cancel)?; + self.update_status(wtxn, Status::Canceled, |bitmap| *bitmap |= &tasks_to_cancel)?; + + // TODO update the content of the tasks i.e. canceled_by and finished_at + // TODO delete the content uuid of the tasks + + Ok(tasks_to_cancel.len() as usize) + } } diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index f8dc8f9bb..210e0cab0 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -271,9 +271,10 @@ fn import_dump( log::info!("Importing the settings."); let settings = index_reader.settings()?; apply_settings_to_builder(&settings, &mut builder); - builder.execute(|indexing_step| { - log::debug!("update: {:?}", indexing_step); - })?; + builder.execute( + |indexing_step| log::debug!("update: {:?}", indexing_step), + || false, + )?; // 3.3 Import the documents. // 3.3.1 We need to recreate the grenad+obkv format accepted by the index. @@ -300,6 +301,7 @@ fn import_dump( ..Default::default() }, |indexing_step| log::debug!("update: {:?}", indexing_step), + || false, )?; let (builder, user_result) = builder.add_documents(reader)?; diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index e5dfe31e1..7a3289e24 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -103,6 +103,8 @@ pub struct DetailsView { #[serde(skip_serializing_if = "Option::is_none")] pub matched_tasks: Option, #[serde(skip_serializing_if = "Option::is_none")] + pub canceled_tasks: Option>, + #[serde(skip_serializing_if = "Option::is_none")] pub deleted_tasks: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub original_query: Option, @@ -144,6 +146,16 @@ impl From
for DetailsView { deleted_documents: Some(deleted_documents), ..DetailsView::default() }, + Details::TaskCancelation { + matched_tasks, + canceled_tasks, + original_query, + } => DetailsView { + matched_tasks: Some(matched_tasks), + canceled_tasks: Some(canceled_tasks), + original_query: Some(original_query), + ..DetailsView::default() + }, Details::TaskDeletion { matched_tasks, deleted_tasks, diff --git a/meilisearch-types/src/tasks.rs b/meilisearch-types/src/tasks.rs index 901250dd2..db334df65 100644 --- a/meilisearch-types/src/tasks.rs +++ b/meilisearch-types/src/tasks.rs @@ -135,7 +135,7 @@ pub enum KindWithContent { }, TaskCancelation { query: String, - tasks: Vec, + tasks: RoaringBitmap, }, TaskDeletion { query: String, @@ -349,9 +349,9 @@ impl FromStr for Kind { Ok(Kind::DocumentDeletion) } else if kind.eq_ignore_ascii_case("settingsUpdate") { Ok(Kind::Settings) - } else if kind.eq_ignore_ascii_case("TaskCancelation") { + } else if kind.eq_ignore_ascii_case("taskCancelation") { Ok(Kind::TaskCancelation) - } else if kind.eq_ignore_ascii_case("TaskDeletion") { + } else if kind.eq_ignore_ascii_case("taskDeletion") { Ok(Kind::TaskDeletion) } else if kind.eq_ignore_ascii_case("dumpCreation") { Ok(Kind::DumpExport) @@ -397,7 +397,7 @@ pub enum Details { deleted_documents: Option, }, TaskCancelation { - matched_tasks: usize, + matched_tasks: u64, canceled_tasks: Option, original_query: String, },