3236: Improves clarity of the code that receives payloads r=Kerollmops a=Kerollmops

This PR makes small changes to #3164. It improves the clarity and simplicity of some parts of the code.

Co-authored-by: Kerollmops <clement@meilisearch.com>
This commit is contained in:
bors[bot] 2022-12-13 18:20:24 +00:00 committed by GitHub
commit 660be071b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 55 deletions

View File

@ -1190,7 +1190,7 @@ mod tests {
pub fn read_json( pub fn read_json(
bytes: &[u8], bytes: &[u8],
write: impl Write + Seek, write: impl Write + Seek,
) -> std::result::Result<usize, DocumentFormatError> { ) -> std::result::Result<u64, DocumentFormatError> {
let temp_file = NamedTempFile::new().unwrap(); let temp_file = NamedTempFile::new().unwrap();
let mut buffer = BufWriter::new(temp_file.reopen().unwrap()); let mut buffer = BufWriter::new(temp_file.reopen().unwrap());
buffer.write_all(bytes).unwrap(); buffer.write_all(bytes).unwrap();

View File

@ -103,7 +103,7 @@ impl ErrorCode for DocumentFormatError {
internal_error!(DocumentFormatError: io::Error); internal_error!(DocumentFormatError: io::Error);
/// Reads CSV from input and write an obkv batch to writer. /// Reads CSV from input and write an obkv batch to writer.
pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result<usize> { pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(writer);
let mmap = unsafe { MmapOptions::new().map(file)? }; let mmap = unsafe { MmapOptions::new().map(file)? };
let csv = csv::Reader::from_reader(mmap.as_ref()); let csv = csv::Reader::from_reader(mmap.as_ref());
@ -112,16 +112,16 @@ pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result<usize> {
let count = builder.documents_count(); let count = builder.documents_count();
let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Internal)?; let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Internal)?;
Ok(count as usize) Ok(count as u64)
} }
/// Reads JSON from temporary file and write an obkv batch to writer. /// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<usize> { pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<u64> {
read_json_inner(file, writer, PayloadType::Json) read_json_inner(file, writer, PayloadType::Json)
} }
/// Reads JSON from temporary file and write an obkv batch to writer. /// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<usize> { pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<u64> {
read_json_inner(file, writer, PayloadType::Ndjson) read_json_inner(file, writer, PayloadType::Ndjson)
} }
@ -130,23 +130,19 @@ fn read_json_inner(
file: &File, file: &File,
writer: impl Write + Seek, writer: impl Write + Seek,
payload_type: PayloadType, payload_type: PayloadType,
) -> Result<usize> { ) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(writer);
let mmap = unsafe { MmapOptions::new().map(file)? }; let mmap = unsafe { MmapOptions::new().map(file)? };
let mut deserializer = serde_json::Deserializer::from_slice(&mmap); let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
match array_each(&mut deserializer, |obj: Object| builder.append_json_object(&obj)) { match array_each(&mut deserializer, |obj| builder.append_json_object(&obj)) {
// The json data has been successfully deserialised and does not need to be processed again. // The json data has been deserialized and does not need to be processed again.
// the data has been successfully transferred to the "update_file" during the deserialisation process. // The data has been transferred to the writer during the deserialization process.
// count ==0 means an empty array Ok(Ok(_)) => (),
Ok(Ok(count)) => {
if count == 0 {
return Ok(count as usize);
}
}
Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))), Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))),
// Prefer deserialization as a json array. Failure to do deserialisation using the traditional method.
Err(_e) => { Err(_e) => {
// If we cannot deserialize the content as an array of object then we try
// to deserialize it with the original method to keep correct error messages.
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
#[serde(transparent)] #[serde(transparent)]
struct ArrayOrSingleObject { struct ArrayOrSingleObject {
@ -170,15 +166,15 @@ fn read_json_inner(
let count = builder.documents_count(); let count = builder.documents_count();
let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Internal)?; let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Internal)?;
Ok(count as usize) Ok(count as u64)
} }
/** /// The actual handling of the deserialization process in serde
* The actual handling of the deserialization process in the serde avoids storing the deserialized object in memory. /// avoids storing the deserialized object in memory.
* Reference: ///
* https://serde.rs/stream-array.html /// ## References
* https://github.com/serde-rs/json/issues/160 /// <https://serde.rs/stream-array.html>
*/ /// <https://github.com/serde-rs/json/issues/160>
fn array_each<'de, D, T, F>(deserializer: D, f: F) -> std::result::Result<io::Result<u64>, D::Error> fn array_each<'de, D, T, F>(deserializer: D, f: F) -> std::result::Result<io::Result<u64>, D::Error>
where where
D: Deserializer<'de>, D: Deserializer<'de>,

View File

@ -95,8 +95,8 @@ pub enum PayloadError {
MalformedPayload(serde_json::error::Error), MalformedPayload(serde_json::error::Error),
#[error("A json payload is missing.")] #[error("A json payload is missing.")]
MissingPayload, MissingPayload,
#[error("Error while writing the playload to disk: `{0}`.")] #[error("Error while receiving the playload. `{0}`.")]
ReceivePayloadErr(Box<dyn std::error::Error + Send + Sync + 'static>), ReceivePayload(Box<dyn std::error::Error + Send + Sync + 'static>),
} }
impl ErrorCode for PayloadError { impl ErrorCode for PayloadError {
@ -128,7 +128,7 @@ impl ErrorCode for PayloadError {
}, },
PayloadError::MissingPayload => Code::MissingPayload, PayloadError::MissingPayload => Code::MissingPayload,
PayloadError::MalformedPayload(_) => Code::MalformedPayload, PayloadError::MalformedPayload(_) => Code::MalformedPayload,
PayloadError::ReceivePayloadErr(_) => Code::Internal, PayloadError::ReceivePayload(_) => Code::Internal,
} }
} }
} }

