Apply code review suggestions

This commit is contained in:
Loïc Lecrenier 2022-10-25 09:48:51 +02:00 committed by Clément Renault
parent e9cd6cbbee
commit 4d25c159e6
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
4 changed files with 61 additions and 47 deletions

View File

@ -26,7 +26,7 @@ pub enum Error {
Heed(#[from] heed::Error),
#[error(transparent)]
Milli(#[from] milli::Error),
#[error("An unexpected crash occurred when processing the task")]
#[error("An unexpected crash occurred when processing the task.")]
ProcessBatchPanicked,
#[error(transparent)]
FileStore(#[from] file_store::Error),

View File

@ -245,8 +245,10 @@ pub struct IndexScheduler {
pub(crate) dumps_path: PathBuf,
// ================= test
/// The next entry is dedicated to the tests.
/// It provide a way to break in multiple part of the scheduler.
// The next entry is dedicated to the tests.
/// Provide a way to set a breakpoint in multiple part of the scheduler.
///
/// See [self.breakpoint()](`IndexScheduler::breakpoint`) for an explanation.
#[cfg(test)]
test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
@ -384,14 +386,16 @@ impl IndexScheduler {
Err(e) => {
log::error!("{}", e);
// Wait one second when an irrecoverable error occurs.
match e {
if matches!(
e,
Error::CorruptedTaskQueue
| Error::TaskDatabaseUpdate(_)
| Error::HeedTransaction(_)
| Error::CreateBatch(_) => {
| Error::CreateBatch(_)
) {
{
std::thread::sleep(Duration::from_secs(1));
}
_ => {}
}
}
}
@ -421,26 +425,24 @@ impl IndexScheduler {
pub fn get_task_ids(&self, query: &Query) -> Result<RoaringBitmap> {
let rtxn = self.env.read_txn()?;
let ProcessingTasks { started_at: started_at_processing, processing: tasks_processing } =
let ProcessingTasks { started_at: started_at_processing, processing: processing_tasks } =
self.processing_tasks.read().unwrap().clone();
let mut tasks = self.all_task_ids(&rtxn)?;
if let Some(status) = &query.status {
let mut include_processing_tasks = false;
let mut status_tasks = RoaringBitmap::new();
for status in status {
match status {
// special case for Processing tasks
Status::Processing => {
include_processing_tasks = true;
status_tasks |= &tasks_processing;
status_tasks |= &processing_tasks;
}
status => status_tasks |= &self.get_status(&rtxn, *status)?,
};
}
if !include_processing_tasks {
tasks -= &tasks_processing;
if !status.contains(&Status::Processing) {
tasks -= &processing_tasks;
}
tasks &= status_tasks;
}
@ -472,22 +474,14 @@ impl IndexScheduler {
// Once we have filtered the two subsets, we put them back together and assign it back to `tasks`.
tasks = {
let (mut filtered_non_processing_tasks, mut filtered_processing_tasks) =
(&tasks - &tasks_processing, &tasks & &tasks_processing);
(&tasks - &processing_tasks, &tasks & &processing_tasks);
// special case for Processing tasks
// in a loop because I want to break early if both query.after_started_at and query.before_started_at are None
// it doesn't actually loop
'block: loop {
let bounds = match (query.after_started_at, query.before_started_at) {
(None, None) => break 'block,
(None, Some(before)) => (Bound::Unbounded, Bound::Excluded(before)),
(Some(after), None) => (Bound::Excluded(after), Bound::Unbounded),
(Some(after), Some(before)) => {
(Bound::Excluded(after), Bound::Excluded(before))
}
};
let start = map_bound(bounds.0, |b| b.unix_timestamp_nanos());
let end = map_bound(bounds.1, |b| b.unix_timestamp_nanos());
// A closure that clears the filtered_processing_tasks if their started_at date falls outside the given bounds
let mut clear_filtered_processing_tasks =
|start: Bound<OffsetDateTime>, end: Bound<OffsetDateTime>| {
let start = map_bound(start, |b| b.unix_timestamp_nanos());
let end = map_bound(end, |b| b.unix_timestamp_nanos());
let is_within_dates = RangeBounds::contains(
&(start, end),
&started_at_processing.unix_timestamp_nanos(),
@ -495,8 +489,19 @@ impl IndexScheduler {
if !is_within_dates {
filtered_processing_tasks.clear();
}
break 'block;
};
match (query.after_started_at, query.before_started_at) {
(None, None) => (),
(None, Some(before)) => {
clear_filtered_processing_tasks(Bound::Unbounded, Bound::Excluded(before))
}
(Some(after), None) => {
clear_filtered_processing_tasks(Bound::Excluded(after), Bound::Unbounded)
}
(Some(after), Some(before)) => {
clear_filtered_processing_tasks(Bound::Excluded(after), Bound::Excluded(before))
}
};
keep_tasks_within_datetimes(
&rtxn,
@ -891,6 +896,18 @@ impl IndexScheduler {
}
}
/// Blocks the thread until the test handle asks to progress to/through this breakpoint.
///
/// Two messages are sent through the channel for each breakpoint.
/// The first message is `(b, false)` and the second message is `(b, true)`.
///
/// Since the channel has a capacity of zero, the `send` and `recv` calls wait for each other.
/// So when the index scheduler calls `test_breakpoint_sdr.send(b, false)`, it blocks
/// the thread until the test catches up by calling `test_breakpoint_rcv.recv()` enough.
/// From the test side, we call `recv()` repeatedly until we find the message `(breakpoint, false)`.
/// As soon as we find it, the index scheduler is unblocked but then wait again on the call to
/// `test_breakpoint_sdr.send(b, true)`. This message will only be able to send once the
/// test asks to progress to the next `(b2, false)`.
#[cfg(test)]
fn breakpoint(&self, b: Breakpoint) {
// We send two messages. The first one will sync with the call

View File

@ -6,7 +6,7 @@ source: index-scheduler/src/lib.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "An unexpected crash occurred when processing the task", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "An unexpected crash occurred when processing the task.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:
enqueued []

View File

@ -259,8 +259,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
| K::Snapshot => {}
};
match &mut task.details {
Some(details) => match details {
Details::IndexSwap { swaps } => {
Some(Details::IndexSwap { swaps }) => {
for (lhs, rhs) in swaps.iter_mut() {
if lhs == swap.0 || lhs == swap.1 {
index_uids.push(lhs);
@ -270,9 +269,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
}
}
}
_ => {}
},
None => {}
_ => (),
}
for index_uid in index_uids {
if index_uid == swap.0 {