3266: Improve the way we receive the documents payload- serde multiple ndjson fix r=curquiza a=jiangbo212

# Pull Request

## Related issue
Fixes #3037 

## Related PR
#3164 

## What does this PR do?
Sorry, This PR is mainly to fix the problems caused by my previously provided PR #3164. It causes multiple ndjson data deserialization failures
- Fix serde multiple ndjson data failures and add test to it
- Fix serde jsonarray error and againest serde it use `from_slice`. only use `from_slice` when serde error category is `data`, it indicate json data is a single json.

## PR checklist
Please check if your PR fulfills the following requirements:
- [x] Does this PR fix an existing issue, or have you listed the changes applied in the PR description (and why they are needed)?
- [x] Have you read the contributing guidelines?
- [x] Have you made sure that the title is accurate and descriptive of the changes?

Thank you so much for contributing to Meilisearch!


Co-authored-by: jiangbo212 <peiyaoliukuan@126.com>
This commit is contained in:
bors[bot] 2023-01-05 11:30:29 +00:00 committed by GitHub
commit 0eaa8ca255
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 148 additions and 34 deletions

View file

@ -4,7 +4,6 @@ use std::fs::File;
use std::io::{self, Seek, Write};
use std::marker::PhantomData;
use either::Either;
use memmap2::MmapOptions;
use milli::documents::{DocumentsBatchBuilder, Error};
use milli::Object;
@ -120,20 +119,6 @@ pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result<u64> {
/// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<u64> {
read_json_inner(file, writer, PayloadType::Json)
}
/// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<u64> {
read_json_inner(file, writer, PayloadType::Ndjson)
}
/// Reads JSON from temporary file and write an obkv batch to writer.
fn read_json_inner(
file: &File,
writer: impl Write + Seek,
payload_type: PayloadType,
) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer);
let mmap = unsafe { MmapOptions::new().map(file)? };
let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
@ -143,23 +128,20 @@ fn read_json_inner(
// The data has been transferred to the writer during the deserialization process.
Ok(Ok(_)) => (),
Ok(Err(e)) => return Err(DocumentFormatError::Io(e)),
Err(_e) => {
// If we cannot deserialize the content as an array of object then we try
// to deserialize it with the original method to keep correct error messages.
#[derive(Deserialize, Debug)]
#[serde(transparent)]
struct ArrayOrSingleObject {
#[serde(with = "either::serde_untagged")]
inner: Either<Vec<Object>, Object>,
Err(e) => {
// Attempt to deserialize a single json string when the cause of the exception is not Category.data
// Other types of deserialisation exceptions are returned directly to the front-end
if e.classify() != serde_json::error::Category::Data {
return Err(DocumentFormatError::MalformedPayload(
Error::Json(e),
PayloadType::Json,
));
}
let content: ArrayOrSingleObject = serde_json::from_reader(file)
let content: Object = serde_json::from_slice(&mmap)
.map_err(Error::Json)
.map_err(|e| (payload_type, e))?;
for object in content.inner.map_right(|o| vec![o]).into_inner() {
builder.append_json_object(&object).map_err(DocumentFormatError::Io)?;
}
.map_err(|e| (PayloadType::Json, e))?;
builder.append_json_object(&content).map_err(DocumentFormatError::Io)?;
}
}
@ -169,6 +151,22 @@ fn read_json_inner(
Ok(count as u64)
}
/// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer);
let mmap = unsafe { MmapOptions::new().map(file)? };
for result in serde_json::Deserializer::from_slice(&mmap).into_iter() {
let object = result.map_err(Error::Json).map_err(|e| (PayloadType::Ndjson, e))?;
builder.append_json_object(&object).map_err(Into::into).map_err(DocumentFormatError::Io)?;
}
let count = builder.documents_count();
let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Io)?;
Ok(count as u64)
}
/// The actual handling of the deserialization process in serde
/// avoids storing the deserialized object in memory.
///