Better handle task abortion

This commit is contained in:
Kerollmops 2025-06-27 12:23:55 +02:00
parent 657bbf5d1e
commit 7219299436
No known key found for this signature in database
GPG key ID: F250A4C4E3AE5F5F

View file

@ -16,7 +16,7 @@ use meilisearch_types::milli::{self, obkv_to_json, Filter, InternalError};
use meilisearch_types::settings::{self, SecretPolicy}; use meilisearch_types::settings::{self, SecretPolicy};
use meilisearch_types::tasks::ExportIndexSettings; use meilisearch_types::tasks::ExportIndexSettings;
use serde::Deserialize; use serde::Deserialize;
use ureq::{json, Agent}; use ureq::{json, Response};
use super::MustStopProcessing; use super::MustStopProcessing;
use crate::processing::AtomicDocumentStep; use crate::processing::AtomicDocumentStep;
@ -45,7 +45,7 @@ impl IndexScheduler {
}) })
.collect(); .collect();
let agent: Agent = ureq::AgentBuilder::new().timeout(Duration::from_secs(5)).build(); let agent = ureq::AgentBuilder::new().timeout(Duration::from_secs(5)).build();
let must_stop_processing = self.scheduler.must_stop_processing.clone(); let must_stop_processing = self.scheduler.must_stop_processing.clone();
for (i, (uid, settings)) in indexes.iter().enumerate() { for (i, (uid, settings)) in indexes.iter().enumerate() {
if must_stop_processing.get() { if must_stop_processing.get() {
@ -272,11 +272,16 @@ fn retry<F>(must_stop_processing: &MustStopProcessing, send_request: F) -> Resul
where where
F: Fn() -> Result<ureq::Response, backoff::Error<ureq::Error>>, F: Fn() -> Result<ureq::Response, backoff::Error<ureq::Error>>,
{ {
match backoff::retry(ExponentialBackoff::default(), || {
if must_stop_processing.get() { if must_stop_processing.get() {
return Err(Error::AbortedTask); return Err(backoff::Error::Permanent(ureq::Error::Status(
u16::MAX,
// 444: Connection Closed Without Response
Response::new(444, "Abort", "Aborted task").unwrap(),
)));
} }
send_request()
match backoff::retry(ExponentialBackoff::default(), send_request) { }) {
Ok(response) => Ok(response), Ok(response) => Ok(response),
Err(backoff::Error::Permanent(e)) => Err(ureq_error_into_error(e)), Err(backoff::Error::Permanent(e)) => Err(ureq_error_into_error(e)),
Err(backoff::Error::Transient { err, retry_after: _ }) => Err(ureq_error_into_error(err)), Err(backoff::Error::Transient { err, retry_after: _ }) => Err(ureq_error_into_error(err)),
@ -306,6 +311,9 @@ fn ureq_error_into_error(error: ureq::Error) -> Error {
} }
match error { match error {
// This is a workaround to handle task abortion - the error propagation path
// makes it difficult to cleanly surface the abortion at this level.
ureq::Error::Status(u16::MAX, _) => Error::AbortedTask,
ureq::Error::Status(_, response) => match response.into_json() { ureq::Error::Status(_, response) => match response.into_json() {
Ok(MeiliError { message, code, r#type, link }) => { Ok(MeiliError { message, code, r#type, link }) => {
Error::FromRemoteWhenExporting { message, code, r#type, link } Error::FromRemoteWhenExporting { message, code, r#type, link }