implements the two last TODOs

This commit is contained in:
Tamo 2024-11-20 10:23:45 +01:00
parent d9a4e69990
commit e145d71a62
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
2 changed files with 14 additions and 21 deletions

View File

@ -67,7 +67,7 @@ use roaring::RoaringBitmap;
use synchronoise::SignalEvent; use synchronoise::SignalEvent;
use time::format_description::well_known::Rfc3339; use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime; use time::OffsetDateTime;
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; use utils::{filter_out_references_to_newer_tasks, keep_ids_within_datetimes, map_bound};
use uuid::Uuid; use uuid::Uuid;
use crate::index_mapper::IndexMapper; use crate::index_mapper::IndexMapper;
@ -904,7 +904,7 @@ impl IndexScheduler {
), ),
}; };
keep_tasks_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut filtered_non_processing_tasks, &mut filtered_non_processing_tasks,
self.started_at, self.started_at,
@ -914,7 +914,7 @@ impl IndexScheduler {
filtered_non_processing_tasks | filtered_processing_tasks filtered_non_processing_tasks | filtered_processing_tasks
}; };
keep_tasks_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut tasks, &mut tasks,
self.enqueued_at, self.enqueued_at,
@ -922,7 +922,7 @@ impl IndexScheduler {
*before_enqueued_at, *before_enqueued_at,
)?; )?;
keep_tasks_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut tasks, &mut tasks,
self.finished_at, self.finished_at,
@ -1092,7 +1092,7 @@ impl IndexScheduler {
), ),
}; };
keep_tasks_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut filtered_non_processing_batches, &mut filtered_non_processing_batches,
self.batch_started_at, self.batch_started_at,
@ -1102,7 +1102,7 @@ impl IndexScheduler {
filtered_non_processing_batches | filtered_processing_batches filtered_non_processing_batches | filtered_processing_batches
}; };
keep_tasks_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut batches, &mut batches,
self.batch_enqueued_at, self.batch_enqueued_at,
@ -1110,7 +1110,7 @@ impl IndexScheduler {
query.before_enqueued_at, query.before_enqueued_at,
)?; )?;
keep_tasks_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut batches, &mut batches,
self.batch_finished_at, self.batch_finished_at,

View File

@ -270,16 +270,10 @@ impl IndexScheduler {
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?; let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?;
dbg!(&task); debug_assert!(old_task != *task);
debug_assert_eq!(old_task.uid, task.uid); debug_assert_eq!(old_task.uid, task.uid);
debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some()); debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some());
// TODO: This shouldn't ever happen, we should assert it
if old_task == *task {
return Ok(());
}
if old_task.status != task.status { if old_task.status != task.status {
self.update_status(wtxn, old_task.status, |bitmap| { self.update_status(wtxn, old_task.status, |bitmap| {
bitmap.remove(task.uid); bitmap.remove(task.uid);
@ -505,10 +499,9 @@ pub(crate) fn remove_task_datetime(
Ok(()) Ok(())
} }
// TODO: Rename the function since it also applies to batches pub(crate) fn keep_ids_within_datetimes(
pub(crate) fn keep_tasks_within_datetimes(
rtxn: &RoTxn, rtxn: &RoTxn,
tasks: &mut RoaringBitmap, ids: &mut RoaringBitmap,
database: Database<BEI128, CboRoaringBitmapCodec>, database: Database<BEI128, CboRoaringBitmapCodec>,
after: Option<OffsetDateTime>, after: Option<OffsetDateTime>,
before: Option<OffsetDateTime>, before: Option<OffsetDateTime>,
@ -519,15 +512,15 @@ pub(crate) fn keep_tasks_within_datetimes(
(Some(after), None) => (Bound::Excluded(*after), Bound::Unbounded), (Some(after), None) => (Bound::Excluded(*after), Bound::Unbounded),
(Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)), (Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)),
}; };
let mut collected_task_ids = RoaringBitmap::new(); let mut collected_ids = RoaringBitmap::new();
let start = map_bound(start, |b| b.unix_timestamp_nanos()); let start = map_bound(start, |b| b.unix_timestamp_nanos());
let end = map_bound(end, |b| b.unix_timestamp_nanos()); let end = map_bound(end, |b| b.unix_timestamp_nanos());
let iter = database.range(rtxn, &(start, end))?; let iter = database.range(rtxn, &(start, end))?;
for r in iter { for r in iter {
let (_timestamp, task_ids) = r?; let (_timestamp, ids) = r?;
collected_task_ids |= task_ids; collected_ids |= ids;
} }
*tasks &= collected_task_ids; *ids &= collected_ids;
Ok(()) Ok(())
} }