diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index 97b15904f..c57cafac0 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -11,8 +11,6 @@ pub enum Error { IndexNotFound(String), #[error("Index `{0}` already exists.")] IndexAlreadyExists(String), - #[error("Corrupted task queue.")] - CorruptedTaskQueue, #[error("Corrupted dump.")] CorruptedDump, #[error("Task `{0}` not found.")] @@ -29,7 +27,7 @@ pub enum Error { #[error(transparent)] Milli(#[from] milli::Error), #[error("An unexpected crash occurred when processing the task")] - MilliPanic, + ProcessBatchPanicked, #[error(transparent)] FileStore(#[from] file_store::Error), #[error(transparent)] @@ -37,6 +35,16 @@ pub enum Error { #[error(transparent)] Anyhow(#[from] anyhow::Error), + + // Irrecoverable errors: + #[error(transparent)] + CreateBatch(Box), + #[error("Corrupted task queue.")] + CorruptedTaskQueue, + #[error(transparent)] + TaskDatabaseUpdate(Box), + #[error(transparent)] + HeedTransaction(heed::Error), } impl ErrorCode for Error { @@ -50,7 +58,7 @@ impl ErrorCode for Error { Error::Dump(e) => e.error_code(), Error::Milli(e) => e.error_code(), - Error::MilliPanic => Code::Internal, + Error::ProcessBatchPanicked => Code::Internal, // TODO: TAMO: are all these errors really internal? Error::Heed(_) => Code::Internal, Error::FileStore(_) => Code::Internal, @@ -58,6 +66,9 @@ impl ErrorCode for Error { Error::Anyhow(_) => Code::Internal, Error::CorruptedTaskQueue => Code::Internal, Error::CorruptedDump => Code::Internal, + Error::TaskDatabaseUpdate(_) => Code::Internal, + Error::CreateBatch(_) => Code::Internal, + Error::HeedTransaction(_) => Code::Internal, } } } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 28ec79e14..905888436 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -41,6 +41,7 @@ use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; use std::sync::{Arc, RwLock}; +use std::time::Duration; use file_store::FileStore; use meilisearch_types::error::ResponseError; @@ -294,6 +295,9 @@ pub enum Breakpoint { BatchCreated, BeforeProcessing, AfterProcessing, + AbortedIndexation, + ProcessBatchSucceeded, + ProcessBatchFailed, } impl IndexScheduler { @@ -377,7 +381,19 @@ impl IndexScheduler { match run.tick() { Ok(0) => (), Ok(_) => run.wake_up.signal(), - Err(e) => log::error!("{}", e), + Err(e) => { + log::error!("{}", e); + // Wait one second when an irrecoverable error occurs. + match e { + Error::CorruptedTaskQueue + | Error::TaskDatabaseUpdate(_) + | Error::HeedTransaction(_) + | Error::CreateBatch(_) => { + std::thread::sleep(Duration::from_secs(1)); + } + _ => {} + } + } } }); } @@ -761,11 +777,12 @@ impl IndexScheduler { self.breakpoint(Breakpoint::Start); } - let rtxn = self.env.read_txn()?; - let batch = match self.create_next_batch(&rtxn)? { - Some(batch) => batch, - None => return Ok(0), - }; + let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; + let batch = + match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { + Some(batch) => batch, + None => return Ok(0), + }; drop(rtxn); // 1. store the starting date with the bitmap of processing tasks. @@ -786,17 +803,19 @@ impl IndexScheduler { let res = { let cloned_index_scheduler = self.private_clone(); let handle = std::thread::spawn(move || cloned_index_scheduler.process_batch(batch)); - handle.join().unwrap_or_else(|_| Err(Error::MilliPanic)) + handle.join().unwrap_or_else(|_| Err(Error::ProcessBatchPanicked)) }; #[cfg(test)] self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?; - let mut wtxn = self.env.write_txn()?; + let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; let finished_at = OffsetDateTime::now_utc(); match res { Ok(tasks) => { + #[cfg(test)] + self.breakpoint(Breakpoint::ProcessBatchSucceeded); #[allow(unused_variables)] for (i, mut task) in tasks.into_iter().enumerate() { task.started_at = Some(started_at); @@ -809,8 +828,11 @@ impl IndexScheduler { }, )?; - self.update_task(&mut wtxn, &task)?; - self.delete_persisted_task_data(&task)?; + self.update_task(&mut wtxn, &task) + .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; + if let Err(e) = self.delete_persisted_task_data(&task) { + log::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid); + } } log::info!("A batch of tasks was successfully completed."); } @@ -818,15 +840,21 @@ impl IndexScheduler { Err(Error::Milli(milli::Error::InternalError( milli::InternalError::AbortedIndexation, ))) => { - // TODO should we add a breakpoint here? - wtxn.abort()?; + #[cfg(test)] + self.breakpoint(Breakpoint::AbortedIndexation); + wtxn.abort().map_err(Error::HeedTransaction)?; return Ok(0); } // In case of a failure we must get back and patch all the tasks with the error. Err(err) => { + #[cfg(test)] + self.breakpoint(Breakpoint::ProcessBatchFailed); let error: ResponseError = err.into(); for id in ids { - let mut task = self.get_task(&wtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; + let mut task = self + .get_task(&wtxn, id) + .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? + .ok_or(Error::CorruptedTaskQueue)?; task.started_at = Some(started_at); task.finished_at = Some(finished_at); task.status = Status::Failed; @@ -835,8 +863,11 @@ impl IndexScheduler { #[cfg(test)] self.maybe_fail(tests::FailureLocation::UpdatingTaskAfterProcessBatchFailure)?; - self.delete_persisted_task_data(&task)?; - self.update_task(&mut wtxn, &task)?; + if let Err(e) = self.delete_persisted_task_data(&task) { + log::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid); + } + self.update_task(&mut wtxn, &task) + .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; } } } @@ -845,7 +876,7 @@ impl IndexScheduler { #[cfg(test)] self.maybe_fail(tests::FailureLocation::CommittingWtxn)?; - wtxn.commit()?; + wtxn.commit().map_err(Error::HeedTransaction)?; #[cfg(test)] self.breakpoint(Breakpoint::AfterProcessing); @@ -875,6 +906,8 @@ impl IndexScheduler { #[cfg(test)] mod tests { + use std::time::Instant; + use big_s::S; use file_store::File; use meili_snap::snapshot; @@ -1762,13 +1795,6 @@ mod tests { #[test] fn fail_in_update_task_after_process_batch_success_for_document_addition() { - // TODO: this is not the correct behaviour, because we process the - // same tasks twice. - // - // I wonder whether the run loop should maybe be paused if we encounter a failure - // at that point. Alternatively, maybe we should preemptively mark the tasks - // as failed at the beginning of `IndexScheduler::tick`. - let (index_scheduler, handle) = IndexScheduler::test( true, vec![(1, FailureLocation::UpdatingTaskAfterProcessBatchSuccess { task_uid: 0 })], @@ -1795,6 +1821,10 @@ mod tests { allow_index_creation: true, }) .unwrap(); + + // This tests that the index scheduler pauses for one second when an irrecoverable failure occurs + let start_time = Instant::now(); + index_scheduler.assert_internally_consistent(); handle.wait_till(Breakpoint::Start); @@ -1803,7 +1833,10 @@ mod tests { handle.wait_till(Breakpoint::AfterProcessing); index_scheduler.assert_internally_consistent(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_iteratino"); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_iteration"); + + let test_duration = start_time.elapsed(); + assert!(test_duration.as_millis() > 1000); } #[test] diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_update_task_after_process_batch_success_for_document_addition/second_iteratino.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_update_task_after_process_batch_success_for_document_addition/second_iteration.snap similarity index 100% rename from index-scheduler/src/snapshots/lib.rs/fail_in_update_task_after_process_batch_success_for_document_addition/second_iteratino.snap rename to index-scheduler/src/snapshots/lib.rs/fail_in_update_task_after_process_batch_success_for_document_addition/second_iteration.snap