MeiliSearch/meilisearch-http/src/routes/indexes/documents.rs

332 lines
10 KiB
Rust
Raw Normal View History

2021-09-14 18:39:02 +02:00
use actix_web::error::PayloadError;
use actix_web::web::Bytes;
2021-09-28 22:22:59 +02:00
use actix_web::{web, HttpResponse};
2021-09-14 18:39:02 +02:00
use futures::{Stream, StreamExt};
2021-06-24 15:02:35 +02:00
use log::debug;
use meilisearch_lib::index_controller::{DocumentAdditionFormat, Update};
2021-09-28 22:08:03 +02:00
use meilisearch_lib::milli::update::IndexDocumentsMethod;
2021-09-28 22:22:59 +02:00
use meilisearch_lib::MeiliSearch;
2020-12-12 13:32:06 +01:00
use serde::Deserialize;
2021-09-24 15:21:07 +02:00
use serde_json::Value;
2021-09-14 18:39:02 +02:00
use tokio::sync::mpsc;
2020-12-12 13:32:06 +01:00
2021-09-30 10:26:30 +02:00
use crate::error::{MeilisearchHttpError, ResponseError};
2021-06-24 15:02:35 +02:00
use crate::extractors::authentication::{policies::*, GuardedData};
2021-06-23 14:56:02 +02:00
use crate::extractors::payload::Payload;
2020-12-22 14:02:41 +01:00
use crate::routes::IndexParam;
2020-12-12 13:32:06 +01:00
2021-02-10 17:08:37 +01:00
const DEFAULT_RETRIEVE_DOCUMENTS_OFFSET: usize = 0;
const DEFAULT_RETRIEVE_DOCUMENTS_LIMIT: usize = 20;
2020-12-23 16:12:37 +01:00
macro_rules! guard_content_type {
($fn_name:ident, $guard_value:literal) => {
fn $fn_name(head: &actix_web::dev::RequestHead) -> bool {
if let Some(content_type) = head.headers.get("Content-Type") {
2021-03-15 18:11:10 +01:00
content_type
.to_str()
.map(|v| v.contains($guard_value))
.unwrap_or(false)
2020-12-23 16:12:37 +01:00
} else {
false
}
}
};
}
2021-09-29 00:19:08 +02:00
2021-09-29 19:04:47 +02:00
guard_content_type!(guard_ndjson, "application/x-ndjson");
guard_content_type!(guard_csv, "text/csv");
2020-12-23 16:12:37 +01:00
guard_content_type!(guard_json, "application/json");
2021-06-29 11:57:47 +02:00
2021-09-29 00:19:08 +02:00
fn empty_application_type(head: &actix_web::dev::RequestHead) -> bool {
head.headers.get("Content-Type").is_none()
}
2021-09-14 18:39:02 +02:00
/// This is required because Payload is not Sync nor Send
2021-09-28 22:22:59 +02:00
fn payload_to_stream(mut payload: Payload) -> impl Stream<Item = Result<Bytes, PayloadError>> {
2021-09-14 18:39:02 +02:00
let (snd, recv) = mpsc::channel(1);
tokio::task::spawn_local(async move {
while let Some(data) = payload.next().await {
let _ = snd.send(data).await;
}
});
tokio_stream::wrappers::ReceiverStream::new(recv)
}
2020-12-12 13:32:06 +01:00
#[derive(Deserialize)]
2021-07-07 16:20:22 +02:00
pub struct DocumentParam {
2021-02-13 10:44:20 +01:00
index_uid: String,
document_id: String,
2020-12-12 13:32:06 +01:00
}
2021-07-05 14:29:20 +02:00
pub fn configure(cfg: &mut web::ServiceConfig) {
2021-06-24 15:02:35 +02:00
cfg.service(
2021-07-05 14:29:20 +02:00
web::resource("")
.route(web::get().to(get_all_documents))
2021-09-29 10:17:52 +02:00
// replace documents routes
.route(
web::post()
.guard(empty_application_type)
2021-09-30 10:26:30 +02:00
.to(missing_content_type_error),
2021-09-29 10:17:52 +02:00
)
2021-09-29 00:12:25 +02:00
.route(web::post().guard(guard_json).to(add_documents_json))
2021-09-29 10:17:52 +02:00
.route(web::post().guard(guard_ndjson).to(add_documents_ndjson))
2021-09-29 00:12:25 +02:00
.route(web::post().guard(guard_csv).to(add_documents_csv))
2021-09-29 10:17:52 +02:00
.route(web::post().to(HttpResponse::UnsupportedMediaType))
// update documents routes
.route(
web::put()
.guard(empty_application_type)
2021-09-30 10:26:30 +02:00
.to(missing_content_type_error),
2021-09-29 10:17:52 +02:00
)
2021-09-29 00:19:08 +02:00
.route(web::put().guard(guard_json).to(update_documents_json))
2021-09-29 10:17:52 +02:00
.route(web::put().guard(guard_ndjson).to(update_documents_ndjson))
2021-09-29 00:19:08 +02:00
.route(web::put().guard(guard_csv).to(update_documents_csv))
2021-09-29 10:17:52 +02:00
.route(web::put().to(HttpResponse::UnsupportedMediaType))
2021-09-24 15:21:07 +02:00
.route(web::delete().to(clear_all_documents)),
2021-07-05 14:29:20 +02:00
)
// this route needs to be before the /documents/{document_id} to match properly
2021-09-24 15:21:07 +02:00
.service(web::resource("/delete-batch").route(web::post().to(delete_documents)))
2021-07-05 14:29:20 +02:00
.service(
web::resource("/{document_id}")
.route(web::get().to(get_document))
2021-09-24 15:21:07 +02:00
.route(web::delete().to(delete_document)),
2021-06-24 15:02:35 +02:00
);
2020-12-12 13:32:06 +01:00
}
2021-09-30 10:26:30 +02:00
async fn missing_content_type_error() -> Result<HttpResponse, ResponseError> {
Err(MeilisearchHttpError::MissingContentType.into())
}
2021-07-07 16:20:22 +02:00
pub async fn get_document(
2021-09-24 12:03:16 +02:00
meilisearch: GuardedData<Public, MeiliSearch>,
2021-02-11 10:59:23 +01:00
path: web::Path<DocumentParam>,
2020-12-12 13:32:06 +01:00
) -> Result<HttpResponse, ResponseError> {
2021-03-04 15:09:00 +01:00
let index = path.index_uid.clone();
let id = path.document_id.clone();
2021-09-24 12:03:16 +02:00
let document = meilisearch
.document(index, id, None as Option<Vec<String>>)
.await?;
2021-06-23 12:18:34 +02:00
debug!("returns: {:?}", document);
Ok(HttpResponse::Ok().json(document))
2020-12-12 13:32:06 +01:00
}
2021-09-24 15:21:07 +02:00
pub async fn delete_document(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<DocumentParam>,
) -> Result<HttpResponse, ResponseError> {
2021-09-28 22:22:59 +02:00
let DocumentParam {
document_id,
index_uid,
} = path.into_inner();
2021-09-24 15:21:07 +02:00
let update = Update::DeleteDocuments(vec![document_id]);
2021-09-28 22:22:59 +02:00
let update_status = meilisearch
.register_update(index_uid, update, false)
.await?;
2021-09-24 15:21:07 +02:00
debug!("returns: {:?}", update_status);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
}
2020-12-12 13:32:06 +01:00
2021-06-23 12:18:34 +02:00
#[derive(Deserialize, Debug)]
2020-12-12 13:32:06 +01:00
#[serde(rename_all = "camelCase", deny_unknown_fields)]
2021-07-07 16:20:22 +02:00
pub struct BrowseQuery {
2021-02-10 17:08:37 +01:00
offset: Option<usize>,
limit: Option<usize>,
attributes_to_retrieve: Option<String>,
2020-12-12 13:32:06 +01:00
}
2021-07-07 16:20:22 +02:00
pub async fn get_all_documents(
2021-09-24 12:03:16 +02:00
meilisearch: GuardedData<Public, MeiliSearch>,
2021-02-10 17:08:37 +01:00
path: web::Path<IndexParam>,
params: web::Query<BrowseQuery>,
2020-12-12 13:32:06 +01:00
) -> Result<HttpResponse, ResponseError> {
2021-06-23 12:18:34 +02:00
debug!("called with params: {:?}", params);
2021-04-22 10:14:29 +02:00
let attributes_to_retrieve = params.attributes_to_retrieve.as_ref().and_then(|attrs| {
2021-04-17 17:33:36 +02:00
let mut names = Vec::new();
for name in attrs.split(',').map(String::from) {
if name == "*" {
2021-04-22 10:14:29 +02:00
return None;
2021-04-17 17:33:36 +02:00
}
names.push(name);
}
Some(names)
});
2021-02-10 17:08:37 +01:00
2021-09-24 12:03:16 +02:00
let documents = meilisearch
.documents(
2021-03-15 18:11:10 +01:00
path.index_uid.clone(),
params.offset.unwrap_or(DEFAULT_RETRIEVE_DOCUMENTS_OFFSET),
params.limit.unwrap_or(DEFAULT_RETRIEVE_DOCUMENTS_LIMIT),
attributes_to_retrieve,
)
.await?;
2021-06-23 12:18:34 +02:00
debug!("returns: {:?}", documents);
Ok(HttpResponse::Ok().json(documents))
2020-12-12 13:32:06 +01:00
}
2021-06-23 12:18:34 +02:00
#[derive(Deserialize, Debug)]
2020-12-12 13:32:06 +01:00
#[serde(rename_all = "camelCase", deny_unknown_fields)]
2021-07-07 16:20:22 +02:00
pub struct UpdateDocumentsQuery {
2021-02-13 12:22:59 +01:00
primary_key: Option<String>,
2020-12-12 13:32:06 +01:00
}
2021-09-29 00:12:25 +02:00
pub async fn add_documents_json(
2021-09-24 12:03:16 +02:00
meilisearch: GuardedData<Private, MeiliSearch>,
2020-12-23 16:12:37 +01:00
path: web::Path<IndexParam>,
2021-02-13 12:22:59 +01:00
params: web::Query<UpdateDocumentsQuery>,
2021-06-23 13:55:16 +02:00
body: Payload,
2020-12-23 16:12:37 +01:00
) -> Result<HttpResponse, ResponseError> {
2021-09-29 10:17:52 +02:00
document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Json,
IndexDocumentsMethod::ReplaceDocuments,
)
.await
}
pub async fn add_documents_ndjson(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Ndjson,
IndexDocumentsMethod::ReplaceDocuments,
)
.await
2021-09-29 00:12:25 +02:00
}
2021-02-26 17:14:11 +01:00
2021-09-29 00:12:25 +02:00
pub async fn add_documents_csv(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
2021-09-29 10:17:52 +02:00
document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Csv,
IndexDocumentsMethod::ReplaceDocuments,
)
.await
2020-12-23 16:12:37 +01:00
}
2021-09-29 00:12:25 +02:00
pub async fn update_documents_json(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
2021-09-29 10:17:52 +02:00
document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Json,
IndexDocumentsMethod::UpdateDocuments,
)
.await
}
pub async fn update_documents_ndjson(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Ndjson,
IndexDocumentsMethod::UpdateDocuments,
)
.await
2021-09-29 00:12:25 +02:00
}
pub async fn update_documents_csv(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
2021-09-29 10:17:52 +02:00
document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Csv,
IndexDocumentsMethod::UpdateDocuments,
)
.await
2021-09-29 00:12:25 +02:00
}
2021-06-29 11:57:47 +02:00
/// Route used when the payload type is "application/json"
/// Used to add or replace documents
2021-09-29 00:12:25 +02:00
async fn document_addition(
2021-09-24 12:03:16 +02:00
meilisearch: GuardedData<Private, MeiliSearch>,
2020-12-12 13:32:06 +01:00
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
2021-06-23 13:55:16 +02:00
body: Payload,
2021-09-29 00:12:25 +02:00
format: DocumentAdditionFormat,
method: IndexDocumentsMethod,
2020-12-12 13:32:06 +01:00
) -> Result<HttpResponse, ResponseError> {
2021-06-23 12:18:34 +02:00
debug!("called with params: {:?}", params);
2021-09-14 18:39:02 +02:00
let update = Update::DocumentAddition {
payload: Box::new(payload_to_stream(body)),
primary_key: params.primary_key.clone(),
2021-09-29 00:12:25 +02:00
method,
format,
2021-09-14 18:39:02 +02:00
};
2021-09-24 12:03:16 +02:00
let update_status = meilisearch
2021-09-28 18:10:09 +02:00
.register_update(path.into_inner().index_uid, update, true)
.await?;
2021-03-04 15:10:58 +01:00
2021-06-23 12:18:34 +02:00
debug!("returns: {:?}", update_status);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
2020-12-12 13:32:06 +01:00
}
2021-09-24 15:21:07 +02:00
pub async fn delete_documents(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
body: web::Json<Vec<Value>>,
) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", body);
let ids = body
.iter()
.map(|v| {
v.as_str()
.map(String::from)
.unwrap_or_else(|| v.to_string())
})
.collect();
let update = Update::DeleteDocuments(ids);
2021-09-28 22:22:59 +02:00
let update_status = meilisearch
.register_update(path.into_inner().index_uid, update, false)
.await?;
2021-09-24 15:21:07 +02:00
debug!("returns: {:?}", update_status);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
}
pub async fn clear_all_documents(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> {
let update = Update::ClearDocuments;
2021-09-28 22:22:59 +02:00
let update_status = meilisearch
.register_update(path.into_inner().index_uid, update, false)
.await?;
2021-09-24 15:21:07 +02:00
debug!("returns: {:?}", update_status);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
}