From a743da30618850e6e6e302b1c7e009d932d7a8b6 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 25 Jun 2025 12:29:14 +0200 Subject: [PATCH] Gzip-compress the content --- .../src/scheduler/process_export.rs | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_export.rs b/crates/index-scheduler/src/scheduler/process_export.rs index 3054c919b..180162eda 100644 --- a/crates/index-scheduler/src/scheduler/process_export.rs +++ b/crates/index-scheduler/src/scheduler/process_export.rs @@ -1,9 +1,11 @@ use std::collections::BTreeMap; -use std::io; +use std::io::{self, Write as _}; use std::sync::atomic; use std::time::Duration; use backoff::ExponentialBackoff; +use flate2::write::GzEncoder; +use flate2::Compression; use meilisearch_types::index_uid_pattern::IndexUidPattern; use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; @@ -131,6 +133,7 @@ impl IndexScheduler { let mut buffer = Vec::new(); let mut tmp_buffer = Vec::new(); + let mut compressed_buffer = Vec::new(); for (i, docid) in universe.iter().enumerate() { if i % ctx.num_threads() != ctx.index() { continue; @@ -205,17 +208,31 @@ impl IndexScheduler { .map_err(milli::InternalError::from) .map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?; - if buffer.len() + tmp_buffer.len() > limit { + // Make sure we put at least one document in the buffer even + // though we might go above the buffer limit before sending + if !buffer.is_empty() && buffer.len() + tmp_buffer.len() > limit { + // We compress the documents before sending them + let mut encoder = + GzEncoder::new(&mut compressed_buffer, Compression::default()); + encoder + .write_all(&buffer) + .map_err(|e| Error::from_milli(e.into(), Some(uid.clone())))?; + encoder + .finish() + .map_err(|e| Error::from_milli(e.into(), Some(uid.clone())))?; + retry(&must_stop_processing, || { let mut request = agent.post(&documents_url); request = request.set("Content-Type", "application/x-ndjson"); + request = request.set("Content-Encoding", "gzip"); if let Some(api_key) = api_key { request = request .set("Authorization", &(format!("Bearer {api_key}"))); } - request.send_bytes(&buffer).map_err(into_backoff_error) + request.send_bytes(&compressed_buffer).map_err(into_backoff_error) })?; buffer.clear(); + compressed_buffer.clear(); } buffer.extend_from_slice(&tmp_buffer);