From ed96556296aa54d8e384f9f7a1a02a3887bbd502 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 16 Jun 2025 15:34:05 +0200 Subject: [PATCH] Support task cancelation --- .../src/scheduler/process_export.rs | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_export.rs b/crates/index-scheduler/src/scheduler/process_export.rs index 7501c260e..ceac18632 100644 --- a/crates/index-scheduler/src/scheduler/process_export.rs +++ b/crates/index-scheduler/src/scheduler/process_export.rs @@ -15,6 +15,7 @@ use meilisearch_types::tasks::ExportIndexSettings; use serde::Deserialize; use ureq::{json, Agent}; +use super::MustStopProcessing; use crate::processing::AtomicDocumentStep; use crate::{Error, IndexScheduler, Result}; @@ -41,9 +42,8 @@ impl IndexScheduler { .collect(); let agent: Agent = ureq::AgentBuilder::new().timeout(Duration::from_secs(5)).build(); - + let must_stop_processing = self.scheduler.must_stop_processing.clone(); for (i, (uid, settings)) in indexes.iter().enumerate() { - let must_stop_processing = self.scheduler.must_stop_processing.clone(); if must_stop_processing.get() { return Err(Error::AbortedTask); } @@ -59,9 +59,9 @@ impl IndexScheduler { let index_rtxn = index.read_txn()?; // Send the primary key - let primary_key = index.primary_key(&index_rtxn).unwrap(); + let primary_key = index.primary_key(&index_rtxn)?; let url = format!("{base_url}/indexes"); - retry(|| { + retry(&must_stop_processing, || { let mut request = agent.post(&url); if let Some(api_key) = api_key { request = request.set("Authorization", &format!("Bearer {api_key}")); @@ -79,7 +79,7 @@ impl IndexScheduler { } // Retry logic for sending settings let url = format!("{base_url}/indexes/{uid}/settings"); - retry(|| { + retry(&must_stop_processing, || { let mut request = agent.patch(&url); if let Some(api_key) = api_key { request = request.set("Authorization", &format!("Bearer {api_key}")); @@ -115,6 +115,8 @@ impl IndexScheduler { progress.update_progress(progress_step); let limit = 50 * 1024 * 1024; // 50 MiB + let documents_url = format!("{base_url}/indexes/{uid}/documents"); + let mut buffer = Vec::new(); let mut tmp_buffer = Vec::new(); for (i, docid) in universe.into_iter().enumerate() { @@ -185,7 +187,14 @@ impl IndexScheduler { .map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?; if buffer.len() + tmp_buffer.len() > limit { - post_serialized_documents(&agent, base_url, uid, api_key, &buffer).unwrap(); + retry(&must_stop_processing, || { + let mut request = agent.post(&documents_url); + request = request.set("Content-Type", "application/x-ndjson"); + if let Some(api_key) = api_key { + request = request.set("Authorization", &(format!("Bearer {api_key}"))); + } + request.send_bytes(&buffer).map_err(into_backoff_error) + })?; buffer.clear(); } buffer.extend_from_slice(&tmp_buffer); @@ -195,7 +204,14 @@ impl IndexScheduler { } } - post_serialized_documents(&agent, base_url, uid, api_key, &buffer).unwrap(); + retry(&must_stop_processing, || { + let mut request = agent.post(&documents_url); + request = request.set("Content-Type", "application/x-ndjson"); + if let Some(api_key) = api_key { + request = request.set("Authorization", &(format!("Bearer {api_key}"))); + } + request.send_bytes(&buffer).map_err(into_backoff_error) + })?; step.store(total_documents, atomic::Ordering::Relaxed); } @@ -203,10 +219,14 @@ impl IndexScheduler { } } -fn retry(send_request: F) -> Result +fn retry(must_stop_processing: &MustStopProcessing, send_request: F) -> Result where F: Fn() -> Result>, { + if must_stop_processing.get() { + return Err(Error::AbortedTask); + } + match backoff::retry(ExponentialBackoff::default(), || send_request()) { Ok(response) => Ok(response), Err(backoff::Error::Permanent(e)) => Err(ureq_error_into_error(e)), @@ -214,24 +234,6 @@ where } } -fn post_serialized_documents( - agent: &Agent, - base_url: &str, - uid: &str, - api_key: Option<&str>, - buffer: &[u8], -) -> Result { - let url = format!("{base_url}/indexes/{uid}/documents"); - retry(|| { - let mut request = agent.post(&url); - request = request.set("Content-Type", "application/x-ndjson"); - if let Some(api_key) = api_key { - request = request.set("Authorization", &(format!("Bearer {api_key}"))); - } - request.send_bytes(buffer).map_err(into_backoff_error) - }) -} - fn into_backoff_error(err: ureq::Error) -> backoff::Error { match err { // Those code status must trigger an automatic retry