diff --git a/Cargo.lock b/Cargo.lock index a883b749f..be6aa4b21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3855,6 +3855,7 @@ dependencies = [ "anyhow", "bumpalo", "bumparaw-collections", + "byte-unit", "convert_case 0.8.0", "csv", "deserr", diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index 5c67d7a94..7fd0ea376 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -4,6 +4,7 @@ use std::collections::BTreeMap; use meilisearch_types::batches::BatchId; +use meilisearch_types::byte_unit::Byte; use meilisearch_types::error::ResponseError; use meilisearch_types::keys::Key; use meilisearch_types::milli::update::IndexDocumentsMethod; @@ -148,6 +149,7 @@ pub enum KindDump { Export { url: String, api_key: Option, + payload_size: Option, indexes: BTreeMap, }, UpgradeDatabase { @@ -222,9 +224,10 @@ impl From for KindDump { KindDump::DumpCreation { keys, instance_uid } } KindWithContent::SnapshotCreation => KindDump::SnapshotCreation, - KindWithContent::Export { url, api_key, indexes } => KindDump::Export { + KindWithContent::Export { url, api_key, payload_size, indexes } => KindDump::Export { url, api_key, + payload_size, indexes: indexes .into_iter() .map(|(pattern, settings)| (pattern.to_string(), settings)) diff --git a/crates/index-scheduler/src/dump.rs b/crates/index-scheduler/src/dump.rs index 2a99a74aa..1e681c8e8 100644 --- a/crates/index-scheduler/src/dump.rs +++ b/crates/index-scheduler/src/dump.rs @@ -212,20 +212,23 @@ impl<'a> Dump<'a> { KindWithContent::DumpCreation { keys, instance_uid } } KindDump::SnapshotCreation => KindWithContent::SnapshotCreation, - KindDump::Export { url, indexes, api_key } => KindWithContent::Export { - url, - api_key, - indexes: indexes - .into_iter() - .map(|(pattern, settings)| { - Ok(( - IndexUidPattern::try_from(pattern) - .map_err(|_| Error::CorruptedDump)?, - settings, - )) - }) - .collect::>()?, - }, + KindDump::Export { url, api_key, payload_size, indexes } => { + KindWithContent::Export { + url, + api_key, + payload_size, + indexes: indexes + .into_iter() + .map(|(pattern, settings)| { + Ok(( + IndexUidPattern::try_from(pattern) + .map_err(|_| Error::CorruptedDump)?, + settings, + )) + }) + .collect::>()?, + } + } KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from }, }, }; diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index 138b591ff..f48821520 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -289,8 +289,8 @@ fn snapshot_details(d: &Details) -> String { Details::IndexSwap { swaps } => { format!("{{ swaps: {swaps:?} }}") } - Details::Export { url, api_key, indexes } => { - format!("{{ url: {url:?}, api_key: {api_key:?}, indexes: {indexes:?} }}") + Details::Export { url, api_key, payload_size, indexes } => { + format!("{{ url: {url:?}, api_key: {api_key:?}, payload_size: {payload_size:?}, indexes: {indexes:?} }}") } Details::UpgradeDatabase { from, to } => { format!("{{ from: {from:?}, to: {to:?} }}") diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 99278756d..e56b8e13a 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -362,12 +362,19 @@ impl IndexScheduler { Ok((vec![task], ProcessBatchInfo::default())) } Batch::Export { mut task } => { - let KindWithContent::Export { url, indexes, api_key } = &task.kind else { + let KindWithContent::Export { url, api_key, payload_size, indexes } = &task.kind + else { unreachable!() }; let ret = catch_unwind(AssertUnwindSafe(|| { - self.process_export(url, indexes, api_key.as_deref(), progress) + self.process_export( + url, + api_key.as_deref(), + payload_size.as_ref(), + indexes, + progress, + ) })); match ret { diff --git a/crates/index-scheduler/src/scheduler/process_export.rs b/crates/index-scheduler/src/scheduler/process_export.rs index 180162eda..e777809fd 100644 --- a/crates/index-scheduler/src/scheduler/process_export.rs +++ b/crates/index-scheduler/src/scheduler/process_export.rs @@ -4,6 +4,7 @@ use std::sync::atomic; use std::time::Duration; use backoff::ExponentialBackoff; +use byte_unit::Byte; use flate2::write::GzEncoder; use flate2::Compression; use meilisearch_types::index_uid_pattern::IndexUidPattern; @@ -25,8 +26,9 @@ impl IndexScheduler { pub(super) fn process_export( &self, base_url: &str, - indexes: &BTreeMap, api_key: Option<&str>, + payload_size: Option<&Byte>, + indexes: &BTreeMap, progress: Progress, ) -> Result<()> { #[cfg(test)] @@ -122,7 +124,7 @@ impl IndexScheduler { let (step, progress_step) = AtomicDocumentStep::new(total_documents); progress.update_progress(progress_step); - let limit = 50 * 1024 * 1024; // 50 MiB + let limit = payload_size.map(|ps| ps.as_u64() as usize).unwrap_or(50 * 1024 * 1024); // defaults to 50 MiB let documents_url = format!("{base_url}/indexes/{uid}/documents"); request_threads() diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 79571745b..594023145 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -601,7 +601,7 @@ impl crate::IndexScheduler { Details::Dump { dump_uid: _ } => { assert_eq!(kind.as_kind(), Kind::DumpCreation); } - Details::Export { url: _, api_key: _, indexes: _ } => { + Details::Export { url: _, api_key: _, payload_size: _, indexes: _ } => { assert_eq!(kind.as_kind(), Kind::Export); } Details::UpgradeDatabase { from: _, to: _ } => { diff --git a/crates/meilisearch-types/Cargo.toml b/crates/meilisearch-types/Cargo.toml index f76044078..faf59643f 100644 --- a/crates/meilisearch-types/Cargo.toml +++ b/crates/meilisearch-types/Cargo.toml @@ -15,6 +15,7 @@ actix-web = { version = "4.11.0", default-features = false } anyhow = "1.0.98" bumpalo = "3.18.1" bumparaw-collections = "0.1.4" +byte-unit = { version = "5.1.6", features = ["serde"] } convert_case = "0.8.0" csv = "1.3.1" deserr = { version = "0.6.3", features = ["actix-web"] } diff --git a/crates/meilisearch-types/src/error.rs b/crates/meilisearch-types/src/error.rs index 08ee803ef..a8f45b4ef 100644 --- a/crates/meilisearch-types/src/error.rs +++ b/crates/meilisearch-types/src/error.rs @@ -392,6 +392,7 @@ InvalidSettingsIndexChat , InvalidRequest , BAD_REQU // Export InvalidExportUrl , InvalidRequest , BAD_REQUEST ; InvalidExportApiKey , InvalidRequest , BAD_REQUEST ; +InvalidExportPayloadSize , InvalidRequest , BAD_REQUEST ; InvalidExportIndexesPatterns , InvalidRequest , BAD_REQUEST ; InvalidExportIndexSkipEmbeddings , InvalidRequest , BAD_REQUEST ; InvalidExportIndexFilter , InvalidRequest , BAD_REQUEST ; diff --git a/crates/meilisearch-types/src/lib.rs b/crates/meilisearch-types/src/lib.rs index a1a57b7e6..fe69da526 100644 --- a/crates/meilisearch-types/src/lib.rs +++ b/crates/meilisearch-types/src/lib.rs @@ -18,7 +18,7 @@ pub mod versioning; pub use milli::{heed, Index}; use uuid::Uuid; pub use versioning::VERSION_FILE_NAME; -pub use {milli, serde_cs}; +pub use {byte_unit, milli, serde_cs}; pub type Document = serde_json::Map; pub type InstanceUid = Uuid; diff --git a/crates/meilisearch-types/src/task_view.rs b/crates/meilisearch-types/src/task_view.rs index 0a8d7b8fe..1dbd5637b 100644 --- a/crates/meilisearch-types/src/task_view.rs +++ b/crates/meilisearch-types/src/task_view.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; +use byte_unit::UnitType; use milli::Object; use serde::{Deserialize, Serialize}; use time::{Duration, OffsetDateTime}; @@ -128,6 +129,8 @@ pub struct DetailsView { #[serde(skip_serializing_if = "Option::is_none")] pub api_key: Option, #[serde(skip_serializing_if = "Option::is_none")] + pub payload_size: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub indexes: Option>, } @@ -263,6 +266,13 @@ impl DetailsView { // So we return the first one we encounter but that shouldn't be an issue anyway. (Some(left), Some(_right)) => Some(left), }, + payload_size: match (self.payload_size.clone(), other.payload_size.clone()) { + (None, None) => None, + (None, Some(size)) | (Some(size), None) => Some(size), + // We should never be able to batch multiple exports at the same time. + // So we return the first one we encounter but that shouldn't be an issue anyway. + (Some(left), Some(_right)) => Some(left), + }, indexes: match (self.indexes.clone(), other.indexes.clone()) { (None, None) => None, (None, Some(indexes)) | (Some(indexes), None) => Some(indexes), @@ -359,9 +369,11 @@ impl From
for DetailsView { Details::IndexSwap { swaps } => { DetailsView { swaps: Some(swaps), ..Default::default() } } - Details::Export { url, api_key, indexes } => DetailsView { + Details::Export { url, api_key, payload_size, indexes } => DetailsView { url: Some(url), api_key, + payload_size: payload_size + .map(|ps| ps.get_appropriate_unit(UnitType::Both).to_string()), indexes: Some( indexes .into_iter() diff --git a/crates/meilisearch-types/src/tasks.rs b/crates/meilisearch-types/src/tasks.rs index 86951192c..508035bb7 100644 --- a/crates/meilisearch-types/src/tasks.rs +++ b/crates/meilisearch-types/src/tasks.rs @@ -3,6 +3,7 @@ use std::collections::{BTreeMap, HashSet}; use std::fmt::{Display, Write}; use std::str::FromStr; +use byte_unit::Byte; use enum_iterator::Sequence; use milli::update::IndexDocumentsMethod; use milli::Object; @@ -159,6 +160,7 @@ pub enum KindWithContent { Export { url: String, api_key: Option, + payload_size: Option, indexes: BTreeMap, }, UpgradeDatabase { @@ -286,11 +288,14 @@ impl KindWithContent { }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::SnapshotCreation => None, - KindWithContent::Export { url, api_key, indexes } => Some(Details::Export { - url: url.clone(), - api_key: api_key.clone(), - indexes: indexes.iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(), - }), + KindWithContent::Export { url, api_key, payload_size, indexes } => { + Some(Details::Export { + url: url.clone(), + api_key: api_key.clone(), + payload_size: payload_size.clone(), + indexes: indexes.iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(), + }) + } KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase { from: (from.0, from.1, from.2), to: ( @@ -357,11 +362,14 @@ impl KindWithContent { }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::SnapshotCreation => None, - KindWithContent::Export { url, api_key, indexes } => Some(Details::Export { - url: url.clone(), - api_key: api_key.clone(), - indexes: indexes.iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(), - }), + KindWithContent::Export { url, api_key, payload_size, indexes } => { + Some(Details::Export { + url: url.clone(), + api_key: api_key.clone(), + payload_size: payload_size.clone(), + indexes: indexes.iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(), + }) + } KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase { from: *from, to: ( @@ -410,11 +418,14 @@ impl From<&KindWithContent> for Option
{ }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::SnapshotCreation => None, - KindWithContent::Export { url, api_key, indexes } => Some(Details::Export { - url: url.clone(), - api_key: api_key.clone(), - indexes: indexes.iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(), - }), + KindWithContent::Export { url, api_key, payload_size, indexes } => { + Some(Details::Export { + url: url.clone(), + api_key: api_key.clone(), + payload_size: payload_size.clone(), + indexes: indexes.iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(), + }) + } KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase { from: *from, to: ( @@ -681,6 +692,7 @@ pub enum Details { Export { url: String, api_key: Option, + payload_size: Option, indexes: BTreeMap, }, UpgradeDatabase { diff --git a/crates/meilisearch/src/routes/export.rs b/crates/meilisearch/src/routes/export.rs index de1fe2c38..1c519224c 100644 --- a/crates/meilisearch/src/routes/export.rs +++ b/crates/meilisearch/src/routes/export.rs @@ -1,7 +1,10 @@ use std::collections::BTreeMap; +use std::convert::Infallible; +use std::str::FromStr as _; use actix_web::web::{self, Data}; use actix_web::{HttpRequest, HttpResponse}; +use byte_unit::Byte; use deserr::actix_web::AwebJson; use deserr::Deserr; use index_scheduler::IndexScheduler; @@ -72,7 +75,7 @@ async fn export( let export = export.into_inner(); debug!(returns = ?export, "Trigger export"); - let Export { url, api_key, indexes } = export; + let Export { url, api_key, payload_size, indexes } = export; let indexes = if indexes.is_empty() { BTreeMap::from([(IndexUidPattern::new_unchecked("*"), DbExportIndexSettings::default())]) @@ -85,7 +88,12 @@ async fn export( .collect() }; - let task = KindWithContent::Export { url, api_key, indexes }; + let task = KindWithContent::Export { + url, + api_key, + payload_size: payload_size.map(|ByteWithDeserr(bytes)| bytes), + indexes, + }; let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = @@ -109,12 +117,51 @@ pub struct Export { #[serde(default)] #[deserr(default, error = DeserrJsonError)] pub api_key: Option, + #[schema(value_type = Option, example = json!("24MiB"))] + #[serde(default)] + #[deserr(default, error = DeserrJsonError)] + pub payload_size: Option, #[schema(value_type = Option>, example = json!(["movies", "steam-*"]))] #[deserr(default)] #[serde(default)] pub indexes: BTreeMap, } +/// A wrapper around the `Byte` type that implements `Deserr`. +#[derive(Debug, Serialize)] +#[serde(transparent)] +pub struct ByteWithDeserr(pub Byte); + +impl deserr::Deserr for ByteWithDeserr +where + E: deserr::DeserializeError, +{ + fn deserialize_from_value( + value: deserr::Value, + location: deserr::ValuePointerRef, + ) -> Result { + use deserr::{ErrorKind, Value, ValueKind}; + match value { + Value::Integer(integer) => Ok(ByteWithDeserr(Byte::from_u64(integer))), + Value::String(string) => Byte::from_str(&string).map(ByteWithDeserr).map_err(|e| { + deserr::take_cf_content(E::error::( + None, + ErrorKind::Unexpected { msg: e.to_string() }, + location, + )) + }), + actual => Err(deserr::take_cf_content(E::error( + None, + ErrorKind::IncorrectValueKind { + actual, + accepted: &[ValueKind::Integer, ValueKind::String], + }, + location, + ))), + } + } +} + #[derive(Debug, Deserr, ToSchema, Serialize)] #[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] #[serde(rename_all = "camelCase")]