5223: Limit batched tasks total size r=curquiza a=Kerollmops

Introduce a new engine parameter (env and config, too) to limit the maximum payload size processed by the engine in batches. You can [review the Discussion and usage on GitHub](https://github.com/orgs/meilisearch/discussions/801).

Co-authored-by: Clément Renault <clement@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2025-01-09 16:13:17 +00:00 committed by GitHub
commit 42854c0bca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 93 additions and 35 deletions

View File

@ -115,6 +115,9 @@ pub struct IndexSchedulerOptions {
/// If the autobatcher is allowed to automatically batch tasks /// If the autobatcher is allowed to automatically batch tasks
/// it will only batch this defined number of tasks at once. /// it will only batch this defined number of tasks at once.
pub max_number_of_batched_tasks: usize, pub max_number_of_batched_tasks: usize,
/// If the autobatcher is allowed to automatically batch tasks
/// it will only batch this defined maximum size (in bytes) of tasks at once.
pub batched_tasks_size_limit: u64,
/// The experimental features enabled for this instance. /// The experimental features enabled for this instance.
pub instance_features: InstanceTogglableFeatures, pub instance_features: InstanceTogglableFeatures,
} }

View File

@ -497,17 +497,26 @@ impl IndexScheduler {
1 1
}; };
let enqueued = index_tasks let mut enqueued = Vec::new();
.into_iter() let mut total_size: u64 = 0;
.take(tasks_limit) for task_id in index_tasks.into_iter().take(tasks_limit) {
.map(|task_id| { let task = self
self.queue .queue
.tasks .tasks
.get_task(rtxn, task_id) .get_task(rtxn, task_id)
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) .and_then(|task| task.ok_or(Error::CorruptedTaskQueue))?;
.map(|task| (task.uid, task.kind))
}) if let Some(uuid) = task.content_uuid() {
.collect::<Result<Vec<_>>>()?; let content_size = self.queue.file_store.compute_size(uuid)?;
total_size = total_size.saturating_add(content_size);
}
if total_size > self.scheduler.batched_tasks_size_limit && !enqueued.is_empty() {
break;
}
enqueued.push((task.uid, task.kind));
}
if let Some((batchkind, create_index)) = if let Some((batchkind, create_index)) =
autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref()) autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref())

View File

@ -60,6 +60,9 @@ pub struct Scheduler {
/// The maximum number of tasks that will be batched together. /// The maximum number of tasks that will be batched together.
pub(crate) max_number_of_batched_tasks: usize, pub(crate) max_number_of_batched_tasks: usize,
/// The maximum size, in bytes, of tasks in a batch.
pub(crate) batched_tasks_size_limit: u64,
/// The path used to create the dumps. /// The path used to create the dumps.
pub(crate) dumps_path: PathBuf, pub(crate) dumps_path: PathBuf,
@ -80,6 +83,7 @@ impl Scheduler {
wake_up: self.wake_up.clone(), wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled, autobatching_enabled: self.autobatching_enabled,
max_number_of_batched_tasks: self.max_number_of_batched_tasks, max_number_of_batched_tasks: self.max_number_of_batched_tasks,
batched_tasks_size_limit: self.batched_tasks_size_limit,
dumps_path: self.dumps_path.clone(), dumps_path: self.dumps_path.clone(),
snapshots_path: self.snapshots_path.clone(), snapshots_path: self.snapshots_path.clone(),
auth_path: self.auth_path.clone(), auth_path: self.auth_path.clone(),
@ -94,6 +98,7 @@ impl Scheduler {
wake_up: Arc::new(SignalEvent::auto(true)), wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled: options.autobatching_enabled, autobatching_enabled: options.autobatching_enabled,
max_number_of_batched_tasks: options.max_number_of_batched_tasks, max_number_of_batched_tasks: options.max_number_of_batched_tasks,
batched_tasks_size_limit: options.batched_tasks_size_limit,
dumps_path: options.dumps_path.clone(), dumps_path: options.dumps_path.clone(),
snapshots_path: options.snapshots_path.clone(), snapshots_path: options.snapshots_path.clone(),
auth_path: options.auth_path.clone(), auth_path: options.auth_path.clone(),

View File

@ -107,6 +107,7 @@ impl IndexScheduler {
cleanup_enabled: true, cleanup_enabled: true,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: usize::MAX, max_number_of_batched_tasks: usize::MAX,
batched_tasks_size_limit: u64::MAX,
instance_features: Default::default(), instance_features: Default::default(),
}; };
configuration(&mut options); configuration(&mut options);

View File

@ -194,6 +194,7 @@ struct Infos {
experimental_enable_logs_route: bool, experimental_enable_logs_route: bool,
experimental_reduce_indexing_memory_usage: bool, experimental_reduce_indexing_memory_usage: bool,
experimental_max_number_of_batched_tasks: usize, experimental_max_number_of_batched_tasks: usize,
experimental_limit_batched_tasks_total_size: u64,
gpu_enabled: bool, gpu_enabled: bool,
db_path: bool, db_path: bool,
import_dump: bool, import_dump: bool,
@ -239,6 +240,7 @@ impl Infos {
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
experimental_max_number_of_batched_tasks, experimental_max_number_of_batched_tasks,
experimental_limit_batched_tasks_total_size,
http_addr, http_addr,
master_key: _, master_key: _,
env, env,
@ -314,6 +316,7 @@ impl Infos {
http_addr: http_addr != default_http_addr(), http_addr: http_addr != default_http_addr(),
http_payload_size_limit, http_payload_size_limit,
experimental_max_number_of_batched_tasks, experimental_max_number_of_batched_tasks,
experimental_limit_batched_tasks_total_size,
task_queue_webhook: task_webhook_url.is_some(), task_queue_webhook: task_webhook_url.is_some(),
task_webhook_authorization_header: task_webhook_authorization_header.is_some(), task_webhook_authorization_header: task_webhook_authorization_header.is_some(),
log_level: log_level.to_string(), log_level: log_level.to_string(),

View File

@ -312,6 +312,7 @@ fn open_or_create_database_unchecked(
cleanup_enabled: !opt.experimental_replication_parameters, cleanup_enabled: !opt.experimental_replication_parameters,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize,
index_count: DEFAULT_INDEX_COUNT, index_count: DEFAULT_INDEX_COUNT,
instance_features, instance_features,

View File

@ -60,6 +60,8 @@ const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
"MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE"; "MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE";
const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str = const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str =
"MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS"; "MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS";
const MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE: &str =
"MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_SIZE";
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml"; const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
const DEFAULT_DB_PATH: &str = "./data.ms"; const DEFAULT_DB_PATH: &str = "./data.ms";
@ -200,20 +202,23 @@ pub struct Opt {
#[clap(long, env = MEILI_TASK_WEBHOOK_URL)] #[clap(long, env = MEILI_TASK_WEBHOOK_URL)]
pub task_webhook_url: Option<Url>, pub task_webhook_url: Option<Url>,
/// The Authorization header to send on the webhook URL whenever a task finishes so a third party can be notified. /// The Authorization header to send on the webhook URL whenever
/// a task finishes so a third party can be notified.
#[clap(long, env = MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER)] #[clap(long, env = MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER)]
pub task_webhook_authorization_header: Option<String>, pub task_webhook_authorization_header: Option<String>,
/// Deactivates Meilisearch's built-in telemetry when provided. /// Deactivates Meilisearch's built-in telemetry when provided.
/// ///
/// Meilisearch automatically collects data from all instances that do not opt out using this flag. /// Meilisearch automatically collects data from all instances that
/// All gathered data is used solely for the purpose of improving Meilisearch, and can be deleted /// do not opt out using this flag. All gathered data is used solely
/// for the purpose of improving Meilisearch, and can be deleted
/// at any time. /// at any time.
#[serde(default)] // we can't send true #[serde(default)] // we can't send true
#[clap(long, env = MEILI_NO_ANALYTICS)] #[clap(long, env = MEILI_NO_ANALYTICS)]
pub no_analytics: bool, pub no_analytics: bool,
/// Sets the maximum size of the index. Value must be given in bytes or explicitly stating a base unit (for instance: 107374182400, '107.7Gb', or '107374 Mb'). /// Sets the maximum size of the index. Value must be given in bytes or explicitly
/// stating a base unit (for instance: 107374182400, '107.7Gb', or '107374 Mb').
#[clap(skip = default_max_index_size())] #[clap(skip = default_max_index_size())]
#[serde(skip, default = "default_max_index_size")] #[serde(skip, default = "default_max_index_size")]
pub max_index_size: Byte, pub max_index_size: Byte,
@ -333,43 +338,53 @@ pub struct Opt {
/// Defines how much detail should be present in Meilisearch's logs. /// Defines how much detail should be present in Meilisearch's logs.
/// ///
/// Meilisearch currently supports six log levels, listed in order of increasing verbosity: OFF, ERROR, WARN, INFO, DEBUG, TRACE. /// Meilisearch currently supports six log levels, listed in order of
/// increasing verbosity: OFF, ERROR, WARN, INFO, DEBUG, TRACE.
#[clap(long, env = MEILI_LOG_LEVEL, default_value_t)] #[clap(long, env = MEILI_LOG_LEVEL, default_value_t)]
#[serde(default)] #[serde(default)]
pub log_level: LogLevel, pub log_level: LogLevel,
/// Experimental contains filter feature. For more information, see: <https://github.com/orgs/meilisearch/discussions/763> /// Experimental contains filter feature. For more information,
/// see: <https://github.com/orgs/meilisearch/discussions/763>
/// ///
/// Enables the experimental contains filter operator. /// Enables the experimental contains filter operator.
#[clap(long, env = MEILI_EXPERIMENTAL_CONTAINS_FILTER)] #[clap(long, env = MEILI_EXPERIMENTAL_CONTAINS_FILTER)]
#[serde(default)] #[serde(default)]
pub experimental_contains_filter: bool, pub experimental_contains_filter: bool,
/// Experimental metrics feature. For more information, see: <https://github.com/meilisearch/meilisearch/discussions/3518> /// Experimental metrics feature. For more information,
/// see: <https://github.com/meilisearch/meilisearch/discussions/3518>
/// ///
/// Enables the Prometheus metrics on the `GET /metrics` endpoint. /// Enables the Prometheus metrics on the `GET /metrics` endpoint.
#[clap(long, env = MEILI_EXPERIMENTAL_ENABLE_METRICS)] #[clap(long, env = MEILI_EXPERIMENTAL_ENABLE_METRICS)]
#[serde(default)] #[serde(default)]
pub experimental_enable_metrics: bool, pub experimental_enable_metrics: bool,
/// Experimental search queue size. For more information, see: <https://github.com/orgs/meilisearch/discussions/729> /// Experimental search queue size. For more information,
/// see: <https://github.com/orgs/meilisearch/discussions/729>
///
/// Lets you customize the size of the search queue. Meilisearch processes
/// your search requests as fast as possible but once the queue is full
/// it starts returning HTTP 503, Service Unavailable.
/// ///
/// Lets you customize the size of the search queue. Meilisearch processes your search requests as fast as possible but once the
/// queue is full it starts returning HTTP 503, Service Unavailable.
/// The default value is 1000. /// The default value is 1000.
#[clap(long, env = MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE, default_value_t = default_experimental_search_queue_size())] #[clap(long, env = MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE, default_value_t = default_experimental_search_queue_size())]
#[serde(default = "default_experimental_search_queue_size")] #[serde(default = "default_experimental_search_queue_size")]
pub experimental_search_queue_size: usize, pub experimental_search_queue_size: usize,
/// Experimental drop search after. For more information, see: <https://github.com/orgs/meilisearch/discussions/783> /// Experimental drop search after. For more information,
/// see: <https://github.com/orgs/meilisearch/discussions/783>
///
/// Let you customize after how many seconds Meilisearch should consider
/// a search request irrelevant and drop it.
/// ///
/// Let you customize after how many seconds Meilisearch should consider a search request irrelevant and drop it.
/// The default value is 60. /// The default value is 60.
#[clap(long, env = MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER, default_value_t = default_drop_search_after())] #[clap(long, env = MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER, default_value_t = default_drop_search_after())]
#[serde(default = "default_drop_search_after")] #[serde(default = "default_drop_search_after")]
pub experimental_drop_search_after: NonZeroUsize, pub experimental_drop_search_after: NonZeroUsize,
/// Experimental number of searches per core. For more information, see: <https://github.com/orgs/meilisearch/discussions/784> /// Experimental number of searches per core. For more information,
/// see: <https://github.com/orgs/meilisearch/discussions/784>
/// ///
/// Lets you customize how many search requests can run on each core concurrently. /// Lets you customize how many search requests can run on each core concurrently.
/// The default value is 4. /// The default value is 4.
@ -377,16 +392,19 @@ pub struct Opt {
#[serde(default = "default_nb_searches_per_core")] #[serde(default = "default_nb_searches_per_core")]
pub experimental_nb_searches_per_core: NonZeroUsize, pub experimental_nb_searches_per_core: NonZeroUsize,
/// Experimental logs mode feature. For more information, see: <https://github.com/orgs/meilisearch/discussions/723> /// Experimental logs mode feature. For more information,
/// see: <https://github.com/orgs/meilisearch/discussions/723>
/// ///
/// Change the mode of the logs on the console. /// Change the mode of the logs on the console.
#[clap(long, env = MEILI_EXPERIMENTAL_LOGS_MODE, default_value_t)] #[clap(long, env = MEILI_EXPERIMENTAL_LOGS_MODE, default_value_t)]
#[serde(default)] #[serde(default)]
pub experimental_logs_mode: LogMode, pub experimental_logs_mode: LogMode,
/// Experimental logs route feature. For more information, see: <https://github.com/orgs/meilisearch/discussions/721> /// Experimental logs route feature. For more information,
/// see: <https://github.com/orgs/meilisearch/discussions/721>
/// ///
/// Enables the log routes on the `POST /logs/stream`, `POST /logs/stderr` endpoints, and the `DELETE /logs/stream` to stop receiving logs. /// Enables the log routes on the `POST /logs/stream`, `POST /logs/stderr` endpoints,
/// and the `DELETE /logs/stream` to stop receiving logs.
#[clap(long, env = MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE)] #[clap(long, env = MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE)]
#[serde(default)] #[serde(default)]
pub experimental_enable_logs_route: bool, pub experimental_enable_logs_route: bool,
@ -396,21 +414,30 @@ pub struct Opt {
/// ///
/// - /!\ Disable the automatic clean up of old processed tasks, you're in charge of that now /// - /!\ Disable the automatic clean up of old processed tasks, you're in charge of that now
/// - Lets you specify a custom task ID upon registering a task /// - Lets you specify a custom task ID upon registering a task
/// - Lets you execute dry-register a task (get an answer from the route but nothing is actually registered in meilisearch and it won't be processed) /// - Lets you execute dry-register a task (get an answer from the route but nothing is actually
/// registered in meilisearch and it won't be processed)
#[clap(long, env = MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS)] #[clap(long, env = MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS)]
#[serde(default)] #[serde(default)]
pub experimental_replication_parameters: bool, pub experimental_replication_parameters: bool,
/// Experimental RAM reduction during indexing, do not use in production, see: <https://github.com/meilisearch/product/discussions/652> /// Experimental RAM reduction during indexing, do not use in production,
/// see: <https://github.com/meilisearch/product/discussions/652>
#[clap(long, env = MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE)] #[clap(long, env = MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE)]
#[serde(default)] #[serde(default)]
pub experimental_reduce_indexing_memory_usage: bool, pub experimental_reduce_indexing_memory_usage: bool,
/// Experimentally reduces the maximum number of tasks that will be processed at once, see: <https://github.com/orgs/meilisearch/discussions/713> /// Experimentally reduces the maximum number of tasks that will be processed at once,
/// see: <https://github.com/orgs/meilisearch/discussions/713>
#[clap(long, env = MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS, default_value_t = default_limit_batched_tasks())] #[clap(long, env = MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS, default_value_t = default_limit_batched_tasks())]
#[serde(default = "default_limit_batched_tasks")] #[serde(default = "default_limit_batched_tasks")]
pub experimental_max_number_of_batched_tasks: usize, pub experimental_max_number_of_batched_tasks: usize,
/// Experimentally reduces the maximum total size, in bytes, of tasks that will be processed at once,
/// see: <https://github.com/orgs/meilisearch/discussions/801>
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, default_value_t = default_limit_batched_tasks_total_size())]
#[serde(default = "default_limit_batched_tasks_total_size")]
pub experimental_limit_batched_tasks_total_size: u64,
#[serde(flatten)] #[serde(flatten)]
#[clap(flatten)] #[clap(flatten)]
pub indexer_options: IndexerOpts, pub indexer_options: IndexerOpts,
@ -482,7 +509,6 @@ impl Opt {
max_index_size: _, max_index_size: _,
max_task_db_size: _, max_task_db_size: _,
http_payload_size_limit, http_payload_size_limit,
experimental_max_number_of_batched_tasks,
ssl_cert_path, ssl_cert_path,
ssl_key_path, ssl_key_path,
ssl_auth_path, ssl_auth_path,
@ -512,6 +538,8 @@ impl Opt {
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_replication_parameters, experimental_replication_parameters,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
experimental_max_number_of_batched_tasks,
experimental_limit_batched_tasks_total_size,
} = self; } = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@ -534,10 +562,6 @@ impl Opt {
MEILI_HTTP_PAYLOAD_SIZE_LIMIT, MEILI_HTTP_PAYLOAD_SIZE_LIMIT,
http_payload_size_limit.to_string(), http_payload_size_limit.to_string(),
); );
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS,
experimental_max_number_of_batched_tasks.to_string(),
);
if let Some(ssl_cert_path) = ssl_cert_path { if let Some(ssl_cert_path) = ssl_cert_path {
export_to_env_if_not_present(MEILI_SSL_CERT_PATH, ssl_cert_path); export_to_env_if_not_present(MEILI_SSL_CERT_PATH, ssl_cert_path);
} }
@ -596,6 +620,14 @@ impl Opt {
MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE, MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE,
experimental_reduce_indexing_memory_usage.to_string(), experimental_reduce_indexing_memory_usage.to_string(),
); );
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS,
experimental_max_number_of_batched_tasks.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE,
experimental_limit_batched_tasks_total_size.to_string(),
);
indexer_options.export_to_env(); indexer_options.export_to_env();
} }
@ -899,6 +931,10 @@ fn default_limit_batched_tasks() -> usize {
usize::MAX usize::MAX
} }
fn default_limit_batched_tasks_total_size() -> u64 {
u64::MAX
}
fn default_snapshot_dir() -> PathBuf { fn default_snapshot_dir() -> PathBuf {
PathBuf::from(DEFAULT_SNAPSHOT_DIR) PathBuf::from(DEFAULT_SNAPSHOT_DIR)
} }