From 7b08d700f77b0876d767211936d9fcd538c09f00 Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Sat, 3 Dec 2022 18:52:20 +0800 Subject: [PATCH] requested changes fix --- Cargo.lock | 13 +----------- meilisearch-http/src/error.rs | 2 +- .../src/routes/indexes/documents.rs | 21 +++++++++---------- meilisearch-types/Cargo.toml | 3 +-- meilisearch-types/src/document_formats.rs | 20 ++++++++++++------ meilisearch-types/src/error.rs | 5 ----- 6 files changed, 27 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ba0d69d4..ecfc94666 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2376,9 +2376,8 @@ dependencies = [ "flate2", "fst", "insta", - "log", "meili-snap", - "memmap", + "memmap2", "milli", "proptest", "proptest-derive", @@ -2398,16 +2397,6 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" -[[package]] -name = "memmap" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "memmap2" version = "0.5.7" diff --git a/meilisearch-http/src/error.rs b/meilisearch-http/src/error.rs index fd18203fc..13e01ac0a 100644 --- a/meilisearch-http/src/error.rs +++ b/meilisearch-http/src/error.rs @@ -128,7 +128,7 @@ impl ErrorCode for PayloadError { }, PayloadError::MissingPayload => Code::MissingPayload, PayloadError::MalformedPayload(_) => Code::MalformedPayload, - PayloadError::ReceivePayloadErr => Code::ReceivePayloadErr, + PayloadError::ReceivePayloadErr => Code::Internal, } } } diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 8051689d6..821361814 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -26,8 +26,10 @@ use once_cell::sync::Lazy; use serde::Deserialize; use serde_cs::vec::CS; use serde_json::Value; -use std::io::{BufWriter, ErrorKind, Write}; +use std::io::ErrorKind; use tempfile::NamedTempFile; +use tokio::fs::File; +use tokio::io::{AsyncWriteExt, BufWriter}; static ACCEPTED_CONTENT_TYPE: Lazy> = Lazy::new(|| { vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()] @@ -227,14 +229,11 @@ async fn document_addition( let (uuid, mut update_file) = index_scheduler.create_update_file()?; - let err: Result = - Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); - let temp_file = match NamedTempFile::new() { Ok(temp_file) => temp_file, Err(e) => { error!("create a temporary file error: {}", e); - return err; + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); } }; debug!("temp file path: {:?}", temp_file.as_ref()); @@ -242,24 +241,24 @@ async fn document_addition( Ok(buffer_file) => buffer_file, Err(e) => { error!("reopen payload temporary file error: {}", e); - return err; + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); } }; - let mut buffer = BufWriter::new(buffer_file); + let mut buffer = BufWriter::new(File::from_std(buffer_file)); let mut buffer_write_size: usize = 0; while let Some(bytes) = body.next().await { - match buffer.write(&bytes?) { + match buffer.write(&bytes?).await { Ok(size) => buffer_write_size += size, Err(e) => { error!("bufWriter write error: {}", e); - return err; + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); } } } - if let Err(e) = buffer.flush() { + if let Err(e) = buffer.flush().await { error!("bufWriter flush error: {}", e); - return err; + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); }; if buffer_write_size == 0 { diff --git a/meilisearch-types/Cargo.toml b/meilisearch-types/Cargo.toml index 92f32a4b5..7bc66a37e 100644 --- a/meilisearch-types/Cargo.toml +++ b/meilisearch-types/Cargo.toml @@ -12,6 +12,7 @@ either = { version = "1.6.1", features = ["serde"] } enum-iterator = "1.1.3" flate2 = "1.0.24" fst = "0.4.7" +memmap2 = "0.5.7" milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.37.0", default-features = false } proptest = { version = "1.0.0", optional = true } proptest-derive = { version = "0.3.0", optional = true } @@ -23,8 +24,6 @@ thiserror = "1.0.30" time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } tokio = "1.0" uuid = { version = "1.1.2", features = ["serde", "v4"] } -memmap = "0.7.0" -log = "0.4.17" [dev-dependencies] insta = "1.19.1" diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index 3ad8e20e0..9180c0ea8 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -1,8 +1,7 @@ use crate::error::{Code, ErrorCode}; use crate::internal_error; use either::Either; -use log::debug; -use memmap::MmapOptions; +use memmap2::MmapOptions; use milli::documents::{DocumentsBatchBuilder, Error}; use milli::Object; use serde::de::{SeqAccess, Visitor}; @@ -104,7 +103,7 @@ internal_error!(DocumentFormatError: io::Error); /// Reads CSV from input and write an obkv batch to writer. pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); - let mmap = unsafe { MmapOptions::new().map(file).unwrap() }; + let mmap = unsafe { MmapOptions::new().map(file)? }; let csv = csv::Reader::from_reader(mmap.as_ref()); builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?; @@ -131,14 +130,21 @@ fn read_json_inner( payload_type: PayloadType, ) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); - let mmap = unsafe { MmapOptions::new().map(file).unwrap() }; + let mmap = unsafe { MmapOptions::new().map(file)? }; let mut deserializer = serde_json::Deserializer::from_slice(&mmap); match array_each(&mut deserializer, |obj: Object| builder.append_json_object(&obj)) { - Ok(Ok(count)) => debug!("serde json array size: {}", count), + // The json data has been successfully deserialised and does not need to be processed again. + // the data has been successfully transferred to the "update_file" during the deserialisation process. + // count ==0 means an empty array + Ok(Ok(count)) => { + if count == 0 { + return Ok(count as usize); + } + } Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))), + // Prefer deserialization as a json array. Failure to do deserialisation using the traditional method. Err(_e) => { - debug!("deserialize single json"); #[derive(Deserialize, Debug)] #[serde(transparent)] struct ArrayOrSingleObject { @@ -166,6 +172,8 @@ fn read_json_inner( } /** + * The actual handling of the deserialization process in the serde avoids storing the deserialized object in memory. + * Reference: * https://serde.rs/stream-array.html * https://github.com/serde-rs/json/issues/160 */ diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index 5062fc4a0..5c0e1d9b8 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -169,7 +169,6 @@ pub enum Code { MissingContentType, MalformedPayload, MissingPayload, - ReceivePayloadErr, ApiKeyNotFound, MissingParameter, @@ -324,10 +323,6 @@ impl Code { DuplicateIndexFound => { ErrCode::invalid("duplicate_index_found", StatusCode::BAD_REQUEST) } - ReceivePayloadErr => ErrCode::internal( - "receive_payload_internal_exceptions", - StatusCode::INTERNAL_SERVER_ERROR, - ), } }