From d0bdff7b7b64b981f30233765ef1c4d87ce1c9a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 9 Jan 2025 11:59:35 +0100 Subject: [PATCH] Make the batched tasks size limit effectively work --- crates/index-scheduler/src/lib.rs | 3 ++ .../src/scheduler/create_batch.rs | 31 ++++++++++++------- crates/index-scheduler/src/scheduler/mod.rs | 5 +++ crates/index-scheduler/src/test_utils.rs | 1 + .../src/analytics/segment_analytics.rs | 2 +- crates/meilisearch/src/lib.rs | 1 + crates/meilisearch/src/option.rs | 6 ++-- 7 files changed, 34 insertions(+), 15 deletions(-) diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index d5b12e99f..7ee0aa80c 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -115,6 +115,9 @@ pub struct IndexSchedulerOptions { /// If the autobatcher is allowed to automatically batch tasks /// it will only batch this defined number of tasks at once. pub max_number_of_batched_tasks: usize, + /// If the autobatcher is allowed to automatically batch tasks + /// it will only batch this defined maximum size (in bytes) of tasks at once. + pub batched_tasks_size_limit: u64, /// The experimental features enabled for this instance. pub instance_features: InstanceTogglableFeatures, } diff --git a/crates/index-scheduler/src/scheduler/create_batch.rs b/crates/index-scheduler/src/scheduler/create_batch.rs index e9755c1a7..b224ee6eb 100644 --- a/crates/index-scheduler/src/scheduler/create_batch.rs +++ b/crates/index-scheduler/src/scheduler/create_batch.rs @@ -497,17 +497,26 @@ impl IndexScheduler { 1 }; - let enqueued = index_tasks - .into_iter() - .take(tasks_limit) - .map(|task_id| { - self.queue - .tasks - .get_task(rtxn, task_id) - .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) - .map(|task| (task.uid, task.kind)) - }) - .collect::>>()?; + let mut enqueued = Vec::new(); + let mut total_size: u64 = 0; + for task_id in index_tasks.into_iter().take(tasks_limit) { + let task = self + .queue + .tasks + .get_task(rtxn, task_id) + .and_then(|task| task.ok_or(Error::CorruptedTaskQueue))?; + + if let Some(uuid) = task.content_uuid() { + let content_size = self.queue.file_store.compute_size(uuid)?; + total_size = total_size.saturating_add(content_size); + } + + if total_size > self.scheduler.batched_tasks_size_limit && !enqueued.is_empty() { + break; + } + + enqueued.push((task.uid, task.kind)); + } if let Some((batchkind, create_index)) = autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref()) diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index 2c76e2f38..2d20c4d55 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -60,6 +60,9 @@ pub struct Scheduler { /// The maximum number of tasks that will be batched together. pub(crate) max_number_of_batched_tasks: usize, + /// The maximum size, in bytes, of tasks in a batch. + pub(crate) batched_tasks_size_limit: u64, + /// The path used to create the dumps. pub(crate) dumps_path: PathBuf, @@ -80,6 +83,7 @@ impl Scheduler { wake_up: self.wake_up.clone(), autobatching_enabled: self.autobatching_enabled, max_number_of_batched_tasks: self.max_number_of_batched_tasks, + batched_tasks_size_limit: self.batched_tasks_size_limit, dumps_path: self.dumps_path.clone(), snapshots_path: self.snapshots_path.clone(), auth_path: self.auth_path.clone(), @@ -94,6 +98,7 @@ impl Scheduler { wake_up: Arc::new(SignalEvent::auto(true)), autobatching_enabled: options.autobatching_enabled, max_number_of_batched_tasks: options.max_number_of_batched_tasks, + batched_tasks_size_limit: options.batched_tasks_size_limit, dumps_path: options.dumps_path.clone(), snapshots_path: options.snapshots_path.clone(), auth_path: options.auth_path.clone(), diff --git a/crates/index-scheduler/src/test_utils.rs b/crates/index-scheduler/src/test_utils.rs index f4779eea9..4be944037 100644 --- a/crates/index-scheduler/src/test_utils.rs +++ b/crates/index-scheduler/src/test_utils.rs @@ -107,6 +107,7 @@ impl IndexScheduler { cleanup_enabled: true, max_number_of_tasks: 1_000_000, max_number_of_batched_tasks: usize::MAX, + batched_tasks_size_limit: u64::MAX, instance_features: Default::default(), }; configuration(&mut options); diff --git a/crates/meilisearch/src/analytics/segment_analytics.rs b/crates/meilisearch/src/analytics/segment_analytics.rs index 57f746581..eef805e27 100644 --- a/crates/meilisearch/src/analytics/segment_analytics.rs +++ b/crates/meilisearch/src/analytics/segment_analytics.rs @@ -194,7 +194,7 @@ struct Infos { experimental_enable_logs_route: bool, experimental_reduce_indexing_memory_usage: bool, experimental_max_number_of_batched_tasks: usize, - experimental_limit_batched_tasks_total_size: usize, + experimental_limit_batched_tasks_total_size: u64, gpu_enabled: bool, db_path: bool, import_dump: bool, diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 3ea8c06c6..e01447b36 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -312,6 +312,7 @@ fn open_or_create_database_unchecked( cleanup_enabled: !opt.experimental_replication_parameters, max_number_of_tasks: 1_000_000, max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, + batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize, index_count: DEFAULT_INDEX_COUNT, instance_features, diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index e95985d53..f686ddef7 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -436,7 +436,7 @@ pub struct Opt { /// see: #[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, default_value_t = default_limit_batched_tasks_total_size())] #[serde(default = "default_limit_batched_tasks_total_size")] - pub experimental_limit_batched_tasks_total_size: usize, + pub experimental_limit_batched_tasks_total_size: u64, #[serde(flatten)] #[clap(flatten)] @@ -931,8 +931,8 @@ fn default_limit_batched_tasks() -> usize { usize::MAX } -fn default_limit_batched_tasks_total_size() -> usize { - usize::MAX +fn default_limit_batched_tasks_total_size() -> u64 { + u64::MAX } fn default_snapshot_dir() -> PathBuf {