diff --git a/Cargo.lock b/Cargo.lock index 9476506ec..349bed5db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3577,6 +3577,7 @@ dependencies = [ "memmap2", "milli", "roaring", + "rustc-hash 2.1.0", "serde", "serde-cs", "serde_json", @@ -3676,7 +3677,7 @@ dependencies = [ "rhai", "roaring", "rstar", - "rustc-hash 2.0.0", + "rustc-hash 2.1.0", "serde", "serde_json", "slice-group-by", @@ -4425,7 +4426,7 @@ dependencies = [ "bytes", "rand", "ring", - "rustc-hash 2.0.0", + "rustc-hash 2.1.0", "rustls", "slab", "thiserror", @@ -4798,9 +4799,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustc-hash" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" +checksum = "c7fb8039b3032c191086b10f11f319a6e99e1e82889c5cc6046f515c9db1d497" [[package]] name = "rustc_version" diff --git a/crates/file-store/src/lib.rs b/crates/file-store/src/lib.rs index c8b3849ab..39ed9482b 100644 --- a/crates/file-store/src/lib.rs +++ b/crates/file-store/src/lib.rs @@ -136,6 +136,14 @@ pub struct File { } impl File { + pub fn from_parts(path: PathBuf, file: Option) -> Self { + Self { path, file } + } + + pub fn into_parts(self) -> (PathBuf, Option) { + (self.path, self.file) + } + pub fn dry_file() -> Result { Ok(Self { path: PathBuf::new(), file: None }) } diff --git a/crates/meilisearch-types/Cargo.toml b/crates/meilisearch-types/Cargo.toml index e81e6dd35..76d8d11ca 100644 --- a/crates/meilisearch-types/Cargo.toml +++ b/crates/meilisearch-types/Cargo.toml @@ -26,6 +26,7 @@ memmap2 = "0.9.4" milli = { path = "../milli" } bumparaw-collections = "0.1.2" roaring = { version = "0.10.7", features = ["serde"] } +rustc-hash = "2.1.0" serde = { version = "1.0.204", features = ["derive"] } serde-cs = "0.2.4" serde_json = "1.0.120" diff --git a/crates/meilisearch-types/src/document_formats.rs b/crates/meilisearch-types/src/document_formats.rs index c6e8ad907..70a0e6204 100644 --- a/crates/meilisearch-types/src/document_formats.rs +++ b/crates/meilisearch-types/src/document_formats.rs @@ -8,6 +8,7 @@ use bumparaw_collections::RawMap; use memmap2::Mmap; use milli::documents::Error; use milli::Object; +use rustc_hash::FxBuildHasher; use serde::de::{SeqAccess, Visitor}; use serde::{Deserialize, Deserializer}; use serde_json::error::Category; @@ -220,7 +221,7 @@ pub fn read_json(input: &File, output: impl io::Write) -> Result { let mut deserializer = serde_json::Deserializer::from_slice(&input); let res = array_each(&mut deserializer, |obj: &RawValue| { doc_alloc.reset(); - let map = RawMap::from_raw_value(obj, &doc_alloc)?; + let map = RawMap::from_raw_value_and_hasher(obj, FxBuildHasher, &doc_alloc)?; to_writer(&mut out, &map) }); let count = match res { @@ -250,26 +251,25 @@ pub fn read_json(input: &File, output: impl io::Write) -> Result { } } -/// Reads NDJSON from file and write it in NDJSON in a file checking it along the way. -pub fn read_ndjson(input: &File, output: impl io::Write) -> Result { +/// Reads NDJSON from file and checks it. +pub fn read_ndjson(input: &File) -> Result { // We memory map to be able to deserialize into a RawMap that // does not allocate when possible and only materialize the first/top level. let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? }; - let mut output = BufWriter::new(output); - let mut bump = Bump::with_capacity(1024 * 1024); let mut count = 0; for result in serde_json::Deserializer::from_slice(&input).into_iter() { bump.reset(); - count += 1; - result - .and_then(|raw: &RawValue| { + match result { + Ok(raw) => { // try to deserialize as a map - let map = RawMap::from_raw_value(raw, &bump)?; - to_writer(&mut output, &map) - }) - .map_err(|e| DocumentFormatError::from((PayloadType::Ndjson, e)))?; + RawMap::from_raw_value_and_hasher(raw, FxBuildHasher, &bump) + .map_err(|e| DocumentFormatError::from((PayloadType::Ndjson, e)))?; + count += 1; + } + Err(e) => return Err(DocumentFormatError::from((PayloadType::Ndjson, e))), + } } Ok(count) diff --git a/crates/meilisearch/src/routes/indexes/documents.rs b/crates/meilisearch/src/routes/indexes/documents.rs index 47f73ef42..5f79000bd 100644 --- a/crates/meilisearch/src/routes/indexes/documents.rs +++ b/crates/meilisearch/src/routes/indexes/documents.rs @@ -1,5 +1,5 @@ use std::collections::HashSet; -use std::io::ErrorKind; +use std::io::{ErrorKind, Seek as _}; use std::marker::PhantomData; use actix_web::http::header::CONTENT_TYPE; @@ -572,7 +572,7 @@ async fn document_addition( index_uid: IndexUid, primary_key: Option, csv_delimiter: Option, - mut body: Payload, + body: Payload, method: IndexDocumentsMethod, task_id: Option, dry_run: bool, @@ -609,54 +609,60 @@ async fn document_addition( }; let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?; + let documents_count = match format { + PayloadType::Ndjson => { + let (path, file) = update_file.into_parts(); + let file = match file { + Some(file) => { + let (file, path) = file.into_parts(); + let mut file = copy_body_to_file(file, body, format).await?; + file.rewind().map_err(|e| { + index_scheduler::Error::FileStore(file_store::Error::IoError(e)) + })?; + Some(tempfile::NamedTempFile::from_parts(file, path)) + } + None => None, + }; - let temp_file = match tempfile() { - Ok(file) => file, - Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))), + let documents_count = tokio::task::spawn_blocking(move || { + let documents_count = file.as_ref().map_or(Ok(0), |ntf| { + read_ndjson(ntf.as_file()).map_err(MeilisearchHttpError::DocumentFormat) + })?; + + let update_file = file_store::File::from_parts(path, file); + update_file.persist()?; + + Ok(documents_count) + }) + .await?; + + Ok(documents_count) + } + PayloadType::Json | PayloadType::Csv { delimiter: _ } => { + let temp_file = match tempfile() { + Ok(file) => file, + Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))), + }; + + let read_file = copy_body_to_file(temp_file, body, format).await?; + tokio::task::spawn_blocking(move || { + let documents_count = match format { + PayloadType::Json => read_json(&read_file, &mut update_file)?, + PayloadType::Csv { delimiter } => { + read_csv(&read_file, &mut update_file, delimiter)? + } + PayloadType::Ndjson => { + unreachable!("We already wrote the user content into the update file") + } + }; + // we NEED to persist the file here because we moved the `udpate_file` in another task. + update_file.persist()?; + Ok(documents_count) + }) + .await + } }; - let async_file = File::from_std(temp_file); - let mut buffer = BufWriter::new(async_file); - - let mut buffer_write_size: usize = 0; - while let Some(result) = body.next().await { - let byte = result?; - - if byte.is_empty() && buffer_write_size == 0 { - return Err(MeilisearchHttpError::MissingPayload(format)); - } - - match buffer.write_all(&byte).await { - Ok(()) => buffer_write_size += 1, - Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))), - } - } - - if let Err(e) = buffer.flush().await { - return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); - } - - if buffer_write_size == 0 { - return Err(MeilisearchHttpError::MissingPayload(format)); - } - - if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await { - return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); - } - - let read_file = buffer.into_inner().into_std().await; - let documents_count = tokio::task::spawn_blocking(move || { - let documents_count = match format { - PayloadType::Json => read_json(&read_file, &mut update_file)?, - PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?, - PayloadType::Ndjson => read_ndjson(&read_file, &mut update_file)?, - }; - // we NEED to persist the file here because we moved the `udpate_file` in another task. - update_file.persist()?; - Ok(documents_count) - }) - .await; - let documents_count = match documents_count { Ok(Ok(documents_count)) => documents_count, // in this case the file has not possibly be persisted. @@ -703,6 +709,39 @@ async fn document_addition( Ok(task.into()) } +async fn copy_body_to_file( + output: std::fs::File, + mut body: Payload, + format: PayloadType, +) -> Result { + let async_file = File::from_std(output); + let mut buffer = BufWriter::new(async_file); + let mut buffer_write_size: usize = 0; + while let Some(result) = body.next().await { + let byte = result?; + + if byte.is_empty() && buffer_write_size == 0 { + return Err(MeilisearchHttpError::MissingPayload(format)); + } + + match buffer.write_all(&byte).await { + Ok(()) => buffer_write_size += 1, + Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))), + } + } + if let Err(e) = buffer.flush().await { + return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); + } + if buffer_write_size == 0 { + return Err(MeilisearchHttpError::MissingPayload(format)); + } + if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await { + return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); + } + let read_file = buffer.into_inner().into_std().await; + Ok(read_file) +} + pub async fn delete_documents_batch( index_scheduler: GuardedData, Data>, index_uid: web::Path,