mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-09 22:48:54 +01:00
Support JSON payloads again (not perfectly though)
This commit is contained in:
parent
f69688e8f7
commit
8d97b7b28c
@ -1,10 +1,9 @@
|
|||||||
use std::fmt::{self, Debug, Display};
|
use std::fmt::{self, Debug, Display};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{self, BufWriter, Write};
|
use std::io::{self, BufReader, BufWriter, Write};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
use memmap2::MmapOptions;
|
use milli::documents::Error;
|
||||||
use milli::documents::{DocumentsBatchBuilder, Error};
|
|
||||||
use milli::Object;
|
use milli::Object;
|
||||||
use serde::de::{SeqAccess, Visitor};
|
use serde::de::{SeqAccess, Visitor};
|
||||||
use serde::{Deserialize, Deserializer};
|
use serde::{Deserialize, Deserializer};
|
||||||
@ -104,29 +103,35 @@ impl ErrorCode for DocumentFormatError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Reads CSV from input and write an obkv batch to writer.
|
/// Reads CSV from input and write an obkv batch to writer.
|
||||||
pub fn read_csv(file: &File, writer: impl Write, delimiter: u8) -> Result<u64> {
|
pub fn read_csv<F: Write>(
|
||||||
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
|
_input: BufReader<File>,
|
||||||
let mmap = unsafe { MmapOptions::new().map(file)? };
|
_output: &mut BufWriter<F>,
|
||||||
let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref());
|
_delimiter: u8,
|
||||||
builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?;
|
) -> Result<u64> {
|
||||||
|
todo!()
|
||||||
|
// let mut builder = DocumentsBatchBuilder::new(BufWriter::new(output));
|
||||||
|
// let mmap = unsafe { MmapOptions::new().map(input)? };
|
||||||
|
// let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref());
|
||||||
|
// builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?;
|
||||||
|
|
||||||
let count = builder.documents_count();
|
// let count = builder.documents_count();
|
||||||
let _ = builder.into_inner().map_err(DocumentFormatError::Io)?;
|
// let _ = builder.into_inner().map_err(DocumentFormatError::Io)?;
|
||||||
|
|
||||||
Ok(count as u64)
|
// Ok(count as u64)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reads JSON from temporary file and write an obkv batch to writer.
|
/// Reads JSON from temporary file and write an obkv batch to writer.
|
||||||
pub fn read_json(file: &File, writer: impl Write) -> Result<u64> {
|
pub fn read_json<F: Write>(input: BufReader<File>, mut output: &mut BufWriter<F>) -> Result<u64> {
|
||||||
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
|
let mut count = 0;
|
||||||
let mmap = unsafe { MmapOptions::new().map(file)? };
|
let mut deserializer = serde_json::Deserializer::from_reader(input);
|
||||||
let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
|
match array_each(&mut deserializer, |obj: Object| {
|
||||||
|
count += 1;
|
||||||
match array_each(&mut deserializer, |obj| builder.append_json_object(&obj)) {
|
serde_json::to_writer(&mut output, &obj)
|
||||||
|
}) {
|
||||||
// The json data has been deserialized and does not need to be processed again.
|
// The json data has been deserialized and does not need to be processed again.
|
||||||
// The data has been transferred to the writer during the deserialization process.
|
// The data has been transferred to the writer during the deserialization process.
|
||||||
Ok(Ok(_)) => (),
|
Ok(Ok(_)) => (),
|
||||||
Ok(Err(e)) => return Err(DocumentFormatError::Io(e)),
|
Ok(Err(e)) => return Err(DocumentFormatError::Io(e.into())),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Attempt to deserialize a single json string when the cause of the exception is not Category.data
|
// Attempt to deserialize a single json string when the cause of the exception is not Category.data
|
||||||
// Other types of deserialisation exceptions are returned directly to the front-end
|
// Other types of deserialisation exceptions are returned directly to the front-end
|
||||||
@ -137,33 +142,30 @@ pub fn read_json(file: &File, writer: impl Write) -> Result<u64> {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let content: Object = serde_json::from_slice(&mmap)
|
todo!("single document/object update")
|
||||||
.map_err(Error::Json)
|
|
||||||
.map_err(|e| (PayloadType::Json, e))?;
|
// let content: Object = serde_json::from_slice(&mmap)
|
||||||
builder.append_json_object(&content).map_err(DocumentFormatError::Io)?;
|
// .map_err(Error::Json)
|
||||||
|
// .map_err(|e| (PayloadType::Json, e))?;
|
||||||
|
// serde_json::to_writer(&mut output, &content).unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let count = builder.documents_count();
|
Ok(count)
|
||||||
let _ = builder.into_inner().map_err(DocumentFormatError::Io)?;
|
|
||||||
|
|
||||||
Ok(count as u64)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reads JSON from temporary file and write an obkv batch to writer.
|
/// Reads JSON from temporary file and write it into the writer.
|
||||||
pub fn read_ndjson(file: &File, writer: impl Write) -> Result<u64> {
|
pub fn read_ndjson<F: Write>(input: BufReader<File>, mut output: &mut BufWriter<F>) -> Result<u64> {
|
||||||
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
|
let mut count = 0;
|
||||||
let mmap = unsafe { MmapOptions::new().map(file)? };
|
for result in serde_json::Deserializer::from_reader(input).into_iter() {
|
||||||
|
count += 1;
|
||||||
for result in serde_json::Deserializer::from_slice(&mmap).into_iter() {
|
// TODO Correctly manage the errors
|
||||||
let object = result.map_err(Error::Json).map_err(|e| (PayloadType::Ndjson, e))?;
|
// Avoid copying the content: use CowStr from milli (move it elsewhere)
|
||||||
builder.append_json_object(&object).map_err(Into::into).map_err(DocumentFormatError::Io)?;
|
let map: Object = result.unwrap();
|
||||||
|
serde_json::to_writer(&mut output, &map).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let count = builder.documents_count();
|
Ok(count)
|
||||||
let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Io)?;
|
|
||||||
|
|
||||||
Ok(count as u64)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The actual handling of the deserialization process in serde
|
/// The actual handling of the deserialization process in serde
|
||||||
@ -172,20 +174,23 @@ pub fn read_ndjson(file: &File, writer: impl Write) -> Result<u64> {
|
|||||||
/// ## References
|
/// ## References
|
||||||
/// <https://serde.rs/stream-array.html>
|
/// <https://serde.rs/stream-array.html>
|
||||||
/// <https://github.com/serde-rs/json/issues/160>
|
/// <https://github.com/serde-rs/json/issues/160>
|
||||||
fn array_each<'de, D, T, F>(deserializer: D, f: F) -> std::result::Result<io::Result<u64>, D::Error>
|
fn array_each<'de, D, T, F>(
|
||||||
|
deserializer: D,
|
||||||
|
f: F,
|
||||||
|
) -> std::result::Result<serde_json::Result<u64>, D::Error>
|
||||||
where
|
where
|
||||||
D: Deserializer<'de>,
|
D: Deserializer<'de>,
|
||||||
T: Deserialize<'de>,
|
T: Deserialize<'de>,
|
||||||
F: FnMut(T) -> io::Result<()>,
|
F: FnMut(T) -> serde_json::Result<()>,
|
||||||
{
|
{
|
||||||
struct SeqVisitor<T, F>(F, PhantomData<T>);
|
struct SeqVisitor<T, F>(F, PhantomData<T>);
|
||||||
|
|
||||||
impl<'de, T, F> Visitor<'de> for SeqVisitor<T, F>
|
impl<'de, T, F> Visitor<'de> for SeqVisitor<T, F>
|
||||||
where
|
where
|
||||||
T: Deserialize<'de>,
|
T: Deserialize<'de>,
|
||||||
F: FnMut(T) -> io::Result<()>,
|
F: FnMut(T) -> serde_json::Result<()>,
|
||||||
{
|
{
|
||||||
type Value = io::Result<u64>;
|
type Value = serde_json::Result<u64>;
|
||||||
|
|
||||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||||
formatter.write_str("a nonempty sequence")
|
formatter.write_str("a nonempty sequence")
|
||||||
@ -194,7 +199,7 @@ where
|
|||||||
fn visit_seq<A>(
|
fn visit_seq<A>(
|
||||||
mut self,
|
mut self,
|
||||||
mut seq: A,
|
mut seq: A,
|
||||||
) -> std::result::Result<io::Result<u64>, <A as SeqAccess<'de>>::Error>
|
) -> std::result::Result<serde_json::Result<u64>, <A as SeqAccess<'de>>::Error>
|
||||||
where
|
where
|
||||||
A: SeqAccess<'de>,
|
A: SeqAccess<'de>,
|
||||||
{
|
{
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use std::io::ErrorKind;
|
use std::io::{BufReader, ErrorKind};
|
||||||
|
|
||||||
use actix_web::http::header::CONTENT_TYPE;
|
use actix_web::http::header::CONTENT_TYPE;
|
||||||
use actix_web::web::Data;
|
use actix_web::web::Data;
|
||||||
@ -423,7 +423,7 @@ async fn document_addition(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?;
|
let (uuid, update_file) = index_scheduler.create_update_file(dry_run)?;
|
||||||
|
|
||||||
let temp_file = match tempfile() {
|
let temp_file = match tempfile() {
|
||||||
Ok(file) => file,
|
Ok(file) => file,
|
||||||
@ -459,15 +459,20 @@ async fn document_addition(
|
|||||||
return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
|
return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut read_file = buffer.into_inner().into_std().await;
|
let read_file = BufReader::new(buffer.into_inner().into_std().await);
|
||||||
let documents_count = tokio::task::spawn_blocking(move || {
|
let documents_count = tokio::task::spawn_blocking(move || {
|
||||||
|
let mut update_file = std::io::BufWriter::new(update_file);
|
||||||
let documents_count = match format {
|
let documents_count = match format {
|
||||||
PayloadType::Json => read_json(&read_file, &mut update_file)?,
|
PayloadType::Json => read_json(read_file, &mut update_file)?,
|
||||||
PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?,
|
PayloadType::Csv { delimiter } => read_csv(read_file, &mut update_file, delimiter)?,
|
||||||
/// TODO do not copy all the content
|
PayloadType::Ndjson => read_ndjson(read_file, &mut update_file)?,
|
||||||
PayloadType::Ndjson => std::io::copy(&mut read_file, &mut update_file).unwrap(),
|
|
||||||
};
|
};
|
||||||
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
||||||
|
// TODO better support of errors
|
||||||
|
let update_file = match update_file.into_inner() {
|
||||||
|
Ok(update_file) => update_file,
|
||||||
|
Err(_) => todo!("handle errors"),
|
||||||
|
};
|
||||||
update_file.persist()?;
|
update_file.persist()?;
|
||||||
Ok(documents_count)
|
Ok(documents_count)
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user