From 4ce1621bb36bbef91c738978eaae5b7e4ef6ef40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 16 Jun 2025 11:35:47 +0200 Subject: [PATCH] Implement a retry strategy --- Cargo.lock | 1 + crates/index-scheduler/Cargo.toml | 1 + crates/index-scheduler/src/error.rs | 4 + .../src/scheduler/process_export.rs | 108 ++++++++++++++---- crates/meilisearch-types/src/settings.rs | 1 + 5 files changed, 91 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c487b6ac4..979ba2703 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2999,6 +2999,7 @@ name = "index-scheduler" version = "1.15.1" dependencies = [ "anyhow", + "backoff", "big_s", "bincode", "bumpalo", diff --git a/crates/index-scheduler/Cargo.toml b/crates/index-scheduler/Cargo.toml index b4f187729..042d9364e 100644 --- a/crates/index-scheduler/Cargo.toml +++ b/crates/index-scheduler/Cargo.toml @@ -44,6 +44,7 @@ time = { version = "0.3.37", features = [ tracing = "0.1.41" ureq = "2.12.1" uuid = { version = "1.11.0", features = ["serde", "v4"] } +backoff = "0.4.0" [dev-dependencies] big_s = "1.0.2" diff --git a/crates/index-scheduler/src/error.rs b/crates/index-scheduler/src/error.rs index 2020ac597..60669ff2d 100644 --- a/crates/index-scheduler/src/error.rs +++ b/crates/index-scheduler/src/error.rs @@ -153,6 +153,8 @@ pub enum Error { DatabaseUpgrade(Box), #[error(transparent)] Export(Box), + #[error("Failed to export documents to remote server {code} ({type}): {message} <{link}>")] + FromRemoteWhenExporting { message: String, code: String, r#type: String, link: String }, #[error("Failed to rollback for index `{index}`: {rollback_outcome} ")] RollbackFailed { index: String, rollback_outcome: RollbackOutcome }, #[error(transparent)] @@ -214,6 +216,7 @@ impl Error { | Error::BatchNotFound(_) | Error::TaskDeletionWithEmptyQuery | Error::TaskCancelationWithEmptyQuery + | Error::FromRemoteWhenExporting { .. } | Error::AbortedTask | Error::Dump(_) | Error::Heed(_) @@ -285,6 +288,7 @@ impl ErrorCode for Error { Error::Dump(e) => e.error_code(), Error::Milli { error, .. } => error.error_code(), Error::ProcessBatchPanicked(_) => Code::Internal, + Error::FromRemoteWhenExporting { .. } => Code::Internal, Error::Heed(e) => e.error_code(), Error::HeedTransaction(e) => e.error_code(), Error::FileStore(e) => e.error_code(), diff --git a/crates/index-scheduler/src/scheduler/process_export.rs b/crates/index-scheduler/src/scheduler/process_export.rs index 1686472ab..7501c260e 100644 --- a/crates/index-scheduler/src/scheduler/process_export.rs +++ b/crates/index-scheduler/src/scheduler/process_export.rs @@ -1,14 +1,18 @@ use std::collections::BTreeMap; +use std::io; use std::sync::atomic; use std::time::Duration; +use backoff::ExponentialBackoff; use meilisearch_types::index_uid_pattern::IndexUidPattern; use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; +use meilisearch_types::milli::update::Setting; use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors}; use meilisearch_types::milli::{obkv_to_json, Filter}; use meilisearch_types::settings::{self, SecretPolicy}; use meilisearch_types::tasks::ExportIndexSettings; +use serde::Deserialize; use ureq::{json, Agent}; use crate::processing::AtomicDocumentStep; @@ -17,7 +21,7 @@ use crate::{Error, IndexScheduler, Result}; impl IndexScheduler { pub(super) fn process_export( &self, - url: &str, + base_url: &str, indexes: &BTreeMap, api_key: Option<&str>, progress: Progress, @@ -56,24 +60,34 @@ impl IndexScheduler { // Send the primary key let primary_key = index.primary_key(&index_rtxn).unwrap(); - // TODO implement retry logic - let mut request = agent.post(&format!("{url}/indexes")); - if let Some(api_key) = api_key { - request = request.set("Authorization", &format!("Bearer {api_key}")); - } - request.send_json(&json!({ "uid": uid, "primaryKey": primary_key })).unwrap(); + let url = format!("{base_url}/indexes"); + retry(|| { + let mut request = agent.post(&url); + if let Some(api_key) = api_key { + request = request.set("Authorization", &format!("Bearer {api_key}")); + } + let index_param = json!({ "uid": uid, "primaryKey": primary_key }); + request.send_json(&index_param).map_err(into_backoff_error) + })?; // Send the index settings - let settings = settings::settings(&index, &index_rtxn, SecretPolicy::RevealSecrets) + let mut settings = settings::settings(&index, &index_rtxn, SecretPolicy::RevealSecrets) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; - // TODO implement retry logic - // improve error reporting (get error message) - let mut request = agent.patch(&format!("{url}/indexes/{uid}/settings")); - if let Some(api_key) = api_key { - request = request.set("Authorization", &format!("Bearer {api_key}")); + // Remove the experimental chat setting if not enabled + if self.features().check_chat_completions("exporting chat settings").is_err() { + settings.chat = Setting::NotSet; } - request.send_json(settings).unwrap(); + // Retry logic for sending settings + let url = format!("{base_url}/indexes/{uid}/settings"); + retry(|| { + let mut request = agent.patch(&url); + if let Some(api_key) = api_key { + request = request.set("Authorization", &format!("Bearer {api_key}")); + } + request.send_json(settings.clone()).map_err(into_backoff_error) + })?; + // TODO support JSON Value objects let filter = filter .as_deref() .map(Filter::from_str) @@ -171,8 +185,7 @@ impl IndexScheduler { .map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?; if buffer.len() + tmp_buffer.len() > limit { - // TODO implement retry logic - post_serialized_documents(&agent, url, uid, api_key, &buffer).unwrap(); + post_serialized_documents(&agent, base_url, uid, api_key, &buffer).unwrap(); buffer.clear(); } buffer.extend_from_slice(&tmp_buffer); @@ -182,7 +195,7 @@ impl IndexScheduler { } } - post_serialized_documents(&agent, url, uid, api_key, &buffer).unwrap(); + post_serialized_documents(&agent, base_url, uid, api_key, &buffer).unwrap(); step.store(total_documents, atomic::Ordering::Relaxed); } @@ -190,19 +203,66 @@ impl IndexScheduler { } } +fn retry(send_request: F) -> Result +where + F: Fn() -> Result>, +{ + match backoff::retry(ExponentialBackoff::default(), || send_request()) { + Ok(response) => Ok(response), + Err(backoff::Error::Permanent(e)) => Err(ureq_error_into_error(e)), + Err(backoff::Error::Transient { err, retry_after: _ }) => Err(ureq_error_into_error(err)), + } +} + fn post_serialized_documents( agent: &Agent, - url: &str, + base_url: &str, uid: &str, api_key: Option<&str>, buffer: &[u8], -) -> Result { - let mut request = agent.post(&format!("{url}/indexes/{uid}/documents")); - request = request.set("Content-Type", "application/x-ndjson"); - if let Some(api_key) = api_key { - request = request.set("Authorization", &format!("Bearer {api_key}")); +) -> Result { + let url = format!("{base_url}/indexes/{uid}/documents"); + retry(|| { + let mut request = agent.post(&url); + request = request.set("Content-Type", "application/x-ndjson"); + if let Some(api_key) = api_key { + request = request.set("Authorization", &(format!("Bearer {api_key}"))); + } + request.send_bytes(buffer).map_err(into_backoff_error) + }) +} + +fn into_backoff_error(err: ureq::Error) -> backoff::Error { + match err { + // Those code status must trigger an automatic retry + // + ureq::Error::Status(408 | 429 | 500 | 502 | 503 | 504, _) => { + backoff::Error::Transient { err, retry_after: None } + } + ureq::Error::Status(_, _) => backoff::Error::Permanent(err), + ureq::Error::Transport(_) => backoff::Error::Transient { err, retry_after: None }, + } +} + +/// Converts a `ureq::Error` into an `Error`. +fn ureq_error_into_error(error: ureq::Error) -> Error { + #[derive(Deserialize)] + struct MeiliError { + message: String, + code: String, + r#type: String, + link: String, + } + + match error { + ureq::Error::Status(_, response) => match response.into_json() { + Ok(MeiliError { message, code, r#type, link }) => { + Error::FromRemoteWhenExporting { message, code, r#type, link } + } + Err(e) => io::Error::from(e).into(), + }, + ureq::Error::Transport(transport) => io::Error::new(io::ErrorKind::Other, transport).into(), } - request.send_bytes(buffer) } enum ExportIndex {} diff --git a/crates/meilisearch-types/src/settings.rs b/crates/meilisearch-types/src/settings.rs index 1c225b355..295318f4b 100644 --- a/crates/meilisearch-types/src/settings.rs +++ b/crates/meilisearch-types/src/settings.rs @@ -968,6 +968,7 @@ pub fn settings( if let SecretPolicy::HideSecrets = secret_policy { settings.hide_secrets() } + Ok(settings) }