2021-09-28 22:22:59 +02:00
|
|
|
use std::{
|
|
|
|
fmt,
|
|
|
|
io::{Read, Seek, Write},
|
|
|
|
};
|
2021-09-28 11:59:55 +02:00
|
|
|
|
|
|
|
use milli::documents::DocumentBatchBuilder;
|
|
|
|
use serde_json::{Deserializer, Map, Value};
|
|
|
|
|
|
|
|
type Result<T> = std::result::Result<T, DocumentFormatError>;
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub enum PayloadType {
|
|
|
|
Jsonl,
|
2021-09-28 20:13:26 +02:00
|
|
|
Json,
|
2021-09-28 11:59:55 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl fmt::Display for PayloadType {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
match self {
|
|
|
|
PayloadType::Jsonl => write!(f, "ndjson"),
|
2021-09-28 20:13:26 +02:00
|
|
|
PayloadType::Json => write!(f, "json"),
|
2021-09-28 11:59:55 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
|
|
pub enum DocumentFormatError {
|
|
|
|
#[error("Internal error: {0}")]
|
|
|
|
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
|
|
|
|
#[error("{0}. The {1} payload provided is malformed.")]
|
2021-09-28 22:22:59 +02:00
|
|
|
MalformedPayload(
|
|
|
|
Box<dyn std::error::Error + Send + Sync + 'static>,
|
|
|
|
PayloadType,
|
|
|
|
),
|
2021-09-28 11:59:55 +02:00
|
|
|
}
|
|
|
|
|
2021-09-28 22:22:59 +02:00
|
|
|
internal_error!(DocumentFormatError: milli::documents::Error);
|
2021-09-28 11:59:55 +02:00
|
|
|
|
|
|
|
macro_rules! malformed {
|
|
|
|
($type:path, $e:expr) => {
|
|
|
|
$e.map_err(|e| DocumentFormatError::MalformedPayload(Box::new(e), $type))
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
/// read jsonl from input and write an obkv batch to writer.
|
|
|
|
pub fn read_jsonl(input: impl Read, writer: impl Write + Seek) -> Result<()> {
|
|
|
|
let mut builder = DocumentBatchBuilder::new(writer)?;
|
|
|
|
let stream = Deserializer::from_reader(input).into_iter::<Map<String, Value>>();
|
|
|
|
|
|
|
|
for value in stream {
|
|
|
|
let value = malformed!(PayloadType::Jsonl, value)?;
|
|
|
|
builder.add_documents(&value)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
builder.finish()?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2021-09-28 20:13:26 +02:00
|
|
|
|
|
|
|
/// read json from input and write an obkv batch to writer.
|
|
|
|
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<()> {
|
|
|
|
let mut builder = DocumentBatchBuilder::new(writer).unwrap();
|
|
|
|
|
2021-09-28 22:22:59 +02:00
|
|
|
let documents: Vec<Map<String, Value>> =
|
|
|
|
malformed!(PayloadType::Json, serde_json::from_reader(input))?;
|
2021-09-28 20:13:26 +02:00
|
|
|
builder.add_documents(documents).unwrap();
|
|
|
|
builder.finish().unwrap();
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|