2022-03-04 10:46:59 +08:00
|
|
|
use std::borrow::Borrow;
|
|
|
|
use std::fmt::{self, Debug, Display};
|
2022-06-16 12:06:20 +02:00
|
|
|
use std::io::{self, BufReader, Read, Seek, Write};
|
2021-09-28 11:59:55 +02:00
|
|
|
|
2022-08-18 11:55:14 +02:00
|
|
|
use either::Either;
|
2022-06-06 12:38:46 +02:00
|
|
|
use meilisearch_types::error::{Code, ErrorCode};
|
|
|
|
use meilisearch_types::internal_error;
|
2022-06-16 12:06:20 +02:00
|
|
|
use milli::documents::{DocumentsBatchBuilder, Error};
|
2022-08-18 11:55:14 +02:00
|
|
|
use milli::Object;
|
|
|
|
use serde::Deserialize;
|
2021-09-28 11:59:55 +02:00
|
|
|
|
|
|
|
type Result<T> = std::result::Result<T, DocumentFormatError>;
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub enum PayloadType {
|
2021-09-29 10:17:52 +02:00
|
|
|
Ndjson,
|
2021-09-28 20:13:26 +02:00
|
|
|
Json,
|
2021-09-28 22:58:48 +02:00
|
|
|
Csv,
|
2021-09-28 11:59:55 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl fmt::Display for PayloadType {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
match self {
|
2022-06-16 12:06:20 +02:00
|
|
|
PayloadType::Ndjson => f.write_str("ndjson"),
|
|
|
|
PayloadType::Json => f.write_str("json"),
|
|
|
|
PayloadType::Csv => f.write_str("csv"),
|
2021-09-28 11:59:55 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-03-04 10:46:59 +08:00
|
|
|
#[derive(Debug)]
|
2021-09-28 11:59:55 +02:00
|
|
|
pub enum DocumentFormatError {
|
|
|
|
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
|
2022-06-16 12:06:20 +02:00
|
|
|
MalformedPayload(Error, PayloadType),
|
2022-03-04 10:46:59 +08:00
|
|
|
}
|
2022-03-28 14:57:51 +08:00
|
|
|
|
2022-03-04 10:46:59 +08:00
|
|
|
impl Display for DocumentFormatError {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
match self {
|
2022-03-08 12:03:59 +08:00
|
|
|
Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e),
|
2022-03-04 10:46:59 +08:00
|
|
|
Self::MalformedPayload(me, b) => match me.borrow() {
|
2022-06-16 12:06:20 +02:00
|
|
|
Error::Json(se) => {
|
2022-03-25 20:53:28 +08:00
|
|
|
// https://github.com/meilisearch/meilisearch/issues/2107
|
|
|
|
// The user input maybe insanely long. We need to truncate it.
|
2022-03-25 21:36:11 +08:00
|
|
|
let mut serde_msg = se.to_string();
|
2022-03-31 10:14:13 +08:00
|
|
|
let ellipsis = "...";
|
|
|
|
if serde_msg.len() > 100 + ellipsis.len() {
|
2022-03-31 10:39:21 +08:00
|
|
|
serde_msg.replace_range(50..serde_msg.len() - 85, ellipsis);
|
2022-03-25 20:53:28 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
write!(
|
2022-03-31 10:14:13 +08:00
|
|
|
f,
|
|
|
|
"The `{}` payload provided is malformed. `Couldn't serialize document value: {}`.",
|
|
|
|
b, serde_msg
|
2022-03-25 20:53:28 +08:00
|
|
|
)
|
|
|
|
}
|
2022-03-08 12:03:59 +08:00
|
|
|
_ => write!(f, "The `{}` payload provided is malformed: `{}`.", b, me),
|
2022-03-04 10:46:59 +08:00
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
2021-09-28 11:59:55 +02:00
|
|
|
}
|
2022-03-28 14:58:00 +08:00
|
|
|
|
2022-03-04 10:46:59 +08:00
|
|
|
impl std::error::Error for DocumentFormatError {}
|
2021-09-28 11:59:55 +02:00
|
|
|
|
2022-06-16 12:06:20 +02:00
|
|
|
impl From<(PayloadType, Error)> for DocumentFormatError {
|
|
|
|
fn from((ty, error): (PayloadType, Error)) -> Self {
|
2021-10-20 21:20:28 +02:00
|
|
|
match error {
|
2022-06-16 12:06:20 +02:00
|
|
|
Error::Io(e) => Self::Internal(Box::new(e)),
|
|
|
|
e => Self::MalformedPayload(e, ty),
|
2021-10-20 21:20:28 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-30 10:35:24 +02:00
|
|
|
impl ErrorCode for DocumentFormatError {
|
|
|
|
fn error_code(&self) -> Code {
|
|
|
|
match self {
|
|
|
|
DocumentFormatError::Internal(_) => Code::Internal,
|
|
|
|
DocumentFormatError::MalformedPayload(_, _) => Code::MalformedPayload,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-20 21:20:28 +02:00
|
|
|
internal_error!(DocumentFormatError: io::Error);
|
2021-09-28 11:59:55 +02:00
|
|
|
|
2022-06-16 12:06:20 +02:00
|
|
|
/// Reads CSV from input and write an obkv batch to writer.
|
2021-12-02 16:03:26 +01:00
|
|
|
pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
2022-06-16 12:06:20 +02:00
|
|
|
let mut builder = DocumentsBatchBuilder::new(writer);
|
2021-10-28 12:13:51 +02:00
|
|
|
|
2022-06-16 12:06:20 +02:00
|
|
|
let csv = csv::Reader::from_reader(input);
|
|
|
|
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
|
2021-09-28 22:58:48 +02:00
|
|
|
|
2022-06-16 12:06:20 +02:00
|
|
|
let count = builder.documents_count();
|
|
|
|
let _ = builder
|
|
|
|
.into_inner()
|
|
|
|
.map_err(Into::into)
|
|
|
|
.map_err(DocumentFormatError::Internal)?;
|
|
|
|
|
|
|
|
Ok(count as usize)
|
2021-09-28 22:58:48 +02:00
|
|
|
}
|
|
|
|
|
2022-06-16 12:06:20 +02:00
|
|
|
/// Reads JSON Lines from input and write an obkv batch to writer.
|
2021-12-02 16:03:26 +01:00
|
|
|
pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
2022-06-16 12:06:20 +02:00
|
|
|
let mut builder = DocumentsBatchBuilder::new(writer);
|
2022-06-16 15:58:39 +02:00
|
|
|
let reader = BufReader::new(input);
|
2021-09-28 11:59:55 +02:00
|
|
|
|
2022-06-16 12:06:20 +02:00
|
|
|
for result in serde_json::Deserializer::from_reader(reader).into_iter() {
|
|
|
|
let object = result
|
|
|
|
.map_err(Error::Json)
|
2021-10-20 21:20:28 +02:00
|
|
|
.map_err(|e| (PayloadType::Ndjson, e))?;
|
2022-06-16 12:06:20 +02:00
|
|
|
builder
|
|
|
|
.append_json_object(&object)
|
|
|
|
.map_err(Into::into)
|
|
|
|
.map_err(DocumentFormatError::Internal)?;
|
2021-09-28 11:59:55 +02:00
|
|
|
}
|
|
|
|
|
2022-06-16 12:06:20 +02:00
|
|
|
let count = builder.documents_count();
|
|
|
|
let _ = builder
|
|
|
|
.into_inner()
|
|
|
|
.map_err(Into::into)
|
|
|
|
.map_err(DocumentFormatError::Internal)?;
|
2021-09-28 11:59:55 +02:00
|
|
|
|
2022-06-16 12:06:20 +02:00
|
|
|
Ok(count as usize)
|
2021-09-28 11:59:55 +02:00
|
|
|
}
|
2021-09-28 20:13:26 +02:00
|
|
|
|
2022-06-16 12:06:20 +02:00
|
|
|
/// Reads JSON from input and write an obkv batch to writer.
|
2021-12-02 16:03:26 +01:00
|
|
|
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
2022-06-16 12:06:20 +02:00
|
|
|
let mut builder = DocumentsBatchBuilder::new(writer);
|
2022-06-16 15:58:39 +02:00
|
|
|
let reader = BufReader::new(input);
|
2022-06-16 12:06:20 +02:00
|
|
|
|
2022-08-18 11:55:14 +02:00
|
|
|
#[derive(Deserialize, Debug)]
|
|
|
|
#[serde(transparent)]
|
|
|
|
struct ArrayOrSingleObject {
|
|
|
|
#[serde(with = "either::serde_untagged")]
|
|
|
|
inner: Either<Vec<Object>, Object>,
|
|
|
|
}
|
|
|
|
|
|
|
|
let content: ArrayOrSingleObject = serde_json::from_reader(reader)
|
2022-06-16 12:06:20 +02:00
|
|
|
.map_err(Error::Json)
|
2021-10-20 21:20:28 +02:00
|
|
|
.map_err(|e| (PayloadType::Json, e))?;
|
2021-10-28 12:13:51 +02:00
|
|
|
|
2022-08-18 11:55:14 +02:00
|
|
|
for object in content.inner.map_right(|o| vec![o]).into_inner() {
|
2022-06-16 12:06:20 +02:00
|
|
|
builder
|
|
|
|
.append_json_object(&object)
|
|
|
|
.map_err(Into::into)
|
|
|
|
.map_err(DocumentFormatError::Internal)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
let count = builder.documents_count();
|
|
|
|
let _ = builder
|
|
|
|
.into_inner()
|
|
|
|
.map_err(Into::into)
|
|
|
|
.map_err(DocumentFormatError::Internal)?;
|
2021-09-28 20:13:26 +02:00
|
|
|
|
2022-06-16 12:06:20 +02:00
|
|
|
Ok(count as usize)
|
2021-09-28 20:13:26 +02:00
|
|
|
}
|