move json reader to document_formats module

This commit is contained in:
mpostma 2021-09-28 20:13:26 +02:00
parent df4e9f4e1e
commit 6f8e670dee
3 changed files with 30 additions and 25 deletions

View File

@ -8,12 +8,14 @@ type Result<T> = std::result::Result<T, DocumentFormatError>;
#[derive(Debug)] #[derive(Debug)]
pub enum PayloadType { pub enum PayloadType {
Jsonl, Jsonl,
Json,
} }
impl fmt::Display for PayloadType { impl fmt::Display for PayloadType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
PayloadType::Jsonl => write!(f, "ndjson"), PayloadType::Jsonl => write!(f, "ndjson"),
PayloadType::Json => write!(f, "json"),
} }
} }
} }
@ -50,3 +52,14 @@ pub fn read_jsonl(input: impl Read, writer: impl Write + Seek) -> Result<()> {
Ok(()) Ok(())
} }
/// read json from input and write an obkv batch to writer.
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<()> {
let mut builder = DocumentBatchBuilder::new(writer).unwrap();
let documents: Vec<Map<String, Value>> = malformed!(PayloadType::Json, serde_json::from_reader(input))?;
builder.add_documents(documents).unwrap();
builder.finish().unwrap();
Ok(())
}

View File

@ -3,7 +3,7 @@ use std::error::Error;
use meilisearch_error::{Code, ErrorCode}; use meilisearch_error::{Code, ErrorCode};
use crate::index_controller::update_file_store::UpdateFileStoreError; use crate::{document_formats::DocumentFormatError, index_controller::update_file_store::UpdateFileStoreError};
pub type Result<T> = std::result::Result<T, UpdateLoopError>; pub type Result<T> = std::result::Result<T, UpdateLoopError>;
@ -21,7 +21,8 @@ pub enum UpdateLoopError {
)] )]
FatalUpdateStoreError, FatalUpdateStoreError,
#[error("{0}")] #[error("{0}")]
InvalidPayload(Box<dyn Error + Send + Sync + 'static>), InvalidPayload(#[from] DocumentFormatError),
// TODO: The reference to actix has to go.
#[error("{0}")] #[error("{0}")]
PayloadError(#[from] actix_web::error::PayloadError), PayloadError(#[from] actix_web::error::PayloadError),
} }

View File

@ -13,22 +13,21 @@ use async_stream::stream;
use bytes::Bytes; use bytes::Bytes;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use log::trace; use log::trace;
use milli::documents::DocumentBatchBuilder;
use milli::update::IndexDocumentsMethod; use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use self::error::{Result, UpdateLoopError}; use self::error::{Result, UpdateLoopError};
pub use self::message::UpdateMsg; pub use self::message::UpdateMsg;
use self::store::{UpdateStore, UpdateStoreInfo}; use self::store::{UpdateStore, UpdateStoreInfo};
use crate::document_formats::read_json;
use crate::index::{Index, Settings, Unchecked}; use crate::index::{Index, Settings, Unchecked};
use crate::index_controller::update_file_store::UpdateFileStore; use crate::index_controller::update_file_store::UpdateFileStore;
use status::UpdateStatus; use status::UpdateStatus;
use super::index_resolver::HardStateIndexResolver; use super::index_resolver::HardStateIndexResolver;
use super::{DocumentAdditionFormat, Payload, Update}; use super::{DocumentAdditionFormat, Update};
pub type UpdateSender = mpsc::Sender<UpdateMsg>; pub type UpdateSender = mpsc::Sender<UpdateMsg>;
@ -197,9 +196,18 @@ impl UpdateLoop {
method, method,
format, format,
} => { } => {
let content_uuid = match format { let reader = StreamReader::new(payload);
DocumentAdditionFormat::Json => self.documents_from_json(payload).await?, let (content_uuid, mut update_file) = self.update_file_store.new_update()?;
}; tokio::task::spawn_blocking(move || -> Result<_> {
match format {
DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?,
}
update_file.persist()?;
Ok(())
}).await??;
RegisterUpdate::DocumentAddition { RegisterUpdate::DocumentAddition {
primary_key, primary_key,
@ -220,23 +228,6 @@ impl UpdateLoop {
Ok(status.into()) Ok(status.into())
} }
async fn documents_from_json(&self, payload: Payload) -> Result<Uuid> {
let file_store = self.update_file_store.clone();
tokio::task::spawn_blocking(move || {
let (uuid, mut file) = file_store.new_update().unwrap();
let mut builder = DocumentBatchBuilder::new(&mut *file).unwrap();
let documents: Vec<Map<String, Value>> =
serde_json::from_reader(StreamReader::new(payload))?;
builder.add_documents(documents).unwrap();
builder.finish().unwrap();
file.persist()?;
Ok(uuid)
})
.await?
}
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> { async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let update_store = self.store.clone(); let update_store = self.store.clone();