From f21ae1f5d1ce71067650e828236bad34f5ef9f67 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 22 Jan 2025 17:58:58 +0100 Subject: [PATCH] Remove the batch id from the date time databases --- .../src/scheduler/process_batch.rs | 35 ++++++++++++++++--- crates/index-scheduler/src/utils.rs | 27 ++++++++++++++ 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 7eda1d56f..5531ced9f 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -16,7 +16,10 @@ use crate::processing::{ InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, UpdateIndexProgress, }; -use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; +use crate::utils::{ + self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task, + ProcessingBatch, +}; use crate::{Error, IndexScheduler, Result, TaskId}; impl IndexScheduler { @@ -418,7 +421,6 @@ impl IndexScheduler { to_delete_tasks -= &enqueued_tasks; // 2. We now have a list of tasks to delete, delete them - let mut affected_indexes = HashSet::new(); let mut affected_statuses = HashSet::new(); let mut affected_kinds = HashSet::new(); @@ -515,9 +517,34 @@ impl IndexScheduler { tasks -= &to_delete_tasks; // We must remove the batch entirely if tasks.is_empty() { - self.queue.batches.all_batches.delete(wtxn, &batch_id)?; - self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?; + if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? { + remove_n_tasks_datetime_earlier_than( + wtxn, + self.queue.batches.started_at, + batch.started_at, + if batch.stats.total_nb_tasks >= 2 { 2 } else { 1 }, + batch_id, + )?; + remove_task_datetime( + wtxn, + self.queue.batches.started_at, + batch.started_at, + batch_id, + )?; + if let Some(finished_at) = batch.finished_at { + remove_task_datetime( + wtxn, + self.queue.batches.finished_at, + finished_at, + batch_id, + )?; + } + + self.queue.batches.all_batches.delete(wtxn, &batch_id)?; + self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?; + } } + // Anyway, we must remove the batch from all its reverse indexes. // The only way to do that is to check diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 80a0bb5ff..028d193e9 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -174,6 +174,33 @@ pub(crate) fn remove_task_datetime( Ok(()) } +pub(crate) fn remove_n_tasks_datetime_earlier_than( + wtxn: &mut RwTxn, + database: Database, + earlier_than: OffsetDateTime, + mut count: usize, + task_id: TaskId, +) -> Result<()> { + let earlier_than = earlier_than.unix_timestamp_nanos(); + let mut iter = database.rev_range_mut(wtxn, &(..earlier_than))?; + while let Some((current, mut existing)) = iter.next().transpose()? { + count -= existing.remove(task_id) as usize; + + if existing.is_empty() { + // safety: We don't keep references to the database + unsafe { iter.del_current()? }; + } else { + // safety: We don't keep references to the database + unsafe { iter.put_current(¤t, &existing)? }; + } + if count == 0 { + break; + } + } + + Ok(()) +} + pub(crate) fn keep_ids_within_datetimes( rtxn: &RoTxn, ids: &mut RoaringBitmap,