From df40533741b11362854cea033068b07f83a5e42f Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 10 Feb 2025 14:05:32 +0100 Subject: [PATCH] Expose a route to get the update file content of a task --- crates/index-scheduler/src/error.rs | 4 ++ crates/index-scheduler/src/queue/mod.rs | 6 ++ crates/meilisearch-types/src/error.rs | 1 + crates/meilisearch/src/routes/tasks.rs | 88 ++++++++++++++++++++++++- 4 files changed, 98 insertions(+), 1 deletion(-) diff --git a/crates/index-scheduler/src/error.rs b/crates/index-scheduler/src/error.rs index e749a1bcb..b5072276c 100644 --- a/crates/index-scheduler/src/error.rs +++ b/crates/index-scheduler/src/error.rs @@ -109,6 +109,8 @@ pub enum Error { InvalidIndexUid { index_uid: String }, #[error("Task `{0}` not found.")] TaskNotFound(TaskId), + #[error("Task `{0}` does not provide any content file.")] + TaskFileNotFound(TaskId), #[error("Batch `{0}` not found.")] BatchNotFound(BatchId), #[error("Query parameters to filter the tasks to delete are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.")] @@ -189,6 +191,7 @@ impl Error { | Error::InvalidTaskCanceledBy { .. } | Error::InvalidIndexUid { .. } | Error::TaskNotFound(_) + | Error::TaskFileNotFound(_) | Error::BatchNotFound(_) | Error::TaskDeletionWithEmptyQuery | Error::TaskCancelationWithEmptyQuery @@ -250,6 +253,7 @@ impl ErrorCode for Error { Error::InvalidTaskCanceledBy { .. } => Code::InvalidTaskCanceledBy, Error::InvalidIndexUid { .. } => Code::InvalidIndexUid, Error::TaskNotFound(_) => Code::TaskNotFound, + Error::TaskFileNotFound(_) => Code::TaskFileNotFound, Error::BatchNotFound(_) => Code::BatchNotFound, Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters, Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters, diff --git a/crates/index-scheduler/src/queue/mod.rs b/crates/index-scheduler/src/queue/mod.rs index c6a79fbb2..8850eb8fa 100644 --- a/crates/index-scheduler/src/queue/mod.rs +++ b/crates/index-scheduler/src/queue/mod.rs @@ -8,6 +8,7 @@ mod tasks_test; mod test; use std::collections::BTreeMap; +use std::fs::File as StdFile; use std::time::Duration; use file_store::FileStore; @@ -216,6 +217,11 @@ impl Queue { } } + /// Open and returns the task's content File. + pub fn update_file(&self, uuid: Uuid) -> file_store::Result { + self.file_store.get_update(uuid) + } + /// Delete a file from the index scheduler. /// /// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method. diff --git a/crates/meilisearch-types/src/error.rs b/crates/meilisearch-types/src/error.rs index 5acc8aa27..f64301b8c 100644 --- a/crates/meilisearch-types/src/error.rs +++ b/crates/meilisearch-types/src/error.rs @@ -372,6 +372,7 @@ RemoteRemoteError , System , BAD_GATEWAY ; RemoteTimeout , System , BAD_GATEWAY ; TooManySearchRequests , System , SERVICE_UNAVAILABLE ; TaskNotFound , InvalidRequest , NOT_FOUND ; +TaskFileNotFound , InvalidRequest , NOT_FOUND ; BatchNotFound , InvalidRequest , NOT_FOUND ; TooManyOpenFiles , System , UNPROCESSABLE_ENTITY ; TooManyVectors , InvalidRequest , BAD_REQUEST ; diff --git a/crates/meilisearch/src/routes/tasks.rs b/crates/meilisearch/src/routes/tasks.rs index 90fdc9c16..b0fd0f002 100644 --- a/crates/meilisearch/src/routes/tasks.rs +++ b/crates/meilisearch/src/routes/tasks.rs @@ -16,6 +16,7 @@ use serde::Serialize; use time::format_description::well_known::Rfc3339; use time::macros::format_description; use time::{Date, Duration, OffsetDateTime, Time}; +use tokio::io::AsyncReadExt; use tokio::task; use utoipa::{IntoParams, OpenApi, ToSchema}; @@ -44,7 +45,10 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .route(web::delete().to(SeqHandler(delete_tasks))), ) .service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks)))) - .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))); + .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))) + .service( + web::resource("/{task_id}/file").route(web::get().to(SeqHandler(get_task_update_file))), + ); } #[derive(Debug, Deserr, IntoParams)] @@ -639,6 +643,88 @@ async fn get_task( } } +/// Get a task's update file. +/// +/// Get a [task's file](https://www.meilisearch.com/docs/learn/async/asynchronous_operations). +#[utoipa::path( + get, + path = "/{taskUid}/file", + tag = "Tasks", + security(("Bearer" = ["tasks.get", "tasks.*", "*"])), + params(("taskUid", format = UInt32, example = 0, description = "The task identifier", nullable = false)), + responses( + (status = 200, description = "Task successfully retrieved", body = TaskView, content_type = "application/x-ndjson", example = json!( + { + "uid": 1, + "indexUid": "movies", + "status": "succeeded", + "type": "documentAdditionOrUpdate", + "canceledBy": null, + "details": { + "receivedDocuments": 79000, + "indexedDocuments": 79000 + }, + "error": null, + "duration": "PT1S", + "enqueuedAt": "2021-01-01T09:39:00.000000Z", + "startedAt": "2021-01-01T09:39:01.000000Z", + "finishedAt": "2021-01-01T09:39:02.000000Z" + } + )), + (status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!( + { + "message": "The Authorization header is missing. It must use the bearer authorization method.", + "code": "missing_authorization_header", + "type": "auth", + "link": "https://docs.meilisearch.com/errors#missing_authorization_header" + } + )), + (status = 404, description = "The task uid does not exists", body = ResponseError, content_type = "application/json", example = json!( + { + "message": "Task :taskUid not found.", + "code": "task_not_found", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors/#task_not_found" + } + )) + ) +)] +async fn get_task_update_file( + index_scheduler: GuardedData, Data>, + task_uid: web::Path, +) -> Result { + /// TODO change the example + let task_uid_string = task_uid.into_inner(); + + let task_uid: TaskId = match task_uid_string.parse() { + Ok(id) => id, + Err(_e) => { + return Err(index_scheduler::Error::InvalidTaskUid { task_uid: task_uid_string }.into()) + } + }; + + let query = index_scheduler::Query { uids: Some(vec![task_uid]), ..Query::default() }; + let filters = index_scheduler.filters(); + let (tasks, _) = index_scheduler.get_tasks_from_authorized_indexes(&query, filters)?; + + if let Some(task) = tasks.first() { + match task.content_uuid() { + Some(uuid) => { + // Yes, that's awful to put everything in memory when we could have streamed it from + // disk but it's really (really) complex to do with the current state of async Rust. + let file = index_scheduler.queue.update_file(uuid)?; + let mut tfile = tokio::fs::File::from_std(file); + let mut content = String::new(); + tfile.read_to_string(&mut content).await?; + Ok(HttpResponse::Ok().content_type("application/x-ndjson").body(content)) + } + None => Err(index_scheduler::Error::TaskFileNotFound(task_uid).into()), + } + } else { + Err(index_scheduler::Error::TaskNotFound(task_uid).into()) + } +} + pub enum DeserializeDateOption { Before, After,