From 8509243e682b0317630f491cdd2605f424d3eaa3 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 18 May 2022 12:07:06 +0200 Subject: [PATCH] Implement the status and type filtering on the tasks route --- meilisearch-http/src/routes/tasks.rs | 85 ++++++++++++++++++++++++-- meilisearch-http/src/task.rs | 21 +++++-- meilisearch-http/tests/common/index.rs | 11 ++++ meilisearch-http/tests/tasks/mod.rs | 79 ++++++++++++++++++++++++ 4 files changed, 185 insertions(+), 11 deletions(-) diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index 64929d5e0..02f700ccd 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -1,6 +1,7 @@ use actix_web::{web, HttpRequest, HttpResponse}; use meilisearch_error::ResponseError; -use meilisearch_lib::tasks::task::TaskId; +use meilisearch_lib::milli::update::IndexDocumentsMethod; +use meilisearch_lib::tasks::task::{DocumentDeletion, TaskContent, TaskEvent, TaskId}; use meilisearch_lib::tasks::TaskFilter; use meilisearch_lib::{IndexUid, MeiliSearch}; use serde::Deserialize; @@ -19,16 +20,51 @@ pub fn configure(cfg: &mut web::ServiceConfig) { #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase", deny_unknown_fields)] -pub struct TasksFilter { +pub struct TaskFilterQuery { #[serde(rename = "type")] type_: Option>, status: Option>, index_uid: Option>, } +#[rustfmt::skip] +fn task_type_matches_content(type_: &TaskType, content: &TaskContent) -> bool { + matches!((type_, content), + (TaskType::IndexCreation, TaskContent::IndexCreation { .. }) + | (TaskType::IndexUpdate, TaskContent::IndexUpdate { .. }) + | (TaskType::IndexDeletion, TaskContent::IndexDeletion) + | (TaskType::DocumentAddition, TaskContent::DocumentAddition { + merge_strategy: IndexDocumentsMethod::ReplaceDocuments, + .. + }) + | (TaskType::DocumentPartial, TaskContent::DocumentAddition { + merge_strategy: IndexDocumentsMethod::UpdateDocuments, + .. + }) + | (TaskType::DocumentDeletion, TaskContent::DocumentDeletion(DocumentDeletion::Ids(_))) + | (TaskType::SettingsUpdate, TaskContent::SettingsUpdate { .. }) + | (TaskType::ClearAll, TaskContent::DocumentDeletion(DocumentDeletion::Clear)) + ) +} + +fn task_status_matches_events(status: &TaskStatus, events: &[TaskEvent]) -> bool { + events.last().map_or(false, |event| { + matches!( + (status, event), + (TaskStatus::Enqueued, TaskEvent::Created(_)) + | ( + TaskStatus::Processing, + TaskEvent::Processing(_) | TaskEvent::Batched { .. } + ) + | (TaskStatus::Succeeded, TaskEvent::Succeded { .. }) + | (TaskStatus::Failed, TaskEvent::Failed { .. }), + ) + }) +} + async fn get_tasks( meilisearch: GuardedData, MeiliSearch>, - params: web::Query, + params: web::Query, req: HttpRequest, analytics: web::Data, ) -> Result { @@ -38,14 +74,17 @@ async fn get_tasks( Some(&req), ); - let TasksFilter { + let TaskFilterQuery { type_, status, index_uid, } = params.into_inner(); let search_rules = &meilisearch.filters().search_rules; - let filters = match index_uid { + + // We first filter on potential indexes and make sure + // that the search filter restrictions are also applied. + let indexes_filters = match index_uid { Some(indexes) => { let mut filters = TaskFilter::default(); for name in indexes.into_inner() { @@ -68,6 +107,42 @@ async fn get_tasks( } }; + // Then we complete the task filter with other potential status and types filters. + let filters = match (type_, status) { + (Some(CS(types)), Some(CS(statuses))) => { + let mut filters = indexes_filters.unwrap_or_default(); + filters.filter_fn(move |task| { + let matches_type = types + .iter() + .any(|t| task_type_matches_content(&t, &task.content)); + let matches_status = statuses + .iter() + .any(|s| task_status_matches_events(&s, &task.events)); + matches_type && matches_status + }); + Some(filters) + } + (Some(CS(types)), None) => { + let mut filters = indexes_filters.unwrap_or_default(); + filters.filter_fn(move |task| { + types + .iter() + .any(|t| task_type_matches_content(&t, &task.content)) + }); + Some(filters) + } + (None, Some(CS(statuses))) => { + let mut filters = indexes_filters.unwrap_or_default(); + filters.filter_fn(move |task| { + statuses + .iter() + .any(|s| task_status_matches_events(&s, &task.events)) + }); + Some(filters) + } + (None, None) => indexes_filters, + }; + let tasks: TaskListView = meilisearch .list_tasks(filters, None, None) .await? diff --git a/meilisearch-http/src/task.rs b/meilisearch-http/src/task.rs index 0c22b8ed6..4ecb6cead 100644 --- a/meilisearch-http/src/task.rs +++ b/meilisearch-http/src/task.rs @@ -52,9 +52,9 @@ impl From for TaskType { } impl FromStr for TaskType { - type Err = &'static str; + type Err = String; - fn from_str(status: &str) -> Result { + fn from_str(status: &str) -> Result { match status { "indexCreation" => Ok(TaskType::IndexCreation), "indexUpdate" => Ok(TaskType::IndexUpdate), @@ -64,7 +64,12 @@ impl FromStr for TaskType { "documentDeletion" => Ok(TaskType::DocumentDeletion), "settingsUpdate" => Ok(TaskType::SettingsUpdate), "clearAll" => Ok(TaskType::ClearAll), - _ => Err("invalid task type value"), + unknown => Err(format!( + "invalid task type `{}` value, expecting one of: \ + indexCreation, indexUpdate, indexDeletion, documentAddition, \ + documentPartial, documentDeletion, settingsUpdate, or clearAll", + unknown + )), } } } @@ -79,15 +84,19 @@ pub enum TaskStatus { } impl FromStr for TaskStatus { - type Err = &'static str; + type Err = String; - fn from_str(status: &str) -> Result { + fn from_str(status: &str) -> Result { match status { "enqueued" => Ok(TaskStatus::Enqueued), "processing" => Ok(TaskStatus::Processing), "succeeded" => Ok(TaskStatus::Succeeded), "failed" => Ok(TaskStatus::Failed), - _ => Err("invalid task status value"), + unknown => Err(format!( + "invalid task status `{}` value, expecting one of: \ + enqueued, processing, succeeded, or failed", + unknown + )), } } } diff --git a/meilisearch-http/tests/common/index.rs b/meilisearch-http/tests/common/index.rs index 9e86ac27e..bdce22db2 100644 --- a/meilisearch-http/tests/common/index.rs +++ b/meilisearch-http/tests/common/index.rs @@ -131,6 +131,17 @@ impl Index<'_> { self.service.get(url).await } + pub async fn filtered_tasks(&self, type_: &[&str], status: &[&str]) -> (Value, StatusCode) { + let mut url = format!("/tasks?indexUid={}", self.uid); + if !type_.is_empty() { + url += &format!("&type={}", type_.join(",")); + } + if !status.is_empty() { + url += &format!("&status={}", status.join(",")); + } + self.service.get(url).await + } + pub async fn get_document( &self, id: u64, diff --git a/meilisearch-http/tests/tasks/mod.rs b/meilisearch-http/tests/tasks/mod.rs index ce0f56eb5..300cddde7 100644 --- a/meilisearch-http/tests/tasks/mod.rs +++ b/meilisearch-http/tests/tasks/mod.rs @@ -59,6 +59,85 @@ async fn list_tasks() { assert_eq!(response["results"].as_array().unwrap().len(), 2); } +#[actix_rt::test] +async fn list_tasks_status_filtered() { + let server = Server::new().await; + let index = server.index("test"); + index.create(None).await; + index.wait_task(0).await; + index + .add_documents( + serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), + None, + ) + .await; + + let (response, code) = index.filtered_tasks(&[], &["succeeded"]).await; + assert_eq!(code, 200, "{}", response); + assert_eq!(response["results"].as_array().unwrap().len(), 1); + + let (response, code) = index.filtered_tasks(&[], &["processing"]).await; + assert_eq!(code, 200, "{}", response); + assert_eq!(response["results"].as_array().unwrap().len(), 1); + + index.wait_task(1).await; + + let (response, code) = index.filtered_tasks(&[], &["succeeded"]).await; + assert_eq!(code, 200, "{}", response); + assert_eq!(response["results"].as_array().unwrap().len(), 2); +} + +#[actix_rt::test] +async fn list_tasks_type_filtered() { + let server = Server::new().await; + let index = server.index("test"); + index.create(None).await; + index.wait_task(0).await; + index + .add_documents( + serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), + None, + ) + .await; + + let (response, code) = index.filtered_tasks(&["indexCreation"], &[]).await; + assert_eq!(code, 200, "{}", response); + assert_eq!(response["results"].as_array().unwrap().len(), 1); + + let (response, code) = index + .filtered_tasks(&["indexCreation", "documentAddition"], &[]) + .await; + assert_eq!(code, 200, "{}", response); + assert_eq!(response["results"].as_array().unwrap().len(), 2); +} + +#[actix_rt::test] +async fn list_tasks_status_and_type_filtered() { + let server = Server::new().await; + let index = server.index("test"); + index.create(None).await; + index.wait_task(0).await; + index + .add_documents( + serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), + None, + ) + .await; + + let (response, code) = index.filtered_tasks(&["indexCreation"], &["failed"]).await; + assert_eq!(code, 200, "{}", response); + assert_eq!(response["results"].as_array().unwrap().len(), 0); + + let (response, code) = index + .filtered_tasks( + &["indexCreation", "documentAddition"], + &["succeeded", "processing"], + ) + .await; + assert_eq!(code, 200, "{}", response); + assert_eq!(response["results"].as_array().unwrap().len(), 2); +} + macro_rules! assert_valid_summarized_task { ($response:expr, $task_type:literal, $index:literal) => {{ assert_eq!($response.as_object().unwrap().len(), 5);