refactor the Task a little bit

This commit is contained in:
Tamo 2022-10-12 03:21:25 +02:00 committed by Clément Renault
parent e6c033bd6b
commit e533e740d4
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
17 changed files with 1358 additions and 833 deletions

View file

@ -11,8 +11,7 @@ pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32;
pub use error::Error;
use meilisearch_types::tasks::{Kind, Status, TaskView};
pub use task::KindWithContent;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
@ -31,7 +30,6 @@ use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{Index, RoaringBitmapCodec, BEU32};
use crate::index_mapper::IndexMapper;
use crate::task::Task;
const DEFAULT_LIMIT: fn() -> u32 = || 20;
@ -246,7 +244,7 @@ impl IndexScheduler {
}
/// Returns the tasks corresponding to the query.
pub fn get_tasks(&self, query: Query) -> Result<Vec<TaskView>> {
pub fn get_tasks(&self, query: Query) -> Result<Vec<Task>> {
let rtxn = self.env.read_txn()?;
let last_task_id = match self.last_task_id(&rtxn)? {
Some(tid) => query.from.map(|from| from.min(tid)).unwrap_or(tid),
@ -292,13 +290,13 @@ impl IndexScheduler {
.map_err(|_| Error::CorruptedTaskQueue)?
.clone();
let ret = tasks.into_iter().map(|task| task.as_task_view());
let ret = tasks.into_iter();
if processing.is_empty() {
Ok(ret.collect())
} else {
Ok(ret
.map(|task| match processing.contains(task.uid) {
true => TaskView {
true => Task {
status: Status::Processing,
started_at: Some(started_at),
..task
@ -311,7 +309,7 @@ impl IndexScheduler {
/// Register a new task in the scheduler. If it fails and data was associated with the task
/// it tries to delete the file.
pub fn register(&self, task: KindWithContent) -> Result<TaskView> {
pub fn register(&self, task: KindWithContent) -> Result<Task> {
let mut wtxn = self.env.write_txn()?;
let task = Task {
@ -343,13 +341,10 @@ impl IndexScheduler {
(bitmap.insert(task.uid));
})?;
// we persist the file in last to be sure everything before was applied successfuly
task.persist()?;
match wtxn.commit() {
Ok(()) => (),
e @ Err(_) => {
task.remove_data()?;
todo!("remove the data associated with the task");
e?;
}
}
@ -357,7 +352,7 @@ impl IndexScheduler {
// notify the scheduler loop to execute a new tick
self.wake_up.signal();
Ok(task.as_task_view())
Ok(task)
}
pub fn create_update_file(&self) -> Result<(Uuid, File)> {
@ -416,7 +411,6 @@ impl IndexScheduler {
task.finished_at = Some(finished_at);
// TODO the info field should've been set by the process_batch function
self.update_task(&mut wtxn, &task)?;
task.remove_data()?;
}
}
// In case of a failure we must get back and patch all the tasks with the error.
@ -430,7 +424,6 @@ impl IndexScheduler {
task.error = Some(error.clone());
self.update_task(&mut wtxn, &task)?;
task.remove_data()?;
}
}
}
@ -585,7 +578,7 @@ mod tests {
assert_eq!(task.uid, idx as u32);
assert_eq!(task.status, Status::Enqueued);
assert_eq!(task.kind, k);
assert_eq!(task.kind.as_kind(), k);
}
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));