mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-03 20:07:09 +02:00
feat: Make MaxThreads None by default
This commit is contained in:
parent
2ac826edca
commit
47a7ed93d3
3 changed files with 44 additions and 31 deletions
|
@ -504,18 +504,22 @@ fn import_dump(
|
||||||
let network = dump_reader.network()?.cloned().unwrap_or_default();
|
let network = dump_reader.network()?.cloned().unwrap_or_default();
|
||||||
index_scheduler.put_network(network)?;
|
index_scheduler.put_network(network)?;
|
||||||
|
|
||||||
let mut indexer_config = index_scheduler.indexer_config().clone_no_threadpool();
|
// 3.1 Use all cpus to process dump if max_indexing_threads not configured
|
||||||
|
let backup_config;
|
||||||
// 3.1 Use all cpus to index the import dump
|
let indexer_config = if index_scheduler.indexer_config().max_threads.is_none() {
|
||||||
indexer_config.thread_pool = {
|
let mut _config = index_scheduler.indexer_config().clone_no_threadpool();
|
||||||
let all_cpus = num_cpus::get();
|
_config.thread_pool = {
|
||||||
|
Some(
|
||||||
let temp_pool = ThreadPoolNoAbortBuilder::new()
|
ThreadPoolNoAbortBuilder::new()
|
||||||
.thread_name(|index| format!("indexing-thread:{index}"))
|
.thread_name(|index| format!("indexing-thread:{index}"))
|
||||||
.num_threads(all_cpus)
|
.num_threads(num_cpus::get())
|
||||||
.build()?;
|
.build()?,
|
||||||
|
)
|
||||||
Some(temp_pool)
|
};
|
||||||
|
backup_config = _config;
|
||||||
|
&backup_config
|
||||||
|
} else {
|
||||||
|
index_scheduler.indexer_config()
|
||||||
};
|
};
|
||||||
|
|
||||||
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
|
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
|
||||||
|
@ -533,7 +537,7 @@ fn import_dump(
|
||||||
|
|
||||||
let mut wtxn = index.write_txn()?;
|
let mut wtxn = index.write_txn()?;
|
||||||
|
|
||||||
let mut builder = milli::update::Settings::new(&mut wtxn, &index, &indexer_config);
|
let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config);
|
||||||
// 4.1 Import the primary key if there is one.
|
// 4.1 Import the primary key if there is one.
|
||||||
if let Some(ref primary_key) = metadata.primary_key {
|
if let Some(ref primary_key) = metadata.primary_key {
|
||||||
builder.set_primary_key(primary_key.to_string());
|
builder.set_primary_key(primary_key.to_string());
|
||||||
|
@ -568,7 +572,7 @@ fn import_dump(
|
||||||
let builder = milli::update::IndexDocuments::new(
|
let builder = milli::update::IndexDocuments::new(
|
||||||
&mut wtxn,
|
&mut wtxn,
|
||||||
&index,
|
&index,
|
||||||
&indexer_config,
|
indexer_config,
|
||||||
IndexDocumentsConfig {
|
IndexDocumentsConfig {
|
||||||
update_method: IndexDocumentsMethod::ReplaceDocuments,
|
update_method: IndexDocumentsMethod::ReplaceDocuments,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
|
|
|
@ -746,25 +746,31 @@ impl IndexerOpts {
|
||||||
max_indexing_memory.to_string(),
|
max_indexing_memory.to_string(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
if let Some(max_indexing_threads) = max_indexing_threads.0 {
|
||||||
export_to_env_if_not_present(
|
export_to_env_if_not_present(
|
||||||
MEILI_MAX_INDEXING_THREADS,
|
MEILI_MAX_INDEXING_THREADS,
|
||||||
max_indexing_threads.0.to_string(),
|
max_indexing_threads.to_string(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl TryFrom<&IndexerOpts> for IndexerConfig {
|
impl TryFrom<&IndexerOpts> for IndexerConfig {
|
||||||
type Error = anyhow::Error;
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
|
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
|
||||||
|
// use 1/2 cpu threads if no value specified
|
||||||
|
let max_indexing_threads = other.max_indexing_threads.unwrap_or(num_cpus::get() / 2);
|
||||||
|
|
||||||
let thread_pool = ThreadPoolNoAbortBuilder::new()
|
let thread_pool = ThreadPoolNoAbortBuilder::new()
|
||||||
.thread_name(|index| format!("indexing-thread:{index}"))
|
.thread_name(|index| format!("indexing-thread:{index}"))
|
||||||
.num_threads(*other.max_indexing_threads)
|
.num_threads(max_indexing_threads)
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
log_every_n: Some(DEFAULT_LOG_EVERY_N),
|
log_every_n: Some(DEFAULT_LOG_EVERY_N),
|
||||||
max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize),
|
max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize),
|
||||||
|
max_threads: *other.max_indexing_threads,
|
||||||
thread_pool: Some(thread_pool),
|
thread_pool: Some(thread_pool),
|
||||||
max_positions_per_attributes: None,
|
max_positions_per_attributes: None,
|
||||||
skip_index_budget: other.skip_index_budget,
|
skip_index_budget: other.skip_index_budget,
|
||||||
|
@ -828,31 +834,31 @@ fn total_memory_bytes() -> Option<u64> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
|
#[derive(Default, Debug, Clone, Copy, Deserialize, Serialize)]
|
||||||
pub struct MaxThreads(usize);
|
pub struct MaxThreads(Option<usize>);
|
||||||
|
|
||||||
impl FromStr for MaxThreads {
|
impl FromStr for MaxThreads {
|
||||||
type Err = ParseIntError;
|
type Err = ParseIntError;
|
||||||
|
|
||||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
fn from_str(s: &str) -> Result<MaxThreads, Self::Err> {
|
||||||
usize::from_str(s).map(Self)
|
if s.is_empty() {
|
||||||
|
return Ok(MaxThreads::default());
|
||||||
}
|
}
|
||||||
}
|
usize::from_str(s).map(Some).map(MaxThreads)
|
||||||
|
|
||||||
impl Default for MaxThreads {
|
|
||||||
fn default() -> Self {
|
|
||||||
MaxThreads(num_cpus::get() / 2)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for MaxThreads {
|
impl fmt::Display for MaxThreads {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
write!(f, "{}", self.0)
|
match self.0 {
|
||||||
|
Some(threads) => write!(f, "{}", threads),
|
||||||
|
None => Ok(()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Deref for MaxThreads {
|
impl Deref for MaxThreads {
|
||||||
type Target = usize;
|
type Target = Option<usize>;
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
&self.0
|
&self.0
|
||||||
|
|
|
@ -9,6 +9,7 @@ pub struct IndexerConfig {
|
||||||
pub max_nb_chunks: Option<usize>,
|
pub max_nb_chunks: Option<usize>,
|
||||||
pub documents_chunk_size: Option<usize>,
|
pub documents_chunk_size: Option<usize>,
|
||||||
pub max_memory: Option<usize>,
|
pub max_memory: Option<usize>,
|
||||||
|
pub max_threads: Option<usize>,
|
||||||
pub chunk_compression_type: CompressionType,
|
pub chunk_compression_type: CompressionType,
|
||||||
pub chunk_compression_level: Option<u32>,
|
pub chunk_compression_level: Option<u32>,
|
||||||
pub thread_pool: Option<ThreadPoolNoAbort>,
|
pub thread_pool: Option<ThreadPoolNoAbort>,
|
||||||
|
@ -32,6 +33,7 @@ impl IndexerConfig {
|
||||||
max_nb_chunks: self.max_nb_chunks,
|
max_nb_chunks: self.max_nb_chunks,
|
||||||
documents_chunk_size: self.documents_chunk_size,
|
documents_chunk_size: self.documents_chunk_size,
|
||||||
max_memory: self.max_memory,
|
max_memory: self.max_memory,
|
||||||
|
max_threads: self.max_threads,
|
||||||
chunk_compression_type: self.chunk_compression_type,
|
chunk_compression_type: self.chunk_compression_type,
|
||||||
chunk_compression_level: self.chunk_compression_level,
|
chunk_compression_level: self.chunk_compression_level,
|
||||||
max_positions_per_attributes: self.max_positions_per_attributes,
|
max_positions_per_attributes: self.max_positions_per_attributes,
|
||||||
|
@ -48,6 +50,7 @@ impl Default for IndexerConfig {
|
||||||
max_nb_chunks: None,
|
max_nb_chunks: None,
|
||||||
documents_chunk_size: None,
|
documents_chunk_size: None,
|
||||||
max_memory: None,
|
max_memory: None,
|
||||||
|
max_threads: None,
|
||||||
chunk_compression_type: CompressionType::None,
|
chunk_compression_type: CompressionType::None,
|
||||||
chunk_compression_level: None,
|
chunk_compression_level: None,
|
||||||
thread_pool: None,
|
thread_pool: None,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue