Make sure that the index-scheduler tick loop is rerun after processing

This commit is contained in:
Kerollmops 2022-10-10 16:19:23 +02:00 committed by Clément Renault
parent b311eb3bed
commit 19c6f8303f
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -217,7 +217,8 @@ impl IndexScheduler {
run.wake_up.wait(); run.wake_up.wait();
match run.tick() { match run.tick() {
Ok(()) => (), Ok(0) => (),
Ok(_) => run.wake_up.signal(),
Err(e) => log::error!("{}", e), Err(e) => log::error!("{}", e),
} }
}); });
@ -361,14 +362,16 @@ impl IndexScheduler {
} }
/// Create and execute and store the result of one batch of registered tasks. /// Create and execute and store the result of one batch of registered tasks.
fn tick(&self) -> Result<()> { ///
/// Returns the number of processed tasks.
fn tick(&self) -> Result<usize> {
#[cfg(test)] #[cfg(test)]
self.test_breakpoint_sdr.send(Breakpoint::Start).unwrap(); self.test_breakpoint_sdr.send(Breakpoint::Start).unwrap();
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let batch = match self.create_next_batch(&rtxn)? { let batch = match self.create_next_batch(&rtxn)? {
Some(batch) => batch, Some(batch) => batch,
None => return Ok(()), None => return Ok(0),
}; };
// we don't need this transaction any longer. // we don't need this transaction any longer.
drop(rtxn); drop(rtxn);
@ -376,6 +379,7 @@ impl IndexScheduler {
// 1. store the starting date with the bitmap of processing tasks. // 1. store the starting date with the bitmap of processing tasks.
let mut ids = batch.ids(); let mut ids = batch.ids();
ids.sort_unstable(); ids.sort_unstable();
let processed_tasks = ids.len();
let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap(); let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap();
let started_at = OffsetDateTime::now_utc(); let started_at = OffsetDateTime::now_utc();
*self.processing_tasks.write().unwrap() = (started_at, processing_tasks); *self.processing_tasks.write().unwrap() = (started_at, processing_tasks);
@ -401,8 +405,7 @@ impl IndexScheduler {
for mut task in tasks { for mut task in tasks {
task.started_at = Some(started_at); task.started_at = Some(started_at);
task.finished_at = Some(finished_at); task.finished_at = Some(finished_at);
// the info field should've been set by the process_batch function // TODO the info field should've been set by the process_batch function
self.update_task(&mut wtxn, &task)?; self.update_task(&mut wtxn, &task)?;
task.remove_data()?; task.remove_data()?;
} }
@ -433,7 +436,7 @@ impl IndexScheduler {
.send(Breakpoint::AfterProcessing) .send(Breakpoint::AfterProcessing)
.unwrap(); .unwrap();
Ok(()) Ok(processed_tasks)
} }
/// Notify the scheduler there is or may be work to do. /// Notify the scheduler there is or may be work to do.