Use the Error enum everywhere in the project

This commit is contained in:
Kerollmops 2021-06-14 16:46:19 +02:00
parent ca78cb5aca
commit 312c2d1d8e
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
35 changed files with 385 additions and 300 deletions

View file

@ -2,17 +2,19 @@ use std::borrow::Cow;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::iter::Peekable;
use std::result::Result as StdResult;
use std::time::Instant;
use anyhow::{anyhow, Context};
use grenad::CompressionType;
use log::info;
use roaring::RoaringBitmap;
use serde_json::{Map, Value};
use crate::error::{Error, UserError, InternalError};
use crate::update::index_documents::merge_function::{merge_obkvs, keep_latest_obkv};
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
use crate::{Index, BEU32, MergeFn, FieldsIdsMap, ExternalDocumentsIds, FieldId, FieldsDistribution};
use crate::{BEU32, MergeFn, FieldsIdsMap, ExternalDocumentsIds, FieldId, FieldsDistribution};
use crate::{Index, Result};
use super::merge_function::merge_two_obkvs;
use super::{create_writer, create_sorter, IndexDocumentsMethod};
@ -53,7 +55,7 @@ fn is_primary_key(field: impl AsRef<str>) -> bool {
}
impl Transform<'_, '_> {
pub fn output_from_json<R, F>(self, reader: R, progress_callback: F) -> anyhow::Result<TransformOutput>
pub fn output_from_json<R, F>(self, reader: R, progress_callback: F) -> Result<TransformOutput>
where
R: Read,
F: Fn(UpdateIndexingStep) + Sync,
@ -61,7 +63,7 @@ impl Transform<'_, '_> {
self.output_from_generic_json(reader, false, progress_callback)
}
pub fn output_from_json_stream<R, F>(self, reader: R, progress_callback: F) -> anyhow::Result<TransformOutput>
pub fn output_from_json_stream<R, F>(self, reader: R, progress_callback: F) -> Result<TransformOutput>
where
R: Read,
F: Fn(UpdateIndexingStep) + Sync,
@ -74,7 +76,7 @@ impl Transform<'_, '_> {
reader: R,
is_stream: bool,
progress_callback: F,
) -> anyhow::Result<TransformOutput>
) -> Result<TransformOutput>
where
R: Read,
F: Fn(UpdateIndexingStep) + Sync,
@ -88,7 +90,7 @@ impl Transform<'_, '_> {
let iter = Box::new(iter) as Box<dyn Iterator<Item=_>>;
iter.peekable()
} else {
let vec: Vec<_> = serde_json::from_reader(reader)?;
let vec: Vec<_> = serde_json::from_reader(reader).map_err(UserError::SerdeJson)?;
let iter = vec.into_iter().map(Ok);
let iter = Box::new(iter) as Box<dyn Iterator<Item=_>>;
iter.peekable()
@ -96,9 +98,12 @@ impl Transform<'_, '_> {
// We extract the primary key from the first document in
// the batch if it hasn't already been defined in the index
let first = match documents.peek().map(Result::as_ref).transpose() {
let first = match documents.peek().map(StdResult::as_ref).transpose() {
Ok(first) => first,
Err(_) => return Err(documents.next().unwrap().unwrap_err().into()),
Err(_) => {
let error = documents.next().unwrap().unwrap_err();
return Err(UserError::SerdeJson(error).into());
},
};
let alternative_name = first.and_then(|doc| doc.keys().find(|f| is_primary_key(f)).cloned());
@ -145,7 +150,7 @@ impl Transform<'_, '_> {
let mut documents_count = 0;
for result in documents {
let document = result?;
let document = result.map_err(UserError::SerdeJson)?;
if self.log_every_n.map_or(false, |len| documents_count % len == 0) {
progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat {
@ -158,7 +163,7 @@ impl Transform<'_, '_> {
// We prepare the fields ids map with the documents keys.
for (key, _value) in &document {
fields_ids_map.insert(&key).context("field id limit reached")?;
fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?;
}
// We retrieve the user id from the document based on the primary key name,
@ -167,11 +172,13 @@ impl Transform<'_, '_> {
Some(value) => match value {
Value::String(string) => Cow::Borrowed(string.as_str()),
Value::Number(number) => Cow::Owned(number.to_string()),
_ => return Err(anyhow!("documents ids must be either strings or numbers")),
content => return Err(UserError::InvalidDocumentId {
document_id: content.clone(),
}.into()),
},
None => {
if !self.autogenerate_docids {
return Err(anyhow!("missing primary key"));
return Err(UserError::MissingPrimaryKey.into());
}
let uuid = uuid::Uuid::new_v4().to_hyphenated().encode_lower(&mut uuid_buffer);
Cow::Borrowed(uuid)
@ -186,13 +193,15 @@ impl Transform<'_, '_> {
// and this should be the document id we return the one we generated.
if let Some(value) = document.get(name) {
// We serialize the attribute values.
serde_json::to_writer(&mut json_buffer, value)?;
serde_json::to_writer(&mut json_buffer, value).map_err(InternalError::SerdeJson)?;
writer.insert(field_id, &json_buffer)?;
}
// We validate the document id [a-zA-Z0-9\-_].
if field_id == primary_key_id && validate_document_id(&external_id).is_none() {
return Err(anyhow!("invalid document id: {:?}", external_id));
return Err(UserError::InvalidDocumentId {
document_id: Value::from(external_id),
}.into());
}
}
@ -217,7 +226,7 @@ impl Transform<'_, '_> {
)
}
pub fn output_from_csv<R, F>(self, reader: R, progress_callback: F) -> anyhow::Result<TransformOutput>
pub fn output_from_csv<R, F>(self, reader: R, progress_callback: F) -> Result<TransformOutput>
where
R: Read,
F: Fn(UpdateIndexingStep) + Sync,
@ -226,12 +235,12 @@ impl Transform<'_, '_> {
let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap();
let mut csv = csv::Reader::from_reader(reader);
let headers = csv.headers()?;
let headers = csv.headers().map_err(UserError::Csv)?;
let mut fields_ids = Vec::new();
// Generate the new fields ids based on the current fields ids and this CSV headers.
for (i, header) in headers.iter().enumerate() {
let id = fields_ids_map.insert(header).context("field id limit reached)")?;
let id = fields_ids_map.insert(header).ok_or(UserError::AttributeLimitReached)?;
fields_ids.push((id, i));
}
@ -281,7 +290,7 @@ impl Transform<'_, '_> {
let mut documents_count = 0;
let mut record = csv::StringRecord::new();
while csv.read_record(&mut record)? {
while csv.read_record(&mut record).map_err(UserError::Csv)? {
obkv_buffer.clear();
let mut writer = obkv::KvWriter::new(&mut obkv_buffer);
@ -298,7 +307,9 @@ impl Transform<'_, '_> {
// We validate the document id [a-zA-Z0-9\-_].
match validate_document_id(&external_id) {
Some(valid) => valid,
None => return Err(anyhow!("invalid document id: {:?}", external_id)),
None => return Err(UserError::InvalidDocumentId {
document_id: Value::from(external_id),
}.into()),
}
},
None => uuid::Uuid::new_v4().to_hyphenated().encode_lower(&mut uuid_buffer),
@ -316,7 +327,7 @@ impl Transform<'_, '_> {
for (field_id, field) in iter {
// We serialize the attribute values as JSON strings.
json_buffer.clear();
serde_json::to_writer(&mut json_buffer, &field)?;
serde_json::to_writer(&mut json_buffer, &field).map_err(InternalError::SerdeJson)?;
writer.insert(*field_id, &json_buffer)?;
}
@ -344,17 +355,18 @@ impl Transform<'_, '_> {
/// Generate the `TransformOutput` based on the given sorter that can be generated from any
/// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document
/// id for the user side and the value must be an obkv where keys are valid fields ids.
fn output_from_sorter<F>(
fn output_from_sorter<F, E>(
self,
sorter: grenad::Sorter<MergeFn>,
sorter: grenad::Sorter<MergeFn<E>>,
primary_key: String,
fields_ids_map: FieldsIdsMap,
approximate_number_of_documents: usize,
mut external_documents_ids: ExternalDocumentsIds<'_>,
progress_callback: F,
) -> anyhow::Result<TransformOutput>
) -> Result<TransformOutput>
where
F: Fn(UpdateIndexingStep) + Sync,
Error: From<E>,
{
let documents_ids = self.index.documents_ids(self.rtxn)?;
let mut fields_distribution = self.index.fields_distribution(self.rtxn)?;
@ -362,7 +374,7 @@ impl Transform<'_, '_> {
// Once we have sort and deduplicated the documents we write them into a final file.
let mut final_sorter = create_sorter(
|_docid, _obkvs| Err(anyhow!("cannot merge two documents")),
|_id, _obkvs| Err(InternalError::IndexingMergingKeys { process: "merging documents" }),
self.chunk_compression_type,
self.chunk_compression_level,
self.chunk_fusing_shrink_size,
@ -398,7 +410,10 @@ impl Transform<'_, '_> {
IndexDocumentsMethod::UpdateDocuments => {
let key = BEU32::new(docid);
let base_obkv = self.index.documents.get(&self.rtxn, &key)?
.context("document not found")?;
.ok_or(InternalError::DatabaseMissingEntry {
db_name: "documents",
key: None,
})?;
let update_obkv = obkv::KvReader::new(update_obkv);
merge_two_obkvs(base_obkv, update_obkv, &mut obkv_buffer);
(docid, obkv_buffer.as_slice())
@ -409,7 +424,7 @@ impl Transform<'_, '_> {
// If this user id is new we add it to the external documents ids map
// for new ids and into the list of new documents.
let new_docid = available_documents_ids.next()
.context("no more available documents ids")?;
.ok_or(UserError::DocumentLimitReached)?;
new_external_documents_ids_builder.insert(external_id, new_docid as u64)?;
new_documents_ids.insert(new_docid);
(new_docid, update_obkv)
@ -469,7 +484,7 @@ impl Transform<'_, '_> {
primary_key: String,
old_fields_ids_map: FieldsIdsMap,
new_fields_ids_map: FieldsIdsMap,
) -> anyhow::Result<TransformOutput>
) -> Result<TransformOutput>
{
let fields_distribution = self.index.fields_distribution(self.rtxn)?;
let external_documents_ids = self.index.external_documents_ids(self.rtxn)?;
@ -529,10 +544,10 @@ fn compute_primary_key_pair(
fields_ids_map: &mut FieldsIdsMap,
alternative_name: Option<String>,
autogenerate_docids: bool,
) -> anyhow::Result<(FieldId, String)> {
) -> Result<(FieldId, String)> {
match primary_key {
Some(primary_key) => {
let id = fields_ids_map.insert(primary_key).ok_or(anyhow!("Maximum number of fields exceeded"))?;
let id = fields_ids_map.insert(primary_key).ok_or(UserError::AttributeLimitReached)?;
Ok((id, primary_key.to_string()))
}
None => {
@ -542,12 +557,12 @@ fn compute_primary_key_pair(
if !autogenerate_docids {
// If there is no primary key in the current document batch, we must
// return an error and not automatically generate any document id.
anyhow::bail!("missing primary key")
return Err(UserError::MissingPrimaryKey.into());
}
DEFAULT_PRIMARY_KEY_NAME.to_string()
},
};
let id = fields_ids_map.insert(&name).context("field id limit reached")?;
let id = fields_ids_map.insert(&name).ok_or(UserError::AttributeLimitReached)?;
Ok((id, name))
},
}