Merge pull request #5753 from meilisearch/export-fixes

Various fixes on the export route
This commit is contained in:
Clément Renault 2025-07-11 19:15:42 +00:00 committed by GitHub
commit f4f333dbf6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 18 additions and 16 deletions

View file

@ -62,13 +62,14 @@ impl IndexScheduler {
let ExportIndexSettings { filter, override_settings } = export_settings; let ExportIndexSettings { filter, override_settings } = export_settings;
let index = self.index(uid)?; let index = self.index(uid)?;
let index_rtxn = index.read_txn()?; let index_rtxn = index.read_txn()?;
let bearer = api_key.map(|api_key| format!("Bearer {api_key}"));
// First, check if the index already exists // First, check if the index already exists
let url = format!("{base_url}/indexes/{uid}"); let url = format!("{base_url}/indexes/{uid}");
let response = retry(&must_stop_processing, || { let response = retry(&must_stop_processing, || {
let mut request = agent.get(&url); let mut request = agent.get(&url);
if let Some(api_key) = api_key { if let Some(bearer) = &bearer {
request = request.set("Authorization", &format!("Bearer {api_key}")); request = request.set("Authorization", bearer);
} }
request.send_bytes(Default::default()).map_err(into_backoff_error) request.send_bytes(Default::default()).map_err(into_backoff_error)
@ -90,8 +91,8 @@ impl IndexScheduler {
let url = format!("{base_url}/indexes"); let url = format!("{base_url}/indexes");
retry(&must_stop_processing, || { retry(&must_stop_processing, || {
let mut request = agent.post(&url); let mut request = agent.post(&url);
if let Some(api_key) = api_key { if let Some(bearer) = &bearer {
request = request.set("Authorization", &format!("Bearer {api_key}")); request = request.set("Authorization", bearer);
} }
let index_param = json!({ "uid": uid, "primaryKey": primary_key }); let index_param = json!({ "uid": uid, "primaryKey": primary_key });
request.send_json(&index_param).map_err(into_backoff_error) request.send_json(&index_param).map_err(into_backoff_error)
@ -103,8 +104,8 @@ impl IndexScheduler {
let url = format!("{base_url}/indexes/{uid}"); let url = format!("{base_url}/indexes/{uid}");
retry(&must_stop_processing, || { retry(&must_stop_processing, || {
let mut request = agent.patch(&url); let mut request = agent.patch(&url);
if let Some(api_key) = api_key { if let Some(bearer) = &bearer {
request = request.set("Authorization", &format!("Bearer {api_key}")); request = request.set("Authorization", bearer);
} }
let index_param = json!({ "primaryKey": primary_key }); let index_param = json!({ "primaryKey": primary_key });
request.send_json(&index_param).map_err(into_backoff_error) request.send_json(&index_param).map_err(into_backoff_error)
@ -122,7 +123,6 @@ impl IndexScheduler {
} }
// Retry logic for sending settings // Retry logic for sending settings
let url = format!("{base_url}/indexes/{uid}/settings"); let url = format!("{base_url}/indexes/{uid}/settings");
let bearer = api_key.map(|api_key| format!("Bearer {api_key}"));
retry(&must_stop_processing, || { retry(&must_stop_processing, || {
let mut request = agent.patch(&url); let mut request = agent.patch(&url);
if let Some(bearer) = bearer.as_ref() { if let Some(bearer) = bearer.as_ref() {
@ -167,10 +167,10 @@ impl IndexScheduler {
}, },
); );
let limit = payload_size.map(|ps| ps.as_u64() as usize).unwrap_or(50 * 1024 * 1024); // defaults to 50 MiB let limit = payload_size.map(|ps| ps.as_u64() as usize).unwrap_or(20 * 1024 * 1024); // defaults to 20 MiB
let documents_url = format!("{base_url}/indexes/{uid}/documents"); let documents_url = format!("{base_url}/indexes/{uid}/documents");
request_threads() let results = request_threads()
.broadcast(|ctx| { .broadcast(|ctx| {
let index_rtxn = index let index_rtxn = index
.read_txn() .read_txn()
@ -265,9 +265,8 @@ impl IndexScheduler {
let mut request = agent.post(&documents_url); let mut request = agent.post(&documents_url);
request = request.set("Content-Type", "application/x-ndjson"); request = request.set("Content-Type", "application/x-ndjson");
request = request.set("Content-Encoding", "gzip"); request = request.set("Content-Encoding", "gzip");
if let Some(api_key) = api_key { if let Some(bearer) = &bearer {
request = request request = request.set("Authorization", bearer);
.set("Authorization", &(format!("Bearer {api_key}")));
} }
request.send_bytes(&compressed_buffer).map_err(into_backoff_error) request.send_bytes(&compressed_buffer).map_err(into_backoff_error)
})?; })?;
@ -276,7 +275,7 @@ impl IndexScheduler {
} }
buffer.extend_from_slice(&tmp_buffer); buffer.extend_from_slice(&tmp_buffer);
if i % 100 == 0 { if i > 0 && i % 100 == 0 {
step.fetch_add(100, atomic::Ordering::Relaxed); step.fetch_add(100, atomic::Ordering::Relaxed);
} }
} }
@ -284,8 +283,8 @@ impl IndexScheduler {
retry(&must_stop_processing, || { retry(&must_stop_processing, || {
let mut request = agent.post(&documents_url); let mut request = agent.post(&documents_url);
request = request.set("Content-Type", "application/x-ndjson"); request = request.set("Content-Type", "application/x-ndjson");
if let Some(api_key) = api_key { if let Some(bearer) = &bearer {
request = request.set("Authorization", &(format!("Bearer {api_key}"))); request = request.set("Authorization", bearer);
} }
request.send_bytes(&buffer).map_err(into_backoff_error) request.send_bytes(&buffer).map_err(into_backoff_error)
})?; })?;
@ -298,6 +297,9 @@ impl IndexScheduler {
Some(uid.to_string()), Some(uid.to_string()),
) )
})?; })?;
for result in results {
result?;
}
step.store(total_documents, atomic::Ordering::Relaxed); step.store(total_documents, atomic::Ordering::Relaxed);
} }

View file

@ -49,7 +49,7 @@ pub enum MeilisearchHttpError {
TooManySearchRequests(usize), TooManySearchRequests(usize),
#[error("Internal error: Search limiter is down.")] #[error("Internal error: Search limiter is down.")]
SearchLimiterIsDown, SearchLimiterIsDown,
#[error("The provided payload reached the size limit. The maximum accepted payload size is {}.", Byte::from_u64(*.0 as u64).get_appropriate_unit(UnitType::Binary))] #[error("The provided payload reached the size limit. The maximum accepted payload size is {}.", Byte::from_u64(*.0 as u64).get_appropriate_unit(if *.0 % 1024 == 0 { UnitType::Binary } else { UnitType::Decimal }))]
PayloadTooLarge(usize), PayloadTooLarge(usize),
#[error("Two indexes must be given for each swap. The list `[{}]` contains {} indexes.", #[error("Two indexes must be given for each swap. The list `[{}]` contains {} indexes.",
.0.iter().map(|uid| format!("\"{uid}\"")).collect::<Vec<_>>().join(", "), .0.len() .0.iter().map(|uid| format!("\"{uid}\"")).collect::<Vec<_>>().join(", "), .0.len()