From 4d25c159e625b7e1388d2a0039a1b4093a38bb82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Lecrenier?= Date: Tue, 25 Oct 2022 09:48:51 +0200 Subject: [PATCH] Apply code review suggestions --- index-scheduler/src/error.rs | 2 +- index-scheduler/src/lib.rs | 83 +++++++++++-------- .../index_creation_failed.snap | 2 +- index-scheduler/src/utils.rs | 21 ++--- 4 files changed, 61 insertions(+), 47 deletions(-) diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index c57cafac0..b34bcb2d8 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -26,7 +26,7 @@ pub enum Error { Heed(#[from] heed::Error), #[error(transparent)] Milli(#[from] milli::Error), - #[error("An unexpected crash occurred when processing the task")] + #[error("An unexpected crash occurred when processing the task.")] ProcessBatchPanicked, #[error(transparent)] FileStore(#[from] file_store::Error), diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 905888436..811f08e48 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -245,8 +245,10 @@ pub struct IndexScheduler { pub(crate) dumps_path: PathBuf, // ================= test - /// The next entry is dedicated to the tests. - /// It provide a way to break in multiple part of the scheduler. + // The next entry is dedicated to the tests. + /// Provide a way to set a breakpoint in multiple part of the scheduler. + /// + /// See [self.breakpoint()](`IndexScheduler::breakpoint`) for an explanation. #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, @@ -384,14 +386,16 @@ impl IndexScheduler { Err(e) => { log::error!("{}", e); // Wait one second when an irrecoverable error occurs. - match e { + if matches!( + e, Error::CorruptedTaskQueue - | Error::TaskDatabaseUpdate(_) - | Error::HeedTransaction(_) - | Error::CreateBatch(_) => { + | Error::TaskDatabaseUpdate(_) + | Error::HeedTransaction(_) + | Error::CreateBatch(_) + ) { + { std::thread::sleep(Duration::from_secs(1)); } - _ => {} } } } @@ -421,26 +425,24 @@ impl IndexScheduler { pub fn get_task_ids(&self, query: &Query) -> Result { let rtxn = self.env.read_txn()?; - let ProcessingTasks { started_at: started_at_processing, processing: tasks_processing } = + let ProcessingTasks { started_at: started_at_processing, processing: processing_tasks } = self.processing_tasks.read().unwrap().clone(); let mut tasks = self.all_task_ids(&rtxn)?; if let Some(status) = &query.status { - let mut include_processing_tasks = false; let mut status_tasks = RoaringBitmap::new(); for status in status { match status { // special case for Processing tasks Status::Processing => { - include_processing_tasks = true; - status_tasks |= &tasks_processing; + status_tasks |= &processing_tasks; } status => status_tasks |= &self.get_status(&rtxn, *status)?, }; } - if !include_processing_tasks { - tasks -= &tasks_processing; + if !status.contains(&Status::Processing) { + tasks -= &processing_tasks; } tasks &= status_tasks; } @@ -472,31 +474,34 @@ impl IndexScheduler { // Once we have filtered the two subsets, we put them back together and assign it back to `tasks`. tasks = { let (mut filtered_non_processing_tasks, mut filtered_processing_tasks) = - (&tasks - &tasks_processing, &tasks & &tasks_processing); + (&tasks - &processing_tasks, &tasks & &processing_tasks); // special case for Processing tasks - // in a loop because I want to break early if both query.after_started_at and query.before_started_at are None - // it doesn't actually loop - 'block: loop { - let bounds = match (query.after_started_at, query.before_started_at) { - (None, None) => break 'block, - (None, Some(before)) => (Bound::Unbounded, Bound::Excluded(before)), - (Some(after), None) => (Bound::Excluded(after), Bound::Unbounded), - (Some(after), Some(before)) => { - (Bound::Excluded(after), Bound::Excluded(before)) + // A closure that clears the filtered_processing_tasks if their started_at date falls outside the given bounds + let mut clear_filtered_processing_tasks = + |start: Bound, end: Bound| { + let start = map_bound(start, |b| b.unix_timestamp_nanos()); + let end = map_bound(end, |b| b.unix_timestamp_nanos()); + let is_within_dates = RangeBounds::contains( + &(start, end), + &started_at_processing.unix_timestamp_nanos(), + ); + if !is_within_dates { + filtered_processing_tasks.clear(); } }; - let start = map_bound(bounds.0, |b| b.unix_timestamp_nanos()); - let end = map_bound(bounds.1, |b| b.unix_timestamp_nanos()); - let is_within_dates = RangeBounds::contains( - &(start, end), - &started_at_processing.unix_timestamp_nanos(), - ); - if !is_within_dates { - filtered_processing_tasks.clear(); + match (query.after_started_at, query.before_started_at) { + (None, None) => (), + (None, Some(before)) => { + clear_filtered_processing_tasks(Bound::Unbounded, Bound::Excluded(before)) } - break 'block; - } + (Some(after), None) => { + clear_filtered_processing_tasks(Bound::Excluded(after), Bound::Unbounded) + } + (Some(after), Some(before)) => { + clear_filtered_processing_tasks(Bound::Excluded(after), Bound::Excluded(before)) + } + }; keep_tasks_within_datetimes( &rtxn, @@ -891,6 +896,18 @@ impl IndexScheduler { } } + /// Blocks the thread until the test handle asks to progress to/through this breakpoint. + /// + /// Two messages are sent through the channel for each breakpoint. + /// The first message is `(b, false)` and the second message is `(b, true)`. + /// + /// Since the channel has a capacity of zero, the `send` and `recv` calls wait for each other. + /// So when the index scheduler calls `test_breakpoint_sdr.send(b, false)`, it blocks + /// the thread until the test catches up by calling `test_breakpoint_rcv.recv()` enough. + /// From the test side, we call `recv()` repeatedly until we find the message `(breakpoint, false)`. + /// As soon as we find it, the index scheduler is unblocked but then wait again on the call to + /// `test_breakpoint_sdr.send(b, true)`. This message will only be able to send once the + /// test asks to progress to the next `(b2, false)`. #[cfg(test)] fn breakpoint(&self, b: Breakpoint) { // We send two messages. The first one will sync with the call diff --git a/index-scheduler/src/snapshots/lib.rs/panic_in_process_batch_for_index_creation/index_creation_failed.snap b/index-scheduler/src/snapshots/lib.rs/panic_in_process_batch_for_index_creation/index_creation_failed.snap index d9a406c26..211c67326 100644 --- a/index-scheduler/src/snapshots/lib.rs/panic_in_process_batch_for_index_creation/index_creation_failed.snap +++ b/index-scheduler/src/snapshots/lib.rs/panic_in_process_batch_for_index_creation/index_creation_failed.snap @@ -6,7 +6,7 @@ source: index-scheduler/src/lib.rs [] ---------------------------------------------------------------------- ### All Tasks: -0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "An unexpected crash occurred when processing the task", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "An unexpected crash occurred when processing the task.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} ---------------------------------------------------------------------- ### Status: enqueued [] diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index ca46e7ae6..32ff82d37 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -259,20 +259,17 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) { | K::Snapshot => {} }; match &mut task.details { - Some(details) => match details { - Details::IndexSwap { swaps } => { - for (lhs, rhs) in swaps.iter_mut() { - if lhs == swap.0 || lhs == swap.1 { - index_uids.push(lhs); - } - if rhs == swap.0 || rhs == swap.1 { - index_uids.push(rhs); - } + Some(Details::IndexSwap { swaps }) => { + for (lhs, rhs) in swaps.iter_mut() { + if lhs == swap.0 || lhs == swap.1 { + index_uids.push(lhs); + } + if rhs == swap.0 || rhs == swap.1 { + index_uids.push(rhs); } } - _ => {} - }, - None => {} + } + _ => (), } for index_uid in index_uids { if index_uid == swap.0 {