Merge pull request #4249 from meilisearch/flag-limit-batch-size

Introduce parameters to limit the number of batched tasks
This commit is contained in:
Clément Renault 2023-12-13 10:32:14 +01:00 committed by GitHub
commit 005800634d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 40 additions and 5 deletions

View File

@ -129,3 +129,6 @@ experimental_enable_metrics = false
# Experimental RAM reduction during indexing, do not use in production, see: <https://github.com/meilisearch/product/discussions/652> # Experimental RAM reduction during indexing, do not use in production, see: <https://github.com/meilisearch/product/discussions/652>
experimental_reduce_indexing_memory_usage = false experimental_reduce_indexing_memory_usage = false
# Experimentally reduces the maximum number of tasks that will be processed at once, see: <https://github.com/orgs/meilisearch/discussions/713>
# experimental_max_number_of_batched_tasks = 100

View File

@ -584,7 +584,9 @@ impl IndexScheduler {
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued; let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
// If autobatching is disabled we only take one task at a time. // If autobatching is disabled we only take one task at a time.
let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 }; // Otherwise, we take only a maximum of tasks to create batches.
let tasks_limit =
if self.autobatching_enabled { self.max_number_of_batched_tasks } else { 1 };
let enqueued = index_tasks let enqueued = index_tasks
.into_iter() .into_iter()

View File

@ -30,6 +30,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
index_mapper, index_mapper,
features: _, features: _,
max_number_of_tasks: _, max_number_of_tasks: _,
max_number_of_batched_tasks: _,
puffin_frame: _, puffin_frame: _,
wake_up: _, wake_up: _,
dumps_path: _, dumps_path: _,

View File

@ -258,6 +258,9 @@ pub struct IndexSchedulerOptions {
/// The maximum number of tasks stored in the task queue before starting /// The maximum number of tasks stored in the task queue before starting
/// to auto schedule task deletions. /// to auto schedule task deletions.
pub max_number_of_tasks: usize, pub max_number_of_tasks: usize,
/// If the autobatcher is allowed to automatically batch tasks
/// it will only batch this defined number of tasks at once.
pub max_number_of_batched_tasks: usize,
/// The experimental features enabled for this instance. /// The experimental features enabled for this instance.
pub instance_features: InstanceTogglableFeatures, pub instance_features: InstanceTogglableFeatures,
} }
@ -316,6 +319,9 @@ pub struct IndexScheduler {
/// the finished tasks automatically. /// the finished tasks automatically.
pub(crate) max_number_of_tasks: usize, pub(crate) max_number_of_tasks: usize,
/// The maximum number of tasks that will be batched together.
pub(crate) max_number_of_batched_tasks: usize,
/// A frame to output the indexation profiling files to disk. /// A frame to output the indexation profiling files to disk.
pub(crate) puffin_frame: Arc<puffin::GlobalFrameView>, pub(crate) puffin_frame: Arc<puffin::GlobalFrameView>,
@ -373,6 +379,7 @@ impl IndexScheduler {
wake_up: self.wake_up.clone(), wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled, autobatching_enabled: self.autobatching_enabled,
max_number_of_tasks: self.max_number_of_tasks, max_number_of_tasks: self.max_number_of_tasks,
max_number_of_batched_tasks: self.max_number_of_batched_tasks,
puffin_frame: self.puffin_frame.clone(), puffin_frame: self.puffin_frame.clone(),
snapshots_path: self.snapshots_path.clone(), snapshots_path: self.snapshots_path.clone(),
dumps_path: self.dumps_path.clone(), dumps_path: self.dumps_path.clone(),
@ -471,6 +478,7 @@ impl IndexScheduler {
puffin_frame: Arc::new(puffin::GlobalFrameView::default()), puffin_frame: Arc::new(puffin::GlobalFrameView::default()),
autobatching_enabled: options.autobatching_enabled, autobatching_enabled: options.autobatching_enabled,
max_number_of_tasks: options.max_number_of_tasks, max_number_of_tasks: options.max_number_of_tasks,
max_number_of_batched_tasks: options.max_number_of_batched_tasks,
dumps_path: options.dumps_path, dumps_path: options.dumps_path,
snapshots_path: options.snapshots_path, snapshots_path: options.snapshots_path,
auth_path: options.auth_path, auth_path: options.auth_path,
@ -1638,6 +1646,7 @@ mod tests {
indexer_config, indexer_config,
autobatching_enabled: true, autobatching_enabled: true,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: usize::MAX,
instance_features: Default::default(), instance_features: Default::default(),
}; };
configuration(&mut options); configuration(&mut options);

View File

@ -251,6 +251,7 @@ struct Infos {
env: String, env: String,
experimental_enable_metrics: bool, experimental_enable_metrics: bool,
experimental_reduce_indexing_memory_usage: bool, experimental_reduce_indexing_memory_usage: bool,
experimental_max_number_of_batched_tasks: usize,
db_path: bool, db_path: bool,
import_dump: bool, import_dump: bool,
dump_dir: bool, dump_dir: bool,
@ -285,6 +286,7 @@ impl From<Opt> for Infos {
db_path, db_path,
experimental_enable_metrics, experimental_enable_metrics,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
experimental_max_number_of_batched_tasks,
http_addr, http_addr,
master_key: _, master_key: _,
env, env,
@ -340,6 +342,7 @@ impl From<Opt> for Infos {
ignore_snapshot_if_db_exists, ignore_snapshot_if_db_exists,
http_addr: http_addr != default_http_addr(), http_addr: http_addr != default_http_addr(),
http_payload_size_limit, http_payload_size_limit,
experimental_max_number_of_batched_tasks,
log_level: log_level.to_string(), log_level: log_level.to_string(),
max_indexing_memory, max_indexing_memory,
max_indexing_threads, max_indexing_threads,

View File

@ -234,6 +234,7 @@ fn open_or_create_database_unchecked(
indexer_config: (&opt.indexer_options).try_into()?, indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true, autobatching_enabled: true,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT, index_count: DEFAULT_INDEX_COUNT,
instance_features, instance_features,

View File

@ -51,6 +51,8 @@ const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL";
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS"; const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str = const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
"MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE"; "MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE";
const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str =
"MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS";
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml"; const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
const DEFAULT_DB_PATH: &str = "./data.ms"; const DEFAULT_DB_PATH: &str = "./data.ms";
@ -301,6 +303,11 @@ pub struct Opt {
#[serde(default)] #[serde(default)]
pub experimental_reduce_indexing_memory_usage: bool, pub experimental_reduce_indexing_memory_usage: bool,
/// Experimentally reduces the maximum number of tasks that will be processed at once, see: <https://github.com/orgs/meilisearch/discussions/713>
#[clap(long, env = MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS, default_value_t = default_limit_batched_tasks())]
#[serde(default = "default_limit_batched_tasks")]
pub experimental_max_number_of_batched_tasks: usize,
#[serde(flatten)] #[serde(flatten)]
#[clap(flatten)] #[clap(flatten)]
pub indexer_options: IndexerOpts, pub indexer_options: IndexerOpts,
@ -371,6 +378,7 @@ impl Opt {
max_index_size: _, max_index_size: _,
max_task_db_size: _, max_task_db_size: _,
http_payload_size_limit, http_payload_size_limit,
experimental_max_number_of_batched_tasks,
ssl_cert_path, ssl_cert_path,
ssl_key_path, ssl_key_path,
ssl_auth_path, ssl_auth_path,
@ -392,8 +400,8 @@ impl Opt {
config_file_path: _, config_file_path: _,
#[cfg(feature = "analytics")] #[cfg(feature = "analytics")]
no_analytics, no_analytics,
experimental_enable_metrics: enable_metrics_route, experimental_enable_metrics,
experimental_reduce_indexing_memory_usage: reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
} = self; } = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@ -409,6 +417,10 @@ impl Opt {
MEILI_HTTP_PAYLOAD_SIZE_LIMIT, MEILI_HTTP_PAYLOAD_SIZE_LIMIT,
http_payload_size_limit.to_string(), http_payload_size_limit.to_string(),
); );
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS,
experimental_max_number_of_batched_tasks.to_string(),
);
if let Some(ssl_cert_path) = ssl_cert_path { if let Some(ssl_cert_path) = ssl_cert_path {
export_to_env_if_not_present(MEILI_SSL_CERT_PATH, ssl_cert_path); export_to_env_if_not_present(MEILI_SSL_CERT_PATH, ssl_cert_path);
} }
@ -433,11 +445,11 @@ impl Opt {
export_to_env_if_not_present(MEILI_LOG_LEVEL, log_level.to_string()); export_to_env_if_not_present(MEILI_LOG_LEVEL, log_level.to_string());
export_to_env_if_not_present( export_to_env_if_not_present(
MEILI_EXPERIMENTAL_ENABLE_METRICS, MEILI_EXPERIMENTAL_ENABLE_METRICS,
enable_metrics_route.to_string(), experimental_enable_metrics.to_string(),
); );
export_to_env_if_not_present( export_to_env_if_not_present(
MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE, MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE,
reduce_indexing_memory_usage.to_string(), experimental_reduce_indexing_memory_usage.to_string(),
); );
indexer_options.export_to_env(); indexer_options.export_to_env();
} }
@ -727,6 +739,10 @@ fn default_http_payload_size_limit() -> Byte {
Byte::from_str(DEFAULT_HTTP_PAYLOAD_SIZE_LIMIT).unwrap() Byte::from_str(DEFAULT_HTTP_PAYLOAD_SIZE_LIMIT).unwrap()
} }
fn default_limit_batched_tasks() -> usize {
usize::MAX
}
fn default_snapshot_dir() -> PathBuf { fn default_snapshot_dir() -> PathBuf {
PathBuf::from(DEFAULT_SNAPSHOT_DIR) PathBuf::from(DEFAULT_SNAPSHOT_DIR)
} }