From 966cbdab6968788020bba9e5fb144c6d253ee039 Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 4 Sep 2023 15:39:54 +0200 Subject: [PATCH] make the tests compile again --- index-scheduler/src/index_mapper/index_map.rs | 3 +- index-scheduler/src/insta_snapshot.rs | 10 +- index-scheduler/src/lib.rs | 129 ++++++++---------- index-scheduler/src/utils.rs | 31 +++-- 4 files changed, 82 insertions(+), 91 deletions(-) diff --git a/index-scheduler/src/index_mapper/index_map.rs b/index-scheduler/src/index_mapper/index_map.rs index 5cdadcd3b..0a19fc23d 100644 --- a/index-scheduler/src/index_mapper/index_map.rs +++ b/index-scheduler/src/index_mapper/index_map.rs @@ -340,7 +340,8 @@ mod tests { impl IndexMapper { fn test() -> (Self, Env, IndexSchedulerHandle) { let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); - (index_scheduler.index_mapper, index_scheduler.env, handle) + let index_scheduler = index_scheduler.inner(); + (index_scheduler.index_mapper.clone(), index_scheduler.env.clone(), handle) } } diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index 5943b2319..2b87bc762 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -1,5 +1,6 @@ use std::collections::BTreeSet; use std::fmt::Write; +use std::ops::Deref; use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{Database, RoTxn}; @@ -8,12 +9,13 @@ use meilisearch_types::tasks::{Details, Task}; use roaring::RoaringBitmap; use crate::index_mapper::IndexMapper; -use crate::{IndexScheduler, Kind, Status, BEI128}; +use crate::{IndexScheduler, IndexSchedulerInner, Kind, Status, BEI128}; pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { scheduler.assert_internally_consistent(); - let IndexScheduler { + let inner = scheduler.inner(); + let IndexSchedulerInner { autobatching_enabled, must_stop_processing: _, processing_tasks, @@ -39,13 +41,13 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { planned_failures: _, run_loop_iteration: _, zookeeper: _, - } = scheduler; + } = inner.deref(); let rtxn = env.read_txn().unwrap(); let mut snap = String::new(); - let processing_tasks = processing_tasks.read().unwrap().processing.clone(); + let processing_tasks = processing_tasks.read().processing.clone(); snap.push_str(&format!("### Autobatching Enabled = {autobatching_enabled}\n")); snap.push_str("### Processing Tasks:\n"); snap.push_str(&snapshot_bitmap(&processing_tasks)); diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index c5734dcfb..21dd3683c 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -355,7 +355,7 @@ impl IndexScheduler { /// only once per index scheduler. fn run(&self) { #[cfg(test)] - self.breakpoint(Breakpoint::Init); + self.inner().breakpoint(Breakpoint::Init); let latch = match self.zookeeper { Some(ref zookeeper) => { @@ -891,30 +891,6 @@ impl IndexScheduler { pub fn features(&self) -> Result { self.inner().features() } - - /// Blocks the thread until the test handle asks to progress to/through this breakpoint. - /// - /// Two messages are sent through the channel for each breakpoint. - /// The first message is `(b, false)` and the second message is `(b, true)`. - /// - /// Since the channel has a capacity of zero, the `send` and `recv` calls wait for each other. - /// So when the index scheduler calls `test_breakpoint_sdr.send(b, false)`, it blocks - /// the thread until the test catches up by calling `test_breakpoint_rcv.recv()` enough. - /// From the test side, we call `recv()` repeatedly until we find the message `(breakpoint, false)`. - /// As soon as we find it, the index scheduler is unblocked but then wait again on the call to - /// `test_breakpoint_sdr.send(b, true)`. This message will only be able to send once the - /// test asks to progress to the next `(b2, false)`. - #[cfg(test)] - fn breakpoint(&self, b: Breakpoint) { - // We send two messages. The first one will sync with the call - // to `handle.wait_until(b)`. The second one will block until the - // the next call to `handle.wait_until(..)`. - self.test_breakpoint_sdr.send((b, false)).unwrap(); - // This one will only be able to be sent if the test handle stays alive. - // If it fails, then it means that we have exited the test. - // By crashing with `unwrap`, we kill the run loop. - self.test_breakpoint_sdr.send((b, true)).unwrap(); - } } /// This is the internal structure that keeps the indexes alive. @@ -1187,7 +1163,7 @@ impl IndexSchedulerInner { fn tick(&self) -> Result { #[cfg(test)] { - *self.run_loop_iteration.write().unwrap() += 1; + *self.run_loop_iteration.write() += 1; self.breakpoint(Breakpoint::Start); } @@ -1728,6 +1704,30 @@ impl IndexSchedulerInner { let index = self.index_mapper.create_index(wtxn, name, date)?; Ok(index) } + + /// Blocks the thread until the test handle asks to progress to/through this breakpoint. + /// + /// Two messages are sent through the channel for each breakpoint. + /// The first message is `(b, false)` and the second message is `(b, true)`. + /// + /// Since the channel has a capacity of zero, the `send` and `recv` calls wait for each other. + /// So when the index scheduler calls `test_breakpoint_sdr.send(b, false)`, it blocks + /// the thread until the test catches up by calling `test_breakpoint_rcv.recv()` enough. + /// From the test side, we call `recv()` repeatedly until we find the message `(breakpoint, false)`. + /// As soon as we find it, the index scheduler is unblocked but then wait again on the call to + /// `test_breakpoint_sdr.send(b, true)`. This message will only be able to send once the + /// test asks to progress to the next `(b2, false)`. + #[cfg(test)] + fn breakpoint(&self, b: Breakpoint) { + // We send two messages. The first one will sync with the call + // to `handle.wait_until(b)`. The second one will block until the + // the next call to `handle.wait_until(..)`. + self.test_breakpoint_sdr.send((b, false)).unwrap(); + // This one will only be able to be sent if the test handle stays alive. + // If it fails, then it means that we have exited the test. + // By crashing with `unwrap`, we kill the run loop. + self.test_breakpoint_sdr.send((b, true)).unwrap(); + } } pub struct Dump<'a> { @@ -2043,12 +2043,13 @@ mod tests { (index_scheduler, index_scheduler_handle) } + } + impl IndexSchedulerInner { /// Return a [`PlannedFailure`](Error::PlannedFailure) error if a failure is planned /// for the given location and current run loop iteration. pub fn maybe_fail(&self, location: FailureLocation) -> Result<()> { - if self.planned_failures.contains(&(*self.run_loop_iteration.read().unwrap(), location)) - { + if self.planned_failures.contains(&(*self.run_loop_iteration.read(), location)) { match location { FailureLocation::PanicInsideProcessBatch => { panic!("simulated panic") @@ -3235,46 +3236,45 @@ mod tests { handle.advance_n_successful_batches(3); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "processed_all_tasks"); - let rtxn = index_scheduler.env.read_txn().unwrap(); let query = Query { limit: Some(0), ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[]"); let query = Query { limit: Some(1), ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[2,]"); let query = Query { limit: Some(2), ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); let query = Query { from: Some(1), ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[0,1,]"); let query = Query { from: Some(2), ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); let query = Query { from: Some(1), limit: Some(1), ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[1,]"); let query = Query { from: Some(1), limit: Some(2), ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[0,1,]"); } @@ -3297,17 +3297,15 @@ mod tests { handle.advance_till([Start, BatchCreated]); - let rtxn = index_scheduler.env.read_txn().unwrap(); - let query = Query { statuses: Some(vec![Status::Processing]), ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[0,]"); // only the processing tasks in the first tick let query = Query { statuses: Some(vec![Status::Enqueued]), ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); // only the enqueued tasks in the first tick @@ -3316,7 +3314,7 @@ mod tests { ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick @@ -3326,7 +3324,7 @@ mod tests { ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // both enqueued and processing tasks in the first tick, but limited to those with a started_at // that comes after the start of the test, which should excludes the enqueued tasks @@ -3338,7 +3336,7 @@ mod tests { ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // both enqueued and processing tasks in the first tick, but limited to those with a started_at // that comes before the start of the test, which should excludes all of them @@ -3351,7 +3349,7 @@ mod tests { ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // both enqueued and processing tasks in the first tick, but limited to those with a started_at // that comes after the start of the test and before one minute after the start of the test, @@ -3367,8 +3365,6 @@ mod tests { BatchCreated, ]); - let rtxn = index_scheduler.env.read_txn().unwrap(); - let second_start_time = OffsetDateTime::now_utc(); let query = Query { @@ -3378,7 +3374,7 @@ mod tests { ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // both succeeded and processing tasks in the first tick, but limited to those with a started_at // that comes after the start of the test and before one minute after the start of the test, @@ -3391,7 +3387,7 @@ mod tests { ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // both succeeded and processing tasks in the first tick, but limited to those with a started_at // that comes before the start of the test, which should exclude all tasks @@ -3404,7 +3400,7 @@ mod tests { ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // both succeeded and processing tasks in the first tick, but limited to those with a started_at // that comes after the start of the second part of the test and before one minute after the @@ -3421,10 +3417,8 @@ mod tests { BatchCreated, ]); - let rtxn = index_scheduler.env.read_txn().unwrap(); - let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // we run the same query to verify that, and indeed find that the last task is matched snapshot!(snapshot_bitmap(&tasks), @"[2,]"); @@ -3436,19 +3430,18 @@ mod tests { ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // enqueued, succeeded, or processing tasks started after the second part of the test, should // again only return the last task snapshot!(snapshot_bitmap(&tasks), @"[2,]"); handle.advance_till([ProcessBatchFailed, AfterProcessing]); - let rtxn = index_scheduler.read_txn().unwrap(); // now the last task should have failed snapshot!(snapshot_index_scheduler(&index_scheduler), name: "end"); let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // so running the last query should return nothing snapshot!(snapshot_bitmap(&tasks), @"[]"); @@ -3460,7 +3453,7 @@ mod tests { ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // but the same query on failed tasks should return the last task snapshot!(snapshot_bitmap(&tasks), @"[2,]"); @@ -3472,7 +3465,7 @@ mod tests { ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // but the same query on failed tasks should return the last task snapshot!(snapshot_bitmap(&tasks), @"[2,]"); @@ -3485,7 +3478,7 @@ mod tests { ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // same query but with an invalid uid snapshot!(snapshot_bitmap(&tasks), @"[]"); @@ -3498,7 +3491,7 @@ mod tests { ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // same query but with a valid uid snapshot!(snapshot_bitmap(&tasks), @"[2,]"); @@ -3526,11 +3519,9 @@ mod tests { handle.advance_till([Start, BatchCreated]); - let rtxn = index_scheduler.env.read_txn().unwrap(); - let query = Query { index_uids: Some(vec!["catto".to_owned()]), ..Default::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // only the first task associated with catto is returned, the indexSwap tasks are excluded! snapshot!(snapshot_bitmap(&tasks), @"[0,]"); @@ -3538,7 +3529,6 @@ mod tests { let query = Query { index_uids: Some(vec!["catto".to_owned()]), ..Default::default() }; let (tasks, _) = index_scheduler .get_task_ids_from_authorized_indexes( - &rtxn, &query, &AuthFilter::with_allowed_indexes( vec![IndexUidPattern::new_unchecked("doggo")].into_iter().collect(), @@ -3552,7 +3542,6 @@ mod tests { let query = Query::default(); let (tasks, _) = index_scheduler .get_task_ids_from_authorized_indexes( - &rtxn, &query, &AuthFilter::with_allowed_indexes( vec![IndexUidPattern::new_unchecked("doggo")].into_iter().collect(), @@ -3566,7 +3555,6 @@ mod tests { let query = Query::default(); let (tasks, _) = index_scheduler .get_task_ids_from_authorized_indexes( - &rtxn, &query, &AuthFilter::with_allowed_indexes( vec![ @@ -3584,7 +3572,7 @@ mod tests { let query = Query::default(); let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // we asked for all the tasks with all index authorized -> all tasks returned snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,3,]"); @@ -3614,10 +3602,9 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); - let rtxn = index_scheduler.read_txn().unwrap(); let query = Query { canceled_by: Some(vec![task_cancelation.uid]), ..Query::default() }; let (tasks, _) = index_scheduler - .get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) + .get_task_ids_from_authorized_indexes(&query, &AuthFilter::default()) .unwrap(); // 0 is not returned because it was not canceled, 3 is not returned because it is the uid of the // taskCancelation itself @@ -3626,7 +3613,6 @@ mod tests { let query = Query { canceled_by: Some(vec![task_cancelation.uid]), ..Query::default() }; let (tasks, _) = index_scheduler .get_task_ids_from_authorized_indexes( - &rtxn, &query, &AuthFilter::with_allowed_indexes( vec![IndexUidPattern::new_unchecked("doggo")].into_iter().collect(), @@ -3650,7 +3636,7 @@ mod tests { handle.advance_one_failed_batch(); // Still in the first iteration - assert_eq!(*index_scheduler.run_loop_iteration.read().unwrap(), 1); + assert_eq!(*index_scheduler.inner().run_loop_iteration.read(), 1); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed"); } @@ -4442,7 +4428,7 @@ mod tests { handle.advance_till([Start, BatchCreated, ProcessBatchFailed, AfterProcessing]); // Still in the first iteration - assert_eq!(*index_scheduler.run_loop_iteration.read().unwrap(), 1); + assert_eq!(*index_scheduler.inner().run_loop_iteration.read(), 1); // No matter what happens in process_batch, the index_scheduler should be internally consistent snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed"); } @@ -4530,6 +4516,7 @@ mod tests { .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) .unwrap(); + let index_scheduler = index_scheduler.inner(); let rtxn = index_scheduler.env.read_txn().unwrap(); let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index 894c2b3de..c224b04b8 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -331,11 +331,12 @@ pub fn clamp_to_page_size(size: usize) -> usize { } #[cfg(test)] -impl IndexScheduler { +impl crate::IndexScheduler { /// Asserts that the index scheduler's content is internally consistent. pub fn assert_internally_consistent(&self) { - let rtxn = self.env.read_txn().unwrap(); - for task in self.all_tasks.iter(&rtxn).unwrap() { + let this = self.inner(); + let rtxn = this.env.read_txn().unwrap(); + for task in this.all_tasks.iter(&rtxn).unwrap() { let (task_id, task) = task.unwrap(); let task_id = task_id.get(); @@ -354,21 +355,21 @@ impl IndexScheduler { } = task; assert_eq!(uid, task.uid); if let Some(task_index_uid) = &task_index_uid { - assert!(self + assert!(this .index_tasks .get(&rtxn, task_index_uid.as_str()) .unwrap() .unwrap() .contains(task.uid)); } - let db_enqueued_at = self + let db_enqueued_at = this .enqueued_at .get(&rtxn, &BEI128::new(enqueued_at.unix_timestamp_nanos())) .unwrap() .unwrap(); assert!(db_enqueued_at.contains(task_id)); if let Some(started_at) = started_at { - let db_started_at = self + let db_started_at = this .started_at .get(&rtxn, &BEI128::new(started_at.unix_timestamp_nanos())) .unwrap() @@ -376,7 +377,7 @@ impl IndexScheduler { assert!(db_started_at.contains(task_id)); } if let Some(finished_at) = finished_at { - let db_finished_at = self + let db_finished_at = this .finished_at .get(&rtxn, &BEI128::new(finished_at.unix_timestamp_nanos())) .unwrap() @@ -384,9 +385,9 @@ impl IndexScheduler { assert!(db_finished_at.contains(task_id)); } if let Some(canceled_by) = canceled_by { - let db_canceled_tasks = self.get_status(&rtxn, Status::Canceled).unwrap(); + let db_canceled_tasks = this.get_status(&rtxn, Status::Canceled).unwrap(); assert!(db_canceled_tasks.contains(uid)); - let db_canceling_task = self.get_task(&rtxn, canceled_by).unwrap().unwrap(); + let db_canceling_task = this.get_task(&rtxn, canceled_by).unwrap().unwrap(); assert_eq!(db_canceling_task.status, Status::Succeeded); match db_canceling_task.kind { KindWithContent::TaskCancelation { query: _, tasks } => { @@ -427,7 +428,7 @@ impl IndexScheduler { Details::IndexInfo { primary_key: pk1 } => match &kind { KindWithContent::IndexCreation { index_uid, primary_key: pk2 } | KindWithContent::IndexUpdate { index_uid, primary_key: pk2 } => { - self.index_tasks + this.index_tasks .get(&rtxn, index_uid.as_str()) .unwrap() .unwrap() @@ -535,23 +536,23 @@ impl IndexScheduler { } } - assert!(self.get_status(&rtxn, status).unwrap().contains(uid)); - assert!(self.get_kind(&rtxn, kind.as_kind()).unwrap().contains(uid)); + assert!(this.get_status(&rtxn, status).unwrap().contains(uid)); + assert!(this.get_kind(&rtxn, kind.as_kind()).unwrap().contains(uid)); if let KindWithContent::DocumentAdditionOrUpdate { content_file, .. } = kind { match status { Status::Enqueued | Status::Processing => { - assert!(self + assert!(this .file_store .all_uuids() .unwrap() .any(|uuid| uuid.as_ref().unwrap() == &content_file), "Could not find uuid `{content_file}` in the file_store. Available uuids are {:?}.", - self.file_store.all_uuids().unwrap().collect::, file_store::Error>>().unwrap(), + this.file_store.all_uuids().unwrap().collect::, file_store::Error>>().unwrap(), ); } Status::Succeeded | Status::Failed | Status::Canceled => { - assert!(self + assert!(this .file_store .all_uuids() .unwrap()