Make the changes to plug the new DocumentsBatch system

This commit is contained in:
Kerollmops 2022-06-16 12:06:20 +02:00 committed by ManyTheFish
parent fe32097964
commit 73d4869e5e
4 changed files with 72 additions and 52 deletions

View File

@ -1,10 +1,10 @@
use std::borrow::Borrow; use std::borrow::Borrow;
use std::fmt::{self, Debug, Display}; use std::fmt::{self, Debug, Display};
use std::io::{self, BufRead, BufReader, BufWriter, Cursor, Read, Seek, Write}; use std::io::{self, BufReader, Read, Seek, Write};
use meilisearch_types::error::{Code, ErrorCode}; use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::internal_error; use meilisearch_types::internal_error;
use milli::documents::DocumentBatchBuilder; use milli::documents::{DocumentsBatchBuilder, Error};
type Result<T> = std::result::Result<T, DocumentFormatError>; type Result<T> = std::result::Result<T, DocumentFormatError>;
@ -18,9 +18,9 @@ pub enum PayloadType {
impl fmt::Display for PayloadType { impl fmt::Display for PayloadType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
PayloadType::Ndjson => write!(f, "ndjson"), PayloadType::Ndjson => f.write_str("ndjson"),
PayloadType::Json => write!(f, "json"), PayloadType::Json => f.write_str("json"),
PayloadType::Csv => write!(f, "csv"), PayloadType::Csv => f.write_str("csv"),
} }
} }
} }
@ -28,7 +28,7 @@ impl fmt::Display for PayloadType {
#[derive(Debug)] #[derive(Debug)]
pub enum DocumentFormatError { pub enum DocumentFormatError {
Internal(Box<dyn std::error::Error + Send + Sync + 'static>), Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
MalformedPayload(Box<milli::documents::Error>, PayloadType), MalformedPayload(Error, PayloadType),
} }
impl Display for DocumentFormatError { impl Display for DocumentFormatError {
@ -36,7 +36,7 @@ impl Display for DocumentFormatError {
match self { match self {
Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e), Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e),
Self::MalformedPayload(me, b) => match me.borrow() { Self::MalformedPayload(me, b) => match me.borrow() {
milli::documents::Error::JsonError(se) => { Error::Json(se) => {
// https://github.com/meilisearch/meilisearch/issues/2107 // https://github.com/meilisearch/meilisearch/issues/2107
// The user input maybe insanely long. We need to truncate it. // The user input maybe insanely long. We need to truncate it.
let mut serde_msg = se.to_string(); let mut serde_msg = se.to_string();
@ -59,11 +59,11 @@ impl Display for DocumentFormatError {
impl std::error::Error for DocumentFormatError {} impl std::error::Error for DocumentFormatError {}
impl From<(PayloadType, milli::documents::Error)> for DocumentFormatError { impl From<(PayloadType, Error)> for DocumentFormatError {
fn from((ty, error): (PayloadType, milli::documents::Error)) -> Self { fn from((ty, error): (PayloadType, Error)) -> Self {
match error { match error {
milli::documents::Error::Io(e) => Self::Internal(Box::new(e)), Error::Io(e) => Self::Internal(Box::new(e)),
e => Self::MalformedPayload(Box::new(e), ty), e => Self::MalformedPayload(e, ty),
} }
} }
} }
@ -79,51 +79,67 @@ impl ErrorCode for DocumentFormatError {
internal_error!(DocumentFormatError: io::Error); internal_error!(DocumentFormatError: io::Error);
/// reads csv from input and write an obkv batch to writer. /// Reads CSV from input and write an obkv batch to writer.
pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> { pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let writer = BufWriter::new(writer); let mut builder = DocumentsBatchBuilder::new(writer);
let builder =
DocumentBatchBuilder::from_csv(input, writer).map_err(|e| (PayloadType::Csv, e))?;
let count = builder.finish().map_err(|e| (PayloadType::Csv, e))?; let csv = csv::Reader::from_reader(input);
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
Ok(count) let count = builder.documents_count();
let _ = builder
.into_inner()
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
Ok(count as usize)
} }
/// reads jsonl from input and write an obkv batch to writer. /// Reads JSON Lines from input and write an obkv batch to writer.
pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize> { pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer);
let mut reader = BufReader::new(input); let mut reader = BufReader::new(input);
let writer = BufWriter::new(writer);
let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Ndjson, e))?; for result in serde_json::Deserializer::from_reader(reader).into_iter() {
let mut buf = String::new(); let object = result
.map_err(Error::Json)
while reader.read_line(&mut buf)? > 0 {
// skip empty lines
if buf == "\n" {
buf.clear();
continue;
}
builder
.extend_from_json(Cursor::new(&buf.as_bytes()))
.map_err(|e| (PayloadType::Ndjson, e))?; .map_err(|e| (PayloadType::Ndjson, e))?;
buf.clear(); builder
.append_json_object(&object)
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
} }
let count = builder.finish().map_err(|e| (PayloadType::Ndjson, e))?; let count = builder.documents_count();
let _ = builder
.into_inner()
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
Ok(count) Ok(count as usize)
} }
/// reads json from input and write an obkv batch to writer. /// Reads JSON from input and write an obkv batch to writer.
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> { pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let writer = BufWriter::new(writer); let mut builder = DocumentsBatchBuilder::new(writer);
let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Json, e))?; let mut reader = BufReader::new(input);
builder
.extend_from_json(input) let objects: Vec<_> = serde_json::from_reader(reader)
.map_err(Error::Json)
.map_err(|e| (PayloadType::Json, e))?; .map_err(|e| (PayloadType::Json, e))?;
let count = builder.finish().map_err(|e| (PayloadType::Json, e))?; for object in objects {
builder
.append_json_object(&object)
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
}
Ok(count) let count = builder.documents_count();
let _ = builder
.into_inner()
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
Ok(count as usize)
} }

View File

@ -4,7 +4,7 @@ use std::path::Path;
use anyhow::Context; use anyhow::Context;
use indexmap::IndexMap; use indexmap::IndexMap;
use milli::documents::DocumentBatchReader; use milli::documents::DocumentsBatchReader;
use milli::heed::{EnvOpenOptions, RoTxn}; use milli::heed::{EnvOpenOptions, RoTxn};
use milli::update::{IndexDocumentsConfig, IndexerConfig}; use milli::update::{IndexDocumentsConfig, IndexerConfig};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -135,19 +135,20 @@ impl Index {
if !empty { if !empty {
tmp_doc_file.seek(SeekFrom::Start(0))?; tmp_doc_file.seek(SeekFrom::Start(0))?;
let documents_reader = DocumentBatchReader::from_reader(tmp_doc_file)?; let documents_reader = DocumentsBatchReader::from_reader(tmp_doc_file)?;
//If the document file is empty, we don't perform the document addition, to prevent //If the document file is empty, we don't perform the document addition, to prevent
//a primary key error to be thrown. //a primary key error to be thrown.
let config = IndexDocumentsConfig::default(); let config = IndexDocumentsConfig::default();
let mut builder = milli::update::IndexDocuments::new( let builder = milli::update::IndexDocuments::new(
&mut txn, &mut txn,
&index, &index,
indexer_config, indexer_config,
config, config,
|_| (), |_| (),
)?; )?;
builder.add_documents(documents_reader)?; let (builder, user_error) = builder.add_documents(documents_reader)?;
user_error?;
builder.execute()?; builder.execute()?;
} }

View File

@ -3,7 +3,7 @@ use std::marker::PhantomData;
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use log::{debug, info, trace}; use log::{debug, info, trace};
use milli::documents::DocumentBatchReader; use milli::documents::DocumentsBatchReader;
use milli::update::{ use milli::update::{
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
Setting, Setting,
@ -315,7 +315,7 @@ impl Index {
}; };
let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step); let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step);
let mut builder = milli::update::IndexDocuments::new( let builder = milli::update::IndexDocuments::new(
&mut txn, &mut txn,
self, self,
self.indexer_config.as_ref(), self.indexer_config.as_ref(),
@ -325,8 +325,9 @@ impl Index {
for content_uuid in contents.into_iter() { for content_uuid in contents.into_iter() {
let content_file = file_store.get_update(content_uuid)?; let content_file = file_store.get_update(content_uuid)?;
let reader = DocumentBatchReader::from_reader(content_file)?; let reader = DocumentsBatchReader::from_reader(content_file)?;
builder.add_documents(reader)?; let (builder, user_error) = builder.add_documents(reader)?;
todo!("use the user_error here");
} }
let addition = builder.execute()?; let addition = builder.execute()?;

View File

@ -3,7 +3,7 @@ use std::io::{self, BufReader, BufWriter, Write};
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use milli::documents::DocumentBatchReader; use milli::documents::DocumentsBatchReader;
use serde_json::Map; use serde_json::Map;
use tempfile::{NamedTempFile, PersistError}; use tempfile::{NamedTempFile, PersistError};
use uuid::Uuid; use uuid::Uuid;
@ -44,7 +44,8 @@ into_update_store_error!(
PersistError, PersistError,
io::Error, io::Error,
serde_json::Error, serde_json::Error,
milli::documents::Error milli::documents::Error,
milli::documents::DocumentsBatchCursorError
); );
impl UpdateFile { impl UpdateFile {
@ -149,12 +150,13 @@ mod store {
let update_file = File::open(update_file_path)?; let update_file = File::open(update_file_path)?;
let mut dst_file = NamedTempFile::new_in(&dump_path)?; let mut dst_file = NamedTempFile::new_in(&dump_path)?;
let mut document_reader = DocumentBatchReader::from_reader(update_file)?; let mut document_cursor = DocumentsBatchReader::from_reader(update_file)?.into_cursor();
let index = document_cursor.documents_batch_index();
let mut document_buffer = Map::new(); let mut document_buffer = Map::new();
// TODO: we need to find a way to do this more efficiently. (create a custom serializer // TODO: we need to find a way to do this more efficiently. (create a custom serializer
// for jsonl for example...) // for jsonl for example...)
while let Some((index, document)) = document_reader.next_document_with_index()? { while let Some(document) = document_cursor.next_document()? {
for (field_id, content) in document.iter() { for (field_id, content) in document.iter() {
if let Some(field_name) = index.name(field_id) { if let Some(field_name) = index.name(field_id) {
let content = serde_json::from_slice(content)?; let content = serde_json::from_slice(content)?;