2523: Improve the tasks error reporting when processed in batches r=irevoire a=Kerollmops

This fixes #2478 by changing the behavior of the task handler when there is an error in a batch of document addition or update.

What changes is that when there is a user error in a task in a batch we now report this task as failed with the right error message but we continue to process the other tasks. A user error can be when a geo field is invalid, a document id is invalid, or missing.

fixes #2582, #2478

Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: ManyTheFish <many@meilisearch.com>
This commit is contained in:
bors[bot] 2022-08-16 14:15:30 +00:00 committed by GitHub
commit b5f91b91c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 251 additions and 222 deletions

View file

@ -28,7 +28,7 @@ lazy_static = "1.4.0"
log = "0.4.14"
meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.31.2" }
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.32.0" }
mime = "0.3.16"
num_cpus = "1.13.1"
obkv = "0.2.0"

View file

@ -1,10 +1,10 @@
use std::borrow::Borrow;
use std::fmt::{self, Debug, Display};
use std::io::{self, BufRead, BufReader, BufWriter, Cursor, Read, Seek, Write};
use std::io::{self, BufReader, Read, Seek, Write};
use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::internal_error;
use milli::documents::DocumentBatchBuilder;
use milli::documents::{DocumentsBatchBuilder, Error};
type Result<T> = std::result::Result<T, DocumentFormatError>;
@ -18,9 +18,9 @@ pub enum PayloadType {
impl fmt::Display for PayloadType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PayloadType::Ndjson => write!(f, "ndjson"),
PayloadType::Json => write!(f, "json"),
PayloadType::Csv => write!(f, "csv"),
PayloadType::Ndjson => f.write_str("ndjson"),
PayloadType::Json => f.write_str("json"),
PayloadType::Csv => f.write_str("csv"),
}
}
}
@ -28,7 +28,7 @@ impl fmt::Display for PayloadType {
#[derive(Debug)]
pub enum DocumentFormatError {
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
MalformedPayload(Box<milli::documents::Error>, PayloadType),
MalformedPayload(Error, PayloadType),
}
impl Display for DocumentFormatError {
@ -36,7 +36,7 @@ impl Display for DocumentFormatError {
match self {
Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e),
Self::MalformedPayload(me, b) => match me.borrow() {
milli::documents::Error::JsonError(se) => {
Error::Json(se) => {
// https://github.com/meilisearch/meilisearch/issues/2107
// The user input maybe insanely long. We need to truncate it.
let mut serde_msg = se.to_string();
@ -59,11 +59,11 @@ impl Display for DocumentFormatError {
impl std::error::Error for DocumentFormatError {}
impl From<(PayloadType, milli::documents::Error)> for DocumentFormatError {
fn from((ty, error): (PayloadType, milli::documents::Error)) -> Self {
impl From<(PayloadType, Error)> for DocumentFormatError {
fn from((ty, error): (PayloadType, Error)) -> Self {
match error {
milli::documents::Error::Io(e) => Self::Internal(Box::new(e)),
e => Self::MalformedPayload(Box::new(e), ty),
Error::Io(e) => Self::Internal(Box::new(e)),
e => Self::MalformedPayload(e, ty),
}
}
}
@ -79,51 +79,67 @@ impl ErrorCode for DocumentFormatError {
internal_error!(DocumentFormatError: io::Error);
/// reads csv from input and write an obkv batch to writer.
/// Reads CSV from input and write an obkv batch to writer.
pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let writer = BufWriter::new(writer);
let builder =
DocumentBatchBuilder::from_csv(input, writer).map_err(|e| (PayloadType::Csv, e))?;
let mut builder = DocumentsBatchBuilder::new(writer);
let count = builder.finish().map_err(|e| (PayloadType::Csv, e))?;
let csv = csv::Reader::from_reader(input);
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
Ok(count)
let count = builder.documents_count();
let _ = builder
.into_inner()
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
Ok(count as usize)
}
/// reads jsonl from input and write an obkv batch to writer.
/// Reads JSON Lines from input and write an obkv batch to writer.
pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let mut reader = BufReader::new(input);
let writer = BufWriter::new(writer);
let mut builder = DocumentsBatchBuilder::new(writer);
let reader = BufReader::new(input);
let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Ndjson, e))?;
let mut buf = String::new();
while reader.read_line(&mut buf)? > 0 {
// skip empty lines
if buf == "\n" {
buf.clear();
continue;
}
builder
.extend_from_json(Cursor::new(&buf.as_bytes()))
for result in serde_json::Deserializer::from_reader(reader).into_iter() {
let object = result
.map_err(Error::Json)
.map_err(|e| (PayloadType::Ndjson, e))?;
buf.clear();
builder
.append_json_object(&object)
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
}
let count = builder.finish().map_err(|e| (PayloadType::Ndjson, e))?;
let count = builder.documents_count();
let _ = builder
.into_inner()
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
Ok(count)
Ok(count as usize)
}
/// reads json from input and write an obkv batch to writer.
/// Reads JSON from input and write an obkv batch to writer.
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let writer = BufWriter::new(writer);
let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Json, e))?;
builder
.extend_from_json(input)
let mut builder = DocumentsBatchBuilder::new(writer);
let reader = BufReader::new(input);
let objects: Vec<_> = serde_json::from_reader(reader)
.map_err(Error::Json)
.map_err(|e| (PayloadType::Json, e))?;
let count = builder.finish().map_err(|e| (PayloadType::Json, e))?;
for object in objects {
builder
.append_json_object(&object)
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
}
Ok(count)
let count = builder.documents_count();
let _ = builder
.into_inner()
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
Ok(count as usize)
}

View file

@ -11,7 +11,7 @@ pub enum DumpError {
#[error("An internal error has occurred. `{0}`.")]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("{0}")]
IndexResolver(#[from] IndexResolverError),
IndexResolver(Box<IndexResolverError>),
}
internal_error!(
@ -26,6 +26,12 @@ internal_error!(
TaskError
);
impl From<IndexResolverError> for DumpError {
fn from(e: IndexResolverError) -> Self {
Self::IndexResolver(Box::new(e))
}
}
impl ErrorCode for DumpError {
fn error_code(&self) -> Code {
match self {

View file

@ -25,6 +25,7 @@ impl ErrorCode for MilliError<'_> {
// TODO: wait for spec for new error codes.
UserError::SerdeJson(_)
| UserError::DocumentLimitReached
| UserError::AccessingSoftDeletedDocument { .. }
| UserError::UnknownInternalDocumentId { .. } => Code::Internal,
UserError::InvalidStoreFile => Code::InvalidStore,
UserError::NoSpaceLeftOnDevice => Code::NoSpaceLeftOnDevice,
@ -32,7 +33,9 @@ impl ErrorCode for MilliError<'_> {
UserError::AttributeLimitReached => Code::MaxFieldsLimitExceeded,
UserError::InvalidFilter(_) => Code::Filter,
UserError::MissingDocumentId { .. } => Code::MissingDocumentId,
UserError::InvalidDocumentId { .. } => Code::InvalidDocumentId,
UserError::InvalidDocumentId { .. } | UserError::TooManyDocumentIds { .. } => {
Code::InvalidDocumentId
}
UserError::MissingPrimaryKey => Code::MissingPrimaryKey,
UserError::PrimaryKeyCannotBeChanged(_) => Code::PrimaryKeyAlreadyPresent,
UserError::SortRankingRuleMissing => Code::Sort,

View file

@ -4,7 +4,7 @@ use std::path::Path;
use anyhow::Context;
use indexmap::IndexMap;
use milli::documents::DocumentBatchReader;
use milli::documents::DocumentsBatchReader;
use milli::heed::{EnvOpenOptions, RoTxn};
use milli::update::{IndexDocumentsConfig, IndexerConfig};
use serde::{Deserialize, Serialize};
@ -135,19 +135,20 @@ impl Index {
if !empty {
tmp_doc_file.seek(SeekFrom::Start(0))?;
let documents_reader = DocumentBatchReader::from_reader(tmp_doc_file)?;
let documents_reader = DocumentsBatchReader::from_reader(tmp_doc_file)?;
//If the document file is empty, we don't perform the document addition, to prevent
//a primary key error to be thrown.
let config = IndexDocumentsConfig::default();
let mut builder = milli::update::IndexDocuments::new(
let builder = milli::update::IndexDocuments::new(
&mut txn,
&index,
indexer_config,
config,
|_| (),
)?;
builder.add_documents(documents_reader)?;
let (builder, user_error) = builder.add_documents(documents_reader)?;
user_error?;
builder.execute()?;
}

View file

@ -40,6 +40,12 @@ impl ErrorCode for IndexError {
}
}
impl From<milli::UserError> for IndexError {
fn from(error: milli::UserError) -> IndexError {
IndexError::Milli(error.into())
}
}
#[derive(Debug, thiserror::Error)]
pub enum FacetError {
#[error("Invalid syntax for the filter parameter: `expected {}, found: {1}`.", .0.join(", "))]

View file

@ -4,7 +4,6 @@ use std::marker::PhantomData;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use walkdir::WalkDir;
use fst::IntoStreamer;
use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn};
@ -14,6 +13,7 @@ use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use time::OffsetDateTime;
use uuid::Uuid;
use walkdir::WalkDir;
use crate::index::search::DEFAULT_PAGINATION_MAX_TOTAL_HITS;
@ -245,11 +245,8 @@ impl Index {
let fields_ids_map = self.fields_ids_map(&txn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let iter = self.all_documents(&txn)?.skip(offset).take(limit);
let mut documents = Vec::new();
for entry in iter {
for entry in self.all_documents(&txn)?.skip(offset).take(limit) {
let (_id, obkv) = entry?;
let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?;
let document = match &attributes_to_retrieve {
@ -302,7 +299,7 @@ impl Index {
}
pub fn size(&self) -> u64 {
WalkDir::new(self.inner.path())
WalkDir::new(self.path())
.into_iter()
.filter_map(|entry| entry.ok())
.filter_map(|entry| entry.metadata().ok())

View file

@ -24,12 +24,12 @@ pub use test::MockIndex as Index;
/// code for unit testing, in places where an index would normally be used.
#[cfg(test)]
pub mod test {
use std::path::Path;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use milli::update::IndexerConfig;
use milli::update::{DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod};
use milli::update::{
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, IndexerConfig,
};
use nelson::Mocker;
use uuid::Uuid;
@ -162,7 +162,7 @@ pub mod test {
primary_key: Option<String>,
file_store: UpdateFileStore,
contents: impl Iterator<Item = Uuid>,
) -> Result<DocumentAdditionResult> {
) -> Result<Vec<Result<DocumentAdditionResult>>> {
match self {
MockIndex::Real(index) => {
index.update_documents(method, primary_key, file_store, contents)

View file

@ -3,7 +3,7 @@ use std::marker::PhantomData;
use std::num::NonZeroUsize;
use log::{debug, info, trace};
use milli::documents::DocumentBatchReader;
use milli::documents::DocumentsBatchReader;
use milli::update::{
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
Setting,
@ -11,7 +11,7 @@ use milli::update::{
use serde::{Deserialize, Serialize, Serializer};
use uuid::Uuid;
use super::error::Result;
use super::error::{IndexError, Result};
use super::index::{Index, IndexMeta};
use crate::update_file_store::UpdateFileStore;
@ -299,7 +299,7 @@ impl Index {
primary_key: Option<String>,
file_store: UpdateFileStore,
contents: impl IntoIterator<Item = Uuid>,
) -> Result<DocumentAdditionResult> {
) -> Result<Vec<Result<DocumentAdditionResult>>> {
trace!("performing document addition");
let mut txn = self.write_txn()?;
@ -323,19 +323,34 @@ impl Index {
indexing_callback,
)?;
let mut results = Vec::new();
for content_uuid in contents.into_iter() {
let content_file = file_store.get_update(content_uuid)?;
let reader = DocumentBatchReader::from_reader(content_file)?;
builder.add_documents(reader)?;
let reader = DocumentsBatchReader::from_reader(content_file)?;
let (new_builder, user_result) = builder.add_documents(reader)?;
builder = new_builder;
let user_result = match user_result {
Ok(count) => {
let addition = DocumentAdditionResult {
indexed_documents: count,
number_of_documents: count,
};
info!("document addition done: {:?}", addition);
Ok(addition)
}
Err(e) => Err(IndexError::from(e)),
};
results.push(user_result);
}
let addition = builder.execute()?;
if results.iter().any(Result::is_ok) {
let _addition = builder.execute()?;
txn.commit()?;
}
txn.commit()?;
info!("document addition done: {:?}", addition);
Ok(addition)
Ok(results)
}
pub fn update_settings(&self, settings: &Settings<Checked>) -> Result<()> {

View file

@ -150,25 +150,34 @@ mod real {
})
.await;
let event = match result {
Ok(Ok(result)) => TaskEvent::Succeeded {
timestamp: OffsetDateTime::now_utc(),
result: TaskResult::DocumentAddition {
indexed_documents: result.indexed_documents,
},
},
Ok(Err(e)) => TaskEvent::Failed {
timestamp: OffsetDateTime::now_utc(),
error: e.into(),
},
Err(e) => TaskEvent::Failed {
timestamp: OffsetDateTime::now_utc(),
error: IndexResolverError::from(e).into(),
},
};
for task in tasks.iter_mut() {
task.events.push(event.clone());
match result {
Ok(Ok(results)) => {
for (task, result) in tasks.iter_mut().zip(results) {
let event = match result {
Ok(addition) => {
TaskEvent::succeeded(TaskResult::DocumentAddition {
indexed_documents: addition.indexed_documents,
})
}
Err(error) => {
TaskEvent::failed(IndexResolverError::from(error))
}
};
task.events.push(event);
}
}
Ok(Err(e)) => {
let event = TaskEvent::failed(e);
for task in tasks.iter_mut() {
task.events.push(event.clone());
}
}
Err(e) => {
let event = TaskEvent::failed(IndexResolverError::from(e));
for task in tasks.iter_mut() {
task.events.push(event.clone());
}
}
}
}
_ => panic!("invalid batch!"),

View file

@ -41,27 +41,10 @@ pub struct IndexerOpts {
#[derive(Debug, Clone, Parser, Default, Serialize)]
pub struct SchedulerConfig {
/// enable the autobatching experimental feature
#[clap(long, hide = true)]
pub enable_auto_batching: bool,
// The maximum number of updates of the same type that can be batched together.
// If unspecified, this is unlimited. A value of 0 is interpreted as 1.
#[clap(long, requires = "enable-auto-batching", hide = true)]
pub max_batch_size: Option<usize>,
// The maximum number of documents in a document batch. Since batches must contain at least one
// update for the scheduler to make progress, the number of documents in a batch will be at
// least the number of documents of its first update.
#[clap(long, requires = "enable-auto-batching", hide = true)]
pub max_documents_per_batch: Option<usize>,
/// Debounce duration in seconds
///
/// When a new task is enqueued, the scheduler waits for `debounce_duration_sec` seconds for new updates before
/// starting to process a batch of updates.
#[clap(long, requires = "enable-auto-batching", hide = true)]
pub debounce_duration_sec: Option<u64>,
/// The engine will disable task auto-batching,
/// and will sequencialy compute each task one by one.
#[clap(long, env = "DISABLE_AUTO_BATCHING")]
pub disable_auto_batching: bool,
}
impl TryFrom<&IndexerOpts> for IndexerConfig {

View file

@ -3,7 +3,6 @@ use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque};
use std::ops::{Deref, DerefMut};
use std::slice;
use std::sync::Arc;
use std::time::Duration;
use atomic_refcell::AtomicRefCell;
use milli::update::IndexDocumentsMethod;
@ -248,17 +247,10 @@ impl Scheduler {
pub fn new(
store: TaskStore,
performers: Vec<Arc<dyn BatchHandler + Sync + Send + 'static>>,
mut config: SchedulerConfig,
config: SchedulerConfig,
) -> Result<Arc<RwLock<Self>>> {
let (notifier, rcv) = watch::channel(());
let debounce_time = config.debounce_duration_sec;
// Disable autobatching
if !config.enable_auto_batching {
config.max_batch_size = Some(1);
}
let this = Self {
snapshots: VecDeque::new(),
tasks: TaskQueue::default(),
@ -275,12 +267,7 @@ impl Scheduler {
let this = Arc::new(RwLock::new(this));
let update_loop = UpdateLoop::new(
this.clone(),
performers,
debounce_time.filter(|&v| v > 0).map(Duration::from_secs),
rcv,
);
let update_loop = UpdateLoop::new(this.clone(), performers, rcv);
tokio::task::spawn_local(update_loop.run());
@ -497,27 +484,17 @@ fn make_batch(tasks: &mut TaskQueue, config: &SchedulerConfig) -> Processing {
match list.peek() {
Some(pending) if pending.kind == kind => {
// We always need to process at least one task for the scheduler to make progress.
if task_list.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1)
{
if config.disable_auto_batching && !task_list.is_empty() {
break;
}
let pending = list.pop().unwrap();
task_list.push(pending.id);
// We add the number of documents to the count if we are scheduling document additions and
// stop adding if we already have enough.
//
// We check that bound only after adding the current task to the batch, so that a batch contains at least one task.
// We add the number of documents to the count if we are scheduling document additions.
match pending.kind {
TaskType::DocumentUpdate { number }
| TaskType::DocumentAddition { number } => {
doc_count += number;
if doc_count
>= config.max_documents_per_batch.unwrap_or(usize::MAX)
{
break;
}
}
_ => (),
}

View file

@ -1,9 +1,7 @@
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::{watch, RwLock};
use tokio::time::interval_at;
use super::batch::Batch;
use super::error::Result;
@ -17,20 +15,17 @@ pub struct UpdateLoop {
performers: Vec<Arc<dyn BatchHandler + Send + Sync + 'static>>,
notifier: Option<watch::Receiver<()>>,
debounce_duration: Option<Duration>,
}
impl UpdateLoop {
pub fn new(
scheduler: Arc<RwLock<Scheduler>>,
performers: Vec<Arc<dyn BatchHandler + Send + Sync + 'static>>,
debuf_duration: Option<Duration>,
notifier: watch::Receiver<()>,
) -> Self {
Self {
scheduler,
performers,
debounce_duration: debuf_duration,
notifier: Some(notifier),
}
}
@ -43,11 +38,6 @@ impl UpdateLoop {
break;
}
if let Some(t) = self.debounce_duration {
let mut interval = interval_at(tokio::time::Instant::now() + t, t);
interval.tick().await;
};
if let Err(e) = self.process_next_batch().await {
log::error!("an error occurred while processing an update batch: {}", e);
}

View file

@ -3,7 +3,7 @@ use std::io::{self, BufReader, BufWriter, Write};
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use milli::documents::DocumentBatchReader;
use milli::documents::DocumentsBatchReader;
use serde_json::Map;
use tempfile::{NamedTempFile, PersistError};
use uuid::Uuid;
@ -44,7 +44,8 @@ into_update_store_error!(
PersistError,
io::Error,
serde_json::Error,
milli::documents::Error
milli::documents::Error,
milli::documents::DocumentsBatchCursorError
);
impl UpdateFile {
@ -149,12 +150,13 @@ mod store {
let update_file = File::open(update_file_path)?;
let mut dst_file = NamedTempFile::new_in(&dump_path)?;
let mut document_reader = DocumentBatchReader::from_reader(update_file)?;
let (mut document_cursor, index) =
DocumentsBatchReader::from_reader(update_file)?.into_cursor_and_fields_index();
let mut document_buffer = Map::new();
// TODO: we need to find a way to do this more efficiently. (create a custom serializer
// for jsonl for example...)
while let Some((index, document)) = document_reader.next_document_with_index()? {
while let Some(document) = document_cursor.next_document()? {
for (field_id, content) in document.iter() {
if let Some(field_name) = index.name(field_id) {
let content = serde_json::from_slice(content)?;