Improve the tasks error reporting

This commit is contained in:
Kerollmops 2022-06-16 15:58:39 +02:00
parent 75c7fd6afa
commit 669a3ff85f
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
17 changed files with 92 additions and 89 deletions

View File

@ -1,17 +0,0 @@
use meilisearch_lib::heed::Env;
use walkdir::WalkDir;
pub trait EnvSizer {
fn size(&self) -> u64;
}
impl EnvSizer for Env {
fn size(&self) -> u64 {
WalkDir::new(self.path())
.into_iter()
.filter_map(|entry| entry.ok())
.filter_map(|entry| entry.metadata().ok())
.filter(|metadata| metadata.is_file())
.fold(0, |acc, m| acc + m.len())
}
}

View File

@ -1,3 +0,0 @@
mod env;
pub use env::EnvSizer;

View File

@ -5,7 +5,6 @@ pub mod analytics;
pub mod task; pub mod task;
#[macro_use] #[macro_use]
pub mod extractors; pub mod extractors;
pub mod helpers;
pub mod option; pub mod option;
pub mod routes; pub mod routes;

View File

@ -326,7 +326,7 @@ async fn error_add_malformed_json_documents() {
assert_eq!( assert_eq!(
response["message"], response["message"],
json!( json!(
r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...890123456789", expected a documents, or a sequence of documents. at line 1 column 102`."# r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...890123456789012345678901234567890123456789", expected a sequence at line 1 column 102`."#
) )
); );
assert_eq!(response["code"], json!("malformed_payload")); assert_eq!(response["code"], json!("malformed_payload"));
@ -349,9 +349,7 @@ async fn error_add_malformed_json_documents() {
assert_eq!(status_code, 400); assert_eq!(status_code, 400);
assert_eq!( assert_eq!(
response["message"], response["message"],
json!( json!("The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string \"0123456789012345678901234567...90123456789012345678901234567890123456789m\", expected a sequence at line 1 column 103`.")
r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...90123456789m", expected a documents, or a sequence of documents. at line 1 column 103`."#
)
); );
assert_eq!(response["code"], json!("malformed_payload")); assert_eq!(response["code"], json!("malformed_payload"));
assert_eq!(response["type"], json!("invalid_request")); assert_eq!(response["type"], json!("invalid_request"));
@ -388,7 +386,7 @@ async fn error_add_malformed_ndjson_documents() {
assert_eq!( assert_eq!(
response["message"], response["message"],
json!( json!(
r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 1 column 2`."# r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 2 column 2`."#
) )
); );
assert_eq!(response["code"], json!("malformed_payload")); assert_eq!(response["code"], json!("malformed_payload"));
@ -411,9 +409,7 @@ async fn error_add_malformed_ndjson_documents() {
assert_eq!(status_code, 400); assert_eq!(status_code, 400);
assert_eq!( assert_eq!(
response["message"], response["message"],
json!( json!("The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 2 column 2`.")
r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 1 column 2`."#
)
); );
assert_eq!(response["code"], json!("malformed_payload")); assert_eq!(response["code"], json!("malformed_payload"));
assert_eq!(response["type"], json!("invalid_request")); assert_eq!(response["type"], json!("invalid_request"));
@ -1020,7 +1016,7 @@ async fn add_documents_invalid_geo_field() {
index.wait_task(2).await; index.wait_task(2).await;
let (response, code) = index.get_task(2).await; let (response, code) = index.get_task(2).await;
assert_eq!(code, 200); assert_eq!(code, 200);
assert_eq!(response["status"], "succeeded"); assert_eq!(response["status"], "failed");
} }
#[actix_rt::test] #[actix_rt::test]

View File

@ -708,9 +708,7 @@ async fn faceting_max_values_per_facet() {
}), }),
|response, code| { |response, code| {
assert_eq!(code, 200, "{}", response); assert_eq!(code, 200, "{}", response);
let numbers = dbg!(&response)["facetDistribution"]["number"] let numbers = &response["facetDistribution"]["number"].as_object().unwrap();
.as_object()
.unwrap();
assert_eq!(numbers.len(), 10_000); assert_eq!(numbers.len(), 10_000);
}, },
) )

View File

@ -98,7 +98,7 @@ pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
/// Reads JSON Lines from input and write an obkv batch to writer. /// Reads JSON Lines from input and write an obkv batch to writer.
pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize> { pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(writer);
let mut reader = BufReader::new(input); let reader = BufReader::new(input);
for result in serde_json::Deserializer::from_reader(reader).into_iter() { for result in serde_json::Deserializer::from_reader(reader).into_iter() {
let object = result let object = result
@ -122,7 +122,7 @@ pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize>
/// Reads JSON from input and write an obkv batch to writer. /// Reads JSON from input and write an obkv batch to writer.
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> { pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(writer);
let mut reader = BufReader::new(input); let reader = BufReader::new(input);
let objects: Vec<_> = serde_json::from_reader(reader) let objects: Vec<_> = serde_json::from_reader(reader)
.map_err(Error::Json) .map_err(Error::Json)

View File

@ -11,7 +11,7 @@ pub enum DumpError {
#[error("An internal error has occurred. `{0}`.")] #[error("An internal error has occurred. `{0}`.")]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>), Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("{0}")] #[error("{0}")]
IndexResolver(#[from] IndexResolverError), IndexResolver(#[from] Box<IndexResolverError>),
} }
internal_error!( internal_error!(

View File

@ -32,7 +32,9 @@ impl ErrorCode for MilliError<'_> {
UserError::AttributeLimitReached => Code::MaxFieldsLimitExceeded, UserError::AttributeLimitReached => Code::MaxFieldsLimitExceeded,
UserError::InvalidFilter(_) => Code::Filter, UserError::InvalidFilter(_) => Code::Filter,
UserError::MissingDocumentId { .. } => Code::MissingDocumentId, UserError::MissingDocumentId { .. } => Code::MissingDocumentId,
UserError::InvalidDocumentId { .. } => Code::InvalidDocumentId, UserError::InvalidDocumentId { .. } | UserError::TooManyDocumentIds { .. } => {
Code::InvalidDocumentId
}
UserError::MissingPrimaryKey => Code::MissingPrimaryKey, UserError::MissingPrimaryKey => Code::MissingPrimaryKey,
UserError::PrimaryKeyCannotBeChanged(_) => Code::PrimaryKeyAlreadyPresent, UserError::PrimaryKeyCannotBeChanged(_) => Code::PrimaryKeyAlreadyPresent,
UserError::SortRankingRuleMissing => Code::Sort, UserError::SortRankingRuleMissing => Code::Sort,

View File

@ -27,7 +27,7 @@ const DATA_FILE_NAME: &str = "documents.jsonl";
impl Index { impl Index {
pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> { pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
// acquire write txn make sure any ongoing write is finished before we start. // acquire write txn make sure any ongoing write is finished before we start.
let txn = self.env.write_txn()?; let txn = self.write_txn()?;
let path = path.as_ref().join(format!("indexes/{}", self.uuid)); let path = path.as_ref().join(format!("indexes/{}", self.uuid));
create_dir_all(&path)?; create_dir_all(&path)?;

View File

@ -40,6 +40,12 @@ impl ErrorCode for IndexError {
} }
} }
impl From<milli::UserError> for IndexError {
fn from(error: milli::UserError) -> IndexError {
IndexError::Milli(error.into())
}
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum FacetError { pub enum FacetError {
#[error("Invalid syntax for the filter parameter: `expected {}, found: {1}`.", .0.join(", "))] #[error("Invalid syntax for the filter parameter: `expected {}, found: {1}`.", .0.join(", "))]

View File

@ -6,16 +6,16 @@ use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use fst::IntoStreamer; use fst::IntoStreamer;
use milli::heed::{EnvOpenOptions, RoTxn}; use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn};
use milli::update::{IndexerConfig, Setting}; use milli::update::{IndexerConfig, Setting};
use milli::{obkv_to_json, FieldDistribution, DEFAULT_VALUES_PER_FACET}; use milli::{obkv_to_json, FieldDistribution, DEFAULT_VALUES_PER_FACET};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::{Map, Value}; use serde_json::{Map, Value};
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
use walkdir::WalkDir;
use crate::index::search::DEFAULT_PAGINATION_LIMITED_TO; use crate::index::search::DEFAULT_PAGINATION_LIMITED_TO;
use crate::EnvSizer;
use super::error::IndexError; use super::error::IndexError;
use super::error::Result; use super::error::Result;
@ -245,11 +245,8 @@ impl Index {
let fields_ids_map = self.fields_ids_map(&txn)?; let fields_ids_map = self.fields_ids_map(&txn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit);
let mut documents = Vec::new(); let mut documents = Vec::new();
for entry in self.all_documents(&txn)?.skip(offset).take(limit) {
for entry in iter {
let (_id, obkv) = entry?; let (_id, obkv) = entry?;
let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?; let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?;
let document = match &attributes_to_retrieve { let document = match &attributes_to_retrieve {
@ -302,7 +299,12 @@ impl Index {
} }
pub fn size(&self) -> u64 { pub fn size(&self) -> u64 {
self.env.size() WalkDir::new(self.path())
.into_iter()
.filter_map(|entry| entry.ok())
.filter_map(|entry| entry.metadata().ok())
.filter(|metadata| metadata.is_file())
.fold(0, |acc, m| acc + m.len())
} }
pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> { pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> {
@ -310,9 +312,8 @@ impl Index {
create_dir_all(&dst)?; create_dir_all(&dst)?;
dst.push("data.mdb"); dst.push("data.mdb");
let _txn = self.write_txn()?; let _txn = self.write_txn()?;
self.inner self.inner.copy_to_path(dst, CompactionOption::Enabled)?;
.env
.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?;
Ok(()) Ok(())
} }
} }

View File

@ -24,12 +24,12 @@ pub use test::MockIndex as Index;
/// code for unit testing, in places where an index would normally be used. /// code for unit testing, in places where an index would normally be used.
#[cfg(test)] #[cfg(test)]
pub mod test { pub mod test {
use std::path::Path; use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use milli::update::IndexerConfig; use milli::update::{
use milli::update::{DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod}; DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, IndexerConfig,
};
use nelson::Mocker; use nelson::Mocker;
use uuid::Uuid; use uuid::Uuid;
@ -162,7 +162,7 @@ pub mod test {
primary_key: Option<String>, primary_key: Option<String>,
file_store: UpdateFileStore, file_store: UpdateFileStore,
contents: impl Iterator<Item = Uuid>, contents: impl Iterator<Item = Uuid>,
) -> Result<DocumentAdditionResult> { ) -> Result<Vec<Result<DocumentAdditionResult>>> {
match self { match self {
MockIndex::Real(index) => { MockIndex::Real(index) => {
index.update_documents(method, primary_key, file_store, contents) index.update_documents(method, primary_key, file_store, contents)

View File

@ -11,7 +11,7 @@ use milli::update::{
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
use uuid::Uuid; use uuid::Uuid;
use super::error::Result; use super::error::{IndexError, Result};
use super::index::{Index, IndexMeta}; use super::index::{Index, IndexMeta};
use crate::update_file_store::UpdateFileStore; use crate::update_file_store::UpdateFileStore;
@ -299,7 +299,7 @@ impl Index {
primary_key: Option<String>, primary_key: Option<String>,
file_store: UpdateFileStore, file_store: UpdateFileStore,
contents: impl IntoIterator<Item = Uuid>, contents: impl IntoIterator<Item = Uuid>,
) -> Result<DocumentAdditionResult> { ) -> Result<Vec<Result<DocumentAdditionResult>>> {
trace!("performing document addition"); trace!("performing document addition");
let mut txn = self.write_txn()?; let mut txn = self.write_txn()?;
@ -315,7 +315,7 @@ impl Index {
}; };
let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step); let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step);
let builder = milli::update::IndexDocuments::new( let mut builder = milli::update::IndexDocuments::new(
&mut txn, &mut txn,
self, self,
self.indexer_config.as_ref(), self.indexer_config.as_ref(),
@ -323,20 +323,34 @@ impl Index {
indexing_callback, indexing_callback,
)?; )?;
let mut results = Vec::new();
for content_uuid in contents.into_iter() { for content_uuid in contents.into_iter() {
let content_file = file_store.get_update(content_uuid)?; let content_file = file_store.get_update(content_uuid)?;
let reader = DocumentsBatchReader::from_reader(content_file)?; let reader = DocumentsBatchReader::from_reader(content_file)?;
let (builder, user_error) = builder.add_documents(reader)?; let (new_builder, user_result) = builder.add_documents(reader)?;
todo!("use the user_error here"); builder = new_builder;
let user_result = match user_result {
Ok(count) => {
let addition = DocumentAdditionResult {
indexed_documents: count,
number_of_documents: count,
};
info!("document addition done: {:?}", addition);
Ok(addition)
}
Err(e) => Err(IndexError::from(e)),
};
results.push(user_result);
} }
let addition = builder.execute()?; if results.iter().any(Result::is_ok) {
let _addition = builder.execute()?;
txn.commit()?; txn.commit()?;
}
info!("document addition done: {:?}", addition); Ok(results)
Ok(addition)
} }
pub fn update_settings(&self, settings: &Settings<Checked>) -> Result<()> { pub fn update_settings(&self, settings: &Settings<Checked>) -> Result<()> {

View File

@ -150,27 +150,36 @@ mod real {
}) })
.await; .await;
match result {
Ok(Ok(results)) => {
for (task, result) in tasks.iter_mut().zip(results) {
let event = match result { let event = match result {
Ok(Ok(result)) => TaskEvent::Succeeded { Ok(addition) => {
timestamp: OffsetDateTime::now_utc(), TaskEvent::succeeded(TaskResult::DocumentAddition {
result: TaskResult::DocumentAddition { indexed_documents: addition.indexed_documents,
indexed_documents: result.indexed_documents, })
}, }
}, Err(error) => {
Ok(Err(e)) => TaskEvent::Failed { TaskEvent::failed(IndexResolverError::from(error))
timestamp: OffsetDateTime::now_utc(), }
error: e.into(),
},
Err(e) => TaskEvent::Failed {
timestamp: OffsetDateTime::now_utc(),
error: IndexResolverError::from(e).into(),
},
}; };
task.events.push(event);
}
}
Ok(Err(e)) => {
let event = TaskEvent::failed(e);
for task in tasks.iter_mut() { for task in tasks.iter_mut() {
task.events.push(event.clone()); task.events.push(event.clone());
} }
} }
Err(e) => {
let event = TaskEvent::failed(IndexResolverError::from(e));
for task in tasks.iter_mut() {
task.events.push(event.clone());
}
}
}
}
_ => panic!("invalid batch!"), _ => panic!("invalid batch!"),
} }
} }

View File

@ -181,9 +181,7 @@ impl SnapshotJob {
let mut options = milli::heed::EnvOpenOptions::new(); let mut options = milli::heed::EnvOpenOptions::new();
options.map_size(self.index_size); options.map_size(self.index_size);
let index = milli::Index::new(options, entry.path())?; let index = milli::Index::new(options, entry.path())?;
index index.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?;
.env
.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?;
} }
Ok(()) Ok(())

View File

@ -151,7 +151,7 @@ mod store {
let update_file = File::open(update_file_path)?; let update_file = File::open(update_file_path)?;
let mut dst_file = NamedTempFile::new_in(&dump_path)?; let mut dst_file = NamedTempFile::new_in(&dump_path)?;
let mut document_cursor = DocumentsBatchReader::from_reader(update_file)?.into_cursor(); let mut document_cursor = DocumentsBatchReader::from_reader(update_file)?.into_cursor();
let index = document_cursor.documents_batch_index(); let index = document_cursor.documents_batch_index().clone();
let mut document_buffer = Map::new(); let mut document_buffer = Map::new();
// TODO: we need to find a way to do this more efficiently. (create a custom serializer // TODO: we need to find a way to do this more efficiently. (create a custom serializer

View File

@ -49,7 +49,7 @@ fn contained_in(selector: &str, key: &str) -> bool {
/// map_leaf_values( /// map_leaf_values(
/// value.as_object_mut().unwrap(), /// value.as_object_mut().unwrap(),
/// ["jean.race.name"], /// ["jean.race.name"],
/// |key, value| match (value, dbg!(key)) { /// |key, value| match (value, key) {
/// (Value::String(name), "jean.race.name") => *name = "patou".to_string(), /// (Value::String(name), "jean.race.name") => *name = "patou".to_string(),
/// _ => unreachable!(), /// _ => unreachable!(),
/// }, /// },
@ -729,7 +729,7 @@ mod tests {
map_leaf_values( map_leaf_values(
value.as_object_mut().unwrap(), value.as_object_mut().unwrap(),
["jean.race.name"], ["jean.race.name"],
|key, value| match (value, dbg!(key)) { |key, value| match (value, key) {
(Value::String(name), "jean.race.name") => *name = S("patou"), (Value::String(name), "jean.race.name") => *name = S("patou"),
_ => unreachable!(), _ => unreachable!(),
}, },