mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-23 05:14:27 +01:00
implement review suggestions
This commit is contained in:
parent
f9445c1d90
commit
baddd80069
@ -1,7 +1,7 @@
|
|||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
use std::fs::{create_dir_all, remove_dir_all, File};
|
use std::fs::{create_dir_all, remove_dir_all, File};
|
||||||
use std::io::{self, Cursor, Read, Seek};
|
use std::io::{self, BufRead, BufReader, Cursor, Read, Seek};
|
||||||
use std::num::ParseFloatError;
|
use std::num::ParseFloatError;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
@ -146,44 +146,34 @@ pub fn documents_from(filename: &str, filetype: &str) -> DocumentBatchReader<imp
|
|||||||
DocumentBatchReader::from_reader(Cursor::new(documents)).unwrap()
|
DocumentBatchReader::from_reader(Cursor::new(documents)).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn documents_from_jsonl(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
|
fn documents_from_jsonl(reader: impl Read) -> anyhow::Result<Vec<u8>> {
|
||||||
let mut writer = Cursor::new(Vec::new());
|
let mut writer = Cursor::new(Vec::new());
|
||||||
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
||||||
|
|
||||||
let values = serde_json::Deserializer::from_reader(reader)
|
let mut buf = String::new();
|
||||||
.into_iter::<serde_json::Map<String, serde_json::Value>>();
|
let mut reader = BufReader::new(reader);
|
||||||
for document in values {
|
|
||||||
let document = document?;
|
while reader.read_line(&mut buf)? > 0 {
|
||||||
documents.add_documents(document)?;
|
documents.extend_from_json(&mut buf.as_bytes())?;
|
||||||
}
|
}
|
||||||
documents.finish()?;
|
documents.finish()?;
|
||||||
|
|
||||||
Ok(writer.into_inner())
|
Ok(writer.into_inner())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn documents_from_json(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
|
fn documents_from_json(reader: impl Read) -> anyhow::Result<Vec<u8>> {
|
||||||
let mut writer = Cursor::new(Vec::new());
|
let mut writer = Cursor::new(Vec::new());
|
||||||
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
||||||
|
|
||||||
let json: serde_json::Value = serde_json::from_reader(reader)?;
|
documents.extend_from_json(reader)?;
|
||||||
documents.add_documents(json)?;
|
|
||||||
documents.finish()?;
|
documents.finish()?;
|
||||||
|
|
||||||
Ok(writer.into_inner())
|
Ok(writer.into_inner())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn documents_from_csv(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
|
fn documents_from_csv(reader: impl Read) -> anyhow::Result<Vec<u8>> {
|
||||||
let mut writer = Cursor::new(Vec::new());
|
let mut writer = Cursor::new(Vec::new());
|
||||||
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?;
|
||||||
|
|
||||||
let iter = CSVDocumentDeserializer::from_reader(reader)?;
|
|
||||||
|
|
||||||
for doc in iter {
|
|
||||||
let doc = doc?;
|
|
||||||
documents.add_documents(doc)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
documents.finish()?;
|
|
||||||
|
|
||||||
Ok(writer.into_inner())
|
Ok(writer.into_inner())
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{stdin, Cursor, Read};
|
use std::io::{stdin, BufRead, BufReader, Cursor, Read};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
@ -9,7 +9,6 @@ use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
|
|||||||
use milli::update::UpdateIndexingStep::{
|
use milli::update::UpdateIndexingStep::{
|
||||||
ComputeIdsAndMergeDocuments, IndexDocuments, MergeDataIntoFinalDatabase, RemapDocumentAddition,
|
ComputeIdsAndMergeDocuments, IndexDocuments, MergeDataIntoFinalDatabase, RemapDocumentAddition,
|
||||||
};
|
};
|
||||||
use serde_json::{Map, Value};
|
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
@ -202,11 +201,11 @@ fn documents_from_jsonl(reader: impl Read) -> Result<Vec<u8>> {
|
|||||||
let mut writer = Cursor::new(Vec::new());
|
let mut writer = Cursor::new(Vec::new());
|
||||||
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
||||||
|
|
||||||
let values = serde_json::Deserializer::from_reader(reader)
|
let mut buf = String::new();
|
||||||
.into_iter::<serde_json::Map<String, serde_json::Value>>();
|
let mut reader = BufReader::new(reader);
|
||||||
for document in values {
|
|
||||||
let document = document?;
|
while reader.read_line(&mut buf)? > 0 {
|
||||||
documents.add_documents(document)?;
|
documents.extend_from_json(&mut buf.as_bytes())?;
|
||||||
}
|
}
|
||||||
documents.finish()?;
|
documents.finish()?;
|
||||||
|
|
||||||
@ -217,8 +216,7 @@ fn documents_from_json(reader: impl Read) -> Result<Vec<u8>> {
|
|||||||
let mut writer = Cursor::new(Vec::new());
|
let mut writer = Cursor::new(Vec::new());
|
||||||
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
||||||
|
|
||||||
let json: serde_json::Value = serde_json::from_reader(reader)?;
|
documents.extend_from_json(reader)?;
|
||||||
documents.add_documents(json)?;
|
|
||||||
documents.finish()?;
|
documents.finish()?;
|
||||||
|
|
||||||
Ok(writer.into_inner())
|
Ok(writer.into_inner())
|
||||||
@ -226,17 +224,7 @@ fn documents_from_json(reader: impl Read) -> Result<Vec<u8>> {
|
|||||||
|
|
||||||
fn documents_from_csv(reader: impl Read) -> Result<Vec<u8>> {
|
fn documents_from_csv(reader: impl Read) -> Result<Vec<u8>> {
|
||||||
let mut writer = Cursor::new(Vec::new());
|
let mut writer = Cursor::new(Vec::new());
|
||||||
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?;
|
||||||
|
|
||||||
let mut records = csv::Reader::from_reader(reader);
|
|
||||||
let iter = records.deserialize::<Map<String, Value>>();
|
|
||||||
|
|
||||||
for doc in iter {
|
|
||||||
let doc = doc?;
|
|
||||||
documents.add_documents(doc)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
documents.finish()?;
|
|
||||||
|
|
||||||
Ok(writer.into_inner())
|
Ok(writer.into_inner())
|
||||||
}
|
}
|
||||||
|
@ -1,285 +0,0 @@
|
|||||||
use std::io::{Read, Result as IoResult};
|
|
||||||
use std::num::ParseFloatError;
|
|
||||||
|
|
||||||
use serde_json::{Map, Value};
|
|
||||||
|
|
||||||
enum AllowedType {
|
|
||||||
String,
|
|
||||||
Number,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_csv_header(header: &str) -> (String, AllowedType) {
|
|
||||||
// if there are several separators we only split on the last one.
|
|
||||||
match header.rsplit_once(':') {
|
|
||||||
Some((field_name, field_type)) => match field_type {
|
|
||||||
"string" => (field_name.to_string(), AllowedType::String),
|
|
||||||
"number" => (field_name.to_string(), AllowedType::Number),
|
|
||||||
// we may return an error in this case.
|
|
||||||
_otherwise => (header.to_string(), AllowedType::String),
|
|
||||||
},
|
|
||||||
None => (header.to_string(), AllowedType::String),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct CSVDocumentDeserializer<R>
|
|
||||||
where
|
|
||||||
R: Read,
|
|
||||||
{
|
|
||||||
documents: csv::StringRecordsIntoIter<R>,
|
|
||||||
headers: Vec<(String, AllowedType)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R: Read> CSVDocumentDeserializer<R> {
|
|
||||||
pub fn from_reader(reader: R) -> IoResult<Self> {
|
|
||||||
let mut records = csv::Reader::from_reader(reader);
|
|
||||||
|
|
||||||
let headers = records.headers()?.into_iter().map(parse_csv_header).collect();
|
|
||||||
|
|
||||||
Ok(Self { documents: records.into_records(), headers })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R: Read> Iterator for CSVDocumentDeserializer<R> {
|
|
||||||
type Item = anyhow::Result<Map<String, Value>>;
|
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
|
||||||
let csv_document = self.documents.next()?;
|
|
||||||
|
|
||||||
match csv_document {
|
|
||||||
Ok(csv_document) => {
|
|
||||||
let mut document = Map::new();
|
|
||||||
|
|
||||||
for ((field_name, field_type), value) in
|
|
||||||
self.headers.iter().zip(csv_document.into_iter())
|
|
||||||
{
|
|
||||||
let parsed_value: Result<Value, ParseFloatError> = match field_type {
|
|
||||||
AllowedType::Number => {
|
|
||||||
value.parse::<f64>().map(Value::from).map_err(Into::into)
|
|
||||||
}
|
|
||||||
AllowedType::String => Ok(Value::String(value.to_string())),
|
|
||||||
};
|
|
||||||
|
|
||||||
match parsed_value {
|
|
||||||
Ok(value) => drop(document.insert(field_name.to_string(), value)),
|
|
||||||
Err(_e) => {
|
|
||||||
return Some(Err(anyhow::anyhow!(
|
|
||||||
"Value '{}' is not a valid number",
|
|
||||||
value
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(Ok(document))
|
|
||||||
}
|
|
||||||
Err(e) => Some(Err(anyhow::anyhow!("Error parsing csv document: {}", e))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use serde_json::json;
|
|
||||||
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn simple_csv_document() {
|
|
||||||
let documents = r#"city,country,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
Value::Object(csv_iter.next().unwrap().unwrap()),
|
|
||||||
json!({
|
|
||||||
"city": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn coma_in_field() {
|
|
||||||
let documents = r#"city,country,pop
|
|
||||||
"Boston","United, States","4628910""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
Value::Object(csv_iter.next().unwrap().unwrap()),
|
|
||||||
json!({
|
|
||||||
"city": "Boston",
|
|
||||||
"country": "United, States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn quote_in_field() {
|
|
||||||
let documents = r#"city,country,pop
|
|
||||||
"Boston","United"" States","4628910""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
Value::Object(csv_iter.next().unwrap().unwrap()),
|
|
||||||
json!({
|
|
||||||
"city": "Boston",
|
|
||||||
"country": "United\" States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn integer_in_field() {
|
|
||||||
let documents = r#"city,country,pop:number
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
Value::Object(csv_iter.next().unwrap().unwrap()),
|
|
||||||
json!({
|
|
||||||
"city": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": 4628910.0,
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn float_in_field() {
|
|
||||||
let documents = r#"city,country,pop:number
|
|
||||||
"Boston","United States","4628910.01""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
Value::Object(csv_iter.next().unwrap().unwrap()),
|
|
||||||
json!({
|
|
||||||
"city": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": 4628910.01,
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn several_double_dot_in_header() {
|
|
||||||
let documents = r#"city:love:string,country:state,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
Value::Object(csv_iter.next().unwrap().unwrap()),
|
|
||||||
json!({
|
|
||||||
"city:love": "Boston",
|
|
||||||
"country:state": "United States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn ending_by_double_dot_in_header() {
|
|
||||||
let documents = r#"city:,country,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
Value::Object(csv_iter.next().unwrap().unwrap()),
|
|
||||||
json!({
|
|
||||||
"city:": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn starting_by_double_dot_in_header() {
|
|
||||||
let documents = r#":city,country,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
Value::Object(csv_iter.next().unwrap().unwrap()),
|
|
||||||
json!({
|
|
||||||
":city": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn starting_by_double_dot_in_header2() {
|
|
||||||
let documents = r#":string,country,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
Value::Object(csv_iter.next().unwrap().unwrap()),
|
|
||||||
json!({
|
|
||||||
"": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn double_double_dot_in_header() {
|
|
||||||
let documents = r#"city::string,country,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
Value::Object(csv_iter.next().unwrap().unwrap()),
|
|
||||||
json!({
|
|
||||||
"city:": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn bad_type_in_header() {
|
|
||||||
let documents = r#"city,country:number,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert!(csv_iter.next().unwrap().is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn bad_column_count1() {
|
|
||||||
let documents = r#"city,country,pop
|
|
||||||
"Boston","United States","4628910", "too much""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert!(csv_iter.next().unwrap().is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn bad_column_count2() {
|
|
||||||
let documents = r#"city,country,pop
|
|
||||||
"Boston","United States""#;
|
|
||||||
|
|
||||||
let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap();
|
|
||||||
|
|
||||||
assert!(csv_iter.next().unwrap().is_err());
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,10 +1,9 @@
|
|||||||
mod documents_from_csv;
|
|
||||||
mod update_store;
|
mod update_store;
|
||||||
|
|
||||||
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
|
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::fs::{create_dir_all, File};
|
use std::fs::{create_dir_all, File};
|
||||||
use std::io::Cursor;
|
use std::io::{BufRead, BufReader, Cursor};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::num::{NonZeroU32, NonZeroUsize};
|
use std::num::{NonZeroU32, NonZeroUsize};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@ -39,7 +38,6 @@ use warp::http::Response;
|
|||||||
use warp::Filter;
|
use warp::Filter;
|
||||||
|
|
||||||
use self::update_store::UpdateStore;
|
use self::update_store::UpdateStore;
|
||||||
use crate::documents_from_csv::CSVDocumentDeserializer;
|
|
||||||
|
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
#[global_allocator]
|
#[global_allocator]
|
||||||
@ -1041,11 +1039,11 @@ fn documents_from_jsonl(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
|
|||||||
let mut writer = Cursor::new(Vec::new());
|
let mut writer = Cursor::new(Vec::new());
|
||||||
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
||||||
|
|
||||||
let values = serde_json::Deserializer::from_reader(reader)
|
let mut buf = String::new();
|
||||||
.into_iter::<serde_json::Map<String, serde_json::Value>>();
|
let mut reader = BufReader::new(reader);
|
||||||
for document in values {
|
|
||||||
let document = document?;
|
while reader.read_line(&mut buf)? > 0 {
|
||||||
documents.add_documents(document)?;
|
documents.extend_from_json(&mut buf.as_bytes())?;
|
||||||
}
|
}
|
||||||
documents.finish()?;
|
documents.finish()?;
|
||||||
|
|
||||||
@ -1056,8 +1054,7 @@ fn documents_from_json(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
|
|||||||
let mut writer = Cursor::new(Vec::new());
|
let mut writer = Cursor::new(Vec::new());
|
||||||
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
||||||
|
|
||||||
let json: serde_json::Value = serde_json::from_reader(reader)?;
|
documents.extend_from_json(reader)?;
|
||||||
documents.add_documents(json)?;
|
|
||||||
documents.finish()?;
|
documents.finish()?;
|
||||||
|
|
||||||
Ok(writer.into_inner())
|
Ok(writer.into_inner())
|
||||||
@ -1065,16 +1062,7 @@ fn documents_from_json(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
|
|||||||
|
|
||||||
fn documents_from_csv(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
|
fn documents_from_csv(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
|
||||||
let mut writer = Cursor::new(Vec::new());
|
let mut writer = Cursor::new(Vec::new());
|
||||||
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?;
|
||||||
|
|
||||||
let iter = CSVDocumentDeserializer::from_reader(reader)?;
|
|
||||||
|
|
||||||
for doc in iter {
|
|
||||||
let doc = doc?;
|
|
||||||
documents.add_documents(doc)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
documents.finish()?;
|
|
||||||
|
|
||||||
Ok(writer.into_inner())
|
Ok(writer.into_inner())
|
||||||
}
|
}
|
||||||
|
@ -1,16 +1,14 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::io::Cursor;
|
use std::io::{Cursor, Write};
|
||||||
use std::io::Write;
|
|
||||||
|
|
||||||
use byteorder::{BigEndian, WriteBytesExt};
|
use byteorder::{BigEndian, WriteBytesExt};
|
||||||
use serde::Deserializer;
|
use serde::Deserializer;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::FieldId;
|
|
||||||
|
|
||||||
use super::serde::DocumentVisitor;
|
use super::serde::DocumentVisitor;
|
||||||
use super::{ByteCounter, DocumentsBatchIndex, DocumentsMetadata, Error};
|
use super::{ByteCounter, DocumentsBatchIndex, DocumentsMetadata, Error};
|
||||||
|
use crate::FieldId;
|
||||||
|
|
||||||
/// The `DocumentsBatchBuilder` provides a way to build a documents batch in the intermediary
|
/// The `DocumentsBatchBuilder` provides a way to build a documents batch in the intermediary
|
||||||
/// format used by milli.
|
/// format used by milli.
|
||||||
@ -27,7 +25,7 @@ use super::{ByteCounter, DocumentsBatchIndex, DocumentsMetadata, Error};
|
|||||||
/// let json = r##"{"id": 1, "name": "foo"}"##;
|
/// let json = r##"{"id": 1, "name": "foo"}"##;
|
||||||
/// let mut writer = Cursor::new(Vec::new());
|
/// let mut writer = Cursor::new(Vec::new());
|
||||||
/// let mut builder = DocumentBatchBuilder::new(&mut writer).unwrap();
|
/// let mut builder = DocumentBatchBuilder::new(&mut writer).unwrap();
|
||||||
/// builder.extend_from_json(Cursor::new(json.as_bytes())).unwrap();
|
/// builder.extend_from_json(&mut json.as_bytes()).unwrap();
|
||||||
/// builder.finish().unwrap();
|
/// builder.finish().unwrap();
|
||||||
/// ```
|
/// ```
|
||||||
pub struct DocumentBatchBuilder<W> {
|
pub struct DocumentBatchBuilder<W> {
|
||||||
@ -46,16 +44,14 @@ impl<W: io::Write + io::Seek> DocumentBatchBuilder<W> {
|
|||||||
// add space to write the offset of the metadata at the end of the writer
|
// add space to write the offset of the metadata at the end of the writer
|
||||||
writer.write_u64::<BigEndian>(0)?;
|
writer.write_u64::<BigEndian>(0)?;
|
||||||
|
|
||||||
let this = Self {
|
Ok(Self {
|
||||||
inner: writer,
|
inner: writer,
|
||||||
index,
|
index,
|
||||||
obkv_buffer: Vec::new(),
|
obkv_buffer: Vec::new(),
|
||||||
value_buffer: Vec::new(),
|
value_buffer: Vec::new(),
|
||||||
values: BTreeMap::new(),
|
values: BTreeMap::new(),
|
||||||
count: 0,
|
count: 0,
|
||||||
};
|
})
|
||||||
|
|
||||||
Ok(this)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the number of documents that have been written to the builder.
|
/// Returns the number of documents that have been written to the builder.
|
||||||
@ -117,27 +113,31 @@ impl<W: io::Write + io::Seek> DocumentBatchBuilder<W> {
|
|||||||
|
|
||||||
for (i, record) in records.into_records().enumerate() {
|
for (i, record) in records.into_records().enumerate() {
|
||||||
let record = record?;
|
let record = record?;
|
||||||
let mut writer = obkv::KvWriter::new(Cursor::new(&mut this.obkv_buffer));
|
this.obkv_buffer.clear();
|
||||||
|
let mut writer = obkv::KvWriter::new(&mut this.obkv_buffer);
|
||||||
for (value, (fid, ty)) in record.into_iter().zip(headers.iter()) {
|
for (value, (fid, ty)) in record.into_iter().zip(headers.iter()) {
|
||||||
let value = match ty {
|
let value = match ty {
|
||||||
AllowedType::Number => value.parse::<f64>().map(Value::from).map_err(|error| Error::ParseFloat {
|
AllowedType::Number => {
|
||||||
error,
|
value.parse::<f64>().map(Value::from).map_err(|error| {
|
||||||
// +1 for the header offset.
|
Error::ParseFloat {
|
||||||
line: i + 1,
|
error,
|
||||||
value: value.to_string(),
|
// +1 for the header offset.
|
||||||
})?,
|
line: i + 1,
|
||||||
|
value: value.to_string(),
|
||||||
|
}
|
||||||
|
})?
|
||||||
|
}
|
||||||
AllowedType::String => Value::String(value.to_string()),
|
AllowedType::String => Value::String(value.to_string()),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
this.value_buffer.clear();
|
||||||
serde_json::to_writer(Cursor::new(&mut this.value_buffer), &value)?;
|
serde_json::to_writer(Cursor::new(&mut this.value_buffer), &value)?;
|
||||||
writer.insert(*fid, &this.value_buffer)?;
|
writer.insert(*fid, &this.value_buffer)?;
|
||||||
this.value_buffer.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
this.inner.write_u32::<BigEndian>(this.obkv_buffer.len() as u32)?;
|
this.inner.write_u32::<BigEndian>(this.obkv_buffer.len() as u32)?;
|
||||||
this.inner.write_all(&this.obkv_buffer)?;
|
this.inner.write_all(&this.obkv_buffer)?;
|
||||||
|
|
||||||
this.obkv_buffer.clear();
|
|
||||||
this.count += 1;
|
this.count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -156,7 +156,8 @@ fn parse_csv_header(header: &str) -> (String, AllowedType) {
|
|||||||
match header.rsplit_once(':') {
|
match header.rsplit_once(':') {
|
||||||
Some((field_name, field_type)) => match field_type {
|
Some((field_name, field_type)) => match field_type {
|
||||||
"string" => (field_name.to_string(), AllowedType::String),
|
"string" => (field_name.to_string(), AllowedType::String),
|
||||||
"number" => (field_name.to_string(), AllowedType::Number), // if the pattern isn't reconized, we keep the whole field.
|
"number" => (field_name.to_string(), AllowedType::Number),
|
||||||
|
// if the pattern isn't reconized, we keep the whole field.
|
||||||
_otherwise => (header.to_string(), AllowedType::String),
|
_otherwise => (header.to_string(), AllowedType::String),
|
||||||
},
|
},
|
||||||
None => (header.to_string(), AllowedType::String),
|
None => (header.to_string(), AllowedType::String),
|
||||||
@ -169,9 +170,8 @@ mod test {
|
|||||||
|
|
||||||
use serde_json::{json, Map};
|
use serde_json::{json, Map};
|
||||||
|
|
||||||
use crate::documents::DocumentBatchReader;
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::documents::DocumentBatchReader;
|
||||||
|
|
||||||
fn obkv_to_value(obkv: &obkv::KvReader<FieldId>, index: &DocumentsBatchIndex) -> Value {
|
fn obkv_to_value(obkv: &obkv::KvReader<FieldId>, index: &DocumentsBatchIndex) -> Value {
|
||||||
let mut map = Map::new();
|
let mut map = Map::new();
|
||||||
@ -525,7 +525,9 @@ mod test {
|
|||||||
"Boston","United States","4628910""#;
|
"Boston","United States","4628910""#;
|
||||||
|
|
||||||
let mut buf = Vec::new();
|
let mut buf = Vec::new();
|
||||||
assert!(DocumentBatchBuilder::from_csv(documents.as_bytes(), Cursor::new(&mut buf)).is_err());
|
assert!(
|
||||||
|
DocumentBatchBuilder::from_csv(documents.as_bytes(), Cursor::new(&mut buf)).is_err()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -534,7 +536,9 @@ mod test {
|
|||||||
"Boston","United States","4628910", "too much""#;
|
"Boston","United States","4628910", "too much""#;
|
||||||
|
|
||||||
let mut buf = Vec::new();
|
let mut buf = Vec::new();
|
||||||
assert!(DocumentBatchBuilder::from_csv(documents.as_bytes(), Cursor::new(&mut buf)).is_err());
|
assert!(
|
||||||
|
DocumentBatchBuilder::from_csv(documents.as_bytes(), Cursor::new(&mut buf)).is_err()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -543,6 +547,8 @@ mod test {
|
|||||||
"Boston","United States""#;
|
"Boston","United States""#;
|
||||||
|
|
||||||
let mut buf = Vec::new();
|
let mut buf = Vec::new();
|
||||||
assert!(DocumentBatchBuilder::from_csv(documents.as_bytes(), Cursor::new(&mut buf)).is_err());
|
assert!(
|
||||||
|
DocumentBatchBuilder::from_csv(documents.as_bytes(), Cursor::new(&mut buf)).is_err()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,9 +7,8 @@ mod builder;
|
|||||||
mod reader;
|
mod reader;
|
||||||
mod serde;
|
mod serde;
|
||||||
|
|
||||||
use std::num::ParseFloatError;
|
|
||||||
use std::io;
|
|
||||||
use std::fmt::{self, Debug};
|
use std::fmt::{self, Debug};
|
||||||
|
use std::io;
|
||||||
|
|
||||||
use ::serde::{Deserialize, Serialize};
|
use ::serde::{Deserialize, Serialize};
|
||||||
use bimap::BiHashMap;
|
use bimap::BiHashMap;
|
||||||
@ -24,7 +23,7 @@ pub struct DocumentsBatchIndex(pub BiHashMap<FieldId, String>);
|
|||||||
|
|
||||||
impl DocumentsBatchIndex {
|
impl DocumentsBatchIndex {
|
||||||
/// Insert the field in the map, or return it's field id if it doesn't already exists.
|
/// Insert the field in the map, or return it's field id if it doesn't already exists.
|
||||||
pub fn insert(&mut self, field: &str) -> FieldId {
|
pub fn insert(&mut self, field: &str) -> FieldId {
|
||||||
match self.0.get_by_right(field) {
|
match self.0.get_by_right(field) {
|
||||||
Some(field_id) => *field_id,
|
Some(field_id) => *field_id,
|
||||||
None => {
|
None => {
|
||||||
@ -43,7 +42,7 @@ impl DocumentsBatchIndex {
|
|||||||
self.0.len()
|
self.0.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn iter(&self) -> impl Iterator<Item=(&FieldId, &String)> {
|
pub fn iter(&self) -> bimap::hash::Iter<FieldId, String> {
|
||||||
self.0.iter()
|
self.0.iter()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,11 +82,7 @@ impl<W: io::Write> io::Write for ByteCounter<W> {
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
ParseFloat {
|
ParseFloat { error: std::num::ParseFloatError, line: usize, value: String },
|
||||||
error: std::num::ParseFloatError,
|
|
||||||
line: usize,
|
|
||||||
value: String,
|
|
||||||
},
|
|
||||||
InvalidDocumentFormat,
|
InvalidDocumentFormat,
|
||||||
Custom(String),
|
Custom(String),
|
||||||
JsonError(serde_json::Error),
|
JsonError(serde_json::Error),
|
||||||
@ -124,7 +119,9 @@ impl From<serde_json::Error> for Error {
|
|||||||
impl fmt::Display for Error {
|
impl fmt::Display for Error {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
Error::ParseFloat { error, line, value} => write!(f, "Error parsing number {:?} at line {}: {}", value, line, error),
|
Error::ParseFloat { error, line, value } => {
|
||||||
|
write!(f, "Error parsing number {:?} at line {}: {}", value, line, error)
|
||||||
|
}
|
||||||
Error::Custom(s) => write!(f, "Unexpected serialization error: {}", s),
|
Error::Custom(s) => write!(f, "Unexpected serialization error: {}", s),
|
||||||
Error::InvalidDocumentFormat => f.write_str("Invalid document addition format."),
|
Error::InvalidDocumentFormat => f.write_str("Invalid document addition format."),
|
||||||
Error::JsonError(err) => write!(f, "Couldn't serialize document value: {}", err),
|
Error::JsonError(err) => write!(f, "Couldn't serialize document value: {}", err),
|
||||||
|
@ -1,18 +1,13 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::io::Cursor;
|
|
||||||
use std::io::Write;
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
use std::io::{Cursor, Write};
|
||||||
|
|
||||||
use byteorder::WriteBytesExt;
|
use byteorder::WriteBytesExt;
|
||||||
|
use serde::de::{DeserializeSeed, MapAccess, SeqAccess, Visitor};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde::de::DeserializeSeed;
|
|
||||||
use serde::de::MapAccess;
|
|
||||||
use serde::de::SeqAccess;
|
|
||||||
use serde::de::Visitor;
|
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use super::Error;
|
use super::{ByteCounter, DocumentsBatchIndex, Error};
|
||||||
use super::{ByteCounter, DocumentsBatchIndex};
|
|
||||||
use crate::FieldId;
|
use crate::FieldId;
|
||||||
|
|
||||||
macro_rules! tri {
|
macro_rules! tri {
|
||||||
@ -31,8 +26,9 @@ impl<'a, 'de> DeserializeSeed<'de> for FieldIdResolver<'a> {
|
|||||||
|
|
||||||
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
|
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
|
||||||
where
|
where
|
||||||
D: serde::Deserializer<'de> {
|
D: serde::Deserializer<'de>,
|
||||||
deserializer.deserialize_str(self)
|
{
|
||||||
|
deserializer.deserialize_str(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,7 +39,7 @@ impl<'a, 'de> Visitor<'de> for FieldIdResolver<'a> {
|
|||||||
where
|
where
|
||||||
E: serde::de::Error,
|
E: serde::de::Error,
|
||||||
{
|
{
|
||||||
Ok(self.0.insert(v))
|
Ok(self.0.insert(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
@ -58,8 +54,9 @@ impl<'de> DeserializeSeed<'de> for ValueDeserializer {
|
|||||||
|
|
||||||
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
|
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
|
||||||
where
|
where
|
||||||
D: serde::Deserializer<'de> {
|
D: serde::Deserializer<'de>,
|
||||||
serde_json::Value::deserialize(deserializer)
|
{
|
||||||
|
serde_json::Value::deserialize(deserializer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -80,7 +77,9 @@ impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> {
|
|||||||
where
|
where
|
||||||
A: SeqAccess<'de>,
|
A: SeqAccess<'de>,
|
||||||
{
|
{
|
||||||
while let Some(v) = seq.next_element_seed(&mut *self)? { tri!(v) }
|
while let Some(v) = seq.next_element_seed(&mut *self)? {
|
||||||
|
tri!(v)
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Ok(()))
|
Ok(Ok(()))
|
||||||
}
|
}
|
||||||
@ -89,7 +88,9 @@ impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> {
|
|||||||
where
|
where
|
||||||
A: MapAccess<'de>,
|
A: MapAccess<'de>,
|
||||||
{
|
{
|
||||||
while let Some((key, value)) = map.next_entry_seed(FieldIdResolver(&mut *self.index), ValueDeserializer)? {
|
while let Some((key, value)) =
|
||||||
|
map.next_entry_seed(FieldIdResolver(&mut *self.index), ValueDeserializer)?
|
||||||
|
{
|
||||||
self.values.insert(key, value);
|
self.values.insert(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,13 +120,15 @@ impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, 'de, W> DeserializeSeed<'de> for &mut DocumentVisitor<'a, W>
|
impl<'a, 'de, W> DeserializeSeed<'de> for &mut DocumentVisitor<'a, W>
|
||||||
where W: Write,
|
where
|
||||||
|
W: Write,
|
||||||
{
|
{
|
||||||
type Value = Result<(), Error>;
|
type Value = Result<(), Error>;
|
||||||
|
|
||||||
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
|
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
|
||||||
where
|
where
|
||||||
D: serde::Deserializer<'de> {
|
D: serde::Deserializer<'de>,
|
||||||
deserializer.deserialize_map(self)
|
{
|
||||||
|
deserializer.deserialize_map(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -906,8 +906,9 @@ mod tests {
|
|||||||
|
|
||||||
let mut cursor = Cursor::new(Vec::new());
|
let mut cursor = Cursor::new(Vec::new());
|
||||||
|
|
||||||
|
let big_object = serde_json::to_string(&big_object).unwrap();
|
||||||
let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
||||||
builder.add_documents(big_object).unwrap();
|
builder.extend_from_json(&mut big_object.as_bytes()).unwrap();
|
||||||
builder.finish().unwrap();
|
builder.finish().unwrap();
|
||||||
cursor.set_position(0);
|
cursor.set_position(0);
|
||||||
let content = DocumentBatchReader::from_reader(cursor).unwrap();
|
let content = DocumentBatchReader::from_reader(cursor).unwrap();
|
||||||
|
@ -544,7 +544,8 @@ mod test {
|
|||||||
mod primary_key_inference {
|
mod primary_key_inference {
|
||||||
use bimap::BiHashMap;
|
use bimap::BiHashMap;
|
||||||
|
|
||||||
use crate::{documents::DocumentsBatchIndex, update::index_documents::transform::find_primary_key};
|
use crate::documents::DocumentsBatchIndex;
|
||||||
|
use crate::update::index_documents::transform::find_primary_key;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn primary_key_infered_on_first_field() {
|
fn primary_key_infered_on_first_field() {
|
||||||
|
Loading…
Reference in New Issue
Block a user