Gzip-compress the content

This commit is contained in:
Kerollmops 2025-06-25 12:29:14 +02:00
parent c6216517c7
commit a743da3061
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -1,9 +1,11 @@
use std::collections::BTreeMap;
use std::io;
use std::io::{self, Write as _};
use std::sync::atomic;
use std::time::Duration;
use backoff::ExponentialBackoff;
use flate2::write::GzEncoder;
use flate2::Compression;
use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
@ -131,6 +133,7 @@ impl IndexScheduler {
let mut buffer = Vec::new();
let mut tmp_buffer = Vec::new();
let mut compressed_buffer = Vec::new();
for (i, docid) in universe.iter().enumerate() {
if i % ctx.num_threads() != ctx.index() {
continue;
@ -205,17 +208,31 @@ impl IndexScheduler {
.map_err(milli::InternalError::from)
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
if buffer.len() + tmp_buffer.len() > limit {
// Make sure we put at least one document in the buffer even
// though we might go above the buffer limit before sending
if !buffer.is_empty() && buffer.len() + tmp_buffer.len() > limit {
// We compress the documents before sending them
let mut encoder =
GzEncoder::new(&mut compressed_buffer, Compression::default());
encoder
.write_all(&buffer)
.map_err(|e| Error::from_milli(e.into(), Some(uid.clone())))?;
encoder
.finish()
.map_err(|e| Error::from_milli(e.into(), Some(uid.clone())))?;
retry(&must_stop_processing, || {
let mut request = agent.post(&documents_url);
request = request.set("Content-Type", "application/x-ndjson");
request = request.set("Content-Encoding", "gzip");
if let Some(api_key) = api_key {
request = request
.set("Authorization", &(format!("Bearer {api_key}")));
}
request.send_bytes(&buffer).map_err(into_backoff_error)
request.send_bytes(&compressed_buffer).map_err(into_backoff_error)
})?;
buffer.clear();
compressed_buffer.clear();
}
buffer.extend_from_slice(&tmp_buffer);