Implement a retry strategy

This commit is contained in:
Clément Renault 2025-06-16 11:35:47 +02:00
parent 87cf0970d1
commit a7685feabc
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
5 changed files with 89 additions and 24 deletions

1
Cargo.lock generated
View File

@ -2999,6 +2999,7 @@ name = "index-scheduler"
version = "1.15.1"
dependencies = [
"anyhow",
"backoff",
"big_s",
"bincode",
"bumpalo",

View File

@ -44,6 +44,7 @@ time = { version = "0.3.37", features = [
tracing = "0.1.41"
ureq = "2.12.1"
uuid = { version = "1.11.0", features = ["serde", "v4"] }
backoff = "0.4.0"
[dev-dependencies]
big_s = "1.0.2"

View File

@ -153,6 +153,8 @@ pub enum Error {
DatabaseUpgrade(Box<Self>),
#[error(transparent)]
Export(Box<Self>),
#[error("Failed to export documents to remote server {code} ({type}): {message} <{link}>")]
FromRemoteWhenExporting { message: String, code: String, r#type: String, link: String },
#[error("Failed to rollback for index `{index}`: {rollback_outcome} ")]
RollbackFailed { index: String, rollback_outcome: RollbackOutcome },
#[error(transparent)]

View File

@ -1,14 +1,18 @@
use std::collections::BTreeMap;
use std::io;
use std::sync::atomic;
use std::time::Duration;
use backoff::ExponentialBackoff;
use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::update::Setting;
use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
use meilisearch_types::milli::{obkv_to_json, Filter};
use meilisearch_types::settings::{self, SecretPolicy};
use meilisearch_types::tasks::ExportIndexSettings;
use serde::Deserialize;
use ureq::{json, Agent};
use crate::processing::AtomicDocumentStep;
@ -17,7 +21,7 @@ use crate::{Error, IndexScheduler, Result};
impl IndexScheduler {
pub(super) fn process_export(
&self,
url: &str,
base_url: &str,
indexes: &BTreeMap<IndexUidPattern, ExportIndexSettings>,
api_key: Option<&str>,
progress: Progress,
@ -56,24 +60,34 @@ impl IndexScheduler {
// Send the primary key
let primary_key = index.primary_key(&index_rtxn).unwrap();
// TODO implement retry logic
let mut request = agent.post(&format!("{url}/indexes"));
if let Some(api_key) = api_key {
request = request.set("Authorization", &format!("Bearer {api_key}"));
}
request.send_json(&json!({ "uid": uid, "primaryKey": primary_key })).unwrap();
let url = format!("{base_url}/indexes");
retry(|| {
let mut request = agent.post(&url);
if let Some(api_key) = api_key {
request = request.set("Authorization", &format!("Bearer {api_key}"));
}
let index_param = json!({ "uid": uid, "primaryKey": primary_key });
request.send_json(&index_param).map_err(into_backoff_error)
})?;
// Send the index settings
let settings = settings::settings(&index, &index_rtxn, SecretPolicy::RevealSecrets)
let mut settings = settings::settings(&index, &index_rtxn, SecretPolicy::RevealSecrets)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
// TODO implement retry logic
// improve error reporting (get error message)
let mut request = agent.patch(&format!("{url}/indexes/{uid}/settings"));
if let Some(api_key) = api_key {
request = request.set("Authorization", &format!("Bearer {api_key}"));
// Remove the experimental chat setting if not enabled
if self.features().check_chat_completions("exporting chat settings").is_err() {
settings.chat = Setting::NotSet;
}
request.send_json(settings).unwrap();
// Retry logic for sending settings
let url = format!("{base_url}/indexes/{uid}/settings");
retry(|| {
let mut request = agent.patch(&url);
if let Some(api_key) = api_key {
request = request.set("Authorization", &format!("Bearer {api_key}"));
}
request.send_json(settings.clone()).map_err(into_backoff_error)
})?;
// TODO support JSON Value objects
let filter = filter
.as_deref()
.map(Filter::from_str)
@ -171,8 +185,7 @@ impl IndexScheduler {
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
if buffer.len() + tmp_buffer.len() > limit {
// TODO implement retry logic
post_serialized_documents(&agent, url, uid, api_key, &buffer).unwrap();
post_serialized_documents(&agent, base_url, uid, api_key, &buffer).unwrap();
buffer.clear();
}
buffer.extend_from_slice(&tmp_buffer);
@ -182,7 +195,7 @@ impl IndexScheduler {
}
}
post_serialized_documents(&agent, url, uid, api_key, &buffer).unwrap();
post_serialized_documents(&agent, base_url, uid, api_key, &buffer).unwrap();
step.store(total_documents, atomic::Ordering::Relaxed);
}
@ -190,19 +203,66 @@ impl IndexScheduler {
}
}
fn retry<F>(send_request: F) -> Result<ureq::Response>
where
F: Fn() -> Result<ureq::Response, backoff::Error<ureq::Error>>,
{
match backoff::retry(ExponentialBackoff::default(), || send_request()) {
Ok(response) => Ok(response),
Err(backoff::Error::Permanent(e)) => Err(ureq_error_into_error(e)),
Err(backoff::Error::Transient { err, retry_after: _ }) => Err(ureq_error_into_error(err)),
}
}
fn post_serialized_documents(
agent: &Agent,
url: &str,
base_url: &str,
uid: &str,
api_key: Option<&str>,
buffer: &[u8],
) -> Result<ureq::Response, ureq::Error> {
let mut request = agent.post(&format!("{url}/indexes/{uid}/documents"));
request = request.set("Content-Type", "application/x-ndjson");
if let Some(api_key) = api_key {
request = request.set("Authorization", &format!("Bearer {api_key}"));
) -> Result<ureq::Response> {
let url = format!("{base_url}/indexes/{uid}/documents");
retry(|| {
let mut request = agent.post(&url);
request = request.set("Content-Type", "application/x-ndjson");
if let Some(api_key) = api_key {
request = request.set("Authorization", &(format!("Bearer {api_key}")));
}
request.send_bytes(buffer).map_err(into_backoff_error)
})
}
fn into_backoff_error(err: ureq::Error) -> backoff::Error<ureq::Error> {
match err {
// Those code status must trigger an automatic retry
// <https://www.restapitutorial.com/advanced/responses/retries>
ureq::Error::Status(408 | 429 | 500 | 502 | 503 | 504, _) => {
backoff::Error::Transient { err, retry_after: None }
}
ureq::Error::Status(_, _) => backoff::Error::Permanent(err),
ureq::Error::Transport(_) => backoff::Error::Transient { err, retry_after: None },
}
}
/// Converts a `ureq::Error` into an `Error`.
fn ureq_error_into_error(error: ureq::Error) -> Error {
#[derive(Deserialize)]
struct MeiliError {
message: String,
code: String,
r#type: String,
link: String,
}
match error {
ureq::Error::Status(_, response) => match response.into_json() {
Ok(MeiliError { message, code, r#type, link }) => {
Error::FromRemoteWhenExporting { message, code, r#type, link }
}
Err(e) => io::Error::from(e).into(),
},
ureq::Error::Transport(transport) => io::Error::new(io::ErrorKind::Other, transport).into(),
}
request.send_bytes(buffer)
}
enum ExportIndex {}

View File

@ -968,6 +968,7 @@ pub fn settings(
if let SecretPolicy::HideSecrets = secret_policy {
settings.hide_secrets()
}
Ok(settings)
}