From 271685cceb06f9203f1f7298e5c21ce0c6e4b11f Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 13 Dec 2022 14:52:10 +0100 Subject: [PATCH 1/5] Simplify the code when array_each failed --- meilisearch-types/src/document_formats.rs | 47 +++++++++-------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index 8357690cd..d6ca578ff 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -4,10 +4,8 @@ use std::fs::File; use std::io::{self, Seek, Write}; use std::marker::PhantomData; -use either::Either; use memmap2::MmapOptions; use milli::documents::{DocumentsBatchBuilder, Error}; -use milli::Object; use serde::de::{SeqAccess, Visitor}; use serde::{Deserialize, Deserializer}; use serde_json::error::Category; @@ -125,7 +123,7 @@ pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result { read_json_inner(file, writer, PayloadType::Ndjson) } -/// Reads JSON from temporary file and write an obkv batch to writer. +/// Reads JSON from temporary file and write an obkv batch to writer. fn read_json_inner( file: &File, writer: impl Write + Seek, @@ -135,35 +133,26 @@ fn read_json_inner( let mmap = unsafe { MmapOptions::new().map(file)? }; let mut deserializer = serde_json::Deserializer::from_slice(&mmap); - match array_each(&mut deserializer, |obj: Object| builder.append_json_object(&obj)) { - // The json data has been successfully deserialised and does not need to be processed again. - // the data has been successfully transferred to the "update_file" during the deserialisation process. - // count ==0 means an empty array + match array_each(&mut deserializer, |obj| builder.append_json_object(&obj)) { + // The json data has been deserialized and does not need to be processed again. + // The data has been transferred to the writer during the deserialization process. Ok(Ok(count)) => { if count == 0 { return Ok(count as usize); } } Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))), - // Prefer deserialization as a json array. Failure to do deserialisation using the traditional method. - Err(_e) => { - #[derive(Deserialize, Debug)] - #[serde(transparent)] - struct ArrayOrSingleObject { - #[serde(with = "either::serde_untagged")] - inner: Either, Object>, - } - - let content: ArrayOrSingleObject = serde_json::from_reader(file) + Err(_) => { + // If we cannot deserialize the content as an array of object then + // we try to deserialize it as a single JSON object. + let object = serde_json::from_reader(file) .map_err(Error::Json) .map_err(|e| (payload_type, e))?; - for object in content.inner.map_right(|o| vec![o]).into_inner() { - builder - .append_json_object(&object) - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - } + builder + .append_json_object(&object) + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; } } @@ -173,12 +162,12 @@ fn read_json_inner( Ok(count as usize) } -/** - * The actual handling of the deserialization process in the serde avoids storing the deserialized object in memory. - * Reference: - * https://serde.rs/stream-array.html - * https://github.com/serde-rs/json/issues/160 - */ +/// The actual handling of the deserialization process in serde +/// avoids storing the deserialized object in memory. +/// +/// ## References +/// +/// fn array_each<'de, D, T, F>(deserializer: D, f: F) -> std::result::Result, D::Error> where D: Deserializer<'de>, From 526793b5b29b31325e078843fa3f34a336709804 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 13 Dec 2022 14:58:32 +0100 Subject: [PATCH 2/5] Handle empty arrays the same way we handle other arrays --- meilisearch-types/src/document_formats.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index d6ca578ff..f3b4d090f 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -136,11 +136,7 @@ fn read_json_inner( match array_each(&mut deserializer, |obj| builder.append_json_object(&obj)) { // The json data has been deserialized and does not need to be processed again. // The data has been transferred to the writer during the deserialization process. - Ok(Ok(count)) => { - if count == 0 { - return Ok(count as usize); - } - } + Ok(Ok(_)) => (), Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))), Err(_) => { // If we cannot deserialize the content as an array of object then From 5d5615ef453001996b69ce1639df2d4154694741 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 13 Dec 2022 15:07:35 +0100 Subject: [PATCH 3/5] Rename the ReceivePayload error variant --- meilisearch/src/error.rs | 6 +-- meilisearch/src/routes/indexes/documents.rs | 50 +++++++++------------ 2 files changed, 25 insertions(+), 31 deletions(-) diff --git a/meilisearch/src/error.rs b/meilisearch/src/error.rs index 53b16f9f5..ce3d383c3 100644 --- a/meilisearch/src/error.rs +++ b/meilisearch/src/error.rs @@ -95,8 +95,8 @@ pub enum PayloadError { MalformedPayload(serde_json::error::Error), #[error("A json payload is missing.")] MissingPayload, - #[error("Error while writing the playload to disk: `{0}`.")] - ReceivePayloadErr(Box), + #[error("Error while receiving the playload. `{0}`.")] + ReceivePayload(Box), } impl ErrorCode for PayloadError { @@ -128,7 +128,7 @@ impl ErrorCode for PayloadError { }, PayloadError::MissingPayload => Code::MissingPayload, PayloadError::MalformedPayload(_) => Code::MalformedPayload, - PayloadError::ReceivePayloadErr(_) => Code::Internal, + PayloadError::ReceivePayload(_) => Code::Internal, } } } diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 5353c1506..fa769a8c7 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -26,7 +26,7 @@ use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter}; use crate::analytics::{Analytics, DocumentDeletionKind}; use crate::error::MeilisearchHttpError; -use crate::error::PayloadError::ReceivePayloadErr; +use crate::error::PayloadError::ReceivePayload; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::payload::Payload; @@ -232,33 +232,29 @@ async fn document_addition( let (uuid, mut update_file) = index_scheduler.create_update_file()?; let temp_file = match tempfile() { - Ok(temp_file) => temp_file, - Err(e) => { - return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); - } + Ok(file) => file, + Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))), }; let async_file = File::from_std(temp_file); let mut buffer = BufWriter::new(async_file); let mut buffer_write_size: usize = 0; - while let Some(bytes) = body.next().await { - let byte = &bytes?; + while let Some(result) = body.next().await { + let byte = result?; if byte.is_empty() && buffer_write_size == 0 { return Err(MeilisearchHttpError::MissingPayload(format)); } - match buffer.write_all(byte).await { + match buffer.write_all(&byte).await { Ok(()) => buffer_write_size += 1, - Err(e) => { - return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); - } - }; + Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))), + } } if let Err(e) = buffer.flush().await { - return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); + return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); } if buffer_write_size == 0 { @@ -266,23 +262,21 @@ async fn document_addition( } if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await { - return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); - }; + return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); + } let read_file = buffer.into_inner().into_std().await; - - let documents_count = - tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> { - let documents_count = match format { - PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?, - PayloadType::Csv => read_csv(&read_file, update_file.as_file_mut())?, - PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?, - }; - // we NEED to persist the file here because we moved the `udpate_file` in another task. - update_file.persist()?; - Ok(documents_count) - }) - .await; + let documents_count = tokio::task::spawn_blocking(move || { + let documents_count = match format { + PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?, + PayloadType::Csv => read_csv(&read_file, update_file.as_file_mut())?, + PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?, + }; + // we NEED to persist the file here because we moved the `udpate_file` in another task. + update_file.persist()?; + Ok(documents_count) + }) + .await; let documents_count = match documents_count { Ok(Ok(documents_count)) => documents_count as u64, From 7b2f2a4f9c93c26b723fe2ecbaf5948f35c0a6ae Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 13 Dec 2022 15:10:51 +0100 Subject: [PATCH 4/5] Do only one convertion to u64 --- index-scheduler/src/lib.rs | 2 +- meilisearch-types/src/document_formats.rs | 12 ++++++------ meilisearch/src/routes/indexes/documents.rs | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 3ad546eb4..1e551f9f8 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1190,7 +1190,7 @@ mod tests { pub fn read_json( bytes: &[u8], write: impl Write + Seek, - ) -> std::result::Result { + ) -> std::result::Result { let temp_file = NamedTempFile::new().unwrap(); let mut buffer = BufWriter::new(temp_file.reopen().unwrap()); buffer.write_all(bytes).unwrap(); diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index f3b4d090f..dc9555fd9 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -101,7 +101,7 @@ impl ErrorCode for DocumentFormatError { internal_error!(DocumentFormatError: io::Error); /// Reads CSV from input and write an obkv batch to writer. -pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result { +pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); let mmap = unsafe { MmapOptions::new().map(file)? }; let csv = csv::Reader::from_reader(mmap.as_ref()); @@ -110,16 +110,16 @@ pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result { let count = builder.documents_count(); let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Internal)?; - Ok(count as usize) + Ok(count as u64) } /// Reads JSON from temporary file and write an obkv batch to writer. -pub fn read_json(file: &File, writer: impl Write + Seek) -> Result { +pub fn read_json(file: &File, writer: impl Write + Seek) -> Result { read_json_inner(file, writer, PayloadType::Json) } /// Reads JSON from temporary file and write an obkv batch to writer. -pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result { +pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result { read_json_inner(file, writer, PayloadType::Ndjson) } @@ -128,7 +128,7 @@ fn read_json_inner( file: &File, writer: impl Write + Seek, payload_type: PayloadType, -) -> Result { +) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); let mmap = unsafe { MmapOptions::new().map(file)? }; let mut deserializer = serde_json::Deserializer::from_slice(&mmap); @@ -155,7 +155,7 @@ fn read_json_inner( let count = builder.documents_count(); let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Internal)?; - Ok(count as usize) + Ok(count as u64) } /// The actual handling of the deserialization process in serde diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index fa769a8c7..ce2df00a0 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -279,7 +279,7 @@ async fn document_addition( .await; let documents_count = match documents_count { - Ok(Ok(documents_count)) => documents_count as u64, + Ok(Ok(documents_count)) => documents_count, // in this case the file has not possibly be persisted. Ok(Err(e)) => return Err(e), Err(e) => { From a08cc8298321366b06593a97dc1076163a256f25 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 13 Dec 2022 16:29:49 +0100 Subject: [PATCH 5/5] Revert "Simplify the code when array_each failed" This reverts commit 271685cceb06f9203f1f7298e5c21ce0c6e4b11f. --- meilisearch-types/src/document_formats.rs | 29 ++++++++++++++++------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index dc9555fd9..5eee63afc 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -4,8 +4,10 @@ use std::fs::File; use std::io::{self, Seek, Write}; use std::marker::PhantomData; +use either::Either; use memmap2::MmapOptions; use milli::documents::{DocumentsBatchBuilder, Error}; +use milli::Object; use serde::de::{SeqAccess, Visitor}; use serde::{Deserialize, Deserializer}; use serde_json::error::Category; @@ -123,7 +125,7 @@ pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result { read_json_inner(file, writer, PayloadType::Ndjson) } -/// Reads JSON from temporary file and write an obkv batch to writer. +/// Reads JSON from temporary file and write an obkv batch to writer. fn read_json_inner( file: &File, writer: impl Write + Seek, @@ -138,17 +140,26 @@ fn read_json_inner( // The data has been transferred to the writer during the deserialization process. Ok(Ok(_)) => (), Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))), - Err(_) => { - // If we cannot deserialize the content as an array of object then - // we try to deserialize it as a single JSON object. - let object = serde_json::from_reader(file) + Err(_e) => { + // If we cannot deserialize the content as an array of object then we try + // to deserialize it with the original method to keep correct error messages. + #[derive(Deserialize, Debug)] + #[serde(transparent)] + struct ArrayOrSingleObject { + #[serde(with = "either::serde_untagged")] + inner: Either, Object>, + } + + let content: ArrayOrSingleObject = serde_json::from_reader(file) .map_err(Error::Json) .map_err(|e| (payload_type, e))?; - builder - .append_json_object(&object) - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; + for object in content.inner.map_right(|o| vec![o]).into_inner() { + builder + .append_json_object(&object) + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + } } }