diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index e50b790cf..67c3f71fc 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -12,8 +12,8 @@ use time::OffsetDateTime; use super::{Query, Queue}; use crate::processing::ProcessingTasks; use crate::utils::{ - insert_task_datetime, keep_ids_within_datetimes, map_bound, remove_task_datetime, - ProcessingBatch, + insert_task_datetime, keep_ids_within_datetimes, map_bound, + remove_n_tasks_datetime_earlier_than, remove_task_datetime, ProcessingBatch, }; use crate::{Error, Result, BEI128}; @@ -239,33 +239,21 @@ impl BatchQueue { remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, old_batch.uid)?; remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, old_batch.uid)?; } else { - let started_at = old_batch.started_at.unix_timestamp_nanos(); - - // We have either one or two enqueued at to remove - let mut exit = old_batch.stats.total_nb_tasks.clamp(0, 2); - let mut iterator = self.enqueued_at.rev_iter_mut(wtxn)?; - while let Some(entry) = iterator.next() { - let (key, mut value) = entry?; - if key > started_at { - continue; - } - if value.remove(old_batch.uid) { - exit = exit.saturating_sub(1); - // Safe because the key and value are owned - unsafe { - iterator.put_current(&key, &value)?; - } - if exit == 0 { - break; - } - } - } + // If we don't have the enqueued at in the batch it means the database comes from the v1.12 + // and we still need to find the date by scrolling the database + remove_n_tasks_datetime_earlier_than( + wtxn, + self.enqueued_at, + old_batch.started_at, + if old_batch.stats.total_nb_tasks >= 2 { 2 } else { 1 }, + old_batch.uid, + )?; } } - if let Some(enqueued_at) = batch.enqueued_at.as_ref() { - insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, batch.uid)?; - insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, batch.uid)?; - } + // A finished batch MUST contains at least one task and have an enqueued_at + let enqueued_at = batch.enqueued_at.as_ref().unwrap(); + insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, batch.uid)?; + insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, batch.uid)?; // Update the started at and finished at if let Some(ref old_batch) = old_batch {