stop leaking the update files of the canceled tasks

This commit is contained in:
Tamo 2024-11-20 13:17:54 +01:00
parent 7e379b3d14
commit 8ad68dd708
No known key found for this signature in database
GPG key ID: 20CD8020AFA88D69
7 changed files with 14 additions and 15 deletions

View file

@ -1556,7 +1556,7 @@ impl IndexScheduler {
drop(rtxn);
// 1. store the starting date with the bitmap of processing tasks.
let ids = batch.ids();
let mut ids = batch.ids();
let processed_tasks = ids.len();
// We reset the must_stop flag to be sure that we don't stop processing tasks
@ -1593,6 +1593,7 @@ impl IndexScheduler {
processing_batch.finished();
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
let mut canceled = RoaringBitmap::new();
match res {
Ok(tasks) => {
@ -1602,7 +1603,6 @@ impl IndexScheduler {
let mut success = 0;
let mut failure = 0;
let mut canceled_by = None;
let mut canceled = RoaringBitmap::new();
#[allow(unused_variables)]
for (i, mut task) in tasks.into_iter().enumerate() {
@ -1693,9 +1693,12 @@ impl IndexScheduler {
}
}
let processed = self.processing_tasks.write().unwrap().stop_processing();
self.processing_tasks.write().unwrap().stop_processing();
// We must re-add the canceled task so they're part of the same batch.
// processed.processing |= canceled;
ids |= canceled;
self.write_batch(&mut wtxn, processing_batch, &processed.processing)?;
self.write_batch(&mut wtxn, processing_batch, &ids)?;
#[cfg(test)]
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;
@ -1725,7 +1728,7 @@ impl IndexScheduler {
})?;
// We shouldn't crash the tick function if we can't send data to the webhook.
let _ = self.notify_webhook(&processed.processing);
let _ = self.notify_webhook(&ids);
#[cfg(test)]
self.breakpoint(Breakpoint::AfterProcessing);