mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 06:44:27 +01:00
implements the get_tasks
This commit is contained in:
parent
19154e48fe
commit
cb4feabca2
@ -31,6 +31,7 @@ pub struct Query {
|
|||||||
#[serde(rename = "type")]
|
#[serde(rename = "type")]
|
||||||
pub kind: Option<Vec<Kind>>,
|
pub kind: Option<Vec<Kind>>,
|
||||||
pub index_uid: Option<Vec<String>>,
|
pub index_uid: Option<Vec<String>>,
|
||||||
|
pub uid: Option<Vec<TaskId>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Query {
|
impl Default for Query {
|
||||||
@ -41,6 +42,7 @@ impl Default for Query {
|
|||||||
status: None,
|
status: None,
|
||||||
kind: None,
|
kind: None,
|
||||||
index_uid: None,
|
index_uid: None,
|
||||||
|
uid: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -72,6 +74,15 @@ impl Query {
|
|||||||
..self
|
..self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn with_uid(self, uid: TaskId) -> Self {
|
||||||
|
let mut task_vec = self.uid.unwrap_or_default();
|
||||||
|
task_vec.push(uid);
|
||||||
|
Self {
|
||||||
|
uid: Some(task_vec),
|
||||||
|
..self
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub mod db_name {
|
pub mod db_name {
|
||||||
@ -172,7 +183,11 @@ impl IndexScheduler {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// This is the list of all the tasks.
|
// This is the list of all the tasks.
|
||||||
let mut tasks = RoaringBitmap::from_iter(0..last_task_id);
|
let mut tasks = RoaringBitmap::from_sorted_iter(0..last_task_id).unwrap();
|
||||||
|
|
||||||
|
if let Some(uids) = query.uid {
|
||||||
|
tasks &= RoaringBitmap::from_iter(uids);
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(status) = query.status {
|
if let Some(status) = query.status {
|
||||||
let mut status_tasks = RoaringBitmap::new();
|
let mut status_tasks = RoaringBitmap::new();
|
||||||
@ -256,6 +271,10 @@ impl IndexScheduler {
|
|||||||
Ok(self.file_store.new_update()?)
|
Ok(self.file_store.new_update()?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn delete_update_file(&self, uuid: Uuid) -> Result<()> {
|
||||||
|
Ok(self.file_store.delete(uuid)?)
|
||||||
|
}
|
||||||
|
|
||||||
/// This worker function must be run in a different thread and must be run only once.
|
/// This worker function must be run in a different thread and must be run only once.
|
||||||
pub fn run(&self) -> ! {
|
pub fn run(&self) -> ! {
|
||||||
loop {
|
loop {
|
||||||
|
@ -18,16 +18,27 @@ pub struct TaskView {
|
|||||||
#[serde(rename = "type")]
|
#[serde(rename = "type")]
|
||||||
pub kind: Kind,
|
pub kind: Kind,
|
||||||
|
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub details: Option<Details>,
|
pub details: Option<Details>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub error: Option<ResponseError>,
|
pub error: Option<ResponseError>,
|
||||||
|
|
||||||
#[serde(serialize_with = "serialize_duration")]
|
#[serde(
|
||||||
|
serialize_with = "serialize_duration",
|
||||||
|
skip_serializing_if = "Option::is_none"
|
||||||
|
)]
|
||||||
pub duration: Option<Duration>,
|
pub duration: Option<Duration>,
|
||||||
#[serde(with = "time::serde::rfc3339")]
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
pub enqueued_at: OffsetDateTime,
|
pub enqueued_at: OffsetDateTime,
|
||||||
#[serde(with = "time::serde::rfc3339::option")]
|
#[serde(
|
||||||
|
with = "time::serde::rfc3339::option",
|
||||||
|
skip_serializing_if = "Option::is_none"
|
||||||
|
)]
|
||||||
pub started_at: Option<OffsetDateTime>,
|
pub started_at: Option<OffsetDateTime>,
|
||||||
#[serde(with = "time::serde::rfc3339::option")]
|
#[serde(
|
||||||
|
with = "time::serde::rfc3339::option",
|
||||||
|
skip_serializing_if = "Option::is_none"
|
||||||
|
)]
|
||||||
pub finished_at: Option<OffsetDateTime>,
|
pub finished_at: Option<OffsetDateTime>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
use actix_web as aweb;
|
use actix_web as aweb;
|
||||||
use aweb::error::{JsonPayloadError, QueryPayloadError};
|
use aweb::error::{JsonPayloadError, QueryPayloadError};
|
||||||
|
use document_formats::DocumentFormatError;
|
||||||
|
use meilisearch_lib::IndexControllerError;
|
||||||
use meilisearch_types::error::{Code, ErrorCode, ResponseError};
|
use meilisearch_types::error::{Code, ErrorCode, ResponseError};
|
||||||
|
use tokio::task::JoinError;
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum MeilisearchHttpError {
|
pub enum MeilisearchHttpError {
|
||||||
@ -12,6 +15,16 @@ pub enum MeilisearchHttpError {
|
|||||||
.1.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", ")
|
.1.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", ")
|
||||||
)]
|
)]
|
||||||
InvalidContentType(String, Vec<String>),
|
InvalidContentType(String, Vec<String>),
|
||||||
|
#[error(transparent)]
|
||||||
|
IndexScheduler(#[from] index_scheduler::Error),
|
||||||
|
#[error(transparent)]
|
||||||
|
Payload(#[from] PayloadError),
|
||||||
|
#[error(transparent)]
|
||||||
|
DocumentFormat(#[from] DocumentFormatError),
|
||||||
|
#[error(transparent)]
|
||||||
|
IndexController(#[from] IndexControllerError),
|
||||||
|
#[error(transparent)]
|
||||||
|
Join(#[from] JoinError),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ErrorCode for MeilisearchHttpError {
|
impl ErrorCode for MeilisearchHttpError {
|
||||||
@ -19,6 +32,11 @@ impl ErrorCode for MeilisearchHttpError {
|
|||||||
match self {
|
match self {
|
||||||
MeilisearchHttpError::MissingContentType(_) => Code::MissingContentType,
|
MeilisearchHttpError::MissingContentType(_) => Code::MissingContentType,
|
||||||
MeilisearchHttpError::InvalidContentType(_, _) => Code::InvalidContentType,
|
MeilisearchHttpError::InvalidContentType(_, _) => Code::InvalidContentType,
|
||||||
|
MeilisearchHttpError::IndexScheduler(e) => e.error_code(),
|
||||||
|
MeilisearchHttpError::Payload(e) => e.error_code(),
|
||||||
|
MeilisearchHttpError::DocumentFormat(e) => e.error_code(),
|
||||||
|
MeilisearchHttpError::IndexController(e) => e.error_code(),
|
||||||
|
MeilisearchHttpError::Join(_) => Code::Internal,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -29,11 +47,19 @@ impl From<MeilisearchHttpError> for aweb::Error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<aweb::error::PayloadError> for MeilisearchHttpError {
|
||||||
|
fn from(error: aweb::error::PayloadError) -> Self {
|
||||||
|
MeilisearchHttpError::Payload(PayloadError::Payload(error))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum PayloadError {
|
pub enum PayloadError {
|
||||||
#[error("{0}")]
|
#[error(transparent)]
|
||||||
|
Payload(aweb::error::PayloadError),
|
||||||
|
#[error(transparent)]
|
||||||
Json(JsonPayloadError),
|
Json(JsonPayloadError),
|
||||||
#[error("{0}")]
|
#[error(transparent)]
|
||||||
Query(QueryPayloadError),
|
Query(QueryPayloadError),
|
||||||
#[error("The json payload provided is malformed. `{0}`.")]
|
#[error("The json payload provided is malformed. `{0}`.")]
|
||||||
MalformedPayload(serde_json::error::Error),
|
MalformedPayload(serde_json::error::Error),
|
||||||
@ -44,6 +70,15 @@ pub enum PayloadError {
|
|||||||
impl ErrorCode for PayloadError {
|
impl ErrorCode for PayloadError {
|
||||||
fn error_code(&self) -> Code {
|
fn error_code(&self) -> Code {
|
||||||
match self {
|
match self {
|
||||||
|
PayloadError::Payload(e) => match e {
|
||||||
|
aweb::error::PayloadError::Incomplete(_) => todo!(),
|
||||||
|
aweb::error::PayloadError::EncodingCorrupted => todo!(),
|
||||||
|
aweb::error::PayloadError::Overflow => todo!(),
|
||||||
|
aweb::error::PayloadError::UnknownLength => todo!(),
|
||||||
|
aweb::error::PayloadError::Http2Payload(_) => todo!(),
|
||||||
|
aweb::error::PayloadError::Io(_) => todo!(),
|
||||||
|
_ => todo!(),
|
||||||
|
},
|
||||||
PayloadError::Json(err) => match err {
|
PayloadError::Json(err) => match err {
|
||||||
JsonPayloadError::Overflow { .. } => Code::PayloadTooLarge,
|
JsonPayloadError::Overflow { .. } => Code::PayloadTooLarge,
|
||||||
JsonPayloadError::ContentType => Code::UnsupportedMediaType,
|
JsonPayloadError::ContentType => Code::UnsupportedMediaType,
|
||||||
|
@ -1,10 +1,12 @@
|
|||||||
|
use std::io::Cursor;
|
||||||
|
|
||||||
use actix_web::error::PayloadError;
|
use actix_web::error::PayloadError;
|
||||||
use actix_web::http::header::CONTENT_TYPE;
|
use actix_web::http::header::CONTENT_TYPE;
|
||||||
use actix_web::web::Bytes;
|
use actix_web::web::Bytes;
|
||||||
use actix_web::HttpMessage;
|
use actix_web::HttpMessage;
|
||||||
use actix_web::{web, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpRequest, HttpResponse};
|
||||||
use bstr::ByteSlice;
|
use bstr::ByteSlice;
|
||||||
use document_formats::PayloadType;
|
use document_formats::{read_csv, read_json, read_ndjson, PayloadType};
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use index_scheduler::{KindWithContent, TaskView};
|
use index_scheduler::{KindWithContent, TaskView};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
@ -235,10 +237,10 @@ async fn document_addition(
|
|||||||
meilisearch: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, MeiliSearch>,
|
meilisearch: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, MeiliSearch>,
|
||||||
index_uid: String,
|
index_uid: String,
|
||||||
primary_key: Option<String>,
|
primary_key: Option<String>,
|
||||||
body: Payload,
|
mut body: Payload,
|
||||||
method: IndexDocumentsMethod,
|
method: IndexDocumentsMethod,
|
||||||
allow_index_creation: bool,
|
allow_index_creation: bool,
|
||||||
) -> Result<TaskView, ResponseError> {
|
) -> Result<TaskView, MeilisearchHttpError> {
|
||||||
let format = match mime_type
|
let format = match mime_type
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|m| (m.type_().as_str(), m.subtype().as_str()))
|
.map(|m| (m.type_().as_str(), m.subtype().as_str()))
|
||||||
@ -260,14 +262,46 @@ async fn document_addition(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: TAMO: do something with the update file
|
let (uuid, mut update_file) = meilisearch.create_update_file()?;
|
||||||
// Box::new(payload_to_stream(body))
|
|
||||||
let (uuid, file) = meilisearch.create_update_file()?;
|
// push the entire stream into a `Vec`.
|
||||||
|
// TODO: Maybe we should write it to a file to reduce the RAM consumption
|
||||||
|
// and then reread it to convert it to obkv?
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
while let Some(bytes) = body.next().await {
|
||||||
|
buffer.extend_from_slice(&bytes?);
|
||||||
|
}
|
||||||
|
let reader = Cursor::new(buffer);
|
||||||
|
|
||||||
|
let documents_count =
|
||||||
|
tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> {
|
||||||
|
let documents_count = match format {
|
||||||
|
PayloadType::Json => read_json(reader, update_file.as_file_mut())?,
|
||||||
|
PayloadType::Csv => read_csv(reader, update_file.as_file_mut())?,
|
||||||
|
PayloadType::Ndjson => read_ndjson(reader, update_file.as_file_mut())?,
|
||||||
|
};
|
||||||
|
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
||||||
|
update_file.persist();
|
||||||
|
Ok(documents_count)
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let documents_count = match documents_count {
|
||||||
|
Ok(Ok(documents_count)) => documents_count,
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
meilisearch.delete_update_file(uuid)?;
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
meilisearch.delete_update_file(uuid)?;
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let task = match method {
|
let task = match method {
|
||||||
IndexDocumentsMethod::ReplaceDocuments => KindWithContent::DocumentAddition {
|
IndexDocumentsMethod::ReplaceDocuments => KindWithContent::DocumentAddition {
|
||||||
content_file: uuid,
|
content_file: uuid,
|
||||||
documents_count: 0, // TODO: TAMO: get the document count
|
documents_count,
|
||||||
primary_key,
|
primary_key,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
index_uid,
|
index_uid,
|
||||||
@ -275,7 +309,7 @@ async fn document_addition(
|
|||||||
|
|
||||||
IndexDocumentsMethod::UpdateDocuments => KindWithContent::DocumentUpdate {
|
IndexDocumentsMethod::UpdateDocuments => KindWithContent::DocumentUpdate {
|
||||||
content_file: uuid,
|
content_file: uuid,
|
||||||
documents_count: 0, // TODO: TAMO: get the document count
|
documents_count,
|
||||||
primary_key,
|
primary_key,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
index_uid,
|
index_uid,
|
||||||
@ -284,7 +318,13 @@ async fn document_addition(
|
|||||||
_ => todo!(),
|
_ => todo!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let task = meilisearch.register_task(task).await?;
|
let task = match meilisearch.register_task(task).await {
|
||||||
|
Ok(task) => task,
|
||||||
|
Err(e) => {
|
||||||
|
meilisearch.delete_update_file(uuid)?;
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
debug!("returns: {:?}", task);
|
debug!("returns: {:?}", task);
|
||||||
Ok(task)
|
Ok(task)
|
||||||
|
@ -156,6 +156,8 @@ async fn get_task(
|
|||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
|
let task_id = task_id.into_inner();
|
||||||
|
|
||||||
analytics.publish(
|
analytics.publish(
|
||||||
"Tasks Seen".to_string(),
|
"Tasks Seen".to_string(),
|
||||||
json!({ "per_task_uid": true }),
|
json!({ "per_task_uid": true }),
|
||||||
@ -170,10 +172,11 @@ async fn get_task(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
filters.limit = 1;
|
filters.uid = Some(vec![task_id]);
|
||||||
filters.from = Some(*task_id);
|
|
||||||
|
|
||||||
let task = meilisearch.list_tasks(filters).await?;
|
if let Some(task) = meilisearch.list_tasks(filters).await?.first() {
|
||||||
|
Ok(HttpResponse::Ok().json(task))
|
||||||
Ok(HttpResponse::Ok().json(task))
|
} else {
|
||||||
|
Err(index_scheduler::Error::TaskNotFound(task_id).into())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -51,8 +51,8 @@ impl ErrorCode for IndexControllerError {
|
|||||||
IndexControllerError::DocumentFormatError(e) => e.error_code(),
|
IndexControllerError::DocumentFormatError(e) => e.error_code(),
|
||||||
IndexControllerError::MissingPayload(_) => Code::MissingPayload,
|
IndexControllerError::MissingPayload(_) => Code::MissingPayload,
|
||||||
IndexControllerError::PayloadTooLarge => Code::PayloadTooLarge,
|
IndexControllerError::PayloadTooLarge => Code::PayloadTooLarge,
|
||||||
IndexControllerError::IndexResolver(_) => todo!(),
|
IndexControllerError::IndexResolver(e) => e.error_code(),
|
||||||
IndexControllerError::IndexError(_) => todo!(),
|
IndexControllerError::IndexError(e) => e.error_code(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,7 @@ use std::ffi::OsStr;
|
|||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
// TODO: TAMO: rename the MeiliSearch in Meilisearch
|
// TODO: TAMO: rename the MeiliSearch in Meilisearch
|
||||||
|
pub use index_controller::error::IndexControllerError;
|
||||||
pub use index_controller::Meilisearch as MeiliSearch;
|
pub use index_controller::Meilisearch as MeiliSearch;
|
||||||
pub use milli;
|
pub use milli;
|
||||||
pub use milli::heed;
|
pub use milli::heed;
|
||||||
|
Loading…
Reference in New Issue
Block a user