View File

@ -26,7 +26,7 @@ use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};
use crate::analytics::{Analytics, DocumentDeletionKind}; use crate::analytics::{Analytics, DocumentDeletionKind};
use crate::error::MeilisearchHttpError; use crate::error::MeilisearchHttpError;
use crate::error::PayloadError::ReceivePayloadErr; use crate::error::PayloadError::ReceivePayload;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::payload::Payload; use crate::extractors::payload::Payload;
@ -232,33 +232,29 @@ async fn document_addition(
let (uuid, mut update_file) = index_scheduler.create_update_file()?; let (uuid, mut update_file) = index_scheduler.create_update_file()?;
let temp_file = match tempfile() { let temp_file = match tempfile() {
Ok(temp_file) => temp_file, Ok(file) => file,
Err(e) => { Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))),
return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e))));
}
}; };
let async_file = File::from_std(temp_file); let async_file = File::from_std(temp_file);
let mut buffer = BufWriter::new(async_file); let mut buffer = BufWriter::new(async_file);
let mut buffer_write_size: usize = 0; let mut buffer_write_size: usize = 0;
while let Some(bytes) = body.next().await { while let Some(result) = body.next().await {
let byte = &bytes?; let byte = result?;
if byte.is_empty() && buffer_write_size == 0 { if byte.is_empty() && buffer_write_size == 0 {
return Err(MeilisearchHttpError::MissingPayload(format)); return Err(MeilisearchHttpError::MissingPayload(format));
} }
match buffer.write_all(byte).await { match buffer.write_all(&byte).await {
Ok(()) => buffer_write_size += 1, Ok(()) => buffer_write_size += 1,
Err(e) => { Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))),
return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); }
}
};
} }
if let Err(e) = buffer.flush().await { if let Err(e) = buffer.flush().await {
return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
} }
if buffer_write_size == 0 { if buffer_write_size == 0 {
@ -266,26 +262,24 @@ async fn document_addition(
} }
if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await { if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await {
return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
}; }
let read_file = buffer.into_inner().into_std().await; let read_file = buffer.into_inner().into_std().await;
let documents_count = tokio::task::spawn_blocking(move || {
let documents_count = let documents_count = match format {
tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> { PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?,
let documents_count = match format { PayloadType::Csv => read_csv(&read_file, update_file.as_file_mut())?,
PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?, PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?,
PayloadType::Csv => read_csv(&read_file, update_file.as_file_mut())?, };
PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?, // we NEED to persist the file here because we moved the `udpate_file` in another task.
}; update_file.persist()?;
// we NEED to persist the file here because we moved the `udpate_file` in another task. Ok(documents_count)
update_file.persist()?; })
Ok(documents_count) .await;
})
.await;
let documents_count = match documents_count { let documents_count = match documents_count {
Ok(Ok(documents_count)) => documents_count as u64, Ok(Ok(documents_count)) => documents_count,
// in this case the file has not possibly be persisted. // in this case the file has not possibly be persisted.
Ok(Err(e)) => return Err(e), Ok(Err(e)) => return Err(e),
Err(e) => { Err(e) => {