mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-11 05:54:30 +01:00
Introduce an options struct to create the IndexScheduler
This commit is contained in:
parent
7074872a78
commit
71b50853dc
@ -190,6 +190,46 @@ mod db_name {
|
|||||||
pub const FINISHED_AT: &str = "finished-at";
|
pub const FINISHED_AT: &str = "finished-at";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum Breakpoint {
|
||||||
|
Start,
|
||||||
|
BatchCreated,
|
||||||
|
BeforeProcessing,
|
||||||
|
AfterProcessing,
|
||||||
|
AbortedIndexation,
|
||||||
|
ProcessBatchSucceeded,
|
||||||
|
ProcessBatchFailed,
|
||||||
|
InsideProcessBatch,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct IndexSchedulerOptions {
|
||||||
|
/// The path to the version file of Meilisearch.
|
||||||
|
version_file_path: PathBuf,
|
||||||
|
/// The path to the folder containing the auth LMDB env.
|
||||||
|
auth_path: PathBuf,
|
||||||
|
/// The path to the folder containing the task databases.
|
||||||
|
tasks_path: PathBuf,
|
||||||
|
/// The path to the file store containing the files associated to the tasks.
|
||||||
|
update_file_path: PathBuf,
|
||||||
|
/// The path to the folder containing meilisearch's indexes.
|
||||||
|
indexes_path: PathBuf,
|
||||||
|
/// The path to the folder containing the snapshots.
|
||||||
|
snapshots_path: PathBuf,
|
||||||
|
/// The path to the folder containing the dumps.
|
||||||
|
dumps_path: PathBuf,
|
||||||
|
/// The maximum size, in bytes, of each meilisearch index.
|
||||||
|
task_db_size: usize,
|
||||||
|
/// The maximum size, in bytes, of the tasks index.
|
||||||
|
index_size: usize,
|
||||||
|
/// Configuration used during indexing for each meilisearch index.
|
||||||
|
indexer_config: IndexerConfig,
|
||||||
|
/// Set to `true` iff the index scheduler is allowed to automatically
|
||||||
|
/// batch tasks together, to process multiple tasks at once.
|
||||||
|
autobatching_enabled: bool,
|
||||||
|
}
|
||||||
|
|
||||||
/// Structure which holds meilisearch's indexes and schedules the tasks
|
/// Structure which holds meilisearch's indexes and schedules the tasks
|
||||||
/// to be performed on them.
|
/// to be performed on them.
|
||||||
pub struct IndexScheduler {
|
pub struct IndexScheduler {
|
||||||
@ -269,6 +309,7 @@ pub struct IndexScheduler {
|
|||||||
/// A counter that is incremented before every call to [`tick`](IndexScheduler::tick)
|
/// A counter that is incremented before every call to [`tick`](IndexScheduler::tick)
|
||||||
run_loop_iteration: Arc<RwLock<usize>>,
|
run_loop_iteration: Arc<RwLock<usize>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IndexScheduler {
|
impl IndexScheduler {
|
||||||
fn private_clone(&self) -> IndexScheduler {
|
fn private_clone(&self) -> IndexScheduler {
|
||||||
IndexScheduler {
|
IndexScheduler {
|
||||||
@ -300,62 +341,24 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
||||||
pub enum Breakpoint {
|
|
||||||
Start,
|
|
||||||
BatchCreated,
|
|
||||||
BeforeProcessing,
|
|
||||||
AfterProcessing,
|
|
||||||
AbortedIndexation,
|
|
||||||
ProcessBatchSucceeded,
|
|
||||||
ProcessBatchFailed,
|
|
||||||
InsideProcessBatch,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IndexScheduler {
|
impl IndexScheduler {
|
||||||
// TODO create a struct of options with a documented field for each required option instead
|
|
||||||
/// Create an index scheduler and start its run loop.
|
/// Create an index scheduler and start its run loop.
|
||||||
///
|
|
||||||
/// ## Arguments
|
|
||||||
/// - `version_file_path`: the path to the version file of Meilisearch
|
|
||||||
/// - `auth_path`: the path to the folder containing the auth LMDB env
|
|
||||||
/// - `tasks_path`: the path to the folder containing the task databases
|
|
||||||
/// - `update_file_path`: the path to the file store containing the files associated to the tasks
|
|
||||||
/// - `indexes_path`: the path to the folder containing meilisearch's indexes
|
|
||||||
/// - `snapshots_path`: the path to the folder containing the snapshots
|
|
||||||
/// - `dumps_path`: the path to the folder containing the dumps
|
|
||||||
/// - `index_size`: the maximum size, in bytes, of each meilisearch index
|
|
||||||
/// - `indexer_config`: configuration used during indexing for each meilisearch index
|
|
||||||
/// - `autobatching_enabled`: `true` iff the index scheduler is allowed to automatically batch tasks
|
|
||||||
/// together, to process multiple tasks at once.
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn new(
|
pub fn new(
|
||||||
version_file_path: PathBuf,
|
options: IndexSchedulerOptions,
|
||||||
auth_path: PathBuf,
|
|
||||||
tasks_path: PathBuf,
|
|
||||||
update_file_path: PathBuf,
|
|
||||||
indexes_path: PathBuf,
|
|
||||||
snapshots_path: PathBuf,
|
|
||||||
dumps_path: PathBuf,
|
|
||||||
task_db_size: usize,
|
|
||||||
index_size: usize,
|
|
||||||
indexer_config: IndexerConfig,
|
|
||||||
autobatching_enabled: bool,
|
|
||||||
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
|
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
|
||||||
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
|
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
std::fs::create_dir_all(&tasks_path)?;
|
std::fs::create_dir_all(&options.tasks_path)?;
|
||||||
std::fs::create_dir_all(&update_file_path)?;
|
std::fs::create_dir_all(&options.update_file_path)?;
|
||||||
std::fs::create_dir_all(&indexes_path)?;
|
std::fs::create_dir_all(&options.indexes_path)?;
|
||||||
std::fs::create_dir_all(&dumps_path)?;
|
std::fs::create_dir_all(&options.dumps_path)?;
|
||||||
|
|
||||||
let mut options = heed::EnvOpenOptions::new();
|
let env = heed::EnvOpenOptions::new()
|
||||||
options.max_dbs(9);
|
.max_dbs(9)
|
||||||
options.map_size(task_db_size);
|
.map_size(options.task_db_size)
|
||||||
|
.open(options.tasks_path)?;
|
||||||
let env = options.open(tasks_path)?;
|
let file_store = FileStore::new(&options.update_file_path)?;
|
||||||
let file_store = FileStore::new(&update_file_path)?;
|
|
||||||
|
|
||||||
// allow unreachable_code to get rids of the warning in the case of a test build.
|
// allow unreachable_code to get rids of the warning in the case of a test build.
|
||||||
let this = Self {
|
let this = Self {
|
||||||
@ -369,15 +372,20 @@ impl IndexScheduler {
|
|||||||
enqueued_at: env.create_database(Some(db_name::ENQUEUED_AT))?,
|
enqueued_at: env.create_database(Some(db_name::ENQUEUED_AT))?,
|
||||||
started_at: env.create_database(Some(db_name::STARTED_AT))?,
|
started_at: env.create_database(Some(db_name::STARTED_AT))?,
|
||||||
finished_at: env.create_database(Some(db_name::FINISHED_AT))?,
|
finished_at: env.create_database(Some(db_name::FINISHED_AT))?,
|
||||||
index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)?,
|
index_mapper: IndexMapper::new(
|
||||||
|
&env,
|
||||||
|
options.indexes_path,
|
||||||
|
options.index_size,
|
||||||
|
options.indexer_config,
|
||||||
|
)?,
|
||||||
env,
|
env,
|
||||||
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
|
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
|
||||||
wake_up: Arc::new(SignalEvent::auto(true)),
|
wake_up: Arc::new(SignalEvent::auto(true)),
|
||||||
autobatching_enabled,
|
autobatching_enabled: options.autobatching_enabled,
|
||||||
dumps_path,
|
dumps_path: options.dumps_path,
|
||||||
snapshots_path,
|
snapshots_path: options.snapshots_path,
|
||||||
auth_path,
|
auth_path: options.auth_path,
|
||||||
version_file_path,
|
version_file_path: options.version_file_path,
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
test_breakpoint_sdr,
|
test_breakpoint_sdr,
|
||||||
@ -976,28 +984,27 @@ mod tests {
|
|||||||
|
|
||||||
impl IndexScheduler {
|
impl IndexScheduler {
|
||||||
pub fn test(
|
pub fn test(
|
||||||
autobatching: bool,
|
autobatching_enabled: bool,
|
||||||
planned_failures: Vec<(usize, FailureLocation)>,
|
planned_failures: Vec<(usize, FailureLocation)>,
|
||||||
) -> (Self, IndexSchedulerHandle) {
|
) -> (Self, IndexSchedulerHandle) {
|
||||||
let tempdir = TempDir::new().unwrap();
|
let tempdir = TempDir::new().unwrap();
|
||||||
let (sender, receiver) = crossbeam::channel::bounded(0);
|
let (sender, receiver) = crossbeam::channel::bounded(0);
|
||||||
|
|
||||||
let index_scheduler = Self::new(
|
let options = IndexSchedulerOptions {
|
||||||
tempdir.path().join(VERSION_FILE_NAME),
|
version_file_path: tempdir.path().join(VERSION_FILE_NAME),
|
||||||
tempdir.path().join("auth"),
|
auth_path: tempdir.path().join("auth"),
|
||||||
tempdir.path().join("db_path"),
|
tasks_path: tempdir.path().join("db_path"),
|
||||||
tempdir.path().join("file_store"),
|
update_file_path: tempdir.path().join("file_store"),
|
||||||
tempdir.path().join("indexes"),
|
indexes_path: tempdir.path().join("indexes"),
|
||||||
tempdir.path().join("snapshots"),
|
snapshots_path: tempdir.path().join("snapshots"),
|
||||||
tempdir.path().join("dumps"),
|
dumps_path: tempdir.path().join("dumps"),
|
||||||
1024 * 1024,
|
task_db_size: 1024 * 1024, // 1 MiB
|
||||||
1024 * 1024,
|
index_size: 1024 * 1024, // 1 MiB
|
||||||
IndexerConfig::default(),
|
indexer_config: IndexerConfig::default(),
|
||||||
autobatching, // enable autobatching
|
autobatching_enabled,
|
||||||
sender,
|
};
|
||||||
planned_failures,
|
|
||||||
)
|
let index_scheduler = Self::new(options, sender, planned_failures).unwrap();
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let index_scheduler_handle =
|
let index_scheduler_handle =
|
||||||
IndexSchedulerHandle { _tempdir: tempdir, test_breakpoint_rcv: receiver };
|
IndexSchedulerHandle { _tempdir: tempdir, test_breakpoint_rcv: receiver };
|
||||||
|
Loading…
x
Reference in New Issue
Block a user