diff --git a/dump/src/lib.rs b/dump/src/lib.rs index e526171bf..ecd34b111 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -113,7 +113,8 @@ pub enum KindDump { lhs: String, rhs: String, }, - CancelTask { + TaskCancelation { + query: String, tasks: Vec, }, DeleteTasks { @@ -181,7 +182,9 @@ impl From for KindDump { KindDump::IndexUpdate { primary_key } } KindWithContent::IndexSwap { lhs, rhs } => KindDump::IndexSwap { lhs, rhs }, - KindWithContent::CancelTask { tasks } => KindDump::CancelTask { tasks }, + KindWithContent::TaskCancelation { query, tasks } => { + KindDump::TaskCancelation { query, tasks } + } KindWithContent::TaskDeletion { query, tasks } => { KindDump::DeleteTasks { query, tasks } } diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index f67573b31..dfb7cab97 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -22,7 +22,7 @@ enum AutobatchKind { IndexDeletion, IndexUpdate, IndexSwap, - CancelTask, + TaskCancelation, TaskDeletion, DumpExport, Snapshot, @@ -62,7 +62,7 @@ impl From for AutobatchKind { KindWithContent::IndexCreation { .. } => AutobatchKind::IndexCreation, KindWithContent::IndexUpdate { .. } => AutobatchKind::IndexUpdate, KindWithContent::IndexSwap { .. } => AutobatchKind::IndexSwap, - KindWithContent::CancelTask { .. } => AutobatchKind::CancelTask, + KindWithContent::TaskCancelation { .. } => AutobatchKind::TaskCancelation, KindWithContent::TaskDeletion { .. } => AutobatchKind::TaskDeletion, KindWithContent::DumpExport { .. } => AutobatchKind::DumpExport, KindWithContent::Snapshot => AutobatchKind::Snapshot, @@ -153,7 +153,7 @@ impl BatchKind { allow_index_creation, settings_ids: vec![task_id], }), - K::DumpExport | K::Snapshot | K::CancelTask | K::TaskDeletion => { + K::DumpExport | K::Snapshot | K::TaskCancelation | K::TaskDeletion => { unreachable!() } } @@ -378,7 +378,7 @@ impl BatchKind { import_ids, }) } - (_, K::CancelTask | K::TaskDeletion | K::DumpExport | K::Snapshot) => { + (_, K::TaskCancelation | K::TaskDeletion | K::DumpExport | K::Snapshot) => { unreachable!() } ( diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 4e81f1661..144b696b5 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::Ordering::Relaxed; use std::collections::HashSet; use std::fs::File; use std::io::BufWriter; @@ -5,10 +6,8 @@ use std::io::BufWriter; use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId}; use dump::IndexMetadata; -use meilisearch_types::milli::documents::obkv_to_object; -use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task}; - use log::{debug, info}; + use meilisearch_types::milli::update::IndexDocumentsConfig; use meilisearch_types::milli::update::{ DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, @@ -16,7 +15,9 @@ use meilisearch_types::milli::update::{ use meilisearch_types::milli::{ self, documents::DocumentsBatchReader, update::Settings as MilliSettings, BEU32, }; +use meilisearch_types::milli::documents::obkv_to_object; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; +use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task}; use meilisearch_types::{ heed::{RoTxn, RwTxn}, Index, @@ -27,7 +28,7 @@ use uuid::Uuid; #[derive(Debug)] pub(crate) enum Batch { - Cancel(Task), + TaskCancelation(Task), TaskDeletion(Task), Snapshot(Vec), Dump(Task), @@ -103,7 +104,7 @@ pub(crate) enum IndexOperation { impl Batch { pub fn ids(&self) -> Vec { match self { - Batch::Cancel(task) + Batch::TaskCancelation(task) | Batch::TaskDeletion(task) | Batch::Dump(task) | Batch::IndexCreation { task, .. } @@ -378,11 +379,11 @@ impl IndexScheduler { /// 5. We get the *next* tasks to process for a specific index. pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result> { let enqueued = &self.get_status(rtxn, Status::Enqueued)?; - let to_cancel = self.get_kind(rtxn, Kind::CancelTask)? & enqueued; + let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued; // 1. we get the last task to cancel. if let Some(task_id) = to_cancel.max() { - return Ok(Some(Batch::Cancel( + return Ok(Some(Batch::TaskCancelation( self.get_task(rtxn, task_id)? .ok_or(Error::CorruptedTaskQueue)?, ))); @@ -457,7 +458,33 @@ impl IndexScheduler { pub(crate) fn process_batch(&self, batch: Batch) -> Result> { match batch { - Batch::Cancel(_) => todo!(), + Batch::TaskCancelation(mut task) => { + // 1. Retrieve the tasks that matched the query at enqueue-time. + let matched_tasks = + if let KindWithContent::TaskCancelation { tasks, query: _ } = &task.kind { + tasks + } else { + unreachable!() + }; + + let mut wtxn = self.env.write_txn()?; + let nbr_canceled_tasks = self.cancel_matched_tasks(&mut wtxn, matched_tasks)?; + + task.status = Status::Succeeded; + match &mut task.details { + Some(Details::TaskCancelation { + matched_tasks: _, + canceled_tasks, + original_query: _, + }) => { + *canceled_tasks = Some(nbr_canceled_tasks); + } + _ => unreachable!(), + } + + wtxn.commit()?; + Ok(vec![task]) + } Batch::TaskDeletion(mut task) => { // 1. Retrieve the tasks that matched the query at enqueue-time. let matched_tasks = @@ -652,7 +679,11 @@ impl IndexScheduler { self.index_mapper.indexer_config(), ); builder.set_primary_key(primary_key); - builder.execute(|_| ())?; + let must_stop = self.processing_tasks.read().unwrap().must_stop.clone(); + builder.execute( + |indexing_step| debug!("update: {:?}", indexing_step), + || must_stop.load(Relaxed), + )?; index_wtxn.commit()?; } @@ -730,6 +761,7 @@ impl IndexScheduler { content_files, mut tasks, } => { + let must_stop = self.processing_tasks.read().unwrap().must_stop.clone(); let indexer_config = self.index_mapper.indexer_config(); // TODO use the code from the IndexCreate operation if let Some(primary_key) = primary_key { @@ -737,7 +769,10 @@ impl IndexScheduler { let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config); builder.set_primary_key(primary_key); - builder.execute(|_| ())?; + builder.execute( + |indexing_step| debug!("update: {:?}", indexing_step), + || must_stop.clone().load(Relaxed), + )?; } } @@ -752,6 +787,7 @@ impl IndexScheduler { indexer_config, config, |indexing_step| debug!("update: {:?}", indexing_step), + || must_stop.load(Relaxed), )?; let mut results = Vec::new(); @@ -845,9 +881,11 @@ impl IndexScheduler { let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config); apply_settings_to_builder(&checked_settings, &mut builder); - builder.execute(|indexing_step| { - debug!("update: {:?}", indexing_step); - })?; + let must_stop = self.processing_tasks.read().unwrap().must_stop.clone(); + builder.execute( + |indexing_step| debug!("update: {:?}", indexing_step), + || must_stop.load(Relaxed), + )?; task.status = Status::Succeeded; } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index b829ce237..5be805377 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -18,6 +18,7 @@ use std::sync::{Arc, RwLock}; use file_store::FileStore; use meilisearch_types::error::ResponseError; +use meilisearch_types::milli; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; use synchronoise::SignalEvent; @@ -129,23 +130,26 @@ struct ProcessingTasks { } impl ProcessingTasks { + /// Stores the currently processing tasks, the date time at which it started + /// and resets the _must stop_ flag. fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) { self.started_at = started_at; self.processing = processing; + self.must_stop.store(false, Relaxed); } + /// Resets the processing tasks to an empty list. fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) { self.started_at = stopped_at; self.processing = RoaringBitmap::new(); } - fn cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool { + /// Forces the currently processing tasks to stop running if necessary. + fn cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) { // If there, at least, is one task that is currently processing we must stop. - let must_stop = !self.processing.is_disjoint(canceled_tasks); - if must_stop { + if !self.processing.is_disjoint(canceled_tasks) { self.must_stop.store(true, Relaxed); } - must_stop } } @@ -171,6 +175,7 @@ pub struct IndexScheduler { pub(crate) all_tasks: Database, SerdeJson>, /// All the tasks ids grouped by their status. + // TODO we should not be able to serialize a `Status::Processing` in this database. pub(crate) status: Database, RoaringBitmapCodec>, /// All the tasks ids grouped by their kind. pub(crate) kind: Database, RoaringBitmapCodec>, @@ -354,7 +359,11 @@ impl IndexScheduler { .take(query.limit.unwrap_or(u32::MAX) as usize), )?; - let ProcessingTasks { started_at, processing, .. } = self + let ProcessingTasks { + started_at, + processing, + .. + } = self .processing_tasks .read() .map_err(|_| Error::CorruptedTaskQueue)? @@ -379,7 +388,7 @@ impl IndexScheduler { /// Register a new task in the scheduler. If it fails and data was associated with the task /// it tries to delete the file. - pub fn register(&self, task: KindWithContent) -> Result { + pub fn register(&self, kind: KindWithContent) -> Result { let mut wtxn = self.env.write_txn()?; let task = Task { @@ -388,9 +397,9 @@ impl IndexScheduler { started_at: None, finished_at: None, error: None, - details: (&task).into(), + details: kind.default_details(), status: Status::Enqueued, - kind: task, + kind: kind.clone(), }; self.all_tasks .append(&mut wtxn, &BEU32::new(task.uid), &task)?; @@ -419,6 +428,16 @@ impl IndexScheduler { } } + // If the registered task is a task cancelation + // we inform the processing tasks to stop (if necessary). + if let KindWithContent::TaskCancelation { tasks, .. } = kind { + let tasks_to_cancel = RoaringBitmap::from_iter(tasks); + self.processing_tasks + .read() + .unwrap() + .cancel_processing_tasks(&tasks_to_cancel); + } + // notify the scheduler loop to execute a new tick self.wake_up.signal(); @@ -504,7 +523,9 @@ impl IndexScheduler { primary_key, }, KindDump::IndexSwap { lhs, rhs } => KindWithContent::IndexSwap { lhs, rhs }, - KindDump::CancelTask { tasks } => KindWithContent::CancelTask { tasks }, + KindDump::TaskCancelation { query, tasks } => { + KindWithContent::TaskCancelation { query, tasks } + } KindDump::DeleteTasks { query, tasks } => { KindWithContent::TaskDeletion { query, tasks } } @@ -618,6 +639,14 @@ impl IndexScheduler { } log::info!("A batch of tasks was successfully completed."); } + // If we have an abortion error we must stop the tick here and re-schedule tasks. + Err(Error::Milli(milli::Error::InternalError( + milli::InternalError::AbortedIndexation, + ))) => { + // TODO should we add a breakpoint here? + wtxn.abort()?; + return Ok(0); + } // In case of a failure we must get back and patch all the tasks with the error. Err(err) => { let error: ResponseError = err.into(); @@ -796,7 +825,10 @@ mod tests { let kinds = [ index_creation_task("catto", "mouse"), replace_document_import_task("catto", None, 0, 12), - KindWithContent::CancelTask { tasks: vec![0, 1] }, + KindWithContent::TaskCancelation { + query: format!("uid=0,1"), + tasks: vec![0, 1], + }, replace_document_import_task("catto", None, 1, 50), replace_document_import_task("doggo", Some("bone"), 2, 5000), ]; diff --git a/index-scheduler/src/snapshot.rs b/index-scheduler/src/snapshot.rs index c4e4d24ba..767ab6509 100644 --- a/index-scheduler/src/snapshot.rs +++ b/index-scheduler/src/snapshot.rs @@ -31,7 +31,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { let mut snap = String::new(); - let processing_tasks = processing_tasks.read().unwrap().processing; + let processing_tasks = processing_tasks.read().unwrap().processing.clone(); snap.push_str(&format!( "### Autobatching Enabled = {autobatching_enabled}\n" )); @@ -143,6 +143,13 @@ fn snaphsot_details(d: &Details) -> String { Details::ClearAll { deleted_documents } => { format!("{{ deleted_documents: {deleted_documents:?} }}") }, + Details::TaskCancelation { + matched_tasks, + canceled_tasks, + original_query, + } => { + format!("{{ matched_tasks: {matched_tasks:?}, canceled_tasks: {canceled_tasks:?}, original_query: {original_query:?} }}") + } Details::TaskDeletion { matched_tasks, deleted_tasks, diff --git a/meilisearch-http/tests/dumps/mod.rs b/meilisearch-http/tests/dumps/mod.rs index 389f6b480..f093cf574 100644 --- a/meilisearch-http/tests/dumps/mod.rs +++ b/meilisearch-http/tests/dumps/mod.rs @@ -23,7 +23,7 @@ async fn import_dump_v1() { }; let error = Server::new_with_options(options) .await - .map(|_| ()) + .map(drop) .unwrap_err(); assert_eq!(error.to_string(), "The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards."); diff --git a/meilisearch-types/src/tasks.rs b/meilisearch-types/src/tasks.rs index d7b0f5fb7..3d32f385e 100644 --- a/meilisearch-types/src/tasks.rs +++ b/meilisearch-types/src/tasks.rs @@ -44,7 +44,7 @@ impl Task { match &self.kind { DumpExport { .. } | Snapshot - | CancelTask { .. } + | TaskCancelation { .. } | TaskDeletion { .. } | IndexSwap { .. } => None, DocumentImport { index_uid, .. } @@ -62,7 +62,7 @@ impl Task { use KindWithContent::*; match &self.kind { - DumpExport { .. } | Snapshot | CancelTask { .. } | TaskDeletion { .. } => None, + DumpExport { .. } | Snapshot | TaskCancelation { .. } | TaskDeletion { .. } => None, DocumentImport { index_uid, .. } | DocumentDeletion { index_uid, .. } | DocumentClear { index_uid } @@ -87,7 +87,7 @@ impl Task { | KindWithContent::IndexCreation { .. } | KindWithContent::IndexUpdate { .. } | KindWithContent::IndexSwap { .. } - | KindWithContent::CancelTask { .. } + | KindWithContent::TaskCancelation { .. } | KindWithContent::TaskDeletion { .. } | KindWithContent::DumpExport { .. } | KindWithContent::Snapshot => None, @@ -95,7 +95,7 @@ impl Task { } } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum KindWithContent { DocumentImport { @@ -134,7 +134,8 @@ pub enum KindWithContent { lhs: String, rhs: String, }, - CancelTask { + TaskCancelation { + query: String, tasks: Vec, }, TaskDeletion { @@ -160,7 +161,7 @@ impl KindWithContent { KindWithContent::IndexDeletion { .. } => Kind::IndexDeletion, KindWithContent::IndexUpdate { .. } => Kind::IndexUpdate, KindWithContent::IndexSwap { .. } => Kind::IndexSwap, - KindWithContent::CancelTask { .. } => Kind::CancelTask, + KindWithContent::TaskCancelation { .. } => Kind::TaskCancelation, KindWithContent::TaskDeletion { .. } => Kind::TaskDeletion, KindWithContent::DumpExport { .. } => Kind::DumpExport, KindWithContent::Snapshot => Kind::Snapshot, @@ -171,7 +172,7 @@ impl KindWithContent { use KindWithContent::*; match self { - DumpExport { .. } | Snapshot | CancelTask { .. } | TaskDeletion { .. } => None, + DumpExport { .. } | Snapshot | TaskCancelation { .. } | TaskDeletion { .. } => None, DocumentImport { index_uid, .. } | DocumentDeletion { index_uid, .. } | DocumentClear { index_uid } @@ -214,7 +215,7 @@ impl KindWithContent { KindWithContent::IndexSwap { .. } => { todo!() } - KindWithContent::CancelTask { .. } => { + KindWithContent::TaskCancelation { .. } => { None // TODO: check correctness of this return value } KindWithContent::TaskDeletion { query, tasks } => Some(Details::TaskDeletion { @@ -250,7 +251,7 @@ impl From<&KindWithContent> for Option
{ primary_key: primary_key.clone(), }), KindWithContent::IndexSwap { .. } => None, - KindWithContent::CancelTask { .. } => None, + KindWithContent::TaskCancelation { .. } => None, KindWithContent::TaskDeletion { query, tasks } => Some(Details::TaskDeletion { matched_tasks: tasks.len(), deleted_tasks: None, @@ -327,7 +328,7 @@ pub enum Kind { IndexDeletion, IndexUpdate, IndexSwap, - CancelTask, + TaskCancelation, TaskDeletion, DumpExport, Snapshot, @@ -349,6 +350,10 @@ impl FromStr for Kind { Ok(Kind::DocumentDeletion) } else if kind.eq_ignore_ascii_case("settingsUpdate") { Ok(Kind::Settings) + } else if kind.eq_ignore_ascii_case("TaskCancelation") { + Ok(Kind::TaskCancelation) + } else if kind.eq_ignore_ascii_case("TaskDeletion") { + Ok(Kind::TaskDeletion) } else if kind.eq_ignore_ascii_case("dumpCreation") { Ok(Kind::DumpExport) } else { @@ -392,6 +397,11 @@ pub enum Details { ClearAll { deleted_documents: Option, }, + TaskCancelation { + matched_tasks: usize, + canceled_tasks: Option, + original_query: String, + }, TaskDeletion { matched_tasks: u64, deleted_tasks: Option,