5326: Expose a route to get the file content associated with a task r=Kerollmops a=Kerollmops

This PR exposes a new `/tasks/{taskUid}/documents` route, exposing the update file associated with a task.

## To Do
- [x] (optional) Change the route to `/tasks/{taskUid}/documents` `@dureuill.`
- [x] Update Open API example.
- [x] Create [an Experimental Feature Discussion](https://github.com/orgs/meilisearch/discussions/808).
- [x] Make this route experimental and enable it via the experimental route.

Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: Clément Renault <clement@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2025-02-10 16:50:13 +00:00 committed by GitHub
commit 6c9409edf8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 137 additions and 10 deletions

View File

@ -109,6 +109,8 @@ pub enum Error {
InvalidIndexUid { index_uid: String },
#[error("Task `{0}` not found.")]
TaskNotFound(TaskId),
#[error("Task `{0}` does not contain any documents. Only `documentAdditionOrUpdate` tasks with the statuses `enqueued` or `processing` contain documents")]
TaskFileNotFound(TaskId),
#[error("Batch `{0}` not found.")]
BatchNotFound(BatchId),
#[error("Query parameters to filter the tasks to delete are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.")]
@ -189,6 +191,7 @@ impl Error {
| Error::InvalidTaskCanceledBy { .. }
| Error::InvalidIndexUid { .. }
| Error::TaskNotFound(_)
| Error::TaskFileNotFound(_)
| Error::BatchNotFound(_)
| Error::TaskDeletionWithEmptyQuery
| Error::TaskCancelationWithEmptyQuery
@ -250,6 +253,7 @@ impl ErrorCode for Error {
Error::InvalidTaskCanceledBy { .. } => Code::InvalidTaskCanceledBy,
Error::InvalidIndexUid { .. } => Code::InvalidIndexUid,
Error::TaskNotFound(_) => Code::TaskNotFound,
Error::TaskFileNotFound(_) => Code::TaskFileNotFound,
Error::BatchNotFound(_) => Code::BatchNotFound,
Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters,
Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters,

View File

@ -105,6 +105,19 @@ impl RoFeatures {
.into())
}
}
pub fn check_get_task_documents_route(&self) -> Result<()> {
if self.runtime.get_task_documents_route {
Ok(())
} else {
Err(FeatureNotEnabledError {
disabled_action: "Getting the documents of an enqueued task",
feature: "get task documents route",
issue_link: "https://github.com/orgs/meilisearch/discussions/808",
}
.into())
}
}
}
impl FeatureData {

View File

@ -8,6 +8,7 @@ mod tasks_test;
mod test;
use std::collections::BTreeMap;
use std::fs::File as StdFile;
use std::time::Duration;
use file_store::FileStore;
@ -216,6 +217,11 @@ impl Queue {
}
}
/// Open and returns the task's content File.
pub fn update_file(&self, uuid: Uuid) -> file_store::Result<StdFile> {
self.file_store.get_update(uuid)
}
/// Delete a file from the index scheduler.
///
/// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method.

View File

@ -372,6 +372,7 @@ RemoteRemoteError , System , BAD_GATEWAY ;
RemoteTimeout , System , BAD_GATEWAY ;
TooManySearchRequests , System , SERVICE_UNAVAILABLE ;
TaskNotFound , InvalidRequest , NOT_FOUND ;
TaskFileNotFound , InvalidRequest , NOT_FOUND ;
BatchNotFound , InvalidRequest , NOT_FOUND ;
TooManyOpenFiles , System , UNPROCESSABLE_ENTITY ;
TooManyVectors , InvalidRequest , BAD_REQUEST ;

View File

@ -10,6 +10,7 @@ pub struct RuntimeTogglableFeatures {
pub edit_documents_by_function: bool,
pub contains_filter: bool,
pub network: bool,
pub get_task_documents_route: bool,
}
#[derive(Default, Debug, Clone, Copy)]

View File

@ -197,6 +197,7 @@ struct Infos {
experimental_max_number_of_batched_tasks: usize,
experimental_limit_batched_tasks_total_size: u64,
experimental_network: bool,
experimental_get_task_documents_route: bool,
gpu_enabled: bool,
db_path: bool,
import_dump: bool,
@ -288,6 +289,7 @@ impl Infos {
edit_documents_by_function,
contains_filter,
network,
get_task_documents_route,
} = features;
// We're going to override every sensible information.
@ -306,6 +308,7 @@ impl Infos {
experimental_enable_logs_route: experimental_enable_logs_route | logs_route,
experimental_reduce_indexing_memory_usage,
experimental_network: network,
experimental_get_task_documents_route: get_task_documents_route,
gpu_enabled: meilisearch_types::milli::vector::is_cuda_enabled(),
db_path: db_path != PathBuf::from("./data.ms"),
import_dump: import_dump.is_some(),

View File

@ -51,6 +51,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
edit_documents_by_function: Some(false),
contains_filter: Some(false),
network: Some(false),
get_task_documents_route: Some(false),
})),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
{
@ -91,6 +92,8 @@ pub struct RuntimeTogglableFeatures {
pub contains_filter: Option<bool>,
#[deserr(default)]
pub network: Option<bool>,
#[deserr(default)]
pub get_task_documents_route: Option<bool>,
}
impl From<meilisearch_types::features::RuntimeTogglableFeatures> for RuntimeTogglableFeatures {
@ -101,6 +104,7 @@ impl From<meilisearch_types::features::RuntimeTogglableFeatures> for RuntimeTogg
edit_documents_by_function,
contains_filter,
network,
get_task_documents_route,
} = value;
Self {
@ -109,6 +113,7 @@ impl From<meilisearch_types::features::RuntimeTogglableFeatures> for RuntimeTogg
edit_documents_by_function: Some(edit_documents_by_function),
contains_filter: Some(contains_filter),
network: Some(network),
get_task_documents_route: Some(get_task_documents_route),
}
}
}
@ -120,6 +125,7 @@ pub struct PatchExperimentalFeatureAnalytics {
edit_documents_by_function: bool,
contains_filter: bool,
network: bool,
get_task_documents_route: bool,
}
impl Aggregate for PatchExperimentalFeatureAnalytics {
@ -134,6 +140,7 @@ impl Aggregate for PatchExperimentalFeatureAnalytics {
edit_documents_by_function: new.edit_documents_by_function,
contains_filter: new.contains_filter,
network: new.network,
get_task_documents_route: new.get_task_documents_route,
})
}
@ -157,6 +164,7 @@ impl Aggregate for PatchExperimentalFeatureAnalytics {
edit_documents_by_function: Some(false),
contains_filter: Some(false),
network: Some(false),
get_task_documents_route: Some(false),
})),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
{
@ -190,6 +198,10 @@ async fn patch_features(
.unwrap_or(old_features.edit_documents_by_function),
contains_filter: new_features.0.contains_filter.unwrap_or(old_features.contains_filter),
network: new_features.0.network.unwrap_or(old_features.network),
get_task_documents_route: new_features
.0
.get_task_documents_route
.unwrap_or(old_features.get_task_documents_route),
};
// explicitly destructure for analytics rather than using the `Serialize` implementation, because
@ -201,6 +213,7 @@ async fn patch_features(
edit_documents_by_function,
contains_filter,
network,
get_task_documents_route,
} = new_features;
analytics.publish(
@ -210,6 +223,7 @@ async fn patch_features(
edit_documents_by_function,
contains_filter,
network,
get_task_documents_route,
},
&req,
);

View File

@ -1,3 +1,5 @@
use std::io::ErrorKind;
use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse};
use deserr::actix_web::AwebQueryParameter;
@ -16,6 +18,7 @@ use serde::Serialize;
use time::format_description::well_known::Rfc3339;
use time::macros::format_description;
use time::{Date, Duration, OffsetDateTime, Time};
use tokio::io::AsyncReadExt;
use tokio::task;
use utoipa::{IntoParams, OpenApi, ToSchema};
@ -44,7 +47,11 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.route(web::delete().to(SeqHandler(delete_tasks))),
)
.service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks))))
.service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task))));
.service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task))))
.service(
web::resource("/{task_id}/documents")
.route(web::get().to(SeqHandler(get_task_documents_file))),
);
}
#[derive(Debug, Deserr, IntoParams)]
@ -639,6 +646,76 @@ async fn get_task(
}
}
/// Get a task's documents.
///
/// Get a [task's documents file](https://www.meilisearch.com/docs/learn/async/asynchronous_operations).
#[utoipa::path(
get,
path = "/{taskUid}/documents",
tag = "Tasks",
security(("Bearer" = ["tasks.get", "tasks.*", "*"])),
params(("taskUid", format = UInt32, example = 0, description = "The task identifier", nullable = false)),
responses(
(status = 200, description = "The content of the task update", body = serde_json::Value, content_type = "application/x-ndjson"),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
{
"message": "The Authorization header is missing. It must use the bearer authorization method.",
"code": "missing_authorization_header",
"type": "auth",
"link": "https://docs.meilisearch.com/errors#missing_authorization_header"
}
)),
(status = 404, description = "The task uid does not exists", body = ResponseError, content_type = "application/json", example = json!(
{
"message": "Task :taskUid not found.",
"code": "task_not_found",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors/#task_not_found"
}
))
)
)]
async fn get_task_documents_file(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, Data<IndexScheduler>>,
task_uid: web::Path<String>,
) -> Result<HttpResponse, ResponseError> {
index_scheduler.features().check_get_task_documents_route()?;
let task_uid_string = task_uid.into_inner();
let task_uid: TaskId = match task_uid_string.parse() {
Ok(id) => id,
Err(_e) => {
return Err(index_scheduler::Error::InvalidTaskUid { task_uid: task_uid_string }.into())
}
};
let query = index_scheduler::Query { uids: Some(vec![task_uid]), ..Query::default() };
let filters = index_scheduler.filters();
let (tasks, _) = index_scheduler.get_tasks_from_authorized_indexes(&query, filters)?;
if let Some(task) = tasks.first() {
match task.content_uuid() {
Some(uuid) => {
let mut tfile = match index_scheduler.queue.update_file(uuid) {
Ok(file) => tokio::fs::File::from_std(file),
Err(file_store::Error::IoError(e)) if e.kind() == ErrorKind::NotFound => {
return Err(index_scheduler::Error::TaskFileNotFound(task_uid).into())
}
Err(e) => return Err(e.into()),
};
// Yes, that's awful to put everything in memory when we could have streamed it from
// disk but it's really (really) complex to do with the current state of async Rust.
let mut content = String::new();
tfile.read_to_string(&mut content).await?;
Ok(HttpResponse::Ok().content_type("application/x-ndjson").body(content))
}
None => Err(index_scheduler::Error::TaskFileNotFound(task_uid).into()),
}
} else {
Err(index_scheduler::Error::TaskNotFound(task_uid).into())
}
}
pub enum DeserializeDateOption {
Before,
After,

View File

@ -1909,7 +1909,8 @@ async fn import_dump_v6_containing_experimental_features() {
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false,
"network": false
"network": false,
"getTaskDocumentsRoute": false
}
"###);
@ -2071,7 +2072,8 @@ async fn generate_and_import_dump_containing_vectors() {
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false,
"network": false
"network": false,
"getTaskDocumentsRoute": false
}
"###);

View File

@ -22,7 +22,8 @@ async fn experimental_features() {
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false,
"network": false
"network": false,
"getTaskDocumentsRoute": false
}
"###);
@ -35,7 +36,8 @@ async fn experimental_features() {
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false,
"network": false
"network": false,
"getTaskDocumentsRoute": false
}
"###);
@ -48,7 +50,8 @@ async fn experimental_features() {
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false,
"network": false
"network": false,
"getTaskDocumentsRoute": false
}
"###);
@ -62,7 +65,8 @@ async fn experimental_features() {
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false,
"network": false
"network": false,
"getTaskDocumentsRoute": false
}
"###);
@ -76,7 +80,8 @@ async fn experimental_features() {
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false,
"network": false
"network": false,
"getTaskDocumentsRoute": false
}
"###);
}
@ -97,7 +102,8 @@ async fn experimental_feature_metrics() {
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false,
"network": false
"network": false,
"getTaskDocumentsRoute": false
}
"###);
@ -152,7 +158,7 @@ async fn errors() {
meili_snap::snapshot!(code, @"400 Bad Request");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Unknown field `NotAFeature`: expected one of `metrics`, `logsRoute`, `editDocumentsByFunction`, `containsFilter`, `network`",
"message": "Unknown field `NotAFeature`: expected one of `metrics`, `logsRoute`, `editDocumentsByFunction`, `containsFilter`, `network`, `getTaskDocumentsRoute`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"