From e41ebd3047fabe57537324c92cf36f74744a2aec Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 22 Jan 2025 15:23:07 +0100 Subject: [PATCH] expose the number of database in the index-scheduler and rewrite the lib.rs to use the value provided in the options instead of a magic number --- crates/index-scheduler/src/features.rs | 18 +++- .../index-scheduler/src/index_mapper/mod.rs | 17 +++- crates/index-scheduler/src/lib.rs | 6 +- crates/index-scheduler/src/queue/batches.rs | 6 ++ crates/index-scheduler/src/queue/mod.rs | 6 ++ crates/index-scheduler/src/queue/tasks.rs | 7 ++ crates/index-scheduler/src/upgrade/mod.rs | 15 ++-- crates/meilisearch/src/lib.rs | 84 ++++++++++--------- 8 files changed, 105 insertions(+), 54 deletions(-) diff --git a/crates/index-scheduler/src/features.rs b/crates/index-scheduler/src/features.rs index e29e52d44..80da67f3e 100644 --- a/crates/index-scheduler/src/features.rs +++ b/crates/index-scheduler/src/features.rs @@ -7,7 +7,12 @@ use meilisearch_types::heed::{Database, Env, RwTxn}; use crate::error::FeatureNotEnabledError; use crate::Result; -const EXPERIMENTAL_FEATURES: &str = "experimental-features"; +/// The number of database used by features +const NUMBER_OF_DATABASES: u32 = 1; +/// Database const names for the `FeatureData`. +mod db_name { + pub const EXPERIMENTAL_FEATURES: &str = "experimental-features"; +} #[derive(Clone)] pub(crate) struct FeatureData { @@ -84,14 +89,19 @@ impl RoFeatures { } impl FeatureData { + pub(crate) const fn nb_db() -> u32 { + NUMBER_OF_DATABASES + } + pub fn new(env: &Env, instance_features: InstanceTogglableFeatures) -> Result { let mut wtxn = env.write_txn()?; - let runtime_features_db = env.create_database(&mut wtxn, Some(EXPERIMENTAL_FEATURES))?; + let runtime_features_db = + env.create_database(&mut wtxn, Some(db_name::EXPERIMENTAL_FEATURES))?; wtxn.commit()?; let txn = env.read_txn()?; let persisted_features: RuntimeTogglableFeatures = - runtime_features_db.get(&txn, EXPERIMENTAL_FEATURES)?.unwrap_or_default(); + runtime_features_db.get(&txn, db_name::EXPERIMENTAL_FEATURES)?.unwrap_or_default(); let InstanceTogglableFeatures { metrics, logs_route, contains_filter } = instance_features; let runtime = Arc::new(RwLock::new(RuntimeTogglableFeatures { metrics: metrics || persisted_features.metrics, @@ -108,7 +118,7 @@ impl FeatureData { mut wtxn: RwTxn, features: RuntimeTogglableFeatures, ) -> Result<()> { - self.persisted.put(&mut wtxn, EXPERIMENTAL_FEATURES, &features)?; + self.persisted.put(&mut wtxn, db_name::EXPERIMENTAL_FEATURES, &features)?; wtxn.commit()?; // safe to unwrap, the lock will only fail if: diff --git a/crates/index-scheduler/src/index_mapper/mod.rs b/crates/index-scheduler/src/index_mapper/mod.rs index 77cccf9b1..cc5e616ed 100644 --- a/crates/index-scheduler/src/index_mapper/mod.rs +++ b/crates/index-scheduler/src/index_mapper/mod.rs @@ -20,8 +20,13 @@ use crate::{Error, IndexBudget, IndexSchedulerOptions, Result}; mod index_map; -const INDEX_MAPPING: &str = "index-mapping"; -const INDEX_STATS: &str = "index-stats"; +/// The number of database used by index mapper +const NUMBER_OF_DATABASES: u32 = 2; +/// Database const names for the `IndexMapper`. +mod db_name { + pub const INDEX_MAPPING: &str = "index-mapping"; + pub const INDEX_STATS: &str = "index-stats"; +} /// Structure managing meilisearch's indexes. /// @@ -138,6 +143,10 @@ impl IndexStats { } impl IndexMapper { + pub(crate) const fn nb_db() -> u32 { + NUMBER_OF_DATABASES + } + pub fn new( env: &Env, wtxn: &mut RwTxn, @@ -146,8 +155,8 @@ impl IndexMapper { ) -> Result { Ok(Self { index_map: Arc::new(RwLock::new(IndexMap::new(budget.index_count))), - index_mapping: env.create_database(wtxn, Some(INDEX_MAPPING))?, - index_stats: env.create_database(wtxn, Some(INDEX_STATS))?, + index_mapping: env.create_database(wtxn, Some(db_name::INDEX_MAPPING))?, + index_stats: env.create_database(wtxn, Some(db_name::INDEX_STATS))?, base_path: options.indexes_path.clone(), index_base_map_size: budget.map_size, index_growth_amount: options.index_growth_amount, diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index b423c47d4..2c7b3e075 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -197,6 +197,10 @@ impl IndexScheduler { } } + pub(crate) const fn nb_db() -> u32 { + Queue::nb_db() + IndexMapper::nb_db() + features::FeatureData::nb_db() + } + /// Create an index scheduler and start its run loop. #[allow(private_interfaces)] // because test_utils is private pub fn new( @@ -232,7 +236,7 @@ impl IndexScheduler { let env = unsafe { heed::EnvOpenOptions::new() - .max_dbs(19) + .max_dbs(Self::nb_db()) .map_size(budget.task_db_size) .open(&options.tasks_path) }?; diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index b45524786..b77602e1e 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -17,6 +17,8 @@ use crate::utils::{ }; use crate::{Error, Result, BEI128}; +/// The number of database used by the batch queue +const NUMBER_OF_DATABASES: u32 = 7; /// Database const names for the `IndexScheduler`. mod db_name { pub const ALL_BATCHES: &str = "all-batches"; @@ -60,6 +62,10 @@ impl BatchQueue { } } + pub(crate) const fn nb_db() -> u32 { + NUMBER_OF_DATABASES + } + pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result { Ok(Self { all_batches: env.create_database(wtxn, Some(db_name::ALL_BATCHES))?, diff --git a/crates/index-scheduler/src/queue/mod.rs b/crates/index-scheduler/src/queue/mod.rs index f97218a21..c6a79fbb2 100644 --- a/crates/index-scheduler/src/queue/mod.rs +++ b/crates/index-scheduler/src/queue/mod.rs @@ -28,6 +28,8 @@ use crate::utils::{ }; use crate::{Error, IndexSchedulerOptions, Result, TaskId}; +/// The number of database used by queue itself +const NUMBER_OF_DATABASES: u32 = 1; /// Database const names for the `IndexScheduler`. mod db_name { pub const BATCH_TO_TASKS_MAPPING: &str = "batch-to-tasks-mapping"; @@ -148,6 +150,10 @@ impl Queue { } } + pub(crate) const fn nb_db() -> u32 { + tasks::TaskQueue::nb_db() + batches::BatchQueue::nb_db() + NUMBER_OF_DATABASES + } + /// Create an index scheduler and start its run loop. pub(crate) fn new( env: &Env, diff --git a/crates/index-scheduler/src/queue/tasks.rs b/crates/index-scheduler/src/queue/tasks.rs index 00e745e71..913ebcb30 100644 --- a/crates/index-scheduler/src/queue/tasks.rs +++ b/crates/index-scheduler/src/queue/tasks.rs @@ -14,9 +14,12 @@ use crate::utils::{ }; use crate::{Error, Result, TaskId, BEI128}; +/// The number of database used by the task queue +const NUMBER_OF_DATABASES: u32 = 8; /// Database const names for the `IndexScheduler`. mod db_name { pub const ALL_TASKS: &str = "all-tasks"; + pub const STATUS: &str = "status"; pub const KIND: &str = "kind"; pub const INDEX_TASKS: &str = "index-tasks"; @@ -61,6 +64,10 @@ impl TaskQueue { } } + pub(crate) const fn nb_db() -> u32 { + NUMBER_OF_DATABASES + } + pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result { Ok(Self { all_tasks: env.create_database(wtxn, Some(db_name::ALL_TASKS))?, diff --git a/crates/index-scheduler/src/upgrade/mod.rs b/crates/index-scheduler/src/upgrade/mod.rs index 989ed6ea5..6881b8b73 100644 --- a/crates/index-scheduler/src/upgrade/mod.rs +++ b/crates/index-scheduler/src/upgrade/mod.rs @@ -8,8 +8,12 @@ use time::OffsetDateTime; use tracing::info; use crate::queue::TaskQueue; +use crate::IndexSchedulerOptions; -pub fn upgrade_task_queue(tasks_path: &Path, from: (u32, u32, u32)) -> anyhow::Result<()> { +pub fn upgrade_task_queue( + opt: &IndexSchedulerOptions, + from: (u32, u32, u32), +) -> anyhow::Result<()> { let current_major: u32 = VERSION_MAJOR.parse().unwrap(); let current_minor: u32 = VERSION_MINOR.parse().unwrap(); let current_patch: u32 = VERSION_PATCH.parse().unwrap(); @@ -40,15 +44,14 @@ pub fn upgrade_task_queue(tasks_path: &Path, from: (u32, u32, u32)) -> anyhow::R info!("Upgrading the task queue"); for (upgrade, upgrade_name) in upgrade_functions[start..].iter() { info!("{upgrade_name}"); - (upgrade)(tasks_path)?; + (upgrade)(&opt.tasks_path)?; } let env = unsafe { heed::EnvOpenOptions::new() - .max_dbs(19) - // Since that's the only database memory-mapped currently we don't need to check the budget yet - .map_size(100 * 1024 * 1024) - .open(tasks_path) + .max_dbs(TaskQueue::nb_db()) + .map_size(opt.task_db_size) + .open(&opt.tasks_path) }?; let mut wtxn = env.write_txn()?; let queue = TaskQueue::new(&env, &mut wtxn)?; diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index f7ef01f97..ef270670b 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -210,13 +210,42 @@ enum OnFailure { } pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc)> { + let index_scheduler_opt = IndexSchedulerOptions { + version_file_path: opt.db_path.join(VERSION_FILE_NAME), + auth_path: opt.db_path.join("auth"), + tasks_path: opt.db_path.join("tasks"), + update_file_path: opt.db_path.join("update_files"), + indexes_path: opt.db_path.join("indexes"), + snapshots_path: opt.snapshot_dir.clone(), + dumps_path: opt.dump_dir.clone(), + webhook_url: opt.task_webhook_url.as_ref().map(|url| url.to_string()), + webhook_authorization_header: opt.task_webhook_authorization_header.clone(), + task_db_size: opt.max_task_db_size.as_u64() as usize, + index_base_map_size: opt.max_index_size.as_u64() as usize, + enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, + indexer_config: Arc::new((&opt.indexer_options).try_into()?), + autobatching_enabled: true, + cleanup_enabled: !opt.experimental_replication_parameters, + max_number_of_tasks: 1_000_000, + max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, + batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size, + index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize, + index_count: DEFAULT_INDEX_COUNT, + instance_features: opt.to_instance_features(), + auto_upgrade: opt.experimental_dumpless_upgrade, + }; + let empty_db = is_empty_db(&opt.db_path); let (index_scheduler, auth_controller) = if let Some(ref snapshot_path) = opt.import_snapshot { let snapshot_path_exists = snapshot_path.exists(); // the db is empty and the snapshot exists, import it if empty_db && snapshot_path_exists { match compression::from_tar_gz(snapshot_path, &opt.db_path) { - Ok(()) => open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?, + Ok(()) => open_or_create_database_unchecked( + opt, + index_scheduler_opt, + OnFailure::RemoveDb, + )?, Err(e) => { std::fs::remove_dir_all(&opt.db_path)?; return Err(e); @@ -233,14 +262,14 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< bail!("snapshot doesn't exist at {}", snapshot_path.display()) // the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag } else { - open_or_create_database(opt, empty_db)? + open_or_create_database(opt, index_scheduler_opt, empty_db)? } } else if let Some(ref path) = opt.import_dump { let src_path_exists = path.exists(); // the db is empty and the dump exists, import it if empty_db && src_path_exists { let (mut index_scheduler, mut auth_controller) = - open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?; + open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::RemoveDb)?; match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) { Ok(()) => (index_scheduler, auth_controller), Err(e) => { @@ -260,10 +289,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< // the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag // or, the dump is missing but we can ignore that because of the ignore_missing_dump flag } else { - open_or_create_database(opt, empty_db)? + open_or_create_database(opt, index_scheduler_opt, empty_db)? } } else { - open_or_create_database(opt, empty_db)? + open_or_create_database(opt, index_scheduler_opt, empty_db)? }; // We create a loop in a thread that registers snapshotCreation tasks @@ -291,38 +320,14 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< /// Try to start the IndexScheduler and AuthController without checking the VERSION file or anything. fn open_or_create_database_unchecked( opt: &Opt, + index_scheduler_opt: IndexSchedulerOptions, on_failure: OnFailure, ) -> anyhow::Result<(IndexScheduler, AuthController)> { // we don't want to create anything in the data.ms yet, thus we // wrap our two builders in a closure that'll be executed later. let auth_controller = AuthController::new(&opt.db_path, &opt.master_key); - let instance_features = opt.to_instance_features(); - let index_scheduler_builder = || -> anyhow::Result<_> { - Ok(IndexScheduler::new(IndexSchedulerOptions { - version_file_path: opt.db_path.join(VERSION_FILE_NAME), - auth_path: opt.db_path.join("auth"), - tasks_path: opt.db_path.join("tasks"), - update_file_path: opt.db_path.join("update_files"), - indexes_path: opt.db_path.join("indexes"), - snapshots_path: opt.snapshot_dir.clone(), - dumps_path: opt.dump_dir.clone(), - webhook_url: opt.task_webhook_url.as_ref().map(|url| url.to_string()), - webhook_authorization_header: opt.task_webhook_authorization_header.clone(), - task_db_size: opt.max_task_db_size.as_u64() as usize, - index_base_map_size: opt.max_index_size.as_u64() as usize, - enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, - indexer_config: Arc::new((&opt.indexer_options).try_into()?), - autobatching_enabled: true, - cleanup_enabled: !opt.experimental_replication_parameters, - max_number_of_tasks: 1_000_000, - max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, - batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size, - index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize, - index_count: DEFAULT_INDEX_COUNT, - instance_features, - auto_upgrade: opt.experimental_dumpless_upgrade, - })?) - }; + let index_scheduler_builder = + || -> anyhow::Result<_> { Ok(IndexScheduler::new(index_scheduler_opt)?) }; match ( index_scheduler_builder(), @@ -341,18 +346,18 @@ fn open_or_create_database_unchecked( /// Ensures Meilisearch version is compatible with the database, returns an error in case of version mismatch. fn check_version_and_update_task_queue( - db_path: &Path, - experimental_dumpless_upgrade: bool, + opt: &Opt, + index_scheduler_opt: &IndexSchedulerOptions, ) -> anyhow::Result<()> { - let (major, minor, patch) = get_version(db_path)?; + let (major, minor, patch) = get_version(&opt.db_path)?; let version_major: u32 = VERSION_MAJOR.parse().unwrap(); let version_minor: u32 = VERSION_MINOR.parse().unwrap(); let version_patch: u32 = VERSION_PATCH.parse().unwrap(); if major != version_major || minor != version_minor || patch > version_patch { - if experimental_dumpless_upgrade { - return upgrade_task_queue(&db_path.join("tasks"), (major, minor, patch)); + if opt.experimental_dumpless_upgrade { + return upgrade_task_queue(index_scheduler_opt, (major, minor, patch)); } else { return Err(VersionFileError::VersionMismatch { major, minor, patch }.into()); } @@ -364,13 +369,14 @@ fn check_version_and_update_task_queue( /// Ensure you're in a valid state and open the IndexScheduler + AuthController for you. fn open_or_create_database( opt: &Opt, + index_scheduler_opt: IndexSchedulerOptions, empty_db: bool, ) -> anyhow::Result<(IndexScheduler, AuthController)> { if !empty_db { - check_version_and_update_task_queue(&opt.db_path, opt.experimental_dumpless_upgrade)?; + check_version_and_update_task_queue(opt, &index_scheduler_opt)?; } - open_or_create_database_unchecked(opt, OnFailure::KeepDb) + open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb) } fn import_dump(