From 703ba7a1fbf411e82e6c97916b81464bb41c58ce Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 17 Oct 2022 13:54:35 +0200 Subject: [PATCH] Introduce the ProcessingTasks struct --- index-scheduler/src/batch.rs | 2 +- index-scheduler/src/lib.rs | 65 ++++++++++++++++++++++++++------- index-scheduler/src/snapshot.rs | 2 +- 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 7a0458d57..4e81f1661 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -939,7 +939,7 @@ impl IndexScheduler { // 1. Remove from this list the tasks that we are not allowed to delete let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?; - let processing_tasks = &self.processing_tasks.read().unwrap().1; + let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); let all_task_ids = self.all_task_ids(&wtxn)?; let mut to_delete_tasks = all_task_ids & matched_tasks; diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 043e4c0c3..b829ce237 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -11,10 +11,9 @@ pub type TaskId = u32; use dump::{KindDump, TaskDump, UpdateFile}; pub use error::Error; -use meilisearch_types::milli::documents::DocumentsBatchBuilder; -use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; use std::sync::{Arc, RwLock}; use file_store::FileStore; @@ -27,8 +26,10 @@ use uuid::Uuid; use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{self, Database, Env}; +use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::{Index, RoaringBitmapCodec, BEU32}; +use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use crate::index_mapper::IndexMapper; @@ -117,6 +118,37 @@ impl Query { } } +#[derive(Debug, Clone)] +struct ProcessingTasks { + /// The date and time at which the indexation started. + started_at: OffsetDateTime, + /// The list of tasks ids that are currently running. + processing: RoaringBitmap, + /// A boolean that can be set to true to stop the currently processing tasks. + must_stop: Arc, +} + +impl ProcessingTasks { + fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) { + self.started_at = started_at; + self.processing = processing; + } + + fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) { + self.started_at = stopped_at; + self.processing = RoaringBitmap::new(); + } + + fn cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool { + // If there, at least, is one task that is currently processing we must stop. + let must_stop = !self.processing.is_disjoint(canceled_tasks); + if must_stop { + self.must_stop.store(true, Relaxed); + } + must_stop + } +} + /// Database const names for the `IndexScheduler`. mod db_name { pub const ALL_TASKS: &str = "all-tasks"; @@ -129,14 +161,12 @@ mod db_name { /// 1. Resolve the name of the indexes. /// 2. Schedule the tasks. pub struct IndexScheduler { - /// The list of tasks currently processing and their starting date. - pub(crate) processing_tasks: Arc>, - - pub(crate) file_store: FileStore, - /// The LMDB environment which the DBs are associated with. pub(crate) env: Env, + pub(crate) processing_tasks: Arc>, + pub(crate) file_store: FileStore, + // The main database, it contains all the tasks accessible by their Id. pub(crate) all_tasks: Database, SerdeJson>, @@ -153,7 +183,7 @@ pub struct IndexScheduler { /// Get a signal when a batch needs to be processed. pub(crate) wake_up: Arc, - /// Weither autobatching is enabled or not. + /// Whether auto-batching is enabled or not. pub(crate) autobatching_enabled: bool, /// The path used to create the dumps. @@ -195,12 +225,15 @@ impl IndexScheduler { options.max_dbs(6); let env = options.open(tasks_path)?; - let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new()); + let processing_tasks = ProcessingTasks { + started_at: OffsetDateTime::now_utc(), + processing: RoaringBitmap::new(), + must_stop: Arc::new(AtomicBool::new(false)), + }; let file_store = FileStore::new(&update_file_path)?; // allow unreachable_code to get rids of the warning in the case of a test build. let this = Self { - // by default there is no processing tasks processing_tasks: Arc::new(RwLock::new(processing_tasks)), file_store, all_tasks: env.create_database(Some(db_name::ALL_TASKS))?, @@ -321,7 +354,7 @@ impl IndexScheduler { .take(query.limit.unwrap_or(u32::MAX) as usize), )?; - let (started_at, processing) = self + let ProcessingTasks { started_at, processing, .. } = self .processing_tasks .read() .map_err(|_| Error::CorruptedTaskQueue)? @@ -556,7 +589,10 @@ impl IndexScheduler { let processed_tasks = ids.len(); let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap(); let started_at = OffsetDateTime::now_utc(); - *self.processing_tasks.write().unwrap() = (started_at, processing_tasks); + self.processing_tasks + .write() + .unwrap() + .start_processing_at(started_at, processing_tasks); #[cfg(test)] { @@ -596,7 +632,10 @@ impl IndexScheduler { } } } - *self.processing_tasks.write().unwrap() = (finished_at, RoaringBitmap::new()); + self.processing_tasks + .write() + .unwrap() + .stop_processing_at(finished_at); wtxn.commit()?; #[cfg(test)] diff --git a/index-scheduler/src/snapshot.rs b/index-scheduler/src/snapshot.rs index 52c8b30ea..c4e4d24ba 100644 --- a/index-scheduler/src/snapshot.rs +++ b/index-scheduler/src/snapshot.rs @@ -31,7 +31,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { let mut snap = String::new(); - let (_time, processing_tasks) = processing_tasks.read().unwrap().clone(); + let processing_tasks = processing_tasks.read().unwrap().processing; snap.push_str(&format!( "### Autobatching Enabled = {autobatching_enabled}\n" ));