Add the options into the IndexScheduler

This commit is contained in:
Kerollmops 2023-09-04 16:38:05 +02:00
parent 966cbdab69
commit 76657af1f9
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
5 changed files with 24 additions and 18 deletions

View File

@ -135,7 +135,7 @@ impl IndexMapper {
index_growth_amount: usize,
index_count: usize,
enable_mdb_writemap: bool,
indexer_config: IndexerConfig,
indexer_config: Arc<IndexerConfig>,
) -> Result<Self> {
let mut wtxn = env.write_txn()?;
let index_mapping = env.create_database(&mut wtxn, Some(INDEX_MAPPING))?;
@ -150,7 +150,7 @@ impl IndexMapper {
index_base_map_size,
index_growth_amount,
enable_mdb_writemap,
indexer_config: Arc::new(indexer_config),
indexer_config,
})
}

View File

@ -265,7 +265,7 @@ pub struct IndexSchedulerOptions {
/// The number of indexes that can be concurrently opened in memory.
pub index_count: usize,
/// Configuration used during indexing for each meilisearch index.
pub indexer_config: IndexerConfig,
pub indexer_config: Arc<IndexerConfig>,
/// Set to `true` iff the index scheduler is allowed to automatically
/// batch tasks together, to process multiple tasks at once.
pub autobatching_enabled: bool,
@ -290,7 +290,7 @@ pub struct IndexScheduler {
impl IndexScheduler {
/// Create an index scheduler and start its run loop.
pub fn new(
options: IndexSchedulerOptions,
options: Arc<IndexSchedulerOptions>,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> {
@ -898,6 +898,9 @@ pub struct IndexSchedulerInner {
/// The LMDB environment which the DBs are associated with.
pub(crate) env: Env,
/// The options to open an IndexScheduler.
pub(crate) options: Arc<IndexSchedulerOptions>,
/// A boolean that can be set to true to stop the currently processing tasks.
pub(crate) must_stop_processing: MustStopProcessing,
@ -982,7 +985,7 @@ pub struct IndexSchedulerInner {
impl IndexSchedulerInner {
fn new(
options: IndexSchedulerOptions,
options: Arc<IndexSchedulerOptions>,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> {
@ -1015,7 +1018,7 @@ impl IndexSchedulerInner {
let env = heed::EnvOpenOptions::new()
.max_dbs(11)
.map_size(budget.task_db_size)
.open(options.tasks_path)?;
.open(&options.tasks_path)?;
let features = features::FeatureData::new(&env, options.instance_features)?;
@ -1047,23 +1050,24 @@ impl IndexSchedulerInner {
finished_at,
index_mapper: IndexMapper::new(
&env,
options.indexes_path,
options.indexes_path.clone(),
budget.map_size,
options.index_growth_amount,
budget.index_count,
options.enable_mdb_writemap,
options.indexer_config,
options.indexer_config.clone(),
)?,
env,
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled: options.autobatching_enabled,
max_number_of_tasks: options.max_number_of_tasks,
dumps_path: options.dumps_path,
snapshots_path: options.snapshots_path,
auth_path: options.auth_path,
version_file_path: options.version_file_path,
zookeeper: options.zookeeper,
dumps_path: options.dumps_path.clone(),
snapshots_path: options.snapshots_path.clone(),
auth_path: options.auth_path.clone(),
version_file_path: options.version_file_path.clone(),
zookeeper: options.zookeeper.clone(),
options,
#[cfg(test)]
test_breakpoint_sdr,
#[cfg(test)]

View File

@ -224,7 +224,7 @@ fn open_or_create_database_unchecked(
// wrap our two builders in a closure that'll be executed later.
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, zookeeper.clone());
let instance_features = opt.to_instance_features();
let index_scheduler = IndexScheduler::new(IndexSchedulerOptions {
let index_scheduler = IndexScheduler::new(Arc::new(IndexSchedulerOptions {
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
auth_path: opt.db_path.join("auth"),
tasks_path: opt.db_path.join("tasks"),
@ -235,14 +235,14 @@ fn open_or_create_database_unchecked(
task_db_size: opt.max_task_db_size.get_bytes() as usize,
index_base_map_size: opt.max_index_size.get_bytes() as usize,
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
indexer_config: (&opt.indexer_options).try_into()?,
indexer_config: (&opt.indexer_options).try_into().map(Arc::new)?,
autobatching_enabled: true,
max_number_of_tasks: 1_000_000,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT,
instance_features,
zookeeper: zookeeper.clone(),
})
}))
.map_err(anyhow::Error::from);
match (

View File

@ -557,7 +557,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
Ok(Self {
log_every_n: Some(DEFAULT_LOG_EVERY_N),
max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize),
thread_pool: Some(thread_pool),
thread_pool: Some(Arc::new(thread_pool)),
max_positions_per_attributes: None,
skip_index_budget: other.skip_index_budget,
..Default::default()

View File

@ -1,3 +1,5 @@
use std::sync::Arc;
use grenad::CompressionType;
use rayon::ThreadPool;
@ -9,7 +11,7 @@ pub struct IndexerConfig {
pub max_memory: Option<usize>,
pub chunk_compression_type: CompressionType,
pub chunk_compression_level: Option<u32>,
pub thread_pool: Option<ThreadPool>,
pub thread_pool: Option<Arc<ThreadPool>>,
pub max_positions_per_attributes: Option<u32>,
pub skip_index_budget: bool,
}