mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-02-03 09:03:47 +01:00
reintroduce the unrecoverable error and use it where its supposed to be used
This commit is contained in:
parent
a475ea7619
commit
c35d7bf5f8
@ -147,7 +147,9 @@ pub enum Error {
|
|||||||
#[error("Corrupted task queue.")]
|
#[error("Corrupted task queue.")]
|
||||||
CorruptedTaskQueue,
|
CorruptedTaskQueue,
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
TaskDatabaseUpgrade(Box<Self>),
|
DatabaseUpgrade(Box<Self>),
|
||||||
|
#[error(transparent)]
|
||||||
|
UnrecoverableError(Box<Self>),
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
HeedTransaction(heed::Error),
|
HeedTransaction(heed::Error),
|
||||||
|
|
||||||
@ -202,7 +204,8 @@ impl Error {
|
|||||||
| Error::Anyhow(_) => true,
|
| Error::Anyhow(_) => true,
|
||||||
Error::CreateBatch(_)
|
Error::CreateBatch(_)
|
||||||
| Error::CorruptedTaskQueue
|
| Error::CorruptedTaskQueue
|
||||||
| Error::TaskDatabaseUpgrade(_)
|
| Error::DatabaseUpgrade(_)
|
||||||
|
| Error::UnrecoverableError(_)
|
||||||
| Error::HeedTransaction(_) => false,
|
| Error::HeedTransaction(_) => false,
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
Error::PlannedFailure => false,
|
Error::PlannedFailure => false,
|
||||||
@ -266,7 +269,8 @@ impl ErrorCode for Error {
|
|||||||
Error::Anyhow(_) => Code::Internal,
|
Error::Anyhow(_) => Code::Internal,
|
||||||
Error::CorruptedTaskQueue => Code::Internal,
|
Error::CorruptedTaskQueue => Code::Internal,
|
||||||
Error::CorruptedDump => Code::Internal,
|
Error::CorruptedDump => Code::Internal,
|
||||||
Error::TaskDatabaseUpgrade(_) => Code::Internal,
|
Error::DatabaseUpgrade(_) => Code::Internal,
|
||||||
|
Error::UnrecoverableError(_) => Code::Internal,
|
||||||
Error::CreateBatch(_) => Code::Internal,
|
Error::CreateBatch(_) => Code::Internal,
|
||||||
|
|
||||||
// This one should never be seen by the end user
|
// This one should never be seen by the end user
|
||||||
|
@ -223,7 +223,7 @@ impl IndexScheduler {
|
|||||||
self.queue
|
self.queue
|
||||||
.tasks
|
.tasks
|
||||||
.update_task(&mut wtxn, &task)
|
.update_task(&mut wtxn, &task)
|
||||||
.map_err(|e| Error::TaskDatabaseUpgrade(Box::new(e)))?;
|
.map_err(|e| Error::UnrecoverableError(Box::new(e)))?;
|
||||||
}
|
}
|
||||||
if let Some(canceled_by) = canceled_by {
|
if let Some(canceled_by) = canceled_by {
|
||||||
self.queue.tasks.canceled_by.put(&mut wtxn, &canceled_by, &canceled)?;
|
self.queue.tasks.canceled_by.put(&mut wtxn, &canceled_by, &canceled)?;
|
||||||
@ -274,7 +274,7 @@ impl IndexScheduler {
|
|||||||
let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u32);
|
let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u32);
|
||||||
progress.update_progress(task_progress_obj);
|
progress.update_progress(task_progress_obj);
|
||||||
|
|
||||||
if matches!(err, Error::TaskDatabaseUpgrade(_)) {
|
if matches!(err, Error::DatabaseUpgrade(_)) {
|
||||||
tracing::error!(
|
tracing::error!(
|
||||||
"Upgrade task failed, tasks won't be processed until the following issue is fixed: {err}"
|
"Upgrade task failed, tasks won't be processed until the following issue is fixed: {err}"
|
||||||
);
|
);
|
||||||
@ -287,7 +287,7 @@ impl IndexScheduler {
|
|||||||
.queue
|
.queue
|
||||||
.tasks
|
.tasks
|
||||||
.get_task(&wtxn, id)
|
.get_task(&wtxn, id)
|
||||||
.map_err(|e| Error::TaskDatabaseUpgrade(Box::new(e)))?
|
.map_err(|e| Error::UnrecoverableError(Box::new(e)))?
|
||||||
.ok_or(Error::CorruptedTaskQueue)?;
|
.ok_or(Error::CorruptedTaskQueue)?;
|
||||||
task.status = Status::Failed;
|
task.status = Status::Failed;
|
||||||
task.error = Some(error.clone());
|
task.error = Some(error.clone());
|
||||||
@ -304,7 +304,7 @@ impl IndexScheduler {
|
|||||||
self.queue
|
self.queue
|
||||||
.tasks
|
.tasks
|
||||||
.update_task(&mut wtxn, &task)
|
.update_task(&mut wtxn, &task)
|
||||||
.map_err(|e| Error::TaskDatabaseUpgrade(Box::new(e)))?;
|
.map_err(|e| Error::UnrecoverableError(Box::new(e)))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -334,7 +334,7 @@ impl IndexScheduler {
|
|||||||
.queue
|
.queue
|
||||||
.tasks
|
.tasks
|
||||||
.get_task(&rtxn, id)
|
.get_task(&rtxn, id)
|
||||||
.map_err(|e| Error::TaskDatabaseUpgrade(Box::new(e)))?
|
.map_err(|e| Error::UnrecoverableError(Box::new(e)))?
|
||||||
.ok_or(Error::CorruptedTaskQueue)?;
|
.ok_or(Error::CorruptedTaskQueue)?;
|
||||||
if let Err(e) = self.queue.delete_persisted_task_data(&task) {
|
if let Err(e) = self.queue.delete_persisted_task_data(&task) {
|
||||||
tracing::error!(
|
tracing::error!(
|
||||||
|
@ -319,11 +319,9 @@ impl IndexScheduler {
|
|||||||
let ret = catch_unwind(AssertUnwindSafe(|| self.process_upgrade(progress)));
|
let ret = catch_unwind(AssertUnwindSafe(|| self.process_upgrade(progress)));
|
||||||
match ret {
|
match ret {
|
||||||
Ok(Ok(())) => (),
|
Ok(Ok(())) => (),
|
||||||
Ok(Err(e)) => return Err(Error::TaskDatabaseUpgrade(Box::new(e))),
|
Ok(Err(e)) => return Err(Error::DatabaseUpgrade(Box::new(e))),
|
||||||
Err(_e) => {
|
Err(_e) => {
|
||||||
return Err(Error::TaskDatabaseUpgrade(Box::new(
|
return Err(Error::DatabaseUpgrade(Box::new(Error::ProcessBatchPanicked)));
|
||||||
Error::ProcessBatchPanicked,
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user