From 71b50853dc502d576bc3a0a6f7f4eebb3872a8b0 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 26 Oct 2022 11:41:59 +0200 Subject: [PATCH] Introduce an options struct to create the IndexScheduler --- index-scheduler/src/lib.rs | 149 +++++++++++++++++++------------------ 1 file changed, 78 insertions(+), 71 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 3853478f9..f461b274b 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -190,6 +190,46 @@ mod db_name { pub const FINISHED_AT: &str = "finished-at"; } +#[cfg(test)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Breakpoint { + Start, + BatchCreated, + BeforeProcessing, + AfterProcessing, + AbortedIndexation, + ProcessBatchSucceeded, + ProcessBatchFailed, + InsideProcessBatch, +} + +#[derive(Debug)] +pub struct IndexSchedulerOptions { + /// The path to the version file of Meilisearch. + version_file_path: PathBuf, + /// The path to the folder containing the auth LMDB env. + auth_path: PathBuf, + /// The path to the folder containing the task databases. + tasks_path: PathBuf, + /// The path to the file store containing the files associated to the tasks. + update_file_path: PathBuf, + /// The path to the folder containing meilisearch's indexes. + indexes_path: PathBuf, + /// The path to the folder containing the snapshots. + snapshots_path: PathBuf, + /// The path to the folder containing the dumps. + dumps_path: PathBuf, + /// The maximum size, in bytes, of each meilisearch index. + task_db_size: usize, + /// The maximum size, in bytes, of the tasks index. + index_size: usize, + /// Configuration used during indexing for each meilisearch index. + indexer_config: IndexerConfig, + /// Set to `true` iff the index scheduler is allowed to automatically + /// batch tasks together, to process multiple tasks at once. + autobatching_enabled: bool, +} + /// Structure which holds meilisearch's indexes and schedules the tasks /// to be performed on them. pub struct IndexScheduler { @@ -269,6 +309,7 @@ pub struct IndexScheduler { /// A counter that is incremented before every call to [`tick`](IndexScheduler::tick) run_loop_iteration: Arc>, } + impl IndexScheduler { fn private_clone(&self) -> IndexScheduler { IndexScheduler { @@ -300,62 +341,24 @@ impl IndexScheduler { } } -#[cfg(test)] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Breakpoint { - Start, - BatchCreated, - BeforeProcessing, - AfterProcessing, - AbortedIndexation, - ProcessBatchSucceeded, - ProcessBatchFailed, - InsideProcessBatch, -} - impl IndexScheduler { - // TODO create a struct of options with a documented field for each required option instead /// Create an index scheduler and start its run loop. - /// - /// ## Arguments - /// - `version_file_path`: the path to the version file of Meilisearch - /// - `auth_path`: the path to the folder containing the auth LMDB env - /// - `tasks_path`: the path to the folder containing the task databases - /// - `update_file_path`: the path to the file store containing the files associated to the tasks - /// - `indexes_path`: the path to the folder containing meilisearch's indexes - /// - `snapshots_path`: the path to the folder containing the snapshots - /// - `dumps_path`: the path to the folder containing the dumps - /// - `index_size`: the maximum size, in bytes, of each meilisearch index - /// - `indexer_config`: configuration used during indexing for each meilisearch index - /// - `autobatching_enabled`: `true` iff the index scheduler is allowed to automatically batch tasks - /// together, to process multiple tasks at once. #[allow(clippy::too_many_arguments)] pub fn new( - version_file_path: PathBuf, - auth_path: PathBuf, - tasks_path: PathBuf, - update_file_path: PathBuf, - indexes_path: PathBuf, - snapshots_path: PathBuf, - dumps_path: PathBuf, - task_db_size: usize, - index_size: usize, - indexer_config: IndexerConfig, - autobatching_enabled: bool, + options: IndexSchedulerOptions, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, ) -> Result { - std::fs::create_dir_all(&tasks_path)?; - std::fs::create_dir_all(&update_file_path)?; - std::fs::create_dir_all(&indexes_path)?; - std::fs::create_dir_all(&dumps_path)?; + std::fs::create_dir_all(&options.tasks_path)?; + std::fs::create_dir_all(&options.update_file_path)?; + std::fs::create_dir_all(&options.indexes_path)?; + std::fs::create_dir_all(&options.dumps_path)?; - let mut options = heed::EnvOpenOptions::new(); - options.max_dbs(9); - options.map_size(task_db_size); - - let env = options.open(tasks_path)?; - let file_store = FileStore::new(&update_file_path)?; + let env = heed::EnvOpenOptions::new() + .max_dbs(9) + .map_size(options.task_db_size) + .open(options.tasks_path)?; + let file_store = FileStore::new(&options.update_file_path)?; // allow unreachable_code to get rids of the warning in the case of a test build. let this = Self { @@ -369,15 +372,20 @@ impl IndexScheduler { enqueued_at: env.create_database(Some(db_name::ENQUEUED_AT))?, started_at: env.create_database(Some(db_name::STARTED_AT))?, finished_at: env.create_database(Some(db_name::FINISHED_AT))?, - index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)?, + index_mapper: IndexMapper::new( + &env, + options.indexes_path, + options.index_size, + options.indexer_config, + )?, env, // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things wake_up: Arc::new(SignalEvent::auto(true)), - autobatching_enabled, - dumps_path, - snapshots_path, - auth_path, - version_file_path, + autobatching_enabled: options.autobatching_enabled, + dumps_path: options.dumps_path, + snapshots_path: options.snapshots_path, + auth_path: options.auth_path, + version_file_path: options.version_file_path, #[cfg(test)] test_breakpoint_sdr, @@ -976,28 +984,27 @@ mod tests { impl IndexScheduler { pub fn test( - autobatching: bool, + autobatching_enabled: bool, planned_failures: Vec<(usize, FailureLocation)>, ) -> (Self, IndexSchedulerHandle) { let tempdir = TempDir::new().unwrap(); let (sender, receiver) = crossbeam::channel::bounded(0); - let index_scheduler = Self::new( - tempdir.path().join(VERSION_FILE_NAME), - tempdir.path().join("auth"), - tempdir.path().join("db_path"), - tempdir.path().join("file_store"), - tempdir.path().join("indexes"), - tempdir.path().join("snapshots"), - tempdir.path().join("dumps"), - 1024 * 1024, - 1024 * 1024, - IndexerConfig::default(), - autobatching, // enable autobatching - sender, - planned_failures, - ) - .unwrap(); + let options = IndexSchedulerOptions { + version_file_path: tempdir.path().join(VERSION_FILE_NAME), + auth_path: tempdir.path().join("auth"), + tasks_path: tempdir.path().join("db_path"), + update_file_path: tempdir.path().join("file_store"), + indexes_path: tempdir.path().join("indexes"), + snapshots_path: tempdir.path().join("snapshots"), + dumps_path: tempdir.path().join("dumps"), + task_db_size: 1024 * 1024, // 1 MiB + index_size: 1024 * 1024, // 1 MiB + indexer_config: IndexerConfig::default(), + autobatching_enabled, + }; + + let index_scheduler = Self::new(options, sender, planned_failures).unwrap(); let index_scheduler_handle = IndexSchedulerHandle { _tempdir: tempdir, test_breakpoint_rcv: receiver };