MeiliSearch/meilisearch-http/src/routes/indexes/documents.rs

408 lines
14 KiB
Rust
Raw Normal View History

2022-09-22 20:02:55 +02:00
use std::io::Cursor;
use actix_web::http::header::CONTENT_TYPE;
use actix_web::web::Data;
use actix_web::HttpMessage;
2021-09-30 11:17:42 +02:00
use actix_web::{web, HttpRequest, HttpResponse};
use bstr::ByteSlice;
use futures::StreamExt;
2022-10-12 03:21:25 +02:00
use index_scheduler::IndexScheduler;
2021-06-24 15:02:35 +02:00
use log::debug;
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::RoTxn;
use meilisearch_types::milli::update::IndexDocumentsMethod;
use meilisearch_types::star_or::StarOr;
2022-10-12 03:21:25 +02:00
use meilisearch_types::tasks::KindWithContent;
use meilisearch_types::{milli, Document, Index};
use mime::Mime;
use once_cell::sync::Lazy;
2020-12-12 13:32:06 +01:00
use serde::Deserialize;
use serde_cs::vec::CS;
use serde_json::Value;
2020-12-12 13:32:06 +01:00
2021-10-12 15:23:31 +02:00
use crate::analytics::Analytics;
use crate::error::MeilisearchHttpError;
2021-06-24 15:02:35 +02:00
use crate::extractors::authentication::{policies::*, GuardedData};
2021-06-23 14:56:02 +02:00
use crate::extractors::payload::Payload;
2022-03-04 20:12:44 +01:00
use crate::extractors::sequential_extractor::SeqHandler;
2022-10-12 03:21:25 +02:00
use crate::routes::{fold_star_or, PaginationView, SummarizedTaskView};
2020-12-12 13:32:06 +01:00
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
vec![
"application/json".to_string(),
"application/x-ndjson".to_string(),
"text/csv".to_string(),
]
});
/// Extracts the mime type from the content type and return
/// a meilisearch error if anything bad happen.
fn extract_mime_type(req: &HttpRequest) -> Result<Option<Mime>, MeilisearchHttpError> {
match req.mime_type() {
Ok(Some(mime)) => Ok(Some(mime)),
Ok(None) => Ok(None),
Err(_) => match req.headers().get(CONTENT_TYPE) {
Some(content_type) => Err(MeilisearchHttpError::InvalidContentType(
content_type.as_bytes().as_bstr().to_string(),
ACCEPTED_CONTENT_TYPE.clone(),
)),
None => Err(MeilisearchHttpError::MissingContentType(
ACCEPTED_CONTENT_TYPE.clone(),
)),
},
}
}
2020-12-12 13:32:06 +01:00
#[derive(Deserialize)]
2021-07-07 16:20:22 +02:00
pub struct DocumentParam {
2021-02-13 10:44:20 +01:00
index_uid: String,
document_id: String,
2020-12-12 13:32:06 +01:00
}
2021-07-05 14:29:20 +02:00
pub fn configure(cfg: &mut web::ServiceConfig) {
2021-06-24 15:02:35 +02:00
cfg.service(
2021-07-05 14:29:20 +02:00
web::resource("")
2022-03-04 20:12:44 +01:00
.route(web::get().to(SeqHandler(get_all_documents)))
.route(web::post().to(SeqHandler(add_documents)))
.route(web::put().to(SeqHandler(update_documents)))
.route(web::delete().to(SeqHandler(clear_all_documents))),
2021-07-05 14:29:20 +02:00
)
// this route needs to be before the /documents/{document_id} to match properly
2022-03-04 20:12:44 +01:00
.service(web::resource("/delete-batch").route(web::post().to(SeqHandler(delete_documents))))
2021-07-05 14:29:20 +02:00
.service(
web::resource("/{document_id}")
2022-03-04 20:12:44 +01:00
.route(web::get().to(SeqHandler(get_document)))
.route(web::delete().to(SeqHandler(delete_document))),
2021-06-24 15:02:35 +02:00
);
2020-12-12 13:32:06 +01:00
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct GetDocument {
fields: Option<CS<StarOr<String>>>,
}
2021-07-07 16:20:22 +02:00
pub async fn get_document(
2022-09-27 16:33:37 +02:00
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, Data<IndexScheduler>>,
2021-02-11 10:59:23 +01:00
path: web::Path<DocumentParam>,
params: web::Query<GetDocument>,
2020-12-12 13:32:06 +01:00
) -> Result<HttpResponse, ResponseError> {
let GetDocument { fields } = params.into_inner();
let attributes_to_retrieve = fields.and_then(fold_star_or);
2022-09-27 16:33:37 +02:00
let index = index_scheduler.index(&path.index_uid)?;
let document = retrieve_document(&index, &path.document_id, attributes_to_retrieve)?;
2021-06-23 12:18:34 +02:00
debug!("returns: {:?}", document);
Ok(HttpResponse::Ok().json(document))
2020-12-12 13:32:06 +01:00
}
2021-09-24 15:21:07 +02:00
pub async fn delete_document(
2022-09-27 16:33:37 +02:00
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
2021-09-24 15:21:07 +02:00
path: web::Path<DocumentParam>,
) -> Result<HttpResponse, ResponseError> {
2021-09-28 22:22:59 +02:00
let DocumentParam {
document_id,
index_uid,
} = path.into_inner();
let task = KindWithContent::DocumentDeletion {
index_uid,
documents_ids: vec![document_id],
};
2022-09-27 16:33:37 +02:00
let task = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
2021-09-24 15:21:07 +02:00
}
2020-12-12 13:32:06 +01:00
2021-06-23 12:18:34 +02:00
#[derive(Deserialize, Debug)]
2020-12-12 13:32:06 +01:00
#[serde(rename_all = "camelCase", deny_unknown_fields)]
2021-07-07 16:20:22 +02:00
pub struct BrowseQuery {
#[serde(default)]
offset: usize,
#[serde(default = "crate::routes::PAGINATION_DEFAULT_LIMIT")]
limit: usize,
fields: Option<CS<StarOr<String>>>,
2020-12-12 13:32:06 +01:00
}
2021-07-07 16:20:22 +02:00
pub async fn get_all_documents(
2022-09-27 16:33:37 +02:00
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
2021-02-10 17:08:37 +01:00
params: web::Query<BrowseQuery>,
2020-12-12 13:32:06 +01:00
) -> Result<HttpResponse, ResponseError> {
2021-06-23 12:18:34 +02:00
debug!("called with params: {:?}", params);
let BrowseQuery {
limit,
offset,
fields,
} = params.into_inner();
let attributes_to_retrieve = fields.and_then(fold_star_or);
2021-02-10 17:08:37 +01:00
2022-09-27 16:33:37 +02:00
let index = index_scheduler.index(&index_uid)?;
let (total, documents) = retrieve_documents(&index, offset, limit, attributes_to_retrieve)?;
let ret = PaginationView::new(offset, limit, total as usize, documents);
debug!("returns: {:?}", ret);
Ok(HttpResponse::Ok().json(ret))
2020-12-12 13:32:06 +01:00
}
2021-06-23 12:18:34 +02:00
#[derive(Deserialize, Debug)]
2020-12-12 13:32:06 +01:00
#[serde(rename_all = "camelCase", deny_unknown_fields)]
2021-07-07 16:20:22 +02:00
pub struct UpdateDocumentsQuery {
2021-10-25 16:41:23 +02:00
pub primary_key: Option<String>,
2020-12-12 13:32:06 +01:00
}
2021-09-30 11:17:42 +02:00
pub async fn add_documents(
2022-09-27 16:33:37 +02:00
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
2021-09-29 00:12:25 +02:00
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
2021-09-30 11:17:42 +02:00
req: HttpRequest,
2021-10-29 16:10:58 +02:00
analytics: web::Data<dyn Analytics>,
2021-09-29 00:12:25 +02:00
) -> Result<HttpResponse, ResponseError> {
2021-09-30 11:17:42 +02:00
debug!("called with params: {:?}", params);
2021-10-12 15:23:31 +02:00
let params = params.into_inner();
2022-09-27 16:33:37 +02:00
analytics.add_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
2021-10-12 15:23:31 +02:00
2022-09-27 16:33:37 +02:00
let allow_index_creation = index_scheduler.filters().allow_index_creation;
let task = document_addition(
extract_mime_type(&req)?,
2022-09-27 16:33:37 +02:00
index_scheduler,
index_uid.into_inner(),
2021-10-12 15:23:31 +02:00
params.primary_key,
2021-09-29 10:17:52 +02:00
body,
2021-09-30 11:29:27 +02:00
IndexDocumentsMethod::ReplaceDocuments,
allow_index_creation,
2021-09-30 11:29:27 +02:00
)
.await?;
Ok(HttpResponse::Accepted().json(task))
2021-09-29 10:17:52 +02:00
}
2021-09-30 11:17:42 +02:00
pub async fn update_documents(
2022-09-27 16:33:37 +02:00
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
path: web::Path<String>,
2021-09-29 10:17:52 +02:00
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
2021-09-30 11:17:42 +02:00
req: HttpRequest,
2021-10-29 16:10:58 +02:00
analytics: web::Data<dyn Analytics>,
2021-09-29 10:17:52 +02:00
) -> Result<HttpResponse, ResponseError> {
2021-09-30 11:17:42 +02:00
debug!("called with params: {:?}", params);
let index_uid = path.into_inner();
2021-10-12 15:31:59 +02:00
2022-09-27 16:33:37 +02:00
analytics.update_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
2021-10-12 15:31:59 +02:00
2022-09-27 16:33:37 +02:00
let allow_index_creation = index_scheduler.filters().allow_index_creation;
let task = document_addition(
extract_mime_type(&req)?,
2022-09-27 16:33:37 +02:00
index_scheduler,
index_uid,
2021-09-30 11:17:42 +02:00
params.into_inner().primary_key,
2021-09-29 10:17:52 +02:00
body,
2021-09-30 11:29:27 +02:00
IndexDocumentsMethod::UpdateDocuments,
allow_index_creation,
2021-09-30 11:29:27 +02:00
)
.await?;
Ok(HttpResponse::Accepted().json(task))
2021-09-29 00:12:25 +02:00
}
async fn document_addition(
mime_type: Option<Mime>,
2022-09-27 16:33:37 +02:00
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
2021-09-30 11:17:42 +02:00
index_uid: String,
primary_key: Option<String>,
2022-09-22 20:02:55 +02:00
mut body: Payload,
2021-09-29 00:12:25 +02:00
method: IndexDocumentsMethod,
allow_index_creation: bool,
2022-10-12 03:21:25 +02:00
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
let format = match mime_type
.as_ref()
.map(|m| (m.type_().as_str(), m.subtype().as_str()))
{
Some(("application", "json")) => PayloadType::Json,
Some(("application", "x-ndjson")) => PayloadType::Ndjson,
Some(("text", "csv")) => PayloadType::Csv,
Some((type_, subtype)) => {
return Err(MeilisearchHttpError::InvalidContentType(
format!("{}/{}", type_, subtype),
ACCEPTED_CONTENT_TYPE.clone(),
2022-10-03 15:29:37 +02:00
))
}
None => {
2022-10-03 15:29:37 +02:00
return Err(MeilisearchHttpError::MissingContentType(
ACCEPTED_CONTENT_TYPE.clone(),
))
2021-09-30 11:29:27 +02:00
}
2021-09-30 11:17:42 +02:00
};
2022-09-27 16:33:37 +02:00
let (uuid, mut update_file) = index_scheduler.create_update_file()?;
2022-09-22 20:02:55 +02:00
// push the entire stream into a `Vec`.
// TODO: Maybe we should write it to a file to reduce the RAM consumption
// and then reread it to convert it to obkv?
let mut buffer = Vec::new();
while let Some(bytes) = body.next().await {
buffer.extend_from_slice(&bytes?);
}
let reader = Cursor::new(buffer);
let documents_count =
tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> {
let documents_count = match format {
PayloadType::Json => read_json(reader, update_file.as_file_mut())?,
PayloadType::Csv => read_csv(reader, update_file.as_file_mut())?,
PayloadType::Ndjson => read_ndjson(reader, update_file.as_file_mut())?,
};
// we NEED to persist the file here because we moved the `udpate_file` in another task.
2022-09-27 16:33:37 +02:00
update_file.persist()?;
2022-09-22 20:02:55 +02:00
Ok(documents_count)
})
.await;
let documents_count = match documents_count {
2022-10-04 18:50:18 +02:00
Ok(Ok(documents_count)) => documents_count as u64,
2022-09-22 20:02:55 +02:00
Ok(Err(e)) => {
2022-09-27 16:33:37 +02:00
index_scheduler.delete_update_file(uuid)?;
2022-10-03 15:29:37 +02:00
return Err(e);
2022-09-22 20:02:55 +02:00
}
Err(e) => {
2022-09-27 16:33:37 +02:00
index_scheduler.delete_update_file(uuid)?;
2022-09-22 20:02:55 +02:00
return Err(e.into());
}
};
2022-09-22 12:47:09 +02:00
let task = KindWithContent::DocumentImport {
method,
content_file: uuid,
documents_count,
primary_key,
allow_index_creation,
index_uid,
2021-09-14 18:39:02 +02:00
};
2021-09-30 11:17:42 +02:00
2022-09-27 16:33:37 +02:00
let scheduler = index_scheduler.clone();
let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? {
2022-09-22 20:02:55 +02:00
Ok(task) => task,
Err(e) => {
2022-09-27 16:33:37 +02:00
index_scheduler.delete_update_file(uuid)?;
2022-09-22 20:02:55 +02:00
return Err(e.into());
}
};
2021-03-04 15:10:58 +01:00
debug!("returns: {:?}", task);
2022-10-12 03:21:25 +02:00
Ok(task.into())
2020-12-12 13:32:06 +01:00
}
2021-09-24 15:21:07 +02:00
pub async fn delete_documents(
2022-09-27 16:33:37 +02:00
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<String>,
2021-09-24 15:21:07 +02:00
body: web::Json<Vec<Value>>,
) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", body);
let ids = body
.iter()
.map(|v| {
v.as_str()
.map(String::from)
.unwrap_or_else(|| v.to_string())
})
.collect();
2022-09-22 12:47:09 +02:00
let task = KindWithContent::DocumentDeletion {
index_uid: path.into_inner(),
documents_ids: ids,
};
2022-09-27 16:33:37 +02:00
let task = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
2021-09-24 15:21:07 +02:00
}
pub async fn clear_all_documents(
2022-09-27 16:33:37 +02:00
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<String>,
2021-09-24 15:21:07 +02:00
) -> Result<HttpResponse, ResponseError> {
2022-09-22 12:47:09 +02:00
let task = KindWithContent::DocumentClear {
index_uid: path.into_inner(),
};
2022-09-27 16:33:37 +02:00
let task = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
2021-09-24 15:21:07 +02:00
}
fn all_documents<'a>(
index: &Index,
rtxn: &'a RoTxn,
) -> Result<impl Iterator<Item = Result<Document, ResponseError>> + 'a, ResponseError> {
let fields_ids_map = index.fields_ids_map(rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
Ok(index.all_documents(rtxn)?.map(move |ret| {
ret.map_err(ResponseError::from)
.and_then(|(_key, document)| -> Result<_, ResponseError> {
Ok(milli::obkv_to_json(&all_fields, &fields_ids_map, document)?)
})
}))
}
fn retrieve_documents<S: AsRef<str>>(
index: &Index,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<(u64, Vec<Document>), ResponseError> {
let rtxn = index.read_txn()?;
let mut documents = Vec::new();
for document in all_documents(index, &rtxn)?.skip(offset).take(limit) {
let document = match &attributes_to_retrieve {
Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
&document?,
attributes_to_retrieve.iter().map(|s| s.as_ref()),
),
None => document?,
};
documents.push(document);
}
let number_of_documents = index.number_of_documents(&rtxn)?;
Ok((number_of_documents, documents))
}
fn retrieve_document<S: AsRef<str>>(
index: &Index,
doc_id: &str,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<Document, ResponseError> {
let txn = index.read_txn()?;
let fields_ids_map = index.fields_ids_map(&txn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let internal_id = index
.external_documents_ids(&txn)?
.get(doc_id.as_bytes())
.ok_or_else(|| MeilisearchHttpError::DocumentNotFound(doc_id.to_string()))?;
let document = index
.documents(&txn, std::iter::once(internal_id))?
.into_iter()
.next()
.map(|(_, d)| d)
.ok_or_else(|| MeilisearchHttpError::DocumentNotFound(doc_id.to_string()))?;
let document = meilisearch_types::milli::obkv_to_json(&all_fields, &fields_ids_map, document)?;
let document = match &attributes_to_retrieve {
Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
&document,
attributes_to_retrieve.iter().map(|s| s.as_ref()),
),
None => document,
};
Ok(document)
}