use the remove_n_tasks_datetime_earlier_than function when updating batches

This commit is contained in:
Tamo 2025-01-28 11:29:05 +01:00
parent 58f90b70c7
commit 485e3127c7
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69

View File

@ -12,8 +12,8 @@ use time::OffsetDateTime;
use super::{Query, Queue};
use crate::processing::ProcessingTasks;
use crate::utils::{
insert_task_datetime, keep_ids_within_datetimes, map_bound, remove_task_datetime,
ProcessingBatch,
insert_task_datetime, keep_ids_within_datetimes, map_bound,
remove_n_tasks_datetime_earlier_than, remove_task_datetime, ProcessingBatch,
};
use crate::{Error, Result, BEI128};
@ -239,33 +239,21 @@ impl BatchQueue {
remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, old_batch.uid)?;
remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, old_batch.uid)?;
} else {
let started_at = old_batch.started_at.unix_timestamp_nanos();
// We have either one or two enqueued at to remove
let mut exit = old_batch.stats.total_nb_tasks.clamp(0, 2);
let mut iterator = self.enqueued_at.rev_iter_mut(wtxn)?;
while let Some(entry) = iterator.next() {
let (key, mut value) = entry?;
if key > started_at {
continue;
}
if value.remove(old_batch.uid) {
exit = exit.saturating_sub(1);
// Safe because the key and value are owned
unsafe {
iterator.put_current(&key, &value)?;
}
if exit == 0 {
break;
}
}
}
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
// and we still need to find the date by scrolling the database
remove_n_tasks_datetime_earlier_than(
wtxn,
self.enqueued_at,
old_batch.started_at,
if old_batch.stats.total_nb_tasks >= 2 { 2 } else { 1 },
old_batch.uid,
)?;
}
}
if let Some(enqueued_at) = batch.enqueued_at.as_ref() {
insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, batch.uid)?;
insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, batch.uid)?;
}
// A finished batch MUST contains at least one task and have an enqueued_at
let enqueued_at = batch.enqueued_at.as_ref().unwrap();
insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, batch.uid)?;
insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, batch.uid)?;
// Update the started at and finished at
if let Some(ref old_batch) = old_batch {