From 8d97b7b28cbab9444e023b5e3cf822151a8b0127 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 10 Sep 2024 17:09:49 +0100 Subject: [PATCH] Support JSON payloads again (not perfectly though) --- meilisearch-types/src/document_formats.rs | 91 +++++++++++---------- meilisearch/src/routes/indexes/documents.rs | 19 +++-- 2 files changed, 60 insertions(+), 50 deletions(-) diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index 50dc5bad4..942203b68 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -1,10 +1,9 @@ use std::fmt::{self, Debug, Display}; use std::fs::File; -use std::io::{self, BufWriter, Write}; +use std::io::{self, BufReader, BufWriter, Write}; use std::marker::PhantomData; -use memmap2::MmapOptions; -use milli::documents::{DocumentsBatchBuilder, Error}; +use milli::documents::Error; use milli::Object; use serde::de::{SeqAccess, Visitor}; use serde::{Deserialize, Deserializer}; @@ -104,29 +103,35 @@ impl ErrorCode for DocumentFormatError { } /// Reads CSV from input and write an obkv batch to writer. -pub fn read_csv(file: &File, writer: impl Write, delimiter: u8) -> Result { - let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer)); - let mmap = unsafe { MmapOptions::new().map(file)? }; - let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref()); - builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?; +pub fn read_csv( + _input: BufReader, + _output: &mut BufWriter, + _delimiter: u8, +) -> Result { + todo!() + // let mut builder = DocumentsBatchBuilder::new(BufWriter::new(output)); + // let mmap = unsafe { MmapOptions::new().map(input)? }; + // let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref()); + // builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?; - let count = builder.documents_count(); - let _ = builder.into_inner().map_err(DocumentFormatError::Io)?; + // let count = builder.documents_count(); + // let _ = builder.into_inner().map_err(DocumentFormatError::Io)?; - Ok(count as u64) + // Ok(count as u64) } /// Reads JSON from temporary file and write an obkv batch to writer. -pub fn read_json(file: &File, writer: impl Write) -> Result { - let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer)); - let mmap = unsafe { MmapOptions::new().map(file)? }; - let mut deserializer = serde_json::Deserializer::from_slice(&mmap); - - match array_each(&mut deserializer, |obj| builder.append_json_object(&obj)) { +pub fn read_json(input: BufReader, mut output: &mut BufWriter) -> Result { + let mut count = 0; + let mut deserializer = serde_json::Deserializer::from_reader(input); + match array_each(&mut deserializer, |obj: Object| { + count += 1; + serde_json::to_writer(&mut output, &obj) + }) { // The json data has been deserialized and does not need to be processed again. // The data has been transferred to the writer during the deserialization process. Ok(Ok(_)) => (), - Ok(Err(e)) => return Err(DocumentFormatError::Io(e)), + Ok(Err(e)) => return Err(DocumentFormatError::Io(e.into())), Err(e) => { // Attempt to deserialize a single json string when the cause of the exception is not Category.data // Other types of deserialisation exceptions are returned directly to the front-end @@ -137,33 +142,30 @@ pub fn read_json(file: &File, writer: impl Write) -> Result { )); } - let content: Object = serde_json::from_slice(&mmap) - .map_err(Error::Json) - .map_err(|e| (PayloadType::Json, e))?; - builder.append_json_object(&content).map_err(DocumentFormatError::Io)?; + todo!("single document/object update") + + // let content: Object = serde_json::from_slice(&mmap) + // .map_err(Error::Json) + // .map_err(|e| (PayloadType::Json, e))?; + // serde_json::to_writer(&mut output, &content).unwrap() } } - let count = builder.documents_count(); - let _ = builder.into_inner().map_err(DocumentFormatError::Io)?; - - Ok(count as u64) + Ok(count) } -/// Reads JSON from temporary file and write an obkv batch to writer. -pub fn read_ndjson(file: &File, writer: impl Write) -> Result { - let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer)); - let mmap = unsafe { MmapOptions::new().map(file)? }; - - for result in serde_json::Deserializer::from_slice(&mmap).into_iter() { - let object = result.map_err(Error::Json).map_err(|e| (PayloadType::Ndjson, e))?; - builder.append_json_object(&object).map_err(Into::into).map_err(DocumentFormatError::Io)?; +/// Reads JSON from temporary file and write it into the writer. +pub fn read_ndjson(input: BufReader, mut output: &mut BufWriter) -> Result { + let mut count = 0; + for result in serde_json::Deserializer::from_reader(input).into_iter() { + count += 1; + // TODO Correctly manage the errors + // Avoid copying the content: use CowStr from milli (move it elsewhere) + let map: Object = result.unwrap(); + serde_json::to_writer(&mut output, &map).unwrap(); } - let count = builder.documents_count(); - let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Io)?; - - Ok(count as u64) + Ok(count) } /// The actual handling of the deserialization process in serde @@ -172,20 +174,23 @@ pub fn read_ndjson(file: &File, writer: impl Write) -> Result { /// ## References /// /// -fn array_each<'de, D, T, F>(deserializer: D, f: F) -> std::result::Result, D::Error> +fn array_each<'de, D, T, F>( + deserializer: D, + f: F, +) -> std::result::Result, D::Error> where D: Deserializer<'de>, T: Deserialize<'de>, - F: FnMut(T) -> io::Result<()>, + F: FnMut(T) -> serde_json::Result<()>, { struct SeqVisitor(F, PhantomData); impl<'de, T, F> Visitor<'de> for SeqVisitor where T: Deserialize<'de>, - F: FnMut(T) -> io::Result<()>, + F: FnMut(T) -> serde_json::Result<()>, { - type Value = io::Result; + type Value = serde_json::Result; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { formatter.write_str("a nonempty sequence") @@ -194,7 +199,7 @@ where fn visit_seq( mut self, mut seq: A, - ) -> std::result::Result, >::Error> + ) -> std::result::Result, >::Error> where A: SeqAccess<'de>, { diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 029a125d0..87b448051 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -1,4 +1,4 @@ -use std::io::ErrorKind; +use std::io::{BufReader, ErrorKind}; use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Data; @@ -423,7 +423,7 @@ async fn document_addition( } }; - let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?; + let (uuid, update_file) = index_scheduler.create_update_file(dry_run)?; let temp_file = match tempfile() { Ok(file) => file, @@ -459,15 +459,20 @@ async fn document_addition( return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); } - let mut read_file = buffer.into_inner().into_std().await; + let read_file = BufReader::new(buffer.into_inner().into_std().await); let documents_count = tokio::task::spawn_blocking(move || { + let mut update_file = std::io::BufWriter::new(update_file); let documents_count = match format { - PayloadType::Json => read_json(&read_file, &mut update_file)?, - PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?, - /// TODO do not copy all the content - PayloadType::Ndjson => std::io::copy(&mut read_file, &mut update_file).unwrap(), + PayloadType::Json => read_json(read_file, &mut update_file)?, + PayloadType::Csv { delimiter } => read_csv(read_file, &mut update_file, delimiter)?, + PayloadType::Ndjson => read_ndjson(read_file, &mut update_file)?, }; // we NEED to persist the file here because we moved the `udpate_file` in another task. + // TODO better support of errors + let update_file = match update_file.into_inner() { + Ok(update_file) => update_file, + Err(_) => todo!("handle errors"), + }; update_file.persist()?; Ok(documents_count) })