1847: Optimize document transform r=MarinPostma a=MarinPostma

integrate the optimization from https://github.com/meilisearch/milli/pull/402.

optimize payload read, by reading it to RAM first instead of streaming it. This means that the payload must fit into RAM, which should not be a problem.

Add BufWriter to the obkv writer to improve write speed.

I have measured a gain of 40-45% in speed after these optimizations.


Co-authored-by: marin postma <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2021-10-26 15:28:58 +00:00 committed by GitHub
commit 0a9d6e8210
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 60 additions and 372 deletions

14
Cargo.lock generated
View File

@ -1763,13 +1763,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
name = "memmap"
version = "0.7.0"
name = "memmap2"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b"
checksum = "4647a11b578fead29cdbb34d4adef8dd3dc35b876c9c6d5240d83f205abfe96e"
dependencies = [
"libc",
"winapi",
]
[[package]]
@ -1783,8 +1782,8 @@ dependencies = [
[[package]]
name = "milli"
version = "0.17.2"
source = "git+https://github.com/meilisearch/milli.git?tag=v0.17.3#1e8acaa20b323a198229ad8ede96d045072e45c8"
version = "0.19.0"
source = "git+https://github.com/meilisearch/milli.git?tag=v0.19.0#d7943fe22553b8205b86c32a0f2656d9e42de351"
dependencies = [
"bimap",
"bincode",
@ -1793,6 +1792,7 @@ dependencies = [
"chrono",
"concat-arrays",
"crossbeam-channel",
"csv",
"either",
"flate2",
"fst",
@ -1807,7 +1807,7 @@ dependencies = [
"log",
"logging_timer",
"meilisearch-tokenizer",
"memmap",
"memmap2",
"obkv",
"once_cell",
"ordered-float",

View File

@ -30,7 +30,7 @@ lazy_static = "1.4.0"
log = "0.4.14"
meilisearch-error = { path = "../meilisearch-error" }
meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" }
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.17.3" }
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.19.0" }
mime = "0.3.16"
num_cpus = "1.13.0"
once_cell = "1.8.0"

View File

@ -1,10 +1,8 @@
use std::fmt;
use std::io::{self, Read, Result as IoResult, Seek, Write};
use std::io::{self, BufRead, BufReader, BufWriter, Cursor, Read, Seek, Write};
use csv::{Reader as CsvReader, StringRecordsIntoIter};
use meilisearch_error::{Code, ErrorCode};
use milli::documents::DocumentBatchBuilder;
use serde_json::{Deserializer, Map, Value};
type Result<T> = std::result::Result<T, DocumentFormatError>;
@ -36,6 +34,15 @@ pub enum DocumentFormatError {
),
}
impl From<(PayloadType, milli::documents::Error)> for DocumentFormatError {
fn from((ty, error): (PayloadType, milli::documents::Error)) -> Self {
match error {
milli::documents::Error::Io(e) => Self::Internal(Box::new(e)),
e => Self::MalformedPayload(Box::new(e), ty),
}
}
}
impl ErrorCode for DocumentFormatError {
fn error_code(&self) -> Code {
match self {
@ -45,330 +52,47 @@ impl ErrorCode for DocumentFormatError {
}
}
internal_error!(DocumentFormatError: milli::documents::Error, io::Error);
macro_rules! malformed {
($type:path, $e:expr) => {
$e.map_err(|e| DocumentFormatError::MalformedPayload(Box::new(e), $type))
};
}
internal_error!(DocumentFormatError: io::Error);
/// reads csv from input and write an obkv batch to writer.
pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<()> {
let mut builder = DocumentBatchBuilder::new(writer).unwrap();
let iter = CsvDocumentIter::from_reader(input)?;
for doc in iter {
let doc = doc?;
builder.add_documents(doc).unwrap();
}
builder.finish().unwrap();
let writer = BufWriter::new(writer);
DocumentBatchBuilder::from_csv(input, writer)
.map_err(|e| (PayloadType::Csv, e))?
.finish()
.map_err(|e| (PayloadType::Csv, e))?;
Ok(())
}
/// read jsonl from input and write an obkv batch to writer.
/// reads jsonl from input and write an obkv batch to writer.
pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<()> {
let mut builder = DocumentBatchBuilder::new(writer)?;
let stream = Deserializer::from_reader(input).into_iter::<Map<String, Value>>();
let mut reader = BufReader::new(input);
let writer = BufWriter::new(writer);
for value in stream {
let value = malformed!(PayloadType::Ndjson, value)?;
builder.add_documents(&value)?;
let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Ndjson, e))?;
let mut buf = String::new();
while reader.read_line(&mut buf)? > 0 {
builder
.extend_from_json(Cursor::new(&buf.as_bytes()))
.map_err(|e| (PayloadType::Ndjson, e))?;
buf.clear();
}
builder.finish()?;
builder.finish().map_err(|e| (PayloadType::Ndjson, e))?;
Ok(())
}
/// read json from input and write an obkv batch to writer.
/// reads json from input and write an obkv batch to writer.
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<()> {
let mut builder = DocumentBatchBuilder::new(writer).unwrap();
let documents: Vec<Map<String, Value>> =
malformed!(PayloadType::Json, serde_json::from_reader(input))?;
builder.add_documents(documents).unwrap();
builder.finish().unwrap();
let writer = BufWriter::new(writer);
let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Json, e))?;
builder
.extend_from_json(input)
.map_err(|e| (PayloadType::Json, e))?;
builder.finish().map_err(|e| (PayloadType::Json, e))?;
Ok(())
}
enum AllowedType {
String,
Number,
}
fn parse_csv_header(header: &str) -> (String, AllowedType) {
// if there are several separators we only split on the last one.
match header.rsplit_once(':') {
Some((field_name, field_type)) => match field_type {
"string" => (field_name.to_string(), AllowedType::String),
"number" => (field_name.to_string(), AllowedType::Number),
// if the pattern isn't reconized, we keep the whole field.
_otherwise => (header.to_string(), AllowedType::String),
},
None => (header.to_string(), AllowedType::String),
}
}
pub struct CsvDocumentIter<R>
where
R: Read,
{
documents: StringRecordsIntoIter<R>,
headers: Vec<(String, AllowedType)>,
}
impl<R: Read> CsvDocumentIter<R> {
pub fn from_reader(reader: R) -> IoResult<Self> {
let mut records = CsvReader::from_reader(reader);
let headers = records
.headers()?
.into_iter()
.map(parse_csv_header)
.collect();
Ok(Self {
documents: records.into_records(),
headers,
})
}
}
impl<R: Read> Iterator for CsvDocumentIter<R> {
type Item = Result<Map<String, Value>>;
fn next(&mut self) -> Option<Self::Item> {
let csv_document = self.documents.next()?;
match csv_document {
Ok(csv_document) => {
let mut document = Map::new();
for ((field_name, field_type), value) in
self.headers.iter().zip(csv_document.into_iter())
{
let parsed_value = match field_type {
AllowedType::Number => {
malformed!(PayloadType::Csv, value.parse::<f64>().map(Value::from))
}
AllowedType::String => Ok(Value::String(value.to_string())),
};
match parsed_value {
Ok(value) => drop(document.insert(field_name.to_string(), value)),
Err(e) => return Some(Err(e)),
}
}
Some(Ok(document))
}
Err(e) => Some(Err(DocumentFormatError::MalformedPayload(
Box::new(e),
PayloadType::Csv,
))),
}
}
}
#[cfg(test)]
mod test {
use serde_json::json;
use super::*;
#[test]
fn simple_csv_document() {
let documents = r#"city,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[test]
fn coma_in_field() {
let documents = r#"city,country,pop
"Boston","United, States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United, States",
"pop": "4628910",
})
);
}
#[test]
fn quote_in_field() {
let documents = r#"city,country,pop
"Boston","United"" States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United\" States",
"pop": "4628910",
})
);
}
#[test]
fn integer_in_field() {
let documents = r#"city,country,pop:number
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United States",
"pop": 4628910.0,
})
);
}
#[test]
fn float_in_field() {
let documents = r#"city,country,pop:number
"Boston","United States","4628910.01""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United States",
"pop": 4628910.01,
})
);
}
#[test]
fn several_colon_in_header() {
let documents = r#"city:love:string,country:state,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city:love": "Boston",
"country:state": "United States",
"pop": "4628910",
})
);
}
#[test]
fn ending_by_colon_in_header() {
let documents = r#"city:,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city:": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[test]
fn starting_by_colon_in_header() {
let documents = r#":city,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
":city": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[ignore]
#[test]
fn starting_by_colon_in_header2() {
let documents = r#":string,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert!(csv_iter.next().unwrap().is_err());
}
#[test]
fn double_colon_in_header() {
let documents = r#"city::string,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city:": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[test]
fn bad_type_in_header() {
let documents = r#"city,country:number,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert!(csv_iter.next().unwrap().is_err());
}
#[test]
fn bad_column_count1() {
let documents = r#"city,country,pop
"Boston","United States","4628910", "too much""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert!(csv_iter.next().unwrap().is_err());
}
#[test]
fn bad_column_count2() {
let documents = r#"city,country,pop
"Boston","United States""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert!(csv_iter.next().unwrap().is_err());
}
}

View File

@ -149,7 +149,7 @@ impl UpdateFileStore {
// for jsonl for example...)
while let Some((index, document)) = document_reader.next_document_with_index()? {
for (field_id, content) in document.iter() {
if let Some(field_name) = index.get_by_left(&field_id) {
if let Some(field_name) = index.name(field_id) {
let content = serde_json::from_slice(content)?;
document_buffer.insert(field_name.to_string(), content);
}

View File

@ -3,15 +3,13 @@ mod message;
pub mod status;
pub mod store;
use std::io::{self, BufRead, BufReader};
use std::io::Cursor;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use actix_web::error::PayloadError;
use async_stream::stream;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use futures::StreamExt;
use log::trace;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
@ -51,48 +49,6 @@ where
Ok(sender)
}
/// A wrapper type to implement read on a `Stream<Result<Bytes, Error>>`.
struct StreamReader<S> {
stream: S,
current: Option<Bytes>,
}
impl<S> StreamReader<S> {
fn new(stream: S) -> Self {
Self {
stream,
current: None,
}
}
}
impl<S: Stream<Item = std::result::Result<Bytes, PayloadError>> + Unpin> io::Read
for StreamReader<S>
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// TODO: optimize buf filling
match self.current.take() {
Some(mut bytes) => {
let split_at = bytes.len().min(buf.len());
let copied = bytes.split_to(split_at);
buf[..split_at].copy_from_slice(&copied);
if !bytes.is_empty() {
self.current.replace(bytes);
}
Ok(copied.len())
}
None => match tokio::runtime::Handle::current().block_on(self.stream.next()) {
Some(Ok(bytes)) => {
self.current.replace(bytes);
self.read(buf)
}
Some(Err(e)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e)),
None => Ok(0),
},
}
}
}
pub struct UpdateLoop {
store: Arc<UpdateStore>,
inbox: Option<mpsc::Receiver<UpdateMsg>>,
@ -196,20 +152,28 @@ impl UpdateLoop {
async fn handle_update(&self, index_uuid: Uuid, update: Update) -> Result<UpdateStatus> {
let registration = match update {
Update::DocumentAddition {
payload,
mut payload,
primary_key,
method,
format,
} => {
let mut reader = BufReader::new(StreamReader::new(payload));
let mut buffer = Vec::new();
while let Some(bytes) = payload.next().await {
match bytes {
Ok(bytes) => {
buffer.extend_from_slice(&bytes);
}
Err(e) => return Err(e.into()),
}
}
let (content_uuid, mut update_file) = self.update_file_store.new_update()?;
tokio::task::spawn_blocking(move || -> Result<_> {
// check if the payload is empty, and return an error
reader.fill_buf()?;
if reader.buffer().is_empty() {
if buffer.is_empty() {
return Err(UpdateLoopError::MissingPayload(format));
}
let reader = Cursor::new(buffer);
match format {
DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?,
DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?,

View File

@ -11,7 +11,7 @@ pub use index_controller::MeiliSearch;
pub use milli;
mod compression;
mod document_formats;
pub mod document_formats;
use walkdir::WalkDir;