diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs index de3d7f5d5..5b224cf49 100644 --- a/meilisearch-lib/src/document_formats.rs +++ b/meilisearch-lib/src/document_formats.rs @@ -1,10 +1,10 @@ use std::borrow::Borrow; use std::fmt::{self, Debug, Display}; -use std::io::{self, BufRead, BufReader, BufWriter, Cursor, Read, Seek, Write}; +use std::io::{self, BufReader, Read, Seek, Write}; use meilisearch_types::error::{Code, ErrorCode}; use meilisearch_types::internal_error; -use milli::documents::DocumentBatchBuilder; +use milli::documents::{DocumentsBatchBuilder, Error}; type Result = std::result::Result; @@ -18,9 +18,9 @@ pub enum PayloadType { impl fmt::Display for PayloadType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - PayloadType::Ndjson => write!(f, "ndjson"), - PayloadType::Json => write!(f, "json"), - PayloadType::Csv => write!(f, "csv"), + PayloadType::Ndjson => f.write_str("ndjson"), + PayloadType::Json => f.write_str("json"), + PayloadType::Csv => f.write_str("csv"), } } } @@ -28,7 +28,7 @@ impl fmt::Display for PayloadType { #[derive(Debug)] pub enum DocumentFormatError { Internal(Box), - MalformedPayload(Box, PayloadType), + MalformedPayload(Error, PayloadType), } impl Display for DocumentFormatError { @@ -36,7 +36,7 @@ impl Display for DocumentFormatError { match self { Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e), Self::MalformedPayload(me, b) => match me.borrow() { - milli::documents::Error::JsonError(se) => { + Error::Json(se) => { // https://github.com/meilisearch/meilisearch/issues/2107 // The user input maybe insanely long. We need to truncate it. let mut serde_msg = se.to_string(); @@ -59,11 +59,11 @@ impl Display for DocumentFormatError { impl std::error::Error for DocumentFormatError {} -impl From<(PayloadType, milli::documents::Error)> for DocumentFormatError { - fn from((ty, error): (PayloadType, milli::documents::Error)) -> Self { +impl From<(PayloadType, Error)> for DocumentFormatError { + fn from((ty, error): (PayloadType, Error)) -> Self { match error { - milli::documents::Error::Io(e) => Self::Internal(Box::new(e)), - e => Self::MalformedPayload(Box::new(e), ty), + Error::Io(e) => Self::Internal(Box::new(e)), + e => Self::MalformedPayload(e, ty), } } } @@ -79,51 +79,67 @@ impl ErrorCode for DocumentFormatError { internal_error!(DocumentFormatError: io::Error); -/// reads csv from input and write an obkv batch to writer. +/// Reads CSV from input and write an obkv batch to writer. pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result { - let writer = BufWriter::new(writer); - let builder = - DocumentBatchBuilder::from_csv(input, writer).map_err(|e| (PayloadType::Csv, e))?; + let mut builder = DocumentsBatchBuilder::new(writer); - let count = builder.finish().map_err(|e| (PayloadType::Csv, e))?; + let csv = csv::Reader::from_reader(input); + builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?; - Ok(count) + let count = builder.documents_count(); + let _ = builder + .into_inner() + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + + Ok(count as usize) } -/// reads jsonl from input and write an obkv batch to writer. +/// Reads JSON Lines from input and write an obkv batch to writer. pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result { + let mut builder = DocumentsBatchBuilder::new(writer); let mut reader = BufReader::new(input); - let writer = BufWriter::new(writer); - let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Ndjson, e))?; - let mut buf = String::new(); - - while reader.read_line(&mut buf)? > 0 { - // skip empty lines - if buf == "\n" { - buf.clear(); - continue; - } - builder - .extend_from_json(Cursor::new(&buf.as_bytes())) + for result in serde_json::Deserializer::from_reader(reader).into_iter() { + let object = result + .map_err(Error::Json) .map_err(|e| (PayloadType::Ndjson, e))?; - buf.clear(); + builder + .append_json_object(&object) + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; } - let count = builder.finish().map_err(|e| (PayloadType::Ndjson, e))?; + let count = builder.documents_count(); + let _ = builder + .into_inner() + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; - Ok(count) + Ok(count as usize) } -/// reads json from input and write an obkv batch to writer. +/// Reads JSON from input and write an obkv batch to writer. pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result { - let writer = BufWriter::new(writer); - let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Json, e))?; - builder - .extend_from_json(input) + let mut builder = DocumentsBatchBuilder::new(writer); + let mut reader = BufReader::new(input); + + let objects: Vec<_> = serde_json::from_reader(reader) + .map_err(Error::Json) .map_err(|e| (PayloadType::Json, e))?; - let count = builder.finish().map_err(|e| (PayloadType::Json, e))?; + for object in objects { + builder + .append_json_object(&object) + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + } - Ok(count) + let count = builder.documents_count(); + let _ = builder + .into_inner() + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + + Ok(count as usize) } diff --git a/meilisearch-lib/src/index/dump.rs b/meilisearch-lib/src/index/dump.rs index e201e738b..8c7daba1f 100644 --- a/meilisearch-lib/src/index/dump.rs +++ b/meilisearch-lib/src/index/dump.rs @@ -4,7 +4,7 @@ use std::path::Path; use anyhow::Context; use indexmap::IndexMap; -use milli::documents::DocumentBatchReader; +use milli::documents::DocumentsBatchReader; use milli::heed::{EnvOpenOptions, RoTxn}; use milli::update::{IndexDocumentsConfig, IndexerConfig}; use serde::{Deserialize, Serialize}; @@ -135,19 +135,20 @@ impl Index { if !empty { tmp_doc_file.seek(SeekFrom::Start(0))?; - let documents_reader = DocumentBatchReader::from_reader(tmp_doc_file)?; + let documents_reader = DocumentsBatchReader::from_reader(tmp_doc_file)?; //If the document file is empty, we don't perform the document addition, to prevent //a primary key error to be thrown. let config = IndexDocumentsConfig::default(); - let mut builder = milli::update::IndexDocuments::new( + let builder = milli::update::IndexDocuments::new( &mut txn, &index, indexer_config, config, |_| (), )?; - builder.add_documents(documents_reader)?; + let (builder, user_error) = builder.add_documents(documents_reader)?; + user_error?; builder.execute()?; } diff --git a/meilisearch-lib/src/index/updates.rs b/meilisearch-lib/src/index/updates.rs index 95edbbf9d..6316f8812 100644 --- a/meilisearch-lib/src/index/updates.rs +++ b/meilisearch-lib/src/index/updates.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use std::num::NonZeroUsize; use log::{debug, info, trace}; -use milli::documents::DocumentBatchReader; +use milli::documents::DocumentsBatchReader; use milli::update::{ DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, Setting, @@ -315,7 +315,7 @@ impl Index { }; let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step); - let mut builder = milli::update::IndexDocuments::new( + let builder = milli::update::IndexDocuments::new( &mut txn, self, self.indexer_config.as_ref(), @@ -325,8 +325,9 @@ impl Index { for content_uuid in contents.into_iter() { let content_file = file_store.get_update(content_uuid)?; - let reader = DocumentBatchReader::from_reader(content_file)?; - builder.add_documents(reader)?; + let reader = DocumentsBatchReader::from_reader(content_file)?; + let (builder, user_error) = builder.add_documents(reader)?; + todo!("use the user_error here"); } let addition = builder.execute()?; diff --git a/meilisearch-lib/src/update_file_store.rs b/meilisearch-lib/src/update_file_store.rs index 3a60dfe26..e1be0dbd4 100644 --- a/meilisearch-lib/src/update_file_store.rs +++ b/meilisearch-lib/src/update_file_store.rs @@ -3,7 +3,7 @@ use std::io::{self, BufReader, BufWriter, Write}; use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; -use milli::documents::DocumentBatchReader; +use milli::documents::DocumentsBatchReader; use serde_json::Map; use tempfile::{NamedTempFile, PersistError}; use uuid::Uuid; @@ -44,7 +44,8 @@ into_update_store_error!( PersistError, io::Error, serde_json::Error, - milli::documents::Error + milli::documents::Error, + milli::documents::DocumentsBatchCursorError ); impl UpdateFile { @@ -149,12 +150,13 @@ mod store { let update_file = File::open(update_file_path)?; let mut dst_file = NamedTempFile::new_in(&dump_path)?; - let mut document_reader = DocumentBatchReader::from_reader(update_file)?; + let mut document_cursor = DocumentsBatchReader::from_reader(update_file)?.into_cursor(); + let index = document_cursor.documents_batch_index(); let mut document_buffer = Map::new(); // TODO: we need to find a way to do this more efficiently. (create a custom serializer // for jsonl for example...) - while let Some((index, document)) = document_reader.next_document_with_index()? { + while let Some(document) = document_cursor.next_document()? { for (field_id, content) in document.iter() { if let Some(field_name) = index.name(field_id) { let content = serde_json::from_slice(content)?;