From 8287c2644fdf850957cd5f190705c966359e4bdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 10 Sep 2024 21:10:28 +0100 Subject: [PATCH] Support CSV again --- meilisearch-types/src/document_formats.rs | 177 ++++++++++++++++---- meilisearch/src/routes/indexes/documents.rs | 16 +- milli/src/update/new/indexer/mod.rs | 1 + milli/src/update/new/mod.rs | 1 + 4 files changed, 147 insertions(+), 48 deletions(-) diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index 942203b68..0b78e4a94 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -1,14 +1,18 @@ use std::fmt::{self, Debug, Display}; use std::fs::File; -use std::io::{self, BufReader, BufWriter, Write}; +use std::io::{self, BufReader, BufWriter, Seek, Write}; use std::marker::PhantomData; +use csv::StringRecord; +use memmap2::Mmap; use milli::documents::Error; +use milli::update::new::TopLevelMap; use milli::Object; use serde::de::{SeqAccess, Visitor}; use serde::{Deserialize, Deserializer}; use serde_json::error::Category; +use crate::error::deserr_codes::MalformedPayload; use crate::error::{Code, ErrorCode}; type Result = std::result::Result; @@ -87,6 +91,16 @@ impl From<(PayloadType, Error)> for DocumentFormatError { } } +impl From<(PayloadType, serde_json::Error)> for DocumentFormatError { + fn from((ty, error): (PayloadType, serde_json::Error)) -> Self { + if error.classify() == Category::Data { + Self::Io(error.into()) + } else { + Self::MalformedPayload(Error::Json(error), ty) + } + } +} + impl From for DocumentFormatError { fn from(error: io::Error) -> Self { Self::Io(error) @@ -102,67 +116,156 @@ impl ErrorCode for DocumentFormatError { } } +// TODO remove that from the place I've borrowed it +#[derive(Debug)] +enum AllowedType { + String, + Boolean, + Number, +} + +fn parse_csv_header(header: &str) -> (&str, AllowedType) { + // if there are several separators we only split on the last one. + match header.rsplit_once(':') { + Some((field_name, field_type)) => match field_type { + "string" => (field_name, AllowedType::String), + "boolean" => (field_name, AllowedType::Boolean), + "number" => (field_name, AllowedType::Number), + // if the pattern isn't recognized, we keep the whole field. + _otherwise => (header, AllowedType::String), + }, + None => (header, AllowedType::String), + } +} + /// Reads CSV from input and write an obkv batch to writer. -pub fn read_csv( - _input: BufReader, - _output: &mut BufWriter, - _delimiter: u8, -) -> Result { - todo!() - // let mut builder = DocumentsBatchBuilder::new(BufWriter::new(output)); - // let mmap = unsafe { MmapOptions::new().map(input)? }; - // let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref()); - // builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?; +pub fn read_csv(input: &File, output: impl io::Write, delimiter: u8) -> Result { + use serde_json::{Map, Value}; - // let count = builder.documents_count(); - // let _ = builder.into_inner().map_err(DocumentFormatError::Io)?; + let mut output = BufWriter::new(output); + let mut reader = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(input); - // Ok(count as u64) + // TODO manage error correctly + // Make sure that we insert the fields ids in order as the obkv writer has this requirement. + let mut typed_fields: Vec<_> = reader + .headers() + .unwrap() + .into_iter() + .map(parse_csv_header) + .map(|(f, t)| (f.to_string(), t)) + .collect(); + + let mut object: Map<_, _> = + reader.headers().unwrap().iter().map(|k| (k.to_string(), Value::Null)).collect(); + + let mut line: usize = 0; + let mut record = csv::StringRecord::new(); + while reader.read_record(&mut record).unwrap() { + // We increment here and not at the end of the while loop to take + // the header offset into account. + line += 1; + + // Reset the document to write + object.iter_mut().for_each(|(_, v)| *v = Value::Null); + + for (i, (name, type_)) in typed_fields.iter().enumerate() { + let value = &record[i]; + let trimmed_value = value.trim(); + let value = match type_ { + AllowedType::Number if trimmed_value.is_empty() => Value::Null, + AllowedType::Number => match trimmed_value.parse::() { + Ok(integer) => Value::from(integer), + Err(_) => { + match trimmed_value.parse::() { + Ok(float) => Value::from(float), + Err(error) => { + panic!("bad float") + // return Err(Error::ParseFloat { + // error, + // line, + // value: value.to_string(), + // }); + } + } + } + }, + AllowedType::Boolean if trimmed_value.is_empty() => Value::Null, + AllowedType::Boolean => match trimmed_value.parse::() { + Ok(bool) => Value::from(bool), + Err(error) => { + panic!("bad bool") + // return Err(Error::ParseBool { + // error, + // line, + // value: value.to_string(), + // }); + } + }, + AllowedType::String if value.is_empty() => Value::Null, + AllowedType::String => Value::from(value), + }; + + *object.get_mut(name).unwrap() = value; + } + + serde_json::to_writer(&mut output, &object).unwrap(); + } + + Ok(line.saturating_sub(1) as u64) } /// Reads JSON from temporary file and write an obkv batch to writer. -pub fn read_json(input: BufReader, mut output: &mut BufWriter) -> Result { +pub fn read_json(input: &File, output: impl io::Write) -> Result { + // We memory map to be able to deserailize into a TopLevelMap<'pl> that + // does not allocate when possible and only materialize the first/top level. + let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? }; + + let mut deserializer = serde_json::Deserializer::from_slice(&input); + let mut output = BufWriter::new(output); let mut count = 0; - let mut deserializer = serde_json::Deserializer::from_reader(input); - match array_each(&mut deserializer, |obj: Object| { + + let count_and_write = |obj: TopLevelMap| { count += 1; serde_json::to_writer(&mut output, &obj) - }) { + }; + + match array_each(&mut deserializer, count_and_write) { // The json data has been deserialized and does not need to be processed again. // The data has been transferred to the writer during the deserialization process. Ok(Ok(_)) => (), - Ok(Err(e)) => return Err(DocumentFormatError::Io(e.into())), + Ok(Err(e)) => return Err(DocumentFormatError::from((PayloadType::Json, e))), Err(e) => { // Attempt to deserialize a single json string when the cause of the exception is not Category.data // Other types of deserialisation exceptions are returned directly to the front-end - if e.classify() != serde_json::error::Category::Data { - return Err(DocumentFormatError::MalformedPayload( - Error::Json(e), - PayloadType::Json, - )); + if e.classify() != Category::Data { + return Err(DocumentFormatError::from((PayloadType::Json, e))); } - todo!("single document/object update") - - // let content: Object = serde_json::from_slice(&mmap) - // .map_err(Error::Json) - // .map_err(|e| (PayloadType::Json, e))?; - // serde_json::to_writer(&mut output, &content).unwrap() + let content: Object = serde_json::from_slice(&input) + .map_err(Error::Json) + .map_err(|e| (PayloadType::Json, e))?; + serde_json::to_writer(&mut output, &content).unwrap() } } - Ok(count) + match output.into_inner() { + Ok(_) => Ok(count), + Err(ie) => Err(DocumentFormatError::Io(ie.into_error())), + } } /// Reads JSON from temporary file and write it into the writer. -pub fn read_ndjson(input: BufReader, mut output: &mut BufWriter) -> Result { +pub fn read_ndjson(input: &File, mut output: impl io::Write) -> Result { + // We memory map to be able to deserailize into a TopLevelMap<'pl> that + // does not allocate when possible and only materialize the first/top level. + let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? }; + let mut count = 0; - for result in serde_json::Deserializer::from_reader(input).into_iter() { + for result in serde_json::Deserializer::from_slice(&input).into_iter() { count += 1; - // TODO Correctly manage the errors - // Avoid copying the content: use CowStr from milli (move it elsewhere) - let map: Object = result.unwrap(); - serde_json::to_writer(&mut output, &map).unwrap(); + result + .and_then(|map: TopLevelMap| serde_json::to_writer(&mut output, &map)) + .map_err(|e| DocumentFormatError::from((PayloadType::Ndjson, e)))?; } Ok(count) diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 87b448051..055685151 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -423,7 +423,7 @@ async fn document_addition( } }; - let (uuid, update_file) = index_scheduler.create_update_file(dry_run)?; + let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?; let temp_file = match tempfile() { Ok(file) => file, @@ -459,20 +459,14 @@ async fn document_addition( return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); } - let read_file = BufReader::new(buffer.into_inner().into_std().await); + let read_file = buffer.into_inner().into_std().await; let documents_count = tokio::task::spawn_blocking(move || { - let mut update_file = std::io::BufWriter::new(update_file); let documents_count = match format { - PayloadType::Json => read_json(read_file, &mut update_file)?, - PayloadType::Csv { delimiter } => read_csv(read_file, &mut update_file, delimiter)?, - PayloadType::Ndjson => read_ndjson(read_file, &mut update_file)?, + PayloadType::Json => read_json(&read_file, &mut update_file)?, + PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?, + PayloadType::Ndjson => read_ndjson(&read_file, &mut update_file)?, }; // we NEED to persist the file here because we moved the `udpate_file` in another task. - // TODO better support of errors - let update_file = match update_file.into_inner() { - Ok(update_file) => update_file, - Err(_) => todo!("handle errors"), - }; update_file.persist()?; Ok(documents_count) }) diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 4d7e2aa47..e80b07671 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -9,6 +9,7 @@ use heed::{RoTxn, RwTxn}; pub use partial_dump::PartialDump; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::ThreadPool; +pub use top_level_map::{CowStr, TopLevelMap}; pub use update_by_function::UpdateByFunction; use super::channel::*; diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 31a017c12..3f5c4b3c9 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -1,4 +1,5 @@ pub use document_change::{Deletion, DocumentChange, Insertion, Update}; +pub use indexer::{CowStr, TopLevelMap}; pub use items_pool::ItemsPool; use super::del_add::DelAdd;