mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-30 00:34:26 +01:00
Merge #3164
3164: Improve the way we receive the documents payload r=Kerollmops a=jiangbo212 # Pull Request ## Related issue Fixes #3037 ## What does this PR do? - writing the playload to a temporary file via BufWritter - deserialising the json tempporary file to an array of Objects by means of a memory map - deserialising thie csv tempporary file by means of a memory map - Adapted some read_json tests ## PR checklist Please check if your PR fulfills the following requirements: - [x] Does this PR fix an existing issue, or have you listed the changes applied in the PR description (and why they are needed)? - [x] Have you read the contributing guidelines? - [x] Have you made sure that the title is accurate and descriptive of the changes? Thank you so much for contributing to Meilisearch! Co-authored-by: jiangbo212 <peiyaoliukuan@gmail.com> Co-authored-by: jiangbo212 <peiyaoliukuan@126.com>
This commit is contained in:
commit
dab2634ca8
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -2378,6 +2378,7 @@ dependencies = [
|
|||||||
"fst",
|
"fst",
|
||||||
"insta",
|
"insta",
|
||||||
"meili-snap",
|
"meili-snap",
|
||||||
|
"memmap2",
|
||||||
"milli",
|
"milli",
|
||||||
"proptest",
|
"proptest",
|
||||||
"proptest-derive",
|
"proptest-derive",
|
||||||
|
@ -1065,19 +1065,21 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::io::{BufWriter, Seek, Write};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use big_s::S;
|
use big_s::S;
|
||||||
use crossbeam::channel::RecvTimeoutError;
|
use crossbeam::channel::RecvTimeoutError;
|
||||||
use file_store::File;
|
use file_store::File;
|
||||||
use meili_snap::snapshot;
|
use meili_snap::snapshot;
|
||||||
|
use meilisearch_types::document_formats::DocumentFormatError;
|
||||||
use meilisearch_types::milli::obkv_to_json;
|
use meilisearch_types::milli::obkv_to_json;
|
||||||
use meilisearch_types::milli::update::IndexDocumentsMethod::{
|
use meilisearch_types::milli::update::IndexDocumentsMethod::{
|
||||||
ReplaceDocuments, UpdateDocuments,
|
ReplaceDocuments, UpdateDocuments,
|
||||||
};
|
};
|
||||||
use meilisearch_types::tasks::IndexSwap;
|
use meilisearch_types::tasks::IndexSwap;
|
||||||
use meilisearch_types::VERSION_FILE_NAME;
|
use meilisearch_types::VERSION_FILE_NAME;
|
||||||
use tempfile::TempDir;
|
use tempfile::{NamedTempFile, TempDir};
|
||||||
use time::Duration;
|
use time::Duration;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use Breakpoint::*;
|
use Breakpoint::*;
|
||||||
@ -1184,6 +1186,18 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Adapting to the new json reading interface
|
||||||
|
pub fn read_json(
|
||||||
|
bytes: &[u8],
|
||||||
|
write: impl Write + Seek,
|
||||||
|
) -> std::result::Result<usize, DocumentFormatError> {
|
||||||
|
let temp_file = NamedTempFile::new().unwrap();
|
||||||
|
let mut buffer = BufWriter::new(temp_file.reopen().unwrap());
|
||||||
|
buffer.write_all(bytes).unwrap();
|
||||||
|
buffer.flush().unwrap();
|
||||||
|
meilisearch_types::document_formats::read_json(temp_file.as_file(), write)
|
||||||
|
}
|
||||||
|
|
||||||
/// Create an update file with the given file uuid.
|
/// Create an update file with the given file uuid.
|
||||||
///
|
///
|
||||||
/// The update file contains just one simple document whose id is given by `document_id`.
|
/// The update file contains just one simple document whose id is given by `document_id`.
|
||||||
@ -1202,9 +1216,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (_uuid, mut file) = index_scheduler.create_update_file_with_uuid(file_uuid).unwrap();
|
let (_uuid, mut file) = index_scheduler.create_update_file_with_uuid(file_uuid).unwrap();
|
||||||
let documents_count =
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
|
|
||||||
.unwrap() as u64;
|
|
||||||
(file, documents_count)
|
(file, documents_count)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1584,9 +1596,7 @@ mod tests {
|
|||||||
}"#;
|
}"#;
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
||||||
let documents_count =
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -1623,9 +1633,7 @@ mod tests {
|
|||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
||||||
let documents_count =
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -1792,9 +1800,7 @@ mod tests {
|
|||||||
}"#;
|
}"#;
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
||||||
let documents_count =
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -1952,11 +1958,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = meilisearch_types::document_formats::read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2003,11 +2005,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = meilisearch_types::document_formats::read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2056,11 +2054,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = meilisearch_types::document_formats::read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2110,11 +2104,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = meilisearch_types::document_formats::read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2165,11 +2155,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = meilisearch_types::document_formats::read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2616,9 +2602,7 @@ mod tests {
|
|||||||
}"#;
|
}"#;
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
||||||
let documents_count =
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2656,9 +2640,7 @@ mod tests {
|
|||||||
}"#;
|
}"#;
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
||||||
let documents_count =
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2714,11 +2696,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = meilisearch_types::document_formats::read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2766,11 +2744,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = meilisearch_types::document_formats::read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2824,11 +2798,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = meilisearch_types::document_formats::read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2887,11 +2857,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = meilisearch_types::document_formats::read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2955,11 +2921,7 @@ mod tests {
|
|||||||
let allow_index_creation = i % 2 != 0;
|
let allow_index_creation = i % 2 != 0;
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = meilisearch_types::document_formats::read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -3012,11 +2974,7 @@ mod tests {
|
|||||||
let allow_index_creation = i % 2 != 0;
|
let allow_index_creation = i % 2 != 0;
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = meilisearch_types::document_formats::read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
|
@ -12,6 +12,7 @@ either = { version = "1.6.1", features = ["serde"] }
|
|||||||
enum-iterator = "1.1.3"
|
enum-iterator = "1.1.3"
|
||||||
flate2 = "1.0.24"
|
flate2 = "1.0.24"
|
||||||
fst = "0.4.7"
|
fst = "0.4.7"
|
||||||
|
memmap2 = "0.5.7"
|
||||||
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.37.2", default-features = false }
|
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.37.2", default-features = false }
|
||||||
proptest = { version = "1.0.0", optional = true }
|
proptest = { version = "1.0.0", optional = true }
|
||||||
proptest-derive = { version = "0.3.0", optional = true }
|
proptest-derive = { version = "0.3.0", optional = true }
|
||||||
|
@ -1,11 +1,15 @@
|
|||||||
use std::borrow::Borrow;
|
use std::borrow::Borrow;
|
||||||
use std::fmt::{self, Debug, Display};
|
use std::fmt::{self, Debug, Display};
|
||||||
use std::io::{self, BufReader, Read, Seek, Write};
|
use std::fs::File;
|
||||||
|
use std::io::{self, Seek, Write};
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
use either::Either;
|
use either::Either;
|
||||||
|
use memmap2::MmapOptions;
|
||||||
use milli::documents::{DocumentsBatchBuilder, Error};
|
use milli::documents::{DocumentsBatchBuilder, Error};
|
||||||
use milli::Object;
|
use milli::Object;
|
||||||
use serde::Deserialize;
|
use serde::de::{SeqAccess, Visitor};
|
||||||
|
use serde::{Deserialize, Deserializer};
|
||||||
use serde_json::error::Category;
|
use serde_json::error::Category;
|
||||||
|
|
||||||
use crate::error::{Code, ErrorCode};
|
use crate::error::{Code, ErrorCode};
|
||||||
@ -99,10 +103,10 @@ impl ErrorCode for DocumentFormatError {
|
|||||||
internal_error!(DocumentFormatError: io::Error);
|
internal_error!(DocumentFormatError: io::Error);
|
||||||
|
|
||||||
/// Reads CSV from input and write an obkv batch to writer.
|
/// Reads CSV from input and write an obkv batch to writer.
|
||||||
pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result<usize> {
|
||||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
let mut builder = DocumentsBatchBuilder::new(writer);
|
||||||
|
let mmap = unsafe { MmapOptions::new().map(file)? };
|
||||||
let csv = csv::Reader::from_reader(input);
|
let csv = csv::Reader::from_reader(mmap.as_ref());
|
||||||
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
|
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
|
||||||
|
|
||||||
let count = builder.documents_count();
|
let count = builder.documents_count();
|
||||||
@ -111,30 +115,38 @@ pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
|||||||
Ok(count as usize)
|
Ok(count as usize)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reads JSON Lines from input and write an obkv batch to writer.
|
/// Reads JSON from temporary file and write an obkv batch to writer.
|
||||||
pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<usize> {
|
||||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
read_json_inner(file, writer, PayloadType::Json)
|
||||||
let reader = BufReader::new(input);
|
|
||||||
|
|
||||||
for result in serde_json::Deserializer::from_reader(reader).into_iter() {
|
|
||||||
let object = result.map_err(Error::Json).map_err(|e| (PayloadType::Ndjson, e))?;
|
|
||||||
builder
|
|
||||||
.append_json_object(&object)
|
|
||||||
.map_err(Into::into)
|
|
||||||
.map_err(DocumentFormatError::Internal)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let count = builder.documents_count();
|
|
||||||
let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Internal)?;
|
|
||||||
|
|
||||||
Ok(count as usize)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reads JSON from input and write an obkv batch to writer.
|
/// Reads JSON from temporary file and write an obkv batch to writer.
|
||||||
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<usize> {
|
||||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
read_json_inner(file, writer, PayloadType::Ndjson)
|
||||||
let reader = BufReader::new(input);
|
}
|
||||||
|
|
||||||
|
/// Reads JSON from temporary file and write an obkv batch to writer.
|
||||||
|
fn read_json_inner(
|
||||||
|
file: &File,
|
||||||
|
writer: impl Write + Seek,
|
||||||
|
payload_type: PayloadType,
|
||||||
|
) -> Result<usize> {
|
||||||
|
let mut builder = DocumentsBatchBuilder::new(writer);
|
||||||
|
let mmap = unsafe { MmapOptions::new().map(file)? };
|
||||||
|
let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
|
||||||
|
|
||||||
|
match array_each(&mut deserializer, |obj: Object| builder.append_json_object(&obj)) {
|
||||||
|
// The json data has been successfully deserialised and does not need to be processed again.
|
||||||
|
// the data has been successfully transferred to the "update_file" during the deserialisation process.
|
||||||
|
// count ==0 means an empty array
|
||||||
|
Ok(Ok(count)) => {
|
||||||
|
if count == 0 {
|
||||||
|
return Ok(count as usize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))),
|
||||||
|
// Prefer deserialization as a json array. Failure to do deserialisation using the traditional method.
|
||||||
|
Err(_e) => {
|
||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug)]
|
||||||
#[serde(transparent)]
|
#[serde(transparent)]
|
||||||
struct ArrayOrSingleObject {
|
struct ArrayOrSingleObject {
|
||||||
@ -142,8 +154,9 @@ pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
|||||||
inner: Either<Vec<Object>, Object>,
|
inner: Either<Vec<Object>, Object>,
|
||||||
}
|
}
|
||||||
|
|
||||||
let content: ArrayOrSingleObject =
|
let content: ArrayOrSingleObject = serde_json::from_reader(file)
|
||||||
serde_json::from_reader(reader).map_err(Error::Json).map_err(|e| (PayloadType::Json, e))?;
|
.map_err(Error::Json)
|
||||||
|
.map_err(|e| (payload_type, e))?;
|
||||||
|
|
||||||
for object in content.inner.map_right(|o| vec![o]).into_inner() {
|
for object in content.inner.map_right(|o| vec![o]).into_inner() {
|
||||||
builder
|
builder
|
||||||
@ -151,9 +164,57 @@ pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
|||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
.map_err(DocumentFormatError::Internal)?;
|
.map_err(DocumentFormatError::Internal)?;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let count = builder.documents_count();
|
let count = builder.documents_count();
|
||||||
let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Internal)?;
|
let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Internal)?;
|
||||||
|
|
||||||
Ok(count as usize)
|
Ok(count as usize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The actual handling of the deserialization process in the serde avoids storing the deserialized object in memory.
|
||||||
|
* Reference:
|
||||||
|
* https://serde.rs/stream-array.html
|
||||||
|
* https://github.com/serde-rs/json/issues/160
|
||||||
|
*/
|
||||||
|
fn array_each<'de, D, T, F>(deserializer: D, f: F) -> std::result::Result<io::Result<u64>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
T: Deserialize<'de>,
|
||||||
|
F: FnMut(T) -> io::Result<()>,
|
||||||
|
{
|
||||||
|
struct SeqVisitor<T, F>(F, PhantomData<T>);
|
||||||
|
|
||||||
|
impl<'de, T, F> Visitor<'de> for SeqVisitor<T, F>
|
||||||
|
where
|
||||||
|
T: Deserialize<'de>,
|
||||||
|
F: FnMut(T) -> io::Result<()>,
|
||||||
|
{
|
||||||
|
type Value = io::Result<u64>;
|
||||||
|
|
||||||
|
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
formatter.write_str("a nonempty sequence")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn visit_seq<A>(
|
||||||
|
mut self,
|
||||||
|
mut seq: A,
|
||||||
|
) -> std::result::Result<io::Result<u64>, <A as SeqAccess<'de>>::Error>
|
||||||
|
where
|
||||||
|
A: SeqAccess<'de>,
|
||||||
|
{
|
||||||
|
let mut max: u64 = 0;
|
||||||
|
while let Some(value) = seq.next_element::<T>()? {
|
||||||
|
match self.0(value) {
|
||||||
|
Ok(()) => max += 1,
|
||||||
|
Err(e) => return Ok(Err(e)),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Ok(Ok(max))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let visitor = SeqVisitor(f, PhantomData);
|
||||||
|
deserializer.deserialize_seq(visitor)
|
||||||
|
}
|
||||||
|
@ -95,6 +95,8 @@ pub enum PayloadError {
|
|||||||
MalformedPayload(serde_json::error::Error),
|
MalformedPayload(serde_json::error::Error),
|
||||||
#[error("A json payload is missing.")]
|
#[error("A json payload is missing.")]
|
||||||
MissingPayload,
|
MissingPayload,
|
||||||
|
#[error("Error while writing the playload to disk: `{0}`.")]
|
||||||
|
ReceivePayloadErr(Box<dyn std::error::Error + Send + Sync + 'static>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ErrorCode for PayloadError {
|
impl ErrorCode for PayloadError {
|
||||||
@ -126,6 +128,7 @@ impl ErrorCode for PayloadError {
|
|||||||
},
|
},
|
||||||
PayloadError::MissingPayload => Code::MissingPayload,
|
PayloadError::MissingPayload => Code::MissingPayload,
|
||||||
PayloadError::MalformedPayload(_) => Code::MalformedPayload,
|
PayloadError::MalformedPayload(_) => Code::MalformedPayload,
|
||||||
|
PayloadError::ReceivePayloadErr(_) => Code::Internal,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use std::io::{Cursor, ErrorKind};
|
use std::io::ErrorKind;
|
||||||
|
|
||||||
use actix_web::http::header::CONTENT_TYPE;
|
use actix_web::http::header::CONTENT_TYPE;
|
||||||
use actix_web::web::Data;
|
use actix_web::web::Data;
|
||||||
@ -20,9 +20,13 @@ use once_cell::sync::Lazy;
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_cs::vec::CS;
|
use serde_cs::vec::CS;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
use tempfile::tempfile;
|
||||||
|
use tokio::fs::File;
|
||||||
|
use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};
|
||||||
|
|
||||||
use crate::analytics::{Analytics, DocumentDeletionKind};
|
use crate::analytics::{Analytics, DocumentDeletionKind};
|
||||||
use crate::error::MeilisearchHttpError;
|
use crate::error::MeilisearchHttpError;
|
||||||
|
use crate::error::PayloadError::ReceivePayloadErr;
|
||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
use crate::extractors::payload::Payload;
|
use crate::extractors::payload::Payload;
|
||||||
@ -227,26 +231,52 @@ async fn document_addition(
|
|||||||
|
|
||||||
let (uuid, mut update_file) = index_scheduler.create_update_file()?;
|
let (uuid, mut update_file) = index_scheduler.create_update_file()?;
|
||||||
|
|
||||||
// TODO: this can be slow, maybe we should spawn a thread? But the payload isn't Send+Sync :weary:
|
let temp_file = match tempfile() {
|
||||||
// push the entire stream into a `Vec`.
|
Ok(temp_file) => temp_file,
|
||||||
// If someone sends us a never ending stream we're going to block the thread.
|
Err(e) => {
|
||||||
// TODO: Maybe we should write it to a file to reduce the RAM consumption
|
return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e))));
|
||||||
// and then reread it to convert it to obkv?
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
while let Some(bytes) = body.next().await {
|
|
||||||
buffer.extend_from_slice(&bytes?);
|
|
||||||
}
|
}
|
||||||
if buffer.is_empty() {
|
};
|
||||||
|
|
||||||
|
let async_file = File::from_std(temp_file);
|
||||||
|
let mut buffer = BufWriter::new(async_file);
|
||||||
|
|
||||||
|
let mut buffer_write_size: usize = 0;
|
||||||
|
while let Some(bytes) = body.next().await {
|
||||||
|
let byte = &bytes?;
|
||||||
|
|
||||||
|
if byte.is_empty() && buffer_write_size == 0 {
|
||||||
return Err(MeilisearchHttpError::MissingPayload(format));
|
return Err(MeilisearchHttpError::MissingPayload(format));
|
||||||
}
|
}
|
||||||
let reader = Cursor::new(buffer);
|
|
||||||
|
match buffer.write_all(byte).await {
|
||||||
|
Ok(()) => buffer_write_size += 1,
|
||||||
|
Err(e) => {
|
||||||
|
return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e))));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = buffer.flush().await {
|
||||||
|
return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e))));
|
||||||
|
}
|
||||||
|
|
||||||
|
if buffer_write_size == 0 {
|
||||||
|
return Err(MeilisearchHttpError::MissingPayload(format));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await {
|
||||||
|
return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e))));
|
||||||
|
};
|
||||||
|
|
||||||
|
let read_file = buffer.into_inner().into_std().await;
|
||||||
|
|
||||||
let documents_count =
|
let documents_count =
|
||||||
tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> {
|
tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> {
|
||||||
let documents_count = match format {
|
let documents_count = match format {
|
||||||
PayloadType::Json => read_json(reader, update_file.as_file_mut())?,
|
PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?,
|
||||||
PayloadType::Csv => read_csv(reader, update_file.as_file_mut())?,
|
PayloadType::Csv => read_csv(&read_file, update_file.as_file_mut())?,
|
||||||
PayloadType::Ndjson => read_ndjson(reader, update_file.as_file_mut())?,
|
PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?,
|
||||||
};
|
};
|
||||||
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
||||||
update_file.persist()?;
|
update_file.persist()?;
|
||||||
|
@ -436,7 +436,7 @@ async fn error_add_malformed_ndjson_documents() {
|
|||||||
assert_eq!(
|
assert_eq!(
|
||||||
response["message"],
|
response["message"],
|
||||||
json!(
|
json!(
|
||||||
r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 2 column 2`."#
|
r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: trailing characters at line 2 column 1`."#
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
assert_eq!(response["code"], json!("malformed_payload"));
|
assert_eq!(response["code"], json!("malformed_payload"));
|
||||||
@ -456,7 +456,7 @@ async fn error_add_malformed_ndjson_documents() {
|
|||||||
assert_eq!(status_code, 400);
|
assert_eq!(status_code, 400);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
response["message"],
|
response["message"],
|
||||||
json!("The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 2 column 2`.")
|
json!("The `ndjson` payload provided is malformed. `Couldn't serialize document value: trailing characters at line 2 column 1`.")
|
||||||
);
|
);
|
||||||
assert_eq!(response["code"], json!("malformed_payload"));
|
assert_eq!(response["code"], json!("malformed_payload"));
|
||||||
assert_eq!(response["type"], json!("invalid_request"));
|
assert_eq!(response["type"], json!("invalid_request"));
|
||||||
|
Loading…
Reference in New Issue
Block a user