diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 4831ccbca..7353657f2 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -403,7 +403,6 @@ impl IndexScheduler { let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued; if let Some(task_id) = to_delete.min() { let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; - return Ok(Some(Batch::TaskDeletion(task))); } @@ -876,9 +875,9 @@ impl IndexScheduler { *lhs_tasks -= &index_lhs_task_ids; *lhs_tasks |= &index_rhs_task_ids; })?; - self.update_index(wtxn, rhs, |lhs_tasks| { - *lhs_tasks -= &index_rhs_task_ids; - *lhs_tasks |= &index_lhs_task_ids; + self.update_index(wtxn, rhs, |rhs_tasks| { + *rhs_tasks -= &index_rhs_task_ids; + *rhs_tasks |= &index_lhs_task_ids; })?; // 6. Swap in the index mapper diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 60ece740e..085fbf438 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -257,11 +257,6 @@ pub struct IndexScheduler { pub(crate) index_tasks: Database, /// Store the task ids of tasks which were enqueued at a specific date - /// - /// Note that since we store the date with nanosecond-level precision, it would be - /// reasonable to assume that there is only one task per key. However, it is not a - /// theoretical certainty, and we might want to make it possible to enqueue multiple - /// tasks at a time in the future. pub(crate) enqueued_at: Database, CboRoaringBitmapCodec>, /// Store the task ids of finished tasks which started being processed at a specific date @@ -299,14 +294,14 @@ pub struct IndexScheduler { #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, - #[cfg(test)] /// A list of planned failures within the [`tick`](IndexScheduler::tick) method of the index scheduler. /// /// The first field is the iteration index and the second field identifies a location in the code. + #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, - #[cfg(test)] /// A counter that is incremented before every call to [`tick`](IndexScheduler::tick) + #[cfg(test)] run_loop_iteration: Arc>, } @@ -422,9 +417,7 @@ impl IndexScheduler { | Error::HeedTransaction(_) | Error::CreateBatch(_) ) { - { - std::thread::sleep(Duration::from_secs(1)); - } + std::thread::sleep(Duration::from_secs(1)); } } } @@ -633,7 +626,7 @@ impl IndexScheduler { })?; self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| { - (bitmap.insert(task.uid)); + bitmap.insert(task.uid); })?; utils::insert_task_datetime(&mut wtxn, self.enqueued_at, task.enqueued_at, task.uid)?; @@ -914,6 +907,7 @@ impl IndexScheduler { } } } + self.processing_tasks.write().unwrap().stop_processing_at(finished_at); #[cfg(test)]