document errors

This commit is contained in:
marin postma 2021-10-24 15:39:56 +02:00
parent 2e62925a6e
commit 53c79e85f2
No known key found for this signature in database
GPG Key ID: 6088B7721C3E39F9
3 changed files with 57 additions and 29 deletions

View File

@ -87,18 +87,13 @@ impl<W: io::Write + io::Seek> DocumentBatchBuilder<W> {
count: &mut self.count, count: &mut self.count,
}; };
de.deserialize_any(&mut visitor).map_err(Error::JsonError)?; de.deserialize_any(&mut visitor).map_err(Error::JsonError)?
Ok(())
} }
/// Extends the builder with json documents from a reader. /// Creates a builder from a reader of CSV documents.
/// ///
/// This method can be only called once and is mutually exclusive with extend from json. This /// Since all fields in a csv documents are guaranteed to be ordered, we are able to perform
/// is because the fields in a csv are always guaranteed to come in order, and permits some /// optimisations, and extending from another CSV is not allowed.
/// optimizations.
///
/// From csv takes care to call finish in the end.
pub fn from_csv<R: io::Read>(reader: R, writer: W) -> Result<Self, Error> { pub fn from_csv<R: io::Read>(reader: R, writer: W) -> Result<Self, Error> {
let mut this = Self::new(writer)?; let mut this = Self::new(writer)?;
@ -108,8 +103,7 @@ impl<W: io::Write + io::Seek> DocumentBatchBuilder<W> {
let mut records = csv::Reader::from_reader(reader); let mut records = csv::Reader::from_reader(reader);
let headers = records let headers = records
.headers() .headers()?
.unwrap()
.into_iter() .into_iter()
.map(parse_csv_header) .map(parse_csv_header)
.map(|(k, t)| (this.index.insert(&k), t)) .map(|(k, t)| (this.index.insert(&k), t))
@ -123,11 +117,11 @@ impl<W: io::Write + io::Seek> DocumentBatchBuilder<W> {
let mut writer = obkv::KvWriter::new(Cursor::new(&mut this.obkv_buffer)); let mut writer = obkv::KvWriter::new(Cursor::new(&mut this.obkv_buffer));
for (value, (fid, ty)) in record.into_iter().zip(headers.iter()) { for (value, (fid, ty)) in record.into_iter().zip(headers.iter()) {
let value = match ty { let value = match ty {
AllowedType::Number => value.parse::<f64>().map(Value::from).unwrap(), AllowedType::Number => value.parse::<f64>().map(Value::from)?,
AllowedType::String => Value::String(value.to_string()), AllowedType::String => Value::String(value.to_string()),
}; };
serde_json::to_writer(Cursor::new(&mut this.value_buffer), &value).unwrap(); serde_json::to_writer(Cursor::new(&mut this.value_buffer), &value)?;
writer.insert(*fid, &this.value_buffer)?; writer.insert(*fid, &this.value_buffer)?;
this.value_buffer.clear(); this.value_buffer.clear();
} }

View File

@ -7,7 +7,9 @@ mod builder;
mod reader; mod reader;
mod serde; mod serde;
use std::{fmt, io}; use std::num::ParseFloatError;
use std::io;
use std::fmt::{self, Debug};
use ::serde::{Deserialize, Serialize}; use ::serde::{Deserialize, Serialize};
use bimap::BiHashMap; use bimap::BiHashMap;
@ -81,14 +83,22 @@ impl<W: io::Write> io::Write for ByteCounter<W> {
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
ParseFloat(std::num::ParseFloatError),
InvalidDocumentFormat, InvalidDocumentFormat,
Custom(String), Custom(String),
JsonError(serde_json::Error), JsonError(serde_json::Error),
CsvError(csv::Error),
Serialize(bincode::Error), Serialize(bincode::Error),
Io(io::Error), Io(io::Error),
DocumentTooLarge, DocumentTooLarge,
} }
impl From<csv::Error> for Error {
fn from(e: csv::Error) -> Self {
Self::CsvError(e)
}
}
impl From<io::Error> for Error { impl From<io::Error> for Error {
fn from(other: io::Error) -> Self { fn from(other: io::Error) -> Self {
Self::Io(other) Self::Io(other)
@ -101,15 +111,29 @@ impl From<bincode::Error> for Error {
} }
} }
impl From<serde_json::Error> for Error {
fn from(other: serde_json::Error) -> Self {
Self::JsonError(other)
}
}
impl From<ParseFloatError> for Error {
fn from(other: ParseFloatError) -> Self {
Self::ParseFloat(other)
}
}
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
Error::ParseFloat(e) => write!(f, "{}", e),
Error::Custom(s) => write!(f, "Unexpected serialization error: {}", s), Error::Custom(s) => write!(f, "Unexpected serialization error: {}", s),
Error::InvalidDocumentFormat => f.write_str("Invalid document addition format."), Error::InvalidDocumentFormat => f.write_str("Invalid document addition format."),
Error::JsonError(err) => write!(f, "Couldn't serialize document value: {}", err), Error::JsonError(err) => write!(f, "Couldn't serialize document value: {}", err),
Error::Io(e) => e.fmt(f), Error::Io(e) => write!(f, "{}", e),
Error::DocumentTooLarge => f.write_str("Provided document is too large (>2Gib)"), Error::DocumentTooLarge => f.write_str("Provided document is too large (>2Gib)"),
Error::Serialize(e) => e.fmt(f), Error::Serialize(e) => write!(f, "{}", e),
Error::CsvError(e) => write!(f, "{}", e),
} }
} }
} }

