mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 14:54:27 +01:00
Pause the index scheduler for one second when a fatal error occurs
This commit is contained in:
parent
4a35eb9849
commit
424202d773
@ -11,8 +11,6 @@ pub enum Error {
|
||||
IndexNotFound(String),
|
||||
#[error("Index `{0}` already exists.")]
|
||||
IndexAlreadyExists(String),
|
||||
#[error("Corrupted task queue.")]
|
||||
CorruptedTaskQueue,
|
||||
#[error("Corrupted dump.")]
|
||||
CorruptedDump,
|
||||
#[error("Task `{0}` not found.")]
|
||||
@ -29,7 +27,7 @@ pub enum Error {
|
||||
#[error(transparent)]
|
||||
Milli(#[from] milli::Error),
|
||||
#[error("An unexpected crash occurred when processing the task")]
|
||||
MilliPanic,
|
||||
ProcessBatchPanicked,
|
||||
#[error(transparent)]
|
||||
FileStore(#[from] file_store::Error),
|
||||
#[error(transparent)]
|
||||
@ -37,6 +35,16 @@ pub enum Error {
|
||||
|
||||
#[error(transparent)]
|
||||
Anyhow(#[from] anyhow::Error),
|
||||
|
||||
// Irrecoverable errors:
|
||||
#[error(transparent)]
|
||||
CreateBatch(Box<Self>),
|
||||
#[error("Corrupted task queue.")]
|
||||
CorruptedTaskQueue,
|
||||
#[error(transparent)]
|
||||
TaskDatabaseUpdate(Box<Self>),
|
||||
#[error(transparent)]
|
||||
HeedTransaction(heed::Error),
|
||||
}
|
||||
|
||||
impl ErrorCode for Error {
|
||||
@ -50,7 +58,7 @@ impl ErrorCode for Error {
|
||||
|
||||
Error::Dump(e) => e.error_code(),
|
||||
Error::Milli(e) => e.error_code(),
|
||||
Error::MilliPanic => Code::Internal,
|
||||
Error::ProcessBatchPanicked => Code::Internal,
|
||||
// TODO: TAMO: are all these errors really internal?
|
||||
Error::Heed(_) => Code::Internal,
|
||||
Error::FileStore(_) => Code::Internal,
|
||||
@ -58,6 +66,9 @@ impl ErrorCode for Error {
|
||||
Error::Anyhow(_) => Code::Internal,
|
||||
Error::CorruptedTaskQueue => Code::Internal,
|
||||
Error::CorruptedDump => Code::Internal,
|
||||
Error::TaskDatabaseUpdate(_) => Code::Internal,
|
||||
Error::CreateBatch(_) => Code::Internal,
|
||||
Error::HeedTransaction(_) => Code::Internal,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -41,6 +41,7 @@ use std::path::PathBuf;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
use file_store::FileStore;
|
||||
use meilisearch_types::error::ResponseError;
|
||||
@ -294,6 +295,9 @@ pub enum Breakpoint {
|
||||
BatchCreated,
|
||||
BeforeProcessing,
|
||||
AfterProcessing,
|
||||
AbortedIndexation,
|
||||
ProcessBatchSucceeded,
|
||||
ProcessBatchFailed,
|
||||
}
|
||||
|
||||
impl IndexScheduler {
|
||||
@ -377,7 +381,19 @@ impl IndexScheduler {
|
||||
match run.tick() {
|
||||
Ok(0) => (),
|
||||
Ok(_) => run.wake_up.signal(),
|
||||
Err(e) => log::error!("{}", e),
|
||||
Err(e) => {
|
||||
log::error!("{}", e);
|
||||
// Wait one second when an irrecoverable error occurs.
|
||||
match e {
|
||||
Error::CorruptedTaskQueue
|
||||
| Error::TaskDatabaseUpdate(_)
|
||||
| Error::HeedTransaction(_)
|
||||
| Error::CreateBatch(_) => {
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -761,11 +777,12 @@ impl IndexScheduler {
|
||||
self.breakpoint(Breakpoint::Start);
|
||||
}
|
||||
|
||||
let rtxn = self.env.read_txn()?;
|
||||
let batch = match self.create_next_batch(&rtxn)? {
|
||||
Some(batch) => batch,
|
||||
None => return Ok(0),
|
||||
};
|
||||
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
|
||||
let batch =
|
||||
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
|
||||
Some(batch) => batch,
|
||||
None => return Ok(0),
|
||||
};
|
||||
drop(rtxn);
|
||||
|
||||
// 1. store the starting date with the bitmap of processing tasks.
|
||||
@ -786,17 +803,19 @@ impl IndexScheduler {
|
||||
let res = {
|
||||
let cloned_index_scheduler = self.private_clone();
|
||||
let handle = std::thread::spawn(move || cloned_index_scheduler.process_batch(batch));
|
||||
handle.join().unwrap_or_else(|_| Err(Error::MilliPanic))
|
||||
handle.join().unwrap_or_else(|_| Err(Error::ProcessBatchPanicked))
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
|
||||
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
|
||||
|
||||
let finished_at = OffsetDateTime::now_utc();
|
||||
match res {
|
||||
Ok(tasks) => {
|
||||
#[cfg(test)]
|
||||
self.breakpoint(Breakpoint::ProcessBatchSucceeded);
|
||||
#[allow(unused_variables)]
|
||||
for (i, mut task) in tasks.into_iter().enumerate() {
|
||||
task.started_at = Some(started_at);
|
||||
@ -809,8 +828,11 @@ impl IndexScheduler {
|
||||
},
|
||||
)?;
|
||||
|
||||
self.update_task(&mut wtxn, &task)?;
|
||||
self.delete_persisted_task_data(&task)?;
|
||||
self.update_task(&mut wtxn, &task)
|
||||
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?;
|
||||
if let Err(e) = self.delete_persisted_task_data(&task) {
|
||||
log::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid);
|
||||
}
|
||||
}
|
||||
log::info!("A batch of tasks was successfully completed.");
|
||||
}
|
||||
@ -818,15 +840,21 @@ impl IndexScheduler {
|
||||
Err(Error::Milli(milli::Error::InternalError(
|
||||
milli::InternalError::AbortedIndexation,
|
||||
))) => {
|
||||
// TODO should we add a breakpoint here?
|
||||
wtxn.abort()?;
|
||||
#[cfg(test)]
|
||||
self.breakpoint(Breakpoint::AbortedIndexation);
|
||||
wtxn.abort().map_err(Error::HeedTransaction)?;
|
||||
return Ok(0);
|
||||
}
|
||||
// In case of a failure we must get back and patch all the tasks with the error.
|
||||
Err(err) => {
|
||||
#[cfg(test)]
|
||||
self.breakpoint(Breakpoint::ProcessBatchFailed);
|
||||
let error: ResponseError = err.into();
|
||||
for id in ids {
|
||||
let mut task = self.get_task(&wtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
let mut task = self
|
||||
.get_task(&wtxn, id)
|
||||
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?
|
||||
.ok_or(Error::CorruptedTaskQueue)?;
|
||||
task.started_at = Some(started_at);
|
||||
task.finished_at = Some(finished_at);
|
||||
task.status = Status::Failed;
|
||||
@ -835,8 +863,11 @@ impl IndexScheduler {
|
||||
#[cfg(test)]
|
||||
self.maybe_fail(tests::FailureLocation::UpdatingTaskAfterProcessBatchFailure)?;
|
||||
|
||||
self.delete_persisted_task_data(&task)?;
|
||||
self.update_task(&mut wtxn, &task)?;
|
||||
if let Err(e) = self.delete_persisted_task_data(&task) {
|
||||
log::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid);
|
||||
}
|
||||
self.update_task(&mut wtxn, &task)
|
||||
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -845,7 +876,7 @@ impl IndexScheduler {
|
||||
#[cfg(test)]
|
||||
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;
|
||||
|
||||
wtxn.commit()?;
|
||||
wtxn.commit().map_err(Error::HeedTransaction)?;
|
||||
|
||||
#[cfg(test)]
|
||||
self.breakpoint(Breakpoint::AfterProcessing);
|
||||
@ -875,6 +906,8 @@ impl IndexScheduler {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Instant;
|
||||
|
||||
use big_s::S;
|
||||
use file_store::File;
|
||||
use meili_snap::snapshot;
|
||||
@ -1762,13 +1795,6 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn fail_in_update_task_after_process_batch_success_for_document_addition() {
|
||||
// TODO: this is not the correct behaviour, because we process the
|
||||
// same tasks twice.
|
||||
//
|
||||
// I wonder whether the run loop should maybe be paused if we encounter a failure
|
||||
// at that point. Alternatively, maybe we should preemptively mark the tasks
|
||||
// as failed at the beginning of `IndexScheduler::tick`.
|
||||
|
||||
let (index_scheduler, handle) = IndexScheduler::test(
|
||||
true,
|
||||
vec![(1, FailureLocation::UpdatingTaskAfterProcessBatchSuccess { task_uid: 0 })],
|
||||
@ -1795,6 +1821,10 @@ mod tests {
|
||||
allow_index_creation: true,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// This tests that the index scheduler pauses for one second when an irrecoverable failure occurs
|
||||
let start_time = Instant::now();
|
||||
|
||||
index_scheduler.assert_internally_consistent();
|
||||
handle.wait_till(Breakpoint::Start);
|
||||
|
||||
@ -1803,7 +1833,10 @@ mod tests {
|
||||
|
||||
handle.wait_till(Breakpoint::AfterProcessing);
|
||||
index_scheduler.assert_internally_consistent();
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_iteratino");
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_iteration");
|
||||
|
||||
let test_duration = start_time.elapsed();
|
||||
assert!(test_duration.as_millis() > 1000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
Loading…
Reference in New Issue
Block a user