serde ndjson fix

This commit is contained in:
jiangbo212 2022-12-21 11:27:15 +08:00
parent 9925309492
commit bf2a401a05
5 changed files with 148 additions and 34 deletions

View file

@ -4,7 +4,6 @@ use std::fs::File;
use std::io::{self, Seek, Write};
use std::marker::PhantomData;
use either::Either;
use memmap2::MmapOptions;
use milli::documents::{DocumentsBatchBuilder, Error};
use milli::Object;
@ -120,20 +119,6 @@ pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result<u64> {
/// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<u64> {
read_json_inner(file, writer, PayloadType::Json)
}
/// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<u64> {
read_json_inner(file, writer, PayloadType::Ndjson)
}
/// Reads JSON from temporary file and write an obkv batch to writer.
fn read_json_inner(
file: &File,
writer: impl Write + Seek,
payload_type: PayloadType,
) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer);
let mmap = unsafe { MmapOptions::new().map(file)? };
let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
@ -143,23 +128,20 @@ fn read_json_inner(
// The data has been transferred to the writer during the deserialization process.
Ok(Ok(_)) => (),
Ok(Err(e)) => return Err(DocumentFormatError::Io(e)),
Err(_e) => {
// If we cannot deserialize the content as an array of object then we try
// to deserialize it with the original method to keep correct error messages.
#[derive(Deserialize, Debug)]
#[serde(transparent)]
struct ArrayOrSingleObject {
#[serde(with = "either::serde_untagged")]
inner: Either<Vec<Object>, Object>,
Err(e) => {
// Attempt to deserialize a single json string when the cause of the exception is not Category.data
// Other types of deserialisation exceptions are returned directly to the front-end
if e.classify() != serde_json::error::Category::Data {
return Err(DocumentFormatError::MalformedPayload(
Error::Json(e),
PayloadType::Json,
));
}
let content: ArrayOrSingleObject = serde_json::from_reader(file)
let content: Object = serde_json::from_slice(&mmap)
.map_err(Error::Json)
.map_err(|e| (payload_type, e))?;
for object in content.inner.map_right(|o| vec![o]).into_inner() {
builder.append_json_object(&object).map_err(DocumentFormatError::Io)?;
}
.map_err(|e| (PayloadType::Json, e))?;
builder.append_json_object(&content).map_err(DocumentFormatError::Io)?;
}
}
@ -169,6 +151,22 @@ fn read_json_inner(
Ok(count as u64)
}
/// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer);
let mmap = unsafe { MmapOptions::new().map(file)? };
for result in serde_json::Deserializer::from_reader(mmap.as_ref()).into_iter() {
let object = result.map_err(Error::Json).map_err(|e| (PayloadType::Ndjson, e))?;
builder.append_json_object(&object).map_err(Into::into).map_err(DocumentFormatError::Io)?;
}
let count = builder.documents_count();
let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Io)?;
Ok(count as u64)
}
/// The actual handling of the deserialization process in serde
/// avoids storing the deserialized object in memory.
///