View File

@ -11,9 +11,19 @@ use serde::de::SeqAccess;
use serde::de::Visitor; use serde::de::Visitor;
use serde_json::Value; use serde_json::Value;
use super::Error;
use super::{ByteCounter, DocumentsBatchIndex}; use super::{ByteCounter, DocumentsBatchIndex};
use crate::FieldId; use crate::FieldId;
macro_rules! tri {
($e:expr) => {
match $e {
Ok(r) => r,
Err(e) => return Ok(Err(e.into())),
}
};
}
struct FieldIdResolver<'a>(&'a mut DocumentsBatchIndex); struct FieldIdResolver<'a>(&'a mut DocumentsBatchIndex);
impl<'a, 'de> DeserializeSeed<'de> for FieldIdResolver<'a> { impl<'a, 'de> DeserializeSeed<'de> for FieldIdResolver<'a> {
@ -36,8 +46,8 @@ impl<'a, 'de> Visitor<'de> for FieldIdResolver<'a> {
Ok(self.0.insert(v)) Ok(self.0.insert(v))
} }
fn expecting(&self, _formatter: &mut fmt::Formatter) -> fmt::Result { fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
todo!() write!(f, "a string")
} }
} }
@ -64,22 +74,22 @@ pub struct DocumentVisitor<'a, W> {
impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> { impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> {
/// This Visitor value is nothing, since it write the value to a file. /// This Visitor value is nothing, since it write the value to a file.
type Value = (); type Value = Result<(), Error>;
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where where
A: SeqAccess<'de>, A: SeqAccess<'de>,
{ {
while let Some(_) = seq.next_element_seed(&mut *self)? { } while let Some(v) = seq.next_element_seed(&mut *self)? { tri!(v) }
Ok(()) Ok(Ok(()))
} }
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where where
A: MapAccess<'de>, A: MapAccess<'de>,
{ {
while let Some((key, value)) = map.next_entry_seed(FieldIdResolver(&mut *self.index), ValueDeserializer).unwrap() { while let Some((key, value)) = map.next_entry_seed(FieldIdResolver(&mut *self.index), ValueDeserializer)? {
self.values.insert(key, value); self.values.insert(key, value);
} }
@ -88,19 +98,19 @@ impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> {
for (key, value) in self.values.iter() { for (key, value) in self.values.iter() {
self.value_buffer.clear(); self.value_buffer.clear();
// This is guaranteed to work // This is guaranteed to work
serde_json::to_writer(Cursor::new(&mut *self.value_buffer), value).unwrap(); tri!(serde_json::to_writer(Cursor::new(&mut *self.value_buffer), value));
obkv.insert(*key, &self.value_buffer).unwrap(); tri!(obkv.insert(*key, &self.value_buffer));
} }
let reader = obkv.into_inner().unwrap().into_inner(); let reader = tri!(obkv.into_inner()).into_inner();
self.inner.write_u32::<byteorder::BigEndian>(reader.len() as u32).unwrap(); tri!(self.inner.write_u32::<byteorder::BigEndian>(reader.len() as u32));
self.inner.write_all(reader).unwrap(); tri!(self.inner.write_all(reader));
*self.count += 1; *self.count += 1;
self.values.clear(); self.values.clear();
Ok(()) Ok(Ok(()))
} }
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result { fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
@ -111,7 +121,7 @@ impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> {
impl<'a, 'de, W> DeserializeSeed<'de> for &mut DocumentVisitor<'a, W> impl<'a, 'de, W> DeserializeSeed<'de> for &mut DocumentVisitor<'a, W>
where W: Write, where W: Write,
{ {
type Value = (); type Value = Result<(), Error>;
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error> fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where where