optimize document transform

fix error types

bump milli
This commit is contained in:
marin postma 2021-10-20 21:20:28 +02:00
parent e87146b0d9
commit 4ac005b094
No known key found for this signature in database
GPG key ID: 6088B7721C3E39F9
6 changed files with 60 additions and 372 deletions

View file

@ -149,7 +149,7 @@ impl UpdateFileStore {
// for jsonl for example...)
while let Some((index, document)) = document_reader.next_document_with_index()? {
for (field_id, content) in document.iter() {
if let Some(field_name) = index.get_by_left(&field_id) {
if let Some(field_name) = index.name(field_id) {
let content = serde_json::from_slice(content)?;
document_buffer.insert(field_name.to_string(), content);
}

View file

@ -3,15 +3,13 @@ mod message;
pub mod status;
pub mod store;
use std::io::{self, BufRead, BufReader};
use std::io::Cursor;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use actix_web::error::PayloadError;
use async_stream::stream;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use futures::StreamExt;
use log::trace;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
@ -51,48 +49,6 @@ where
Ok(sender)
}
/// A wrapper type to implement read on a `Stream<Result<Bytes, Error>>`.
struct StreamReader<S> {
stream: S,
current: Option<Bytes>,
}
impl<S> StreamReader<S> {
fn new(stream: S) -> Self {
Self {
stream,
current: None,
}
}
}
impl<S: Stream<Item = std::result::Result<Bytes, PayloadError>> + Unpin> io::Read
for StreamReader<S>
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// TODO: optimize buf filling
match self.current.take() {
Some(mut bytes) => {
let split_at = bytes.len().min(buf.len());
let copied = bytes.split_to(split_at);
buf[..split_at].copy_from_slice(&copied);
if !bytes.is_empty() {
self.current.replace(bytes);
}
Ok(copied.len())
}
None => match tokio::runtime::Handle::current().block_on(self.stream.next()) {
Some(Ok(bytes)) => {
self.current.replace(bytes);
self.read(buf)
}
Some(Err(e)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e)),
None => Ok(0),
},
}
}
}
pub struct UpdateLoop {
store: Arc<UpdateStore>,
inbox: Option<mpsc::Receiver<UpdateMsg>>,
@ -196,20 +152,28 @@ impl UpdateLoop {
async fn handle_update(&self, index_uuid: Uuid, update: Update) -> Result<UpdateStatus> {
let registration = match update {
Update::DocumentAddition {
payload,
mut payload,
primary_key,
method,
format,
} => {
let mut reader = BufReader::new(StreamReader::new(payload));
let mut buffer = Vec::new();
while let Some(bytes) = payload.next().await {
match bytes {
Ok(bytes) => {
buffer.extend_from_slice(&bytes);
}
Err(e) => return Err(e.into()),
}
}
let (content_uuid, mut update_file) = self.update_file_store.new_update()?;
tokio::task::spawn_blocking(move || -> Result<_> {
// check if the payload is empty, and return an error
reader.fill_buf()?;
if reader.buffer().is_empty() {
if buffer.is_empty() {
return Err(UpdateLoopError::MissingPayload(format));
}
let reader = Cursor::new(buffer);
match format {
DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?,
DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?,