let you specify your task id

This commit is contained in:
Tamo 2023-09-07 11:16:51 +02:00
parent 15dafde21d
commit 9ee4f55e6c
12 changed files with 655 additions and 346 deletions

View file

@ -7,7 +7,7 @@ use bstr::ByteSlice as _;
use deserr::actix_web::{AwebJson, AwebQueryParameter};
use deserr::Deserr;
use futures::StreamExt;
use index_scheduler::IndexScheduler;
use index_scheduler::{IndexScheduler, TaskId};
use meilisearch_types::deserr::query_params::Param;
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
@ -36,7 +36,7 @@ use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::extractors::payload::Payload;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::{PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
use crate::routes::{get_task_id, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
use crate::search::parse_filter;
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
@ -130,9 +130,10 @@ pub async fn delete_document(
index_uid: index_uid.to_string(),
documents_ids: vec![document_id],
};
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
debug!(returns = ?task, "Delete document");
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
}
@ -277,6 +278,7 @@ pub async fn replace_documents(
analytics.add_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
let uid = get_task_id(&req)?;
let task = document_addition(
extract_mime_type(&req)?,
index_scheduler,
@ -285,6 +287,7 @@ pub async fn replace_documents(
params.csv_delimiter,
body,
IndexDocumentsMethod::ReplaceDocuments,
uid,
allow_index_creation,
)
.await?;
@ -309,6 +312,7 @@ pub async fn update_documents(
analytics.update_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
let uid = get_task_id(&req)?;
let task = document_addition(
extract_mime_type(&req)?,
index_scheduler,
@ -317,6 +321,7 @@ pub async fn update_documents(
params.csv_delimiter,
body,
IndexDocumentsMethod::UpdateDocuments,
uid,
allow_index_creation,
)
.await?;
@ -334,6 +339,7 @@ async fn document_addition(
csv_delimiter: Option<u8>,
mut body: Payload,
method: IndexDocumentsMethod,
task_id: Option<TaskId>,
allow_index_creation: bool,
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
let format = match (
@ -450,7 +456,7 @@ async fn document_addition(
};
let scheduler = index_scheduler.clone();
let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? {
let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id)).await? {
Ok(task) => task,
Err(e) => {
index_scheduler.delete_update_file(uuid)?;
@ -480,8 +486,9 @@ pub async fn delete_documents_batch(
let task =
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!(returns = ?task, "Delete documents by batch");
Ok(HttpResponse::Accepted().json(task))
@ -516,8 +523,9 @@ pub async fn delete_documents_by_filter(
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!(returns = ?task, "Delete documents by filter");
Ok(HttpResponse::Accepted().json(task))
@ -533,8 +541,9 @@ pub async fn clear_all_documents(
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!(returns = ?task, "Delete all documents");
Ok(HttpResponse::Accepted().json(task))