diff --git a/Cargo.lock b/Cargo.lock index eddb07937..b1a495f82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1647,6 +1647,7 @@ dependencies = [ "siphasher", "slice-group-by", "structopt", + "sysinfo 0.20.0", "tar", "tempdir", "tempfile", @@ -1739,7 +1740,7 @@ dependencies = [ "slice-group-by", "smallstr", "smallvec", - "sysinfo", + "sysinfo 0.19.2", "tempfile", "uuid", ] @@ -2859,6 +2860,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "sysinfo" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0af066e6272f2175c1783cfc2ebf3e2d8dfe2c182b00677fdeccbf8291af83fb" +dependencies = [ + "cfg-if 1.0.0", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "winapi", +] + [[package]] name = "tar" version = "0.4.35" diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index ecb1d9992..e845edc13 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -76,6 +76,7 @@ pin-project = "1.0.7" whoami = { version = "1.1.2", optional = true } reqwest = { version = "0.11.3", features = ["json", "rustls-tls"], default-features = false, optional = true } serdeval = "0.1.0" +sysinfo = "0.20.0" [dev-dependencies] actix-rt = "2.1.0" diff --git a/meilisearch-http/src/index/update_handler.rs b/meilisearch-http/src/index/update_handler.rs index 2017a9fcf..0bca4f9b8 100644 --- a/meilisearch-http/src/index/update_handler.rs +++ b/meilisearch-http/src/index/update_handler.rs @@ -1,8 +1,8 @@ use std::fs::File; use crate::index::Index; -use milli::CompressionType; use milli::update::UpdateBuilder; +use milli::CompressionType; use rayon::ThreadPool; use crate::index_controller::UpdateMeta; @@ -14,7 +14,7 @@ pub struct UpdateHandler { chunk_compression_level: Option, thread_pool: ThreadPool, log_frequency: usize, - max_memory: usize, + max_memory: Option, linked_hash_map_size: usize, chunk_compression_type: CompressionType, chunk_fusing_shrink_size: u64, @@ -25,12 +25,13 @@ impl UpdateHandler { let thread_pool = rayon::ThreadPoolBuilder::new() .num_threads(opt.indexing_jobs.unwrap_or(num_cpus::get() / 2)) .build()?; + Ok(Self { max_nb_chunks: opt.max_nb_chunks, chunk_compression_level: opt.chunk_compression_level, thread_pool, log_frequency: opt.log_every_n, - max_memory: opt.max_memory.get_bytes() as usize, + max_memory: opt.max_memory.map(|m| m.get_bytes() as usize), linked_hash_map_size: opt.linked_hash_map_size, chunk_compression_type: opt.chunk_compression_type, chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(), @@ -48,7 +49,9 @@ impl UpdateHandler { } update_builder.thread_pool(&self.thread_pool); update_builder.log_every_n(self.log_frequency); - update_builder.max_memory(self.max_memory); + if let Some(max_memory) = self.max_memory { + update_builder.max_memory(max_memory); + } update_builder.linked_hash_map_size(self.linked_hash_map_size); update_builder.chunk_compression_type(self.chunk_compression_type); update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size); diff --git a/meilisearch-http/src/option.rs b/meilisearch-http/src/option.rs index d87b98adb..14c9e5a13 100644 --- a/meilisearch-http/src/option.rs +++ b/meilisearch-http/src/option.rs @@ -1,5 +1,9 @@ +use byte_unit::ByteError; +use std::fmt; use std::io::{BufReader, Read}; +use std::ops::Deref; use std::path::PathBuf; +use std::str::FromStr; use std::sync::Arc; use std::{error, fs}; @@ -11,6 +15,7 @@ use rustls::{ RootCertStore, }; use structopt::StructOpt; +use sysinfo::{RefreshKind, System, SystemExt}; #[derive(Debug, Clone, StructOpt)] pub struct IndexerOpts { @@ -23,13 +28,15 @@ pub struct IndexerOpts { #[structopt(long)] pub max_nb_chunks: Option, - /// The maximum amount of memory to use for the Grenad buffer. It is recommended - /// to use something like 80%-90% of the available memory. + /// The maximum amount of memory the indexer will use. It defaults to 2/3 + /// of the available memory. It is recommended to use something like 80%-90% + /// of the available memory, no more. /// - /// It is automatically split by the number of jobs e.g. if you use 7 jobs - /// and 7 GB of max memory, each thread will use a maximum of 1 GB. - #[structopt(long, default_value = "7 GiB")] - pub max_memory: Byte, + /// In case the engine is unable to retrieve the available memory the engine will + /// try to use the memory it needs but without real limit, this can lead to + /// Out-Of-Memory issues and it is recommended to specify the amount of memory to use. + #[structopt(long)] + pub max_memory: MaxMemory, /// Size of the linked hash map cache when indexing. /// The bigger it is, the faster the indexing is but the more memory it takes. @@ -69,7 +76,7 @@ impl Default for IndexerOpts { Self { log_every_n: 100_000, max_nb_chunks: None, - max_memory: Byte::from_str("1GiB").unwrap(), + max_memory: MaxMemory::default(), linked_hash_map_size: 500, chunk_compression_type: CompressionType::None, chunk_compression_level: None, @@ -240,6 +247,57 @@ impl Opt { } } +/// A type used to detect the max memory available and use 2/3 of it. +#[derive(Debug, Clone, Copy)] +pub struct MaxMemory(Option); + +impl FromStr for MaxMemory { + type Err = ByteError; + + fn from_str(s: &str) -> Result { + Byte::from_str(s).map(Some).map(MaxMemory) + } +} + +impl Default for MaxMemory { + fn default() -> MaxMemory { + MaxMemory( + total_memory_bytes() + .map(|bytes| bytes * 2 / 3) + .map(Byte::from_bytes), + ) + } +} + +impl fmt::Display for MaxMemory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.0 { + Some(memory) => write!(f, "{}", memory.get_appropriate_unit(true)), + None => f.write_str("unknown"), + } + } +} + +impl Deref for MaxMemory { + type Target = Option; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Returns the total amount of bytes available or `None` if this system isn't supported. +fn total_memory_bytes() -> Option { + if System::IS_SUPPORTED { + let memory_kind = RefreshKind::new().with_memory(); + let mut system = System::new_with_specifics(memory_kind); + system.refresh_memory(); + Some(system.total_memory() * 1024) // KiB into bytes + } else { + None + } +} + fn load_certs(filename: PathBuf) -> Result, Box> { let certfile = fs::File::open(filename).map_err(|_| "cannot open certificate file")?; let mut reader = BufReader::new(certfile);