diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index b1d727c2f..be24e8467 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -20,6 +20,7 @@ use meilisearch_types::{ use roaring::RoaringBitmap; use uuid::Uuid; +#[derive(Debug)] pub(crate) enum Batch { Cancel(Task), TaskDeletion(Task), @@ -42,6 +43,7 @@ pub(crate) enum Batch { }, } +#[derive(Debug)] pub(crate) enum IndexOperation { DocumentImport { index_uid: String, @@ -381,7 +383,7 @@ impl IndexScheduler { } // 2. we get the next task to delete - let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)?; + let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued; if let Some(task_id) = to_delete.min() { let task = self .get_task(rtxn, task_id)? diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 7c3d81b0e..174cc847f 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -30,13 +30,10 @@ use meilisearch_types::milli::{Index, RoaringBitmapCodec, BEU32}; use crate::index_mapper::IndexMapper; -const DEFAULT_LIMIT: fn() -> u32 = || 20; - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct Query { - #[serde(default = "DEFAULT_LIMIT")] - pub limit: u32, + pub limit: Option, pub from: Option, pub status: Option>, #[serde(rename = "type")] @@ -48,7 +45,7 @@ pub struct Query { impl Default for Query { fn default() -> Self { Self { - limit: DEFAULT_LIMIT(), + limit: None, from: None, status: None, kind: None, @@ -96,7 +93,10 @@ impl Query { } pub fn with_limit(self, limit: u32) -> Self { - Self { limit, ..self } + Self { + limit: Some(limit), + ..self + } } } @@ -245,13 +245,20 @@ impl IndexScheduler { /// Return the task ids corresponding to the query pub fn get_task_ids(&self, query: &Query) -> Result { let rtxn = self.env.read_txn()?; - let last_task_id = match self.last_task_id(&rtxn)? { - Some(tid) => query.from.map(|from| from.min(tid)).unwrap_or(tid), - None => return Ok(RoaringBitmap::new()), - }; // This is the list of all the tasks. - let mut tasks = RoaringBitmap::from_sorted_iter(0..last_task_id).unwrap(); + let mut tasks = { + let mut all_tasks = RoaringBitmap::new(); + for status in [ + Status::Enqueued, + Status::Processing, + Status::Succeeded, + Status::Failed, + ] { + all_tasks |= self.get_status(&rtxn, status)?; + } + all_tasks + }; if let Some(uids) = &query.uid { tasks &= RoaringBitmap::from_iter(uids); @@ -289,8 +296,14 @@ impl IndexScheduler { let rtxn = self.env.read_txn()?; let tasks = self.get_task_ids(&query)?; - let tasks = - self.get_existing_tasks(&rtxn, tasks.into_iter().rev().take(query.limit as usize))?; + let tasks = self.get_existing_tasks( + &rtxn, + tasks + .into_iter() + .rev() + .take(query.limit.unwrap_or(u32::MAX) as usize), + )?; + let (started_at, processing) = self .processing_tasks .read() diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index 021ba1e68..a222604a5 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -21,9 +21,12 @@ use super::fold_star_or; const DEFAULT_LIMIT: fn() -> u32 = || 20; pub fn configure(cfg: &mut web::ServiceConfig) { - cfg.service(web::resource("").route(web::get().to(SeqHandler(get_tasks)))) - .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))) - .service(web::resource("").route(web::delete().to(SeqHandler(delete_tasks)))); + cfg.service( + web::resource("") + .route(web::get().to(SeqHandler(get_tasks))) + .route(web::delete().to(SeqHandler(delete_tasks))), + ) + .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))); } #[derive(Debug, Clone, PartialEq, Serialize)] @@ -63,8 +66,8 @@ pub struct TaskView { pub finished_at: Option, } -impl From for TaskView { - fn from(task: Task) -> Self { +impl TaskView { + fn from_task(task: &Task) -> TaskView { TaskView { uid: task.uid, index_uid: task @@ -72,7 +75,7 @@ impl From for TaskView { .and_then(|vec| vec.first().map(|i| i.to_string())), status: task.status, kind: task.kind.as_kind(), - details: task.details.map(DetailsView::from), + details: task.details.clone().map(DetailsView::from), error: task.error.clone(), duration: task .started_at @@ -172,38 +175,44 @@ pub struct TasksFilterQuery { from: Option, } -#[rustfmt::skip] -fn task_type_matches_content(type_: &TaskType, content: &TaskContent) -> bool { - matches!((type_, content), - (TaskType::IndexCreation, TaskContent::IndexCreation { .. }) - | (TaskType::IndexUpdate, TaskContent::IndexUpdate { .. }) - | (TaskType::IndexDeletion, TaskContent::IndexDeletion { .. }) - | (TaskType::DocumentAdditionOrUpdate, TaskContent::DocumentAddition { .. }) - | (TaskType::DocumentDeletion, TaskContent::DocumentDeletion{ .. }) - | (TaskType::SettingsUpdate, TaskContent::SettingsUpdate { .. }) - | (TaskType::DumpCreation, TaskContent::Dump { .. }) - ) -} - -#[rustfmt::skip] -fn task_status_matches_events(status: &TaskStatus, events: &[TaskEvent]) -> bool { - events.last().map_or(false, |event| { - matches!((status, event), - (TaskStatus::Enqueued, TaskEvent::Created(_)) - | (TaskStatus::Processing, TaskEvent::Processing(_) | TaskEvent::Batched { .. }) - | (TaskStatus::Succeeded, TaskEvent::Succeeded { .. }) - | (TaskStatus::Failed, TaskEvent::Failed { .. }), - ) - }) +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub struct TaskDeletionQuery { + #[serde(rename = "type")] + type_: Option>, + uid: Option>, + status: Option>, + index_uid: Option>, } async fn delete_tasks( index_scheduler: GuardedData, Data>, - params: web::Query, + params: web::Query, _req: HttpRequest, _analytics: web::Data, ) -> Result { - let query = params.into_inner(); + let TaskDeletionQuery { + type_, + uid, + status, + index_uid, + } = params.into_inner(); + + let kind: Option> = type_.map(|x| x.into_iter().collect()); + let uid: Option> = uid.map(|x| x.into_iter().collect()); + let status: Option> = status.map(|x| x.into_iter().collect()); + let index_uid: Option> = + index_uid.map(|x| x.into_iter().map(|x| x.to_string()).collect()); + + let query = Query { + limit: None, + from: None, + status, + kind, + index_uid, + uid, + }; + let filtered_query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query); let tasks = index_scheduler.get_task_ids(&filtered_query)?; @@ -215,7 +224,7 @@ async fn delete_tasks( // TODO: Lo: analytics let task = index_scheduler.register(task_deletion)?; - let task_view: TaskView = task.into(); + let task_view = TaskView::from_task(&task); Ok(HttpResponse::Ok().json(task_view)) } @@ -288,9 +297,13 @@ async fn get_tasks( filters.from = from; // We +1 just to know if there is more after this "page" or not. let limit = limit.saturating_add(1); - filters.limit = limit; + filters.limit = Some(limit); - let mut tasks_results: Vec<_> = index_scheduler.get_tasks(filters)?.into_iter().collect(); + let mut tasks_results: Vec = index_scheduler + .get_tasks(filters)? + .into_iter() + .map(|t| TaskView::from_task(&t)) + .collect(); // If we were able to fetch the number +1 tasks we asked // it means that there is more to come. @@ -338,7 +351,8 @@ async fn get_task( filters.uid = Some(vec![task_id]); if let Some(task) = index_scheduler.get_tasks(filters)?.first() { - Ok(HttpResponse::Ok().json(task)) + let task_view = TaskView::from_task(&task); + Ok(HttpResponse::Ok().json(task_view)) } else { Err(index_scheduler::Error::TaskNotFound(task_id).into()) } @@ -355,8 +369,6 @@ fn filter_out_inaccessible_indexes_from_query( let search_rules = &index_scheduler.filters().search_rules; - let mut query = index_scheduler::Query::default(); - // We filter on potential indexes and make sure that the search filter // restrictions are also applied. match indexes { diff --git a/meilisearch-http/tests/auth/authorization.rs b/meilisearch-http/tests/auth/authorization.rs index 5b23749c5..51df6fb79 100644 --- a/meilisearch-http/tests/auth/authorization.rs +++ b/meilisearch-http/tests/auth/authorization.rs @@ -16,6 +16,7 @@ pub static AUTHORIZATIONS: Lazy hashset!{"documents.get", "documents.*", "*"}, ("DELETE", "/indexes/products/documents/0") => hashset!{"documents.delete", "documents.*", "*"}, ("GET", "/tasks") => hashset!{"tasks.get", "tasks.*", "*"}, + ("DELETE", "/tasks") => hashset!{"tasks.delete", "tasks.*", "*"}, ("GET", "/tasks?indexUid=products") => hashset!{"tasks.get", "tasks.*", "*"}, ("GET", "/tasks/0") => hashset!{"tasks.get", "tasks.*", "*"}, ("PATCH", "/indexes/products/") => hashset!{"indexes.update", "indexes.*", "*"}, diff --git a/meilisearch-types/src/tasks.rs b/meilisearch-types/src/tasks.rs index 1828cdbcd..d7b59717a 100644 --- a/meilisearch-types/src/tasks.rs +++ b/meilisearch-types/src/tasks.rs @@ -16,7 +16,7 @@ use crate::{ pub type TaskId = u32; -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Task { pub uid: TaskId, @@ -73,7 +73,7 @@ impl Task { } } -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum KindWithContent { DocumentImport {