diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 9541e9736..4c7d9e363 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -67,7 +67,7 @@ use roaring::RoaringBitmap; use synchronoise::SignalEvent; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; +use utils::{filter_out_references_to_newer_tasks, keep_ids_within_datetimes, map_bound}; use uuid::Uuid; use crate::index_mapper::IndexMapper; @@ -904,7 +904,7 @@ impl IndexScheduler { ), }; - keep_tasks_within_datetimes( + keep_ids_within_datetimes( rtxn, &mut filtered_non_processing_tasks, self.started_at, @@ -914,7 +914,7 @@ impl IndexScheduler { filtered_non_processing_tasks | filtered_processing_tasks }; - keep_tasks_within_datetimes( + keep_ids_within_datetimes( rtxn, &mut tasks, self.enqueued_at, @@ -922,7 +922,7 @@ impl IndexScheduler { *before_enqueued_at, )?; - keep_tasks_within_datetimes( + keep_ids_within_datetimes( rtxn, &mut tasks, self.finished_at, @@ -1092,7 +1092,7 @@ impl IndexScheduler { ), }; - keep_tasks_within_datetimes( + keep_ids_within_datetimes( rtxn, &mut filtered_non_processing_batches, self.batch_started_at, @@ -1102,7 +1102,7 @@ impl IndexScheduler { filtered_non_processing_batches | filtered_processing_batches }; - keep_tasks_within_datetimes( + keep_ids_within_datetimes( rtxn, &mut batches, self.batch_enqueued_at, @@ -1110,7 +1110,7 @@ impl IndexScheduler { query.before_enqueued_at, )?; - keep_tasks_within_datetimes( + keep_ids_within_datetimes( rtxn, &mut batches, self.batch_finished_at, diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 3362d802f..827cdf95e 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -270,16 +270,10 @@ impl IndexScheduler { pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?; - dbg!(&task); - + debug_assert!(old_task != *task); debug_assert_eq!(old_task.uid, task.uid); debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some()); - // TODO: This shouldn't ever happen, we should assert it - if old_task == *task { - return Ok(()); - } - if old_task.status != task.status { self.update_status(wtxn, old_task.status, |bitmap| { bitmap.remove(task.uid); @@ -505,10 +499,9 @@ pub(crate) fn remove_task_datetime( Ok(()) } -// TODO: Rename the function since it also applies to batches -pub(crate) fn keep_tasks_within_datetimes( +pub(crate) fn keep_ids_within_datetimes( rtxn: &RoTxn, - tasks: &mut RoaringBitmap, + ids: &mut RoaringBitmap, database: Database, after: Option, before: Option, @@ -519,15 +512,15 @@ pub(crate) fn keep_tasks_within_datetimes( (Some(after), None) => (Bound::Excluded(*after), Bound::Unbounded), (Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)), }; - let mut collected_task_ids = RoaringBitmap::new(); + let mut collected_ids = RoaringBitmap::new(); let start = map_bound(start, |b| b.unix_timestamp_nanos()); let end = map_bound(end, |b| b.unix_timestamp_nanos()); let iter = database.range(rtxn, &(start, end))?; for r in iter { - let (_timestamp, task_ids) = r?; - collected_task_ids |= task_ids; + let (_timestamp, ids) = r?; + collected_ids |= ids; } - *tasks &= collected_task_ids; + *ids &= collected_ids; Ok(()) }