implements the get_tasks

This commit is contained in:
Tamo 2022-09-22 20:02:55 +02:00 committed by Clément Renault
parent 5b282acb7b
commit f84cbee170
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
7 changed files with 131 additions and 22 deletions

View file

@ -1,10 +1,12 @@
use std::io::Cursor;
use actix_web::error::PayloadError;
use actix_web::http::header::CONTENT_TYPE;
use actix_web::web::Bytes;
use actix_web::HttpMessage;
use actix_web::{web, HttpRequest, HttpResponse};
use bstr::ByteSlice;
use document_formats::PayloadType;
use document_formats::{read_csv, read_json, read_ndjson, PayloadType};
use futures::{Stream, StreamExt};
use index_scheduler::{KindWithContent, TaskView};
use log::debug;
@ -235,10 +237,10 @@ async fn document_addition(
meilisearch: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, MeiliSearch>,
index_uid: String,
primary_key: Option<String>,
body: Payload,
mut body: Payload,
method: IndexDocumentsMethod,
allow_index_creation: bool,
) -> Result<TaskView, ResponseError> {
) -> Result<TaskView, MeilisearchHttpError> {
let format = match mime_type
.as_ref()
.map(|m| (m.type_().as_str(), m.subtype().as_str()))
@ -260,14 +262,46 @@ async fn document_addition(
}
};
// TODO: TAMO: do something with the update file
// Box::new(payload_to_stream(body))
let (uuid, file) = meilisearch.create_update_file()?;
let (uuid, mut update_file) = meilisearch.create_update_file()?;
// push the entire stream into a `Vec`.
// TODO: Maybe we should write it to a file to reduce the RAM consumption
// and then reread it to convert it to obkv?
let mut buffer = Vec::new();
while let Some(bytes) = body.next().await {
buffer.extend_from_slice(&bytes?);
}
let reader = Cursor::new(buffer);
let documents_count =
tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> {
let documents_count = match format {
PayloadType::Json => read_json(reader, update_file.as_file_mut())?,
PayloadType::Csv => read_csv(reader, update_file.as_file_mut())?,
PayloadType::Ndjson => read_ndjson(reader, update_file.as_file_mut())?,
};
// we NEED to persist the file here because we moved the `udpate_file` in another task.
update_file.persist();
Ok(documents_count)
})
.await;
let documents_count = match documents_count {
Ok(Ok(documents_count)) => documents_count,
Ok(Err(e)) => {
meilisearch.delete_update_file(uuid)?;
return Err(e.into());
}
Err(e) => {
meilisearch.delete_update_file(uuid)?;
return Err(e.into());
}
};
let task = match method {
IndexDocumentsMethod::ReplaceDocuments => KindWithContent::DocumentAddition {
content_file: uuid,
documents_count: 0, // TODO: TAMO: get the document count
documents_count,
primary_key,
allow_index_creation,
index_uid,
@ -275,7 +309,7 @@ async fn document_addition(
IndexDocumentsMethod::UpdateDocuments => KindWithContent::DocumentUpdate {
content_file: uuid,
documents_count: 0, // TODO: TAMO: get the document count
documents_count,
primary_key,
allow_index_creation,
index_uid,
@ -284,7 +318,13 @@ async fn document_addition(
_ => todo!(),
};
let task = meilisearch.register_task(task).await?;
let task = match meilisearch.register_task(task).await {
Ok(task) => task,
Err(e) => {
meilisearch.delete_update_file(uuid)?;
return Err(e.into());
}
};
debug!("returns: {:?}", task);
Ok(task)