From 7e259cb0d2e382c8b2a77c1ed56597d9278fb97c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 11 Dec 2023 16:08:39 +0100 Subject: [PATCH] Expose the --max-number-of-batched-tasks argument --- index-scheduler/src/batch.rs | 4 +++- index-scheduler/src/insta_snapshot.rs | 1 + index-scheduler/src/lib.rs | 9 ++++++++ .../src/analytics/segment_analytics.rs | 3 +++ meilisearch/src/lib.rs | 1 + meilisearch/src/option.rs | 23 +++++++++++++++---- 6 files changed, 36 insertions(+), 5 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 9089acb69..94a8b3f07 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -584,7 +584,9 @@ impl IndexScheduler { let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued; // If autobatching is disabled we only take one task at a time. - let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 }; + // Otherwise, we take only a maximum of tasks to create batches. + let tasks_limit = + if self.autobatching_enabled { self.max_number_of_batched_tasks } else { 1 }; let enqueued = index_tasks .into_iter() diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index 885a66f49..bd8fa5148 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -30,6 +30,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { index_mapper, features: _, max_number_of_tasks: _, + max_number_of_batched_tasks: _, puffin_frame: _, wake_up: _, dumps_path: _, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 446db8eae..a1b6497d9 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -258,6 +258,9 @@ pub struct IndexSchedulerOptions { /// The maximum number of tasks stored in the task queue before starting /// to auto schedule task deletions. pub max_number_of_tasks: usize, + /// If the autobatcher is allowed to automatically batch tasks + /// it will only batch this defined number of tasks at once. + pub max_number_of_batched_tasks: usize, /// The experimental features enabled for this instance. pub instance_features: InstanceTogglableFeatures, } @@ -316,6 +319,9 @@ pub struct IndexScheduler { /// the finished tasks automatically. pub(crate) max_number_of_tasks: usize, + /// The maximum number of tasks that will be batched together. + pub(crate) max_number_of_batched_tasks: usize, + /// A frame to output the indexation profiling files to disk. pub(crate) puffin_frame: Arc, @@ -373,6 +379,7 @@ impl IndexScheduler { wake_up: self.wake_up.clone(), autobatching_enabled: self.autobatching_enabled, max_number_of_tasks: self.max_number_of_tasks, + max_number_of_batched_tasks: self.max_number_of_batched_tasks, puffin_frame: self.puffin_frame.clone(), snapshots_path: self.snapshots_path.clone(), dumps_path: self.dumps_path.clone(), @@ -471,6 +478,7 @@ impl IndexScheduler { puffin_frame: Arc::new(puffin::GlobalFrameView::default()), autobatching_enabled: options.autobatching_enabled, max_number_of_tasks: options.max_number_of_tasks, + max_number_of_batched_tasks: options.max_number_of_batched_tasks, dumps_path: options.dumps_path, snapshots_path: options.snapshots_path, auth_path: options.auth_path, @@ -1638,6 +1646,7 @@ mod tests { indexer_config, autobatching_enabled: true, max_number_of_tasks: 1_000_000, + max_number_of_batched_tasks: usize::MAX, instance_features: Default::default(), }; configuration(&mut options); diff --git a/meilisearch/src/analytics/segment_analytics.rs b/meilisearch/src/analytics/segment_analytics.rs index 2f0014ab7..54f5813f0 100644 --- a/meilisearch/src/analytics/segment_analytics.rs +++ b/meilisearch/src/analytics/segment_analytics.rs @@ -263,6 +263,7 @@ struct Infos { ignore_snapshot_if_db_exists: bool, http_addr: bool, http_payload_size_limit: Byte, + max_number_of_batched_tasks: usize, log_level: String, max_indexing_memory: MaxMemory, max_indexing_threads: MaxThreads, @@ -291,6 +292,7 @@ impl From for Infos { max_index_size: _, max_task_db_size: _, http_payload_size_limit, + max_number_of_batched_tasks, ssl_cert_path, ssl_key_path, ssl_auth_path, @@ -340,6 +342,7 @@ impl From for Infos { ignore_snapshot_if_db_exists, http_addr: http_addr != default_http_addr(), http_payload_size_limit, + max_number_of_batched_tasks, log_level: log_level.to_string(), max_indexing_memory, max_indexing_threads, diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 0dba77e08..896375108 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -234,6 +234,7 @@ fn open_or_create_database_unchecked( indexer_config: (&opt.indexer_options).try_into()?, autobatching_enabled: true, max_number_of_tasks: 1_000_000, + max_number_of_batched_tasks: opt.max_number_of_batched_tasks, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, index_count: DEFAULT_INDEX_COUNT, instance_features, diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index b8489c3e3..564a4084c 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -30,6 +30,7 @@ const MEILI_MASTER_KEY: &str = "MEILI_MASTER_KEY"; const MEILI_ENV: &str = "MEILI_ENV"; #[cfg(feature = "analytics")] const MEILI_NO_ANALYTICS: &str = "MEILI_NO_ANALYTICS"; +const MEILI_MAX_NUMBER_OF_BATCHED_TASKS: &str = "MEILI_MAX_NUMBER_OF_BATCHED_TASKS"; const MEILI_HTTP_PAYLOAD_SIZE_LIMIT: &str = "MEILI_HTTP_PAYLOAD_SIZE_LIMIT"; const MEILI_SSL_CERT_PATH: &str = "MEILI_SSL_CERT_PATH"; const MEILI_SSL_KEY_PATH: &str = "MEILI_SSL_KEY_PATH"; @@ -175,6 +176,11 @@ pub struct Opt { #[serde(skip, default = "default_max_task_db_size")] pub max_task_db_size: Byte, + /// Defines the maximum number of tasks that will be processed at once. + #[clap(long, env = MEILI_MAX_NUMBER_OF_BATCHED_TASKS, default_value_t = default_limit_batched_tasks())] + #[serde(default = "default_limit_batched_tasks")] + pub max_number_of_batched_tasks: usize, + /// Sets the maximum size of accepted payloads. Value must be given in bytes or explicitly stating a /// base unit (for instance: 107374182400, '107.7Gb', or '107374 Mb'). #[clap(long, env = MEILI_HTTP_PAYLOAD_SIZE_LIMIT, default_value_t = default_http_payload_size_limit())] @@ -371,6 +377,7 @@ impl Opt { max_index_size: _, max_task_db_size: _, http_payload_size_limit, + max_number_of_batched_tasks, ssl_cert_path, ssl_key_path, ssl_auth_path, @@ -392,8 +399,8 @@ impl Opt { config_file_path: _, #[cfg(feature = "analytics")] no_analytics, - experimental_enable_metrics: enable_metrics_route, - experimental_reduce_indexing_memory_usage: reduce_indexing_memory_usage, + experimental_enable_metrics, + experimental_reduce_indexing_memory_usage, } = self; export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); @@ -409,6 +416,10 @@ impl Opt { MEILI_HTTP_PAYLOAD_SIZE_LIMIT, http_payload_size_limit.to_string(), ); + export_to_env_if_not_present( + MEILI_MAX_NUMBER_OF_BATCHED_TASKS, + max_number_of_batched_tasks.to_string(), + ); if let Some(ssl_cert_path) = ssl_cert_path { export_to_env_if_not_present(MEILI_SSL_CERT_PATH, ssl_cert_path); } @@ -433,11 +444,11 @@ impl Opt { export_to_env_if_not_present(MEILI_LOG_LEVEL, log_level.to_string()); export_to_env_if_not_present( MEILI_EXPERIMENTAL_ENABLE_METRICS, - enable_metrics_route.to_string(), + experimental_enable_metrics.to_string(), ); export_to_env_if_not_present( MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE, - reduce_indexing_memory_usage.to_string(), + experimental_reduce_indexing_memory_usage.to_string(), ); indexer_options.export_to_env(); } @@ -727,6 +738,10 @@ fn default_http_payload_size_limit() -> Byte { Byte::from_str(DEFAULT_HTTP_PAYLOAD_SIZE_LIMIT).unwrap() } +fn default_limit_batched_tasks() -> usize { + usize::MAX +} + fn default_snapshot_dir() -> PathBuf { PathBuf::from(DEFAULT_SNAPSHOT_DIR) }