mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-03 11:57:07 +02:00
Improve the tasks error reporting
This commit is contained in:
parent
73d4869e5e
commit
e3426d5b7a
14 changed files with 83 additions and 81 deletions
|
@ -98,7 +98,7 @@ pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
|||
/// Reads JSON Lines from input and write an obkv batch to writer.
|
||||
pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
||||
let mut reader = BufReader::new(input);
|
||||
let reader = BufReader::new(input);
|
||||
|
||||
for result in serde_json::Deserializer::from_reader(reader).into_iter() {
|
||||
let object = result
|
||||
|
@ -122,7 +122,7 @@ pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize>
|
|||
/// Reads JSON from input and write an obkv batch to writer.
|
||||
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
||||
let mut reader = BufReader::new(input);
|
||||
let reader = BufReader::new(input);
|
||||
|
||||
let objects: Vec<_> = serde_json::from_reader(reader)
|
||||
.map_err(Error::Json)
|
||||
|
|
|
@ -25,6 +25,7 @@ impl ErrorCode for MilliError<'_> {
|
|||
// TODO: wait for spec for new error codes.
|
||||
UserError::SerdeJson(_)
|
||||
| UserError::DocumentLimitReached
|
||||
| UserError::AccessingSoftDeletedDocument { .. }
|
||||
| UserError::UnknownInternalDocumentId { .. } => Code::Internal,
|
||||
UserError::InvalidStoreFile => Code::InvalidStore,
|
||||
UserError::NoSpaceLeftOnDevice => Code::NoSpaceLeftOnDevice,
|
||||
|
@ -32,7 +33,9 @@ impl ErrorCode for MilliError<'_> {
|
|||
UserError::AttributeLimitReached => Code::MaxFieldsLimitExceeded,
|
||||
UserError::InvalidFilter(_) => Code::Filter,
|
||||
UserError::MissingDocumentId { .. } => Code::MissingDocumentId,
|
||||
UserError::InvalidDocumentId { .. } => Code::InvalidDocumentId,
|
||||
UserError::InvalidDocumentId { .. } | UserError::TooManyDocumentIds { .. } => {
|
||||
Code::InvalidDocumentId
|
||||
}
|
||||
UserError::MissingPrimaryKey => Code::MissingPrimaryKey,
|
||||
UserError::PrimaryKeyCannotBeChanged(_) => Code::PrimaryKeyAlreadyPresent,
|
||||
UserError::SortRankingRuleMissing => Code::Sort,
|
||||
|
|
|
@ -40,6 +40,12 @@ impl ErrorCode for IndexError {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<milli::UserError> for IndexError {
|
||||
fn from(error: milli::UserError) -> IndexError {
|
||||
IndexError::Milli(error.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum FacetError {
|
||||
#[error("Invalid syntax for the filter parameter: `expected {}, found: {1}`.", .0.join(", "))]
|
||||
|
|
|
@ -4,7 +4,6 @@ use std::marker::PhantomData;
|
|||
use std::ops::Deref;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use fst::IntoStreamer;
|
||||
use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn};
|
||||
|
@ -14,6 +13,7 @@ use serde::{Deserialize, Serialize};
|
|||
use serde_json::{Map, Value};
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::index::search::DEFAULT_PAGINATION_MAX_TOTAL_HITS;
|
||||
|
||||
|
@ -245,11 +245,8 @@ impl Index {
|
|||
let fields_ids_map = self.fields_ids_map(&txn)?;
|
||||
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
|
||||
|
||||
let iter = self.all_documents(&txn)?.skip(offset).take(limit);
|
||||
|
||||
let mut documents = Vec::new();
|
||||
|
||||
for entry in iter {
|
||||
for entry in self.all_documents(&txn)?.skip(offset).take(limit) {
|
||||
let (_id, obkv) = entry?;
|
||||
let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?;
|
||||
let document = match &attributes_to_retrieve {
|
||||
|
@ -302,7 +299,7 @@ impl Index {
|
|||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
WalkDir::new(self.inner.path())
|
||||
WalkDir::new(self.path())
|
||||
.into_iter()
|
||||
.filter_map(|entry| entry.ok())
|
||||
.filter_map(|entry| entry.metadata().ok())
|
||||
|
|
|
@ -24,12 +24,12 @@ pub use test::MockIndex as Index;
|
|||
/// code for unit testing, in places where an index would normally be used.
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use milli::update::IndexerConfig;
|
||||
use milli::update::{DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod};
|
||||
use milli::update::{
|
||||
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, IndexerConfig,
|
||||
};
|
||||
use nelson::Mocker;
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -162,7 +162,7 @@ pub mod test {
|
|||
primary_key: Option<String>,
|
||||
file_store: UpdateFileStore,
|
||||
contents: impl Iterator<Item = Uuid>,
|
||||
) -> Result<DocumentAdditionResult> {
|
||||
) -> Result<Vec<Result<DocumentAdditionResult>>> {
|
||||
match self {
|
||||
MockIndex::Real(index) => {
|
||||
index.update_documents(method, primary_key, file_store, contents)
|
||||
|
|
|
@ -11,7 +11,7 @@ use milli::update::{
|
|||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::error::Result;
|
||||
use super::error::{IndexError, Result};
|
||||
use super::index::{Index, IndexMeta};
|
||||
use crate::update_file_store::UpdateFileStore;
|
||||
|
||||
|
@ -299,7 +299,7 @@ impl Index {
|
|||
primary_key: Option<String>,
|
||||
file_store: UpdateFileStore,
|
||||
contents: impl IntoIterator<Item = Uuid>,
|
||||
) -> Result<DocumentAdditionResult> {
|
||||
) -> Result<Vec<Result<DocumentAdditionResult>>> {
|
||||
trace!("performing document addition");
|
||||
let mut txn = self.write_txn()?;
|
||||
|
||||
|
@ -315,7 +315,7 @@ impl Index {
|
|||
};
|
||||
|
||||
let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step);
|
||||
let builder = milli::update::IndexDocuments::new(
|
||||
let mut builder = milli::update::IndexDocuments::new(
|
||||
&mut txn,
|
||||
self,
|
||||
self.indexer_config.as_ref(),
|
||||
|
@ -323,20 +323,34 @@ impl Index {
|
|||
indexing_callback,
|
||||
)?;
|
||||
|
||||
let mut results = Vec::new();
|
||||
for content_uuid in contents.into_iter() {
|
||||
let content_file = file_store.get_update(content_uuid)?;
|
||||
let reader = DocumentsBatchReader::from_reader(content_file)?;
|
||||
let (builder, user_error) = builder.add_documents(reader)?;
|
||||
todo!("use the user_error here");
|
||||
let (new_builder, user_result) = builder.add_documents(reader)?;
|
||||
builder = new_builder;
|
||||
|
||||
let user_result = match user_result {
|
||||
Ok(count) => {
|
||||
let addition = DocumentAdditionResult {
|
||||
indexed_documents: count,
|
||||
number_of_documents: count,
|
||||
};
|
||||
info!("document addition done: {:?}", addition);
|
||||
Ok(addition)
|
||||
}
|
||||
Err(e) => Err(IndexError::from(e)),
|
||||
};
|
||||
|
||||
results.push(user_result);
|
||||
}
|
||||
|
||||
let addition = builder.execute()?;
|
||||
if results.iter().any(Result::is_ok) {
|
||||
let _addition = builder.execute()?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
txn.commit()?;
|
||||
|
||||
info!("document addition done: {:?}", addition);
|
||||
|
||||
Ok(addition)
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
pub fn update_settings(&self, settings: &Settings<Checked>) -> Result<()> {
|
||||
|
|
|
@ -150,25 +150,34 @@ mod real {
|
|||
})
|
||||
.await;
|
||||
|
||||
let event = match result {
|
||||
Ok(Ok(result)) => TaskEvent::Succeeded {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
result: TaskResult::DocumentAddition {
|
||||
indexed_documents: result.indexed_documents,
|
||||
},
|
||||
},
|
||||
Ok(Err(e)) => TaskEvent::Failed {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
error: e.into(),
|
||||
},
|
||||
Err(e) => TaskEvent::Failed {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
error: IndexResolverError::from(e).into(),
|
||||
},
|
||||
};
|
||||
|
||||
for task in tasks.iter_mut() {
|
||||
task.events.push(event.clone());
|
||||
match result {
|
||||
Ok(Ok(results)) => {
|
||||
for (task, result) in tasks.iter_mut().zip(results) {
|
||||
let event = match result {
|
||||
Ok(addition) => {
|
||||
TaskEvent::succeeded(TaskResult::DocumentAddition {
|
||||
indexed_documents: addition.indexed_documents,
|
||||
})
|
||||
}
|
||||
Err(error) => {
|
||||
TaskEvent::failed(IndexResolverError::from(error))
|
||||
}
|
||||
};
|
||||
task.events.push(event);
|
||||
}
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
let event = TaskEvent::failed(e);
|
||||
for task in tasks.iter_mut() {
|
||||
task.events.push(event.clone());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let event = TaskEvent::failed(IndexResolverError::from(e));
|
||||
for task in tasks.iter_mut() {
|
||||
task.events.push(event.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => panic!("invalid batch!"),
|
||||
|
|
|
@ -150,8 +150,8 @@ mod store {
|
|||
|
||||
let update_file = File::open(update_file_path)?;
|
||||
let mut dst_file = NamedTempFile::new_in(&dump_path)?;
|
||||
let mut document_cursor = DocumentsBatchReader::from_reader(update_file)?.into_cursor();
|
||||
let index = document_cursor.documents_batch_index();
|
||||
let (mut document_cursor, index) =
|
||||
DocumentsBatchReader::from_reader(update_file)?.into_cursor_and_fields_index();
|
||||
|
||||
let mut document_buffer = Map::new();
|
||||
// TODO: we need to find a way to do this more efficiently. (create a custom serializer
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue