write most of the run loop

This commit is contained in:
Tamo 2022-09-16 01:58:08 +02:00 committed by Clément Renault
parent 4846a7c501
commit 1ea9c0b4c0
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 80 additions and 35 deletions

View File

@ -1,7 +1,7 @@
use crate::{ use crate::{
autobatcher::BatchKind, autobatcher::BatchKind,
task::{Kind, KindWithContent, Status, Task}, task::{Kind, KindWithContent, Status, Task},
Error, IndexScheduler, Result, Error, IndexScheduler, Result, TaskId,
}; };
use index::{Settings, Unchecked}; use index::{Settings, Unchecked};
use milli::{ use milli::{
@ -33,6 +33,26 @@ pub(crate) enum Batch {
}, },
} }
impl Batch {
pub fn ids(&self) -> Vec<TaskId> {
match self {
Batch::Cancel(task) => vec![task.uid],
Batch::Snapshot(tasks) | Batch::Dump(tasks) | Batch::DocumentAddition { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect()
}
Batch::SettingsAndDocumentAddition {
document_addition_tasks,
settings_tasks,
..
} => document_addition_tasks
.iter()
.chain(settings_tasks)
.map(|task| task.uid)
.collect(),
}
}
}
impl IndexScheduler { impl IndexScheduler {
pub(crate) fn create_next_batch_index( pub(crate) fn create_next_batch_index(
&self, &self,

View File

@ -1,10 +1,11 @@
use crate::index_mapper::IndexMapper; use crate::index_mapper::IndexMapper;
use crate::task::{Kind, KindWithContent, Status, Task, TaskView}; use crate::task::{Kind, KindWithContent, Status, Task, TaskView};
use crate::Result; use crate::{Error, Result};
use file_store::FileStore; use file_store::FileStore;
use index::Index; use index::Index;
use milli::update::IndexerConfig; use milli::update::IndexerConfig;
use synchronoise::SignalEvent; use synchronoise::SignalEvent;
use time::OffsetDateTime;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
@ -45,8 +46,8 @@ pub mod db_name {
/// 2. Schedule the tasks. /// 2. Schedule the tasks.
#[derive(Clone)] #[derive(Clone)]
pub struct IndexScheduler { pub struct IndexScheduler {
/// The list of tasks currently processing. /// The list of tasks currently processing and their starting date.
pub(crate) processing_tasks: Arc<RwLock<RoaringBitmap>>, pub(crate) processing_tasks: Arc<RwLock<(OffsetDateTime, RoaringBitmap)>>,
pub(crate) file_store: FileStore, pub(crate) file_store: FileStore,
@ -89,9 +90,11 @@ impl IndexScheduler {
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
let wake_up = SignalEvent::auto(true); let wake_up = SignalEvent::auto(true);
let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new());
Ok(Self { Ok(Self {
// by default there is no processing tasks // by default there is no processing tasks
processing_tasks: Arc::default(), processing_tasks: Arc::new(RwLock::new(processing_tasks)),
file_store: FileStore::new(update_file_path)?, file_store: FileStore::new(update_file_path)?,
all_tasks: env.create_database(Some(db_name::ALL_TASKS))?, all_tasks: env.create_database(Some(db_name::ALL_TASKS))?,
status: env.create_database(Some(db_name::STATUS))?, status: env.create_database(Some(db_name::STATUS))?,
@ -201,38 +204,63 @@ impl IndexScheduler {
} }
/// This worker function must be run in a different thread and must be run only once. /// This worker function must be run in a different thread and must be run only once.
fn run(&self) -> ! { pub fn run(&self) -> ! {
loop { loop {
self.wake_up.wait(); self.wake_up.wait();
self.tick() match self.tick() {
Ok(()) => (),
Err(e) => log::error!("{}", e),
}
} }
} }
/// Create and execute and store the result of one batch of registered tasks. /// Create and execute and store the result of one batch of registered tasks.
fn tick(&self) { fn tick(&self) -> Result<()> {
let mut wtxn = match self.env.write_txn() { let mut wtxn = self.env.write_txn()?;
Ok(wtxn) => wtxn, let batch = match self.create_next_batch(&wtxn)? {
Err(e) => { Some(batch) => batch,
log::error!("{}", e); None => return Ok(()),
return;
}
}; };
let batch = match self.create_next_batch(&wtxn) {
Ok(Some(batch)) => batch, // 1. store the starting date with the bitmap of processing tasks.
Ok(None) => return, let mut ids = batch.ids();
Err(e) => { ids.sort_unstable();
log::error!("{}", e); let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap();
return; let started_at = OffsetDateTime::now_utc();
*self.processing_tasks.write().unwrap() = (started_at, processing_tasks);
// 2. process the tasks
let res = self.process_batch(&mut wtxn, batch);
let finished_at = OffsetDateTime::now_utc();
match res {
Ok(tasks) => {
for mut task in tasks {
task.started_at = Some(started_at);
task.finished_at = Some(finished_at);
task.status = Status::Succeeded;
// the info field should've been set by the process_batch function
self.update_task(&mut wtxn, &task)?;
task.remove_data()?;
} }
}; }
// 1. store the starting date with the bitmap of processing tasks // In case of a failure we must get back and patch all the tasks with the error.
// 2. update the tasks with a starting date *but* do not write anything on disk Err(_err) => {
for id in ids {
let mut task = self.get_task(&wtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
task.started_at = Some(started_at);
task.finished_at = Some(finished_at);
task.status = Status::Failed;
// TODO: TAMO: set the error correctly
// task.error = Some(err);
// 3. process the tasks self.update_task(&mut wtxn, &task)?;
let _res = self.process_batch(&mut wtxn, batch); task.remove_data()?;
}
// 4. store the updated tasks on disk }
}
// TODO: TAMO: do this later // TODO: TAMO: do this later
// must delete the file on disk // must delete the file on disk
@ -240,13 +268,10 @@ impl IndexScheduler {
// in case of « success » we must update all the task on disk // in case of « success » we must update all the task on disk
// self.handle_batch_result(res); // self.handle_batch_result(res);
match wtxn.commit() { wtxn.commit()?;
Ok(()) => log::info!("A batch of tasks was successfully completed."), log::info!("A batch of tasks was successfully completed.");
Err(e) => {
log::error!("{}", e); Ok(())
return;
}
}
} }
#[cfg(truc)] #[cfg(truc)]

View File

@ -44,7 +44,7 @@ impl IndexScheduler {
.collect::<Result<_>>() .collect::<Result<_>>()
} }
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: Task) -> Result<()> { pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
let old_task = self let old_task = self
.get_task(wtxn, task.uid)? .get_task(wtxn, task.uid)?
.ok_or(Error::CorruptedTaskQueue)?; .ok_or(Error::CorruptedTaskQueue)?;