diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index d3fb5b737..3c50283d9 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -115,6 +115,9 @@ pub struct IndexSchedulerOptions { /// If the autobatcher is allowed to automatically batch tasks /// it will only batch this defined number of tasks at once. pub max_number_of_batched_tasks: usize, + /// If the autobatcher is allowed to automatically batch tasks + /// it will only batch this defined maximum size (in bytes) of tasks at once. + pub batched_tasks_size_limit: u64, /// The experimental features enabled for this instance. pub instance_features: InstanceTogglableFeatures, } diff --git a/crates/index-scheduler/src/scheduler/create_batch.rs b/crates/index-scheduler/src/scheduler/create_batch.rs index e9755c1a7..b224ee6eb 100644 --- a/crates/index-scheduler/src/scheduler/create_batch.rs +++ b/crates/index-scheduler/src/scheduler/create_batch.rs @@ -497,17 +497,26 @@ impl IndexScheduler { 1 }; - let enqueued = index_tasks - .into_iter() - .take(tasks_limit) - .map(|task_id| { - self.queue - .tasks - .get_task(rtxn, task_id) - .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) - .map(|task| (task.uid, task.kind)) - }) - .collect::>>()?; + let mut enqueued = Vec::new(); + let mut total_size: u64 = 0; + for task_id in index_tasks.into_iter().take(tasks_limit) { + let task = self + .queue + .tasks + .get_task(rtxn, task_id) + .and_then(|task| task.ok_or(Error::CorruptedTaskQueue))?; + + if let Some(uuid) = task.content_uuid() { + let content_size = self.queue.file_store.compute_size(uuid)?; + total_size = total_size.saturating_add(content_size); + } + + if total_size > self.scheduler.batched_tasks_size_limit && !enqueued.is_empty() { + break; + } + + enqueued.push((task.uid, task.kind)); + } if let Some((batchkind, create_index)) = autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref()) diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index 2c76e2f38..2d20c4d55 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -60,6 +60,9 @@ pub struct Scheduler { /// The maximum number of tasks that will be batched together. pub(crate) max_number_of_batched_tasks: usize, + /// The maximum size, in bytes, of tasks in a batch. + pub(crate) batched_tasks_size_limit: u64, + /// The path used to create the dumps. pub(crate) dumps_path: PathBuf, @@ -80,6 +83,7 @@ impl Scheduler { wake_up: self.wake_up.clone(), autobatching_enabled: self.autobatching_enabled, max_number_of_batched_tasks: self.max_number_of_batched_tasks, + batched_tasks_size_limit: self.batched_tasks_size_limit, dumps_path: self.dumps_path.clone(), snapshots_path: self.snapshots_path.clone(), auth_path: self.auth_path.clone(), @@ -94,6 +98,7 @@ impl Scheduler { wake_up: Arc::new(SignalEvent::auto(true)), autobatching_enabled: options.autobatching_enabled, max_number_of_batched_tasks: options.max_number_of_batched_tasks, + batched_tasks_size_limit: options.batched_tasks_size_limit, dumps_path: options.dumps_path.clone(), snapshots_path: options.snapshots_path.clone(), auth_path: options.auth_path.clone(), diff --git a/crates/index-scheduler/src/test_utils.rs b/crates/index-scheduler/src/test_utils.rs index f4779eea9..4be944037 100644 --- a/crates/index-scheduler/src/test_utils.rs +++ b/crates/index-scheduler/src/test_utils.rs @@ -107,6 +107,7 @@ impl IndexScheduler { cleanup_enabled: true, max_number_of_tasks: 1_000_000, max_number_of_batched_tasks: usize::MAX, + batched_tasks_size_limit: u64::MAX, instance_features: Default::default(), }; configuration(&mut options); diff --git a/crates/meilisearch/src/analytics/segment_analytics.rs b/crates/meilisearch/src/analytics/segment_analytics.rs index 9d187a52c..646bff532 100644 --- a/crates/meilisearch/src/analytics/segment_analytics.rs +++ b/crates/meilisearch/src/analytics/segment_analytics.rs @@ -194,6 +194,7 @@ struct Infos { experimental_enable_logs_route: bool, experimental_reduce_indexing_memory_usage: bool, experimental_max_number_of_batched_tasks: usize, + experimental_limit_batched_tasks_total_size: u64, gpu_enabled: bool, db_path: bool, import_dump: bool, @@ -239,6 +240,7 @@ impl Infos { experimental_enable_logs_route, experimental_reduce_indexing_memory_usage, experimental_max_number_of_batched_tasks, + experimental_limit_batched_tasks_total_size, http_addr, master_key: _, env, @@ -314,6 +316,7 @@ impl Infos { http_addr: http_addr != default_http_addr(), http_payload_size_limit, experimental_max_number_of_batched_tasks, + experimental_limit_batched_tasks_total_size, task_queue_webhook: task_webhook_url.is_some(), task_webhook_authorization_header: task_webhook_authorization_header.is_some(), log_level: log_level.to_string(), diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 8359c5255..a8b8b8eba 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -312,6 +312,7 @@ fn open_or_create_database_unchecked( cleanup_enabled: !opt.experimental_replication_parameters, max_number_of_tasks: 1_000_000, max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, + batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize, index_count: DEFAULT_INDEX_COUNT, instance_features, diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index 4405031ae..b5aa6b9e7 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -60,6 +60,8 @@ const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str = "MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE"; const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str = "MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS"; +const MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE: &str = + "MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_SIZE"; const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml"; const DEFAULT_DB_PATH: &str = "./data.ms"; @@ -200,20 +202,23 @@ pub struct Opt { #[clap(long, env = MEILI_TASK_WEBHOOK_URL)] pub task_webhook_url: Option, - /// The Authorization header to send on the webhook URL whenever a task finishes so a third party can be notified. + /// The Authorization header to send on the webhook URL whenever + /// a task finishes so a third party can be notified. #[clap(long, env = MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER)] pub task_webhook_authorization_header: Option, /// Deactivates Meilisearch's built-in telemetry when provided. /// - /// Meilisearch automatically collects data from all instances that do not opt out using this flag. - /// All gathered data is used solely for the purpose of improving Meilisearch, and can be deleted + /// Meilisearch automatically collects data from all instances that + /// do not opt out using this flag. All gathered data is used solely + /// for the purpose of improving Meilisearch, and can be deleted /// at any time. #[serde(default)] // we can't send true #[clap(long, env = MEILI_NO_ANALYTICS)] pub no_analytics: bool, - /// Sets the maximum size of the index. Value must be given in bytes or explicitly stating a base unit (for instance: 107374182400, '107.7Gb', or '107374 Mb'). + /// Sets the maximum size of the index. Value must be given in bytes or explicitly + /// stating a base unit (for instance: 107374182400, '107.7Gb', or '107374 Mb'). #[clap(skip = default_max_index_size())] #[serde(skip, default = "default_max_index_size")] pub max_index_size: Byte, @@ -333,43 +338,53 @@ pub struct Opt { /// Defines how much detail should be present in Meilisearch's logs. /// - /// Meilisearch currently supports six log levels, listed in order of increasing verbosity: OFF, ERROR, WARN, INFO, DEBUG, TRACE. + /// Meilisearch currently supports six log levels, listed in order of + /// increasing verbosity: OFF, ERROR, WARN, INFO, DEBUG, TRACE. #[clap(long, env = MEILI_LOG_LEVEL, default_value_t)] #[serde(default)] pub log_level: LogLevel, - /// Experimental contains filter feature. For more information, see: + /// Experimental contains filter feature. For more information, + /// see: /// /// Enables the experimental contains filter operator. #[clap(long, env = MEILI_EXPERIMENTAL_CONTAINS_FILTER)] #[serde(default)] pub experimental_contains_filter: bool, - /// Experimental metrics feature. For more information, see: + /// Experimental metrics feature. For more information, + /// see: /// /// Enables the Prometheus metrics on the `GET /metrics` endpoint. #[clap(long, env = MEILI_EXPERIMENTAL_ENABLE_METRICS)] #[serde(default)] pub experimental_enable_metrics: bool, - /// Experimental search queue size. For more information, see: + /// Experimental search queue size. For more information, + /// see: + /// + /// Lets you customize the size of the search queue. Meilisearch processes + /// your search requests as fast as possible but once the queue is full + /// it starts returning HTTP 503, Service Unavailable. /// - /// Lets you customize the size of the search queue. Meilisearch processes your search requests as fast as possible but once the - /// queue is full it starts returning HTTP 503, Service Unavailable. /// The default value is 1000. #[clap(long, env = MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE, default_value_t = default_experimental_search_queue_size())] #[serde(default = "default_experimental_search_queue_size")] pub experimental_search_queue_size: usize, - /// Experimental drop search after. For more information, see: + /// Experimental drop search after. For more information, + /// see: + /// + /// Let you customize after how many seconds Meilisearch should consider + /// a search request irrelevant and drop it. /// - /// Let you customize after how many seconds Meilisearch should consider a search request irrelevant and drop it. /// The default value is 60. #[clap(long, env = MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER, default_value_t = default_drop_search_after())] #[serde(default = "default_drop_search_after")] pub experimental_drop_search_after: NonZeroUsize, - /// Experimental number of searches per core. For more information, see: + /// Experimental number of searches per core. For more information, + /// see: /// /// Lets you customize how many search requests can run on each core concurrently. /// The default value is 4. @@ -377,16 +392,19 @@ pub struct Opt { #[serde(default = "default_nb_searches_per_core")] pub experimental_nb_searches_per_core: NonZeroUsize, - /// Experimental logs mode feature. For more information, see: + /// Experimental logs mode feature. For more information, + /// see: /// /// Change the mode of the logs on the console. #[clap(long, env = MEILI_EXPERIMENTAL_LOGS_MODE, default_value_t)] #[serde(default)] pub experimental_logs_mode: LogMode, - /// Experimental logs route feature. For more information, see: + /// Experimental logs route feature. For more information, + /// see: /// - /// Enables the log routes on the `POST /logs/stream`, `POST /logs/stderr` endpoints, and the `DELETE /logs/stream` to stop receiving logs. + /// Enables the log routes on the `POST /logs/stream`, `POST /logs/stderr` endpoints, + /// and the `DELETE /logs/stream` to stop receiving logs. #[clap(long, env = MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE)] #[serde(default)] pub experimental_enable_logs_route: bool, @@ -396,21 +414,30 @@ pub struct Opt { /// /// - /!\ Disable the automatic clean up of old processed tasks, you're in charge of that now /// - Lets you specify a custom task ID upon registering a task - /// - Lets you execute dry-register a task (get an answer from the route but nothing is actually registered in meilisearch and it won't be processed) + /// - Lets you execute dry-register a task (get an answer from the route but nothing is actually + /// registered in meilisearch and it won't be processed) #[clap(long, env = MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS)] #[serde(default)] pub experimental_replication_parameters: bool, - /// Experimental RAM reduction during indexing, do not use in production, see: + /// Experimental RAM reduction during indexing, do not use in production, + /// see: #[clap(long, env = MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE)] #[serde(default)] pub experimental_reduce_indexing_memory_usage: bool, - /// Experimentally reduces the maximum number of tasks that will be processed at once, see: + /// Experimentally reduces the maximum number of tasks that will be processed at once, + /// see: #[clap(long, env = MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS, default_value_t = default_limit_batched_tasks())] #[serde(default = "default_limit_batched_tasks")] pub experimental_max_number_of_batched_tasks: usize, + /// Experimentally reduces the maximum total size, in bytes, of tasks that will be processed at once, + /// see: + #[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, default_value_t = default_limit_batched_tasks_total_size())] + #[serde(default = "default_limit_batched_tasks_total_size")] + pub experimental_limit_batched_tasks_total_size: u64, + #[serde(flatten)] #[clap(flatten)] pub indexer_options: IndexerOpts, @@ -482,7 +509,6 @@ impl Opt { max_index_size: _, max_task_db_size: _, http_payload_size_limit, - experimental_max_number_of_batched_tasks, ssl_cert_path, ssl_key_path, ssl_auth_path, @@ -512,6 +538,8 @@ impl Opt { experimental_enable_logs_route, experimental_replication_parameters, experimental_reduce_indexing_memory_usage, + experimental_max_number_of_batched_tasks, + experimental_limit_batched_tasks_total_size, } = self; export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); @@ -534,10 +562,6 @@ impl Opt { MEILI_HTTP_PAYLOAD_SIZE_LIMIT, http_payload_size_limit.to_string(), ); - export_to_env_if_not_present( - MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS, - experimental_max_number_of_batched_tasks.to_string(), - ); if let Some(ssl_cert_path) = ssl_cert_path { export_to_env_if_not_present(MEILI_SSL_CERT_PATH, ssl_cert_path); } @@ -596,6 +620,14 @@ impl Opt { MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE, experimental_reduce_indexing_memory_usage.to_string(), ); + export_to_env_if_not_present( + MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS, + experimental_max_number_of_batched_tasks.to_string(), + ); + export_to_env_if_not_present( + MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, + experimental_limit_batched_tasks_total_size.to_string(), + ); indexer_options.export_to_env(); } @@ -899,6 +931,10 @@ fn default_limit_batched_tasks() -> usize { usize::MAX } +fn default_limit_batched_tasks_total_size() -> u64 { + u64::MAX +} + fn default_snapshot_dir() -> PathBuf { PathBuf::from(DEFAULT_SNAPSHOT_DIR) }