diff --git a/index-scheduler/src/index_mapper/mod.rs b/index-scheduler/src/index_mapper/mod.rs index 00bf023b1..c9c7f9f53 100644 --- a/index-scheduler/src/index_mapper/mod.rs +++ b/index-scheduler/src/index_mapper/mod.rs @@ -135,7 +135,7 @@ impl IndexMapper { index_growth_amount: usize, index_count: usize, enable_mdb_writemap: bool, - indexer_config: IndexerConfig, + indexer_config: Arc, ) -> Result { let mut wtxn = env.write_txn()?; let index_mapping = env.create_database(&mut wtxn, Some(INDEX_MAPPING))?; @@ -150,7 +150,7 @@ impl IndexMapper { index_base_map_size, index_growth_amount, enable_mdb_writemap, - indexer_config: Arc::new(indexer_config), + indexer_config, }) } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 21dd3683c..fb7f1c6e9 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -265,7 +265,7 @@ pub struct IndexSchedulerOptions { /// The number of indexes that can be concurrently opened in memory. pub index_count: usize, /// Configuration used during indexing for each meilisearch index. - pub indexer_config: IndexerConfig, + pub indexer_config: Arc, /// Set to `true` iff the index scheduler is allowed to automatically /// batch tasks together, to process multiple tasks at once. pub autobatching_enabled: bool, @@ -290,7 +290,7 @@ pub struct IndexScheduler { impl IndexScheduler { /// Create an index scheduler and start its run loop. pub fn new( - options: IndexSchedulerOptions, + options: Arc, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, ) -> Result { @@ -898,6 +898,9 @@ pub struct IndexSchedulerInner { /// The LMDB environment which the DBs are associated with. pub(crate) env: Env, + /// The options to open an IndexScheduler. + pub(crate) options: Arc, + /// A boolean that can be set to true to stop the currently processing tasks. pub(crate) must_stop_processing: MustStopProcessing, @@ -982,7 +985,7 @@ pub struct IndexSchedulerInner { impl IndexSchedulerInner { fn new( - options: IndexSchedulerOptions, + options: Arc, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, ) -> Result { @@ -1015,7 +1018,7 @@ impl IndexSchedulerInner { let env = heed::EnvOpenOptions::new() .max_dbs(11) .map_size(budget.task_db_size) - .open(options.tasks_path)?; + .open(&options.tasks_path)?; let features = features::FeatureData::new(&env, options.instance_features)?; @@ -1047,23 +1050,24 @@ impl IndexSchedulerInner { finished_at, index_mapper: IndexMapper::new( &env, - options.indexes_path, + options.indexes_path.clone(), budget.map_size, options.index_growth_amount, budget.index_count, options.enable_mdb_writemap, - options.indexer_config, + options.indexer_config.clone(), )?, env, // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things wake_up: Arc::new(SignalEvent::auto(true)), autobatching_enabled: options.autobatching_enabled, max_number_of_tasks: options.max_number_of_tasks, - dumps_path: options.dumps_path, - snapshots_path: options.snapshots_path, - auth_path: options.auth_path, - version_file_path: options.version_file_path, - zookeeper: options.zookeeper, + dumps_path: options.dumps_path.clone(), + snapshots_path: options.snapshots_path.clone(), + auth_path: options.auth_path.clone(), + version_file_path: options.version_file_path.clone(), + zookeeper: options.zookeeper.clone(), + options, #[cfg(test)] test_breakpoint_sdr, #[cfg(test)] diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index bb2ff0db1..c9fd4344a 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -224,7 +224,7 @@ fn open_or_create_database_unchecked( // wrap our two builders in a closure that'll be executed later. let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, zookeeper.clone()); let instance_features = opt.to_instance_features(); - let index_scheduler = IndexScheduler::new(IndexSchedulerOptions { + let index_scheduler = IndexScheduler::new(Arc::new(IndexSchedulerOptions { version_file_path: opt.db_path.join(VERSION_FILE_NAME), auth_path: opt.db_path.join("auth"), tasks_path: opt.db_path.join("tasks"), @@ -235,14 +235,14 @@ fn open_or_create_database_unchecked( task_db_size: opt.max_task_db_size.get_bytes() as usize, index_base_map_size: opt.max_index_size.get_bytes() as usize, enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, - indexer_config: (&opt.indexer_options).try_into()?, + indexer_config: (&opt.indexer_options).try_into().map(Arc::new)?, autobatching_enabled: true, max_number_of_tasks: 1_000_000, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, index_count: DEFAULT_INDEX_COUNT, instance_features, zookeeper: zookeeper.clone(), - }) + })) .map_err(anyhow::Error::from); match ( diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index 86c3d3493..eea2637c3 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -557,7 +557,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { Ok(Self { log_every_n: Some(DEFAULT_LOG_EVERY_N), max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize), - thread_pool: Some(thread_pool), + thread_pool: Some(Arc::new(thread_pool)), max_positions_per_attributes: None, skip_index_budget: other.skip_index_budget, ..Default::default() diff --git a/milli/src/update/indexer_config.rs b/milli/src/update/indexer_config.rs index ff7942fdb..77586e641 100644 --- a/milli/src/update/indexer_config.rs +++ b/milli/src/update/indexer_config.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use grenad::CompressionType; use rayon::ThreadPool; @@ -9,7 +11,7 @@ pub struct IndexerConfig { pub max_memory: Option, pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, - pub thread_pool: Option, + pub thread_pool: Option>, pub max_positions_per_attributes: Option, pub skip_index_budget: bool, }