Support task cancelation

This commit is contained in:
Clément Renault 2025-06-16 15:34:05 +02:00
parent ba84865dbc
commit ed96556296
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -15,6 +15,7 @@ use meilisearch_types::tasks::ExportIndexSettings;
use serde::Deserialize;
use ureq::{json, Agent};
use super::MustStopProcessing;
use crate::processing::AtomicDocumentStep;
use crate::{Error, IndexScheduler, Result};
@ -41,9 +42,8 @@ impl IndexScheduler {
.collect();
let agent: Agent = ureq::AgentBuilder::new().timeout(Duration::from_secs(5)).build();
let must_stop_processing = self.scheduler.must_stop_processing.clone();
for (i, (uid, settings)) in indexes.iter().enumerate() {
let must_stop_processing = self.scheduler.must_stop_processing.clone();
if must_stop_processing.get() {
return Err(Error::AbortedTask);
}
@ -59,9 +59,9 @@ impl IndexScheduler {
let index_rtxn = index.read_txn()?;
// Send the primary key
let primary_key = index.primary_key(&index_rtxn).unwrap();
let primary_key = index.primary_key(&index_rtxn)?;
let url = format!("{base_url}/indexes");
retry(|| {
retry(&must_stop_processing, || {
let mut request = agent.post(&url);
if let Some(api_key) = api_key {
request = request.set("Authorization", &format!("Bearer {api_key}"));
@ -79,7 +79,7 @@ impl IndexScheduler {
}
// Retry logic for sending settings
let url = format!("{base_url}/indexes/{uid}/settings");
retry(|| {
retry(&must_stop_processing, || {
let mut request = agent.patch(&url);
if let Some(api_key) = api_key {
request = request.set("Authorization", &format!("Bearer {api_key}"));
@ -115,6 +115,8 @@ impl IndexScheduler {
progress.update_progress(progress_step);
let limit = 50 * 1024 * 1024; // 50 MiB
let documents_url = format!("{base_url}/indexes/{uid}/documents");
let mut buffer = Vec::new();
let mut tmp_buffer = Vec::new();
for (i, docid) in universe.into_iter().enumerate() {
@ -185,7 +187,14 @@ impl IndexScheduler {
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
if buffer.len() + tmp_buffer.len() > limit {
post_serialized_documents(&agent, base_url, uid, api_key, &buffer).unwrap();
retry(&must_stop_processing, || {
let mut request = agent.post(&documents_url);
request = request.set("Content-Type", "application/x-ndjson");
if let Some(api_key) = api_key {
request = request.set("Authorization", &(format!("Bearer {api_key}")));
}
request.send_bytes(&buffer).map_err(into_backoff_error)
})?;
buffer.clear();
}
buffer.extend_from_slice(&tmp_buffer);
@ -195,7 +204,14 @@ impl IndexScheduler {
}
}
post_serialized_documents(&agent, base_url, uid, api_key, &buffer).unwrap();
retry(&must_stop_processing, || {
let mut request = agent.post(&documents_url);
request = request.set("Content-Type", "application/x-ndjson");
if let Some(api_key) = api_key {
request = request.set("Authorization", &(format!("Bearer {api_key}")));
}
request.send_bytes(&buffer).map_err(into_backoff_error)
})?;
step.store(total_documents, atomic::Ordering::Relaxed);
}
@ -203,10 +219,14 @@ impl IndexScheduler {
}
}
fn retry<F>(send_request: F) -> Result<ureq::Response>
fn retry<F>(must_stop_processing: &MustStopProcessing, send_request: F) -> Result<ureq::Response>
where
F: Fn() -> Result<ureq::Response, backoff::Error<ureq::Error>>,
{
if must_stop_processing.get() {
return Err(Error::AbortedTask);
}
match backoff::retry(ExponentialBackoff::default(), || send_request()) {
Ok(response) => Ok(response),
Err(backoff::Error::Permanent(e)) => Err(ureq_error_into_error(e)),
@ -214,24 +234,6 @@ where
}
}
fn post_serialized_documents(
agent: &Agent,
base_url: &str,
uid: &str,
api_key: Option<&str>,
buffer: &[u8],
) -> Result<ureq::Response> {
let url = format!("{base_url}/indexes/{uid}/documents");
retry(|| {
let mut request = agent.post(&url);
request = request.set("Content-Type", "application/x-ndjson");
if let Some(api_key) = api_key {
request = request.set("Authorization", &(format!("Bearer {api_key}")));
}
request.send_bytes(buffer).map_err(into_backoff_error)
})
}
fn into_backoff_error(err: ureq::Error) -> backoff::Error<ureq::Error> {
match err {
// Those code status must trigger an automatic retry