fix issue 3037

This commit is contained in:
jiangbo212 2022-11-30 00:03:22 +08:00
parent 914f8b118c
commit 38982d13fe
8 changed files with 182 additions and 79 deletions

View file

@ -1,13 +1,16 @@
use std::borrow::Borrow;
use std::fmt::{self, Debug, Display};
use std::io::{self, BufReader, Read, Seek, Write};
use std::fs::File;
use std::io::{self, Seek, Write};
use std::marker::PhantomData;
use either::Either;
use log::debug;
use memmap::MmapOptions;
use milli::documents::{DocumentsBatchBuilder, Error};
use milli::Object;
use serde::Deserialize;
use serde::de::{Visitor, SeqAccess};
use serde::{Deserialize, Deserializer};
use serde_json::error::Category;
use crate::error::{Code, ErrorCode};
use crate::internal_error;
@ -99,10 +102,10 @@ impl ErrorCode for DocumentFormatError {
internal_error!(DocumentFormatError: io::Error);
/// Reads CSV from input and write an obkv batch to writer.
pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer);
let csv = csv::Reader::from_reader(input);
let mmap = unsafe { MmapOptions::new().map(file).unwrap()};
let csv = csv::Reader::from_reader(mmap.as_ref());
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
let count = builder.documents_count();
@ -111,17 +114,47 @@ pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
Ok(count as usize)
}
/// Reads JSON Lines from input and write an obkv batch to writer.
pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer);
let reader = BufReader::new(input);
/// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<usize> {
read_json_inner(file, writer, PayloadType::Json)
}
for result in serde_json::Deserializer::from_reader(reader).into_iter() {
let object = result.map_err(Error::Json).map_err(|e| (PayloadType::Ndjson, e))?;
/// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<usize> {
read_json_inner(file, writer, PayloadType::Ndjson)
}
/// Reads JSON from temporary file and write an obkv batch to writer.
fn read_json_inner(file: &File, writer: impl Write + Seek, payload_type: PayloadType) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer);
let mmap = unsafe { MmapOptions::new().map(file).unwrap()};
let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
match array_each(&mut deserializer, |obj: Object | {
builder
.append_json_object(&object)
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
.append_json_object(&obj)
}) {
Ok(Ok(count)) => debug!("serde json array size: {}", count),
Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))),
Err(_e) => {
debug!("deserialize single json");
#[derive(Deserialize, Debug)]
#[serde(transparent)]
struct ArrayOrSingleObject {
#[serde(with = "either::serde_untagged")]
inner: Either<Vec<Object>, Object>,
}
let content: ArrayOrSingleObject =
serde_json::from_reader(file).map_err(Error::Json).map_err(|e| (payload_type, e))?;
for object in content.inner.map_right(|o| vec![o]).into_inner() {
builder
.append_json_object(&object)
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
}
}
}
let count = builder.documents_count();
@ -130,30 +163,43 @@ pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize>
Ok(count as usize)
}
/// Reads JSON from input and write an obkv batch to writer.
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer);
let reader = BufReader::new(input);
/**
* https://serde.rs/stream-array.html
* https://github.com/serde-rs/json/issues/160
*/
fn array_each<'de, D, T, F>(deserializer: D, f: F) -> std::result::Result<io::Result<u64>, D::Error>
where
D: Deserializer<'de>,
T: Deserialize<'de>,
F: FnMut(T) -> io::Result<()>,
{
struct SeqVisitor<T, F>(F, PhantomData<T>);
#[derive(Deserialize, Debug)]
#[serde(transparent)]
struct ArrayOrSingleObject {
#[serde(with = "either::serde_untagged")]
inner: Either<Vec<Object>, Object>,
impl<'de, T, F> Visitor<'de> for SeqVisitor<T, F>
where
T: Deserialize<'de>,
F: FnMut(T) -> io::Result<()>,
{
type Value = io::Result<u64>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a nonempty sequence")
}
fn visit_seq<A>(mut self, mut seq: A) -> std::result::Result<io::Result<u64>, <A as SeqAccess<'de>>::Error>
where
A: SeqAccess<'de>,
{
let mut max: u64 = 0;
while let Some(value) = seq.next_element::<T>()? {
match self.0(value) {
Ok(()) => max = max + 1,
Err(e) => return Ok(Err(e)),
};
}
Ok(Ok(max))
}
}
let content: ArrayOrSingleObject =
serde_json::from_reader(reader).map_err(Error::Json).map_err(|e| (PayloadType::Json, e))?;
for object in content.inner.map_right(|o| vec![o]).into_inner() {
builder
.append_json_object(&object)
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
}
let count = builder.documents_count();
let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Internal)?;
Ok(count as usize)
}
let visitor = SeqVisitor(f, PhantomData);
deserializer.deserialize_seq(visitor)
}

View file

@ -164,6 +164,7 @@ pub enum Code {
MissingContentType,
MalformedPayload,
MissingPayload,
ReceivePayloadErr,
ApiKeyNotFound,
MissingParameter,
@ -303,6 +304,9 @@ impl Code {
DuplicateIndexFound => {
ErrCode::invalid("duplicate_index_found", StatusCode::BAD_REQUEST)
}
ReceivePayloadErr => {
ErrCode::internal("receive_payload_internal_exceptions", StatusCode::INTERNAL_SERVER_ERROR)
}
}
}