diff --git a/meilisearch-http/src/option.rs b/meilisearch-http/src/option.rs index 7284b4588..9fefb4231 100644 --- a/meilisearch-http/src/option.rs +++ b/meilisearch-http/src/option.rs @@ -145,8 +145,8 @@ pub struct Opt { #[clap(long, env = "MEILI_LOG_LEVEL", default_value = "info")] pub log_level: String, - #[serde(skip)] - #[clap(skip)] + #[serde(flatten)] + #[clap(flatten)] pub indexer_options: IndexerOpts, #[serde(flatten)] diff --git a/meilisearch-http/tests/common/server.rs b/meilisearch-http/tests/common/server.rs index dcb4b6266..8ed106dd3 100644 --- a/meilisearch-http/tests/common/server.rs +++ b/meilisearch-http/tests/common/server.rs @@ -152,7 +152,7 @@ pub fn default_settings(dir: impl AsRef) -> Opt { ignore_dump_if_db_exists: false, indexer_options: IndexerOpts { // memory has to be unlimited because several meilisearch are running in test context. - max_memory: MaxMemory::unlimited(), + max_indexing_memory: MaxMemory::unlimited(), ..Default::default() }, log_level: "off".into(), diff --git a/meilisearch-lib/src/options.rs b/meilisearch-lib/src/options.rs index 195576799..bb24d94b0 100644 --- a/meilisearch-lib/src/options.rs +++ b/meilisearch-lib/src/options.rs @@ -1,5 +1,5 @@ use core::fmt; -use std::{convert::TryFrom, ops::Deref, str::FromStr}; +use std::{convert::TryFrom, num::ParseIntError, ops::Deref, str::FromStr}; use byte_unit::{Byte, ByteError}; use clap::Parser; @@ -7,15 +7,17 @@ use milli::{update::IndexerConfig, CompressionType}; use serde::Serialize; use sysinfo::{RefreshKind, System, SystemExt}; -#[derive(Debug, Clone, Parser)] +#[derive(Debug, Clone, Parser, Serialize)] pub struct IndexerOpts { /// The amount of documents to skip before printing /// a log regarding the indexing advancement. - #[clap(long, default_value = "100000")] // 100k + #[serde(skip)] + #[clap(long, default_value = "100000", hide = true)] // 100k pub log_every_n: usize, /// Grenad max number of chunks in bytes. - #[clap(long)] + #[serde(skip)] + #[clap(long, hide = true)] pub max_nb_chunks: Option, /// The maximum amount of memory the indexer will use. It defaults to 2/3 @@ -25,22 +27,30 @@ pub struct IndexerOpts { /// In case the engine is unable to retrieve the available memory the engine will /// try to use the memory it needs but without real limit, this can lead to /// Out-Of-Memory issues and it is recommended to specify the amount of memory to use. - #[clap(long, default_value_t)] - pub max_memory: MaxMemory, + #[clap(long, env = "MEILI_MAX_INDEXING_MEMORY", default_value_t)] + pub max_indexing_memory: MaxMemory, + + /// The maximum number of threads the indexer will use. + /// It defaults to half of the available threads. + #[clap(long, env = "MEILI_MAX_INDEXING_THREADS", default_value_t)] + pub max_indexing_threads: MaxThreads, /// The name of the compression algorithm to use when compressing intermediate /// Grenad chunks while indexing documents. /// /// Choosing a fast algorithm will make the indexing faster but may consume more memory. - #[clap(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] + #[serde(skip)] + #[clap(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"], hide = true)] pub chunk_compression_type: CompressionType, /// The level of compression of the chosen algorithm. - #[clap(long, requires = "chunk-compression-type")] + #[serde(skip)] + #[clap(long, requires = "chunk-compression-type", hide = true)] pub chunk_compression_level: Option, /// Number of parallel jobs for indexing, defaults to # of CPUs. - #[clap(long)] + #[serde(skip)] + #[clap(long, hide = true)] pub indexing_jobs: Option, } @@ -74,13 +84,13 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { fn try_from(other: &IndexerOpts) -> Result { let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(other.indexing_jobs.unwrap_or(num_cpus::get() / 2)) + .num_threads(other.indexing_jobs.unwrap_or(*other.max_indexing_threads)) .build()?; Ok(Self { log_every_n: Some(other.log_every_n), max_nb_chunks: other.max_nb_chunks, - max_memory: (*other.max_memory).map(|b| b.get_bytes() as usize), + max_memory: (*other.max_indexing_memory).map(|b| b.get_bytes() as usize), chunk_compression_type: other.chunk_compression_type, chunk_compression_level: other.chunk_compression_level, thread_pool: Some(thread_pool), @@ -95,7 +105,8 @@ impl Default for IndexerOpts { Self { log_every_n: 100_000, max_nb_chunks: None, - max_memory: MaxMemory::default(), + max_indexing_memory: MaxMemory::default(), + max_indexing_threads: MaxThreads::default(), chunk_compression_type: CompressionType::None, chunk_compression_level: None, indexing_jobs: None, @@ -104,7 +115,7 @@ impl Default for IndexerOpts { } /// A type used to detect the max memory available and use 2/3 of it. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Serialize)] pub struct MaxMemory(Option); impl FromStr for MaxMemory { @@ -159,3 +170,34 @@ fn total_memory_bytes() -> Option { None } } + +#[derive(Debug, Clone, Copy, Serialize)] +pub struct MaxThreads(usize); + +impl FromStr for MaxThreads { + type Err = ParseIntError; + + fn from_str(s: &str) -> Result { + usize::from_str(s).map(Self) + } +} + +impl Default for MaxThreads { + fn default() -> Self { + MaxThreads(num_cpus::get() / 2) + } +} + +impl fmt::Display for MaxThreads { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl Deref for MaxThreads { + type Target = usize; + + fn deref(&self) -> &Self::Target { + &self.0 + } +}