mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-06-15 20:42:24 +02:00
Revert thread_pool type back to Option in config
This commit is contained in:
parent
648b2876f6
commit
3b773b3416
@ -115,8 +115,7 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
let local_pool;
|
let local_pool;
|
||||||
let indexer_config = self.index_mapper.indexer_config();
|
let indexer_config = self.index_mapper.indexer_config();
|
||||||
let pool_guard = indexer_config.thread_pool.read().unwrap();
|
let pool = match &indexer_config.thread_pool {
|
||||||
let pool = match &*pool_guard {
|
|
||||||
Some(pool) => pool,
|
Some(pool) => pool,
|
||||||
None => {
|
None => {
|
||||||
local_pool = ThreadPoolNoAbortBuilder::new()
|
local_pool = ThreadPoolNoAbortBuilder::new()
|
||||||
@ -269,8 +268,7 @@ impl IndexScheduler {
|
|||||||
if task.error.is_none() {
|
if task.error.is_none() {
|
||||||
let local_pool;
|
let local_pool;
|
||||||
let indexer_config = self.index_mapper.indexer_config();
|
let indexer_config = self.index_mapper.indexer_config();
|
||||||
let pool_guard = indexer_config.thread_pool.read().unwrap();
|
let pool = match &indexer_config.thread_pool {
|
||||||
let pool = match &*pool_guard {
|
|
||||||
Some(pool) => pool,
|
Some(pool) => pool,
|
||||||
None => {
|
None => {
|
||||||
local_pool = ThreadPoolNoAbortBuilder::new()
|
local_pool = ThreadPoolNoAbortBuilder::new()
|
||||||
@ -433,8 +431,7 @@ impl IndexScheduler {
|
|||||||
if !tasks.iter().all(|res| res.error.is_some()) {
|
if !tasks.iter().all(|res| res.error.is_some()) {
|
||||||
let local_pool;
|
let local_pool;
|
||||||
let indexer_config = self.index_mapper.indexer_config();
|
let indexer_config = self.index_mapper.indexer_config();
|
||||||
let pool_guard = indexer_config.thread_pool.read().unwrap();
|
let pool = match &indexer_config.thread_pool {
|
||||||
let pool = match &*pool_guard {
|
|
||||||
Some(pool) => pool,
|
Some(pool) => pool,
|
||||||
None => {
|
None => {
|
||||||
local_pool = ThreadPoolNoAbortBuilder::new()
|
local_pool = ThreadPoolNoAbortBuilder::new()
|
||||||
|
@ -37,7 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
|
|||||||
use meilisearch_auth::{open_auth_store_env, AuthController};
|
use meilisearch_auth::{open_auth_store_env, AuthController};
|
||||||
use meilisearch_types::milli::constants::VERSION_MAJOR;
|
use meilisearch_types::milli::constants::VERSION_MAJOR;
|
||||||
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
|
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
|
||||||
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod};
|
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig};
|
||||||
use meilisearch_types::milli::ThreadPoolNoAbortBuilder;
|
use meilisearch_types::milli::ThreadPoolNoAbortBuilder;
|
||||||
use meilisearch_types::settings::apply_settings_to_builder;
|
use meilisearch_types::settings::apply_settings_to_builder;
|
||||||
use meilisearch_types::tasks::KindWithContent;
|
use meilisearch_types::tasks::KindWithContent;
|
||||||
@ -504,10 +504,10 @@ fn import_dump(
|
|||||||
let network = dump_reader.network()?.cloned().unwrap_or_default();
|
let network = dump_reader.network()?.cloned().unwrap_or_default();
|
||||||
index_scheduler.put_network(network)?;
|
index_scheduler.put_network(network)?;
|
||||||
|
|
||||||
let indexer_config = index_scheduler.indexer_config();
|
let mut indexer_config = IndexerConfig::clone_no_threadpool(index_scheduler.indexer_config());
|
||||||
|
|
||||||
// Use all cpus to index a dump
|
// 3.1 Use all cpus to index the import dump
|
||||||
let pool_before = {
|
indexer_config.thread_pool = {
|
||||||
let all_cpus = num_cpus::get();
|
let all_cpus = num_cpus::get();
|
||||||
|
|
||||||
let temp_pool = ThreadPoolNoAbortBuilder::new()
|
let temp_pool = ThreadPoolNoAbortBuilder::new()
|
||||||
@ -515,7 +515,7 @@ fn import_dump(
|
|||||||
.num_threads(all_cpus)
|
.num_threads(all_cpus)
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
indexer_config.thread_pool.write().unwrap().replace(temp_pool)
|
Some(temp_pool)
|
||||||
};
|
};
|
||||||
|
|
||||||
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
|
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
|
||||||
@ -533,7 +533,7 @@ fn import_dump(
|
|||||||
|
|
||||||
let mut wtxn = index.write_txn()?;
|
let mut wtxn = index.write_txn()?;
|
||||||
|
|
||||||
let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config);
|
let mut builder = milli::update::Settings::new(&mut wtxn, &index, &indexer_config);
|
||||||
// 4.1 Import the primary key if there is one.
|
// 4.1 Import the primary key if there is one.
|
||||||
if let Some(ref primary_key) = metadata.primary_key {
|
if let Some(ref primary_key) = metadata.primary_key {
|
||||||
builder.set_primary_key(primary_key.to_string());
|
builder.set_primary_key(primary_key.to_string());
|
||||||
@ -568,7 +568,7 @@ fn import_dump(
|
|||||||
let builder = milli::update::IndexDocuments::new(
|
let builder = milli::update::IndexDocuments::new(
|
||||||
&mut wtxn,
|
&mut wtxn,
|
||||||
&index,
|
&index,
|
||||||
indexer_config,
|
&indexer_config,
|
||||||
IndexDocumentsConfig {
|
IndexDocumentsConfig {
|
||||||
update_method: IndexDocumentsMethod::ReplaceDocuments,
|
update_method: IndexDocumentsMethod::ReplaceDocuments,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
@ -589,12 +589,6 @@ fn import_dump(
|
|||||||
index_scheduler.refresh_index_stats(&uid)?;
|
index_scheduler.refresh_index_stats(&uid)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore original thread pool after dump
|
|
||||||
{
|
|
||||||
let mut guard = indexer_config.thread_pool.write().unwrap();
|
|
||||||
*guard = pool_before;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 5. Import the queue
|
// 5. Import the queue
|
||||||
let mut index_scheduler_dump = index_scheduler.register_dumped_task()?;
|
let mut index_scheduler_dump = index_scheduler.register_dumped_task()?;
|
||||||
// 5.1. Import the batches
|
// 5.1. Import the batches
|
||||||
|
@ -6,7 +6,7 @@ use std::num::{NonZeroUsize, ParseIntError};
|
|||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::Arc;
|
||||||
use std::{env, fmt, fs};
|
use std::{env, fmt, fs};
|
||||||
|
|
||||||
use byte_unit::{Byte, ParseError, UnitType};
|
use byte_unit::{Byte, ParseError, UnitType};
|
||||||
@ -765,7 +765,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
|||||||
Ok(Self {
|
Ok(Self {
|
||||||
log_every_n: Some(DEFAULT_LOG_EVERY_N),
|
log_every_n: Some(DEFAULT_LOG_EVERY_N),
|
||||||
max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize),
|
max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize),
|
||||||
thread_pool: RwLock::new(Some(thread_pool)),
|
thread_pool: Some(thread_pool),
|
||||||
max_positions_per_attributes: None,
|
max_positions_per_attributes: None,
|
||||||
skip_index_budget: other.skip_index_budget,
|
skip_index_budget: other.skip_index_budget,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
|
@ -1936,8 +1936,7 @@ pub(crate) mod tests {
|
|||||||
) -> Result<(), crate::error::Error> {
|
) -> Result<(), crate::error::Error> {
|
||||||
let local_pool;
|
let local_pool;
|
||||||
let indexer_config = &self.indexer_config;
|
let indexer_config = &self.indexer_config;
|
||||||
let pool_guard = indexer_config.thread_pool.read().unwrap();
|
let pool = match &indexer_config.thread_pool {
|
||||||
let pool = match &*pool_guard {
|
|
||||||
Some(pool) => pool,
|
Some(pool) => pool,
|
||||||
None => {
|
None => {
|
||||||
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||||
@ -2031,8 +2030,7 @@ pub(crate) mod tests {
|
|||||||
) -> Result<(), crate::error::Error> {
|
) -> Result<(), crate::error::Error> {
|
||||||
let local_pool;
|
let local_pool;
|
||||||
let indexer_config = &self.indexer_config;
|
let indexer_config = &self.indexer_config;
|
||||||
let pool_guard = indexer_config.thread_pool.read().unwrap();
|
let pool = match &indexer_config.thread_pool {
|
||||||
let pool = match &*pool_guard {
|
|
||||||
Some(pool) => pool,
|
Some(pool) => pool,
|
||||||
None => {
|
None => {
|
||||||
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||||
@ -2111,8 +2109,7 @@ pub(crate) mod tests {
|
|||||||
|
|
||||||
let local_pool;
|
let local_pool;
|
||||||
let indexer_config = &index.indexer_config;
|
let indexer_config = &index.indexer_config;
|
||||||
let pool_guard = indexer_config.thread_pool.read().unwrap();
|
let pool = match &indexer_config.thread_pool {
|
||||||
let pool = match &*pool_guard {
|
|
||||||
Some(pool) => pool,
|
Some(pool) => pool,
|
||||||
None => {
|
None => {
|
||||||
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||||
|
@ -228,10 +228,8 @@ where
|
|||||||
let possible_embedding_mistakes =
|
let possible_embedding_mistakes =
|
||||||
crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution);
|
crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution);
|
||||||
|
|
||||||
let pool_guard = self.indexer_config.thread_pool.read().unwrap();
|
|
||||||
|
|
||||||
let backup_pool;
|
let backup_pool;
|
||||||
let pool = match &*pool_guard {
|
let pool = match self.indexer_config.thread_pool {
|
||||||
Some(ref pool) => pool,
|
Some(ref pool) => pool,
|
||||||
None => {
|
None => {
|
||||||
// We initialize a backup pool with the default
|
// We initialize a backup pool with the default
|
||||||
|
@ -1,5 +1,3 @@
|
|||||||
use std::sync::RwLock;
|
|
||||||
|
|
||||||
use grenad::CompressionType;
|
use grenad::CompressionType;
|
||||||
|
|
||||||
use super::GrenadParameters;
|
use super::GrenadParameters;
|
||||||
@ -13,7 +11,7 @@ pub struct IndexerConfig {
|
|||||||
pub max_memory: Option<usize>,
|
pub max_memory: Option<usize>,
|
||||||
pub chunk_compression_type: CompressionType,
|
pub chunk_compression_type: CompressionType,
|
||||||
pub chunk_compression_level: Option<u32>,
|
pub chunk_compression_level: Option<u32>,
|
||||||
pub thread_pool: RwLock<Option<ThreadPoolNoAbort>>,
|
pub thread_pool: Option<ThreadPoolNoAbort>,
|
||||||
pub max_positions_per_attributes: Option<u32>,
|
pub max_positions_per_attributes: Option<u32>,
|
||||||
pub skip_index_budget: bool,
|
pub skip_index_budget: bool,
|
||||||
}
|
}
|
||||||
@ -27,6 +25,20 @@ impl IndexerConfig {
|
|||||||
max_nb_chunks: self.max_nb_chunks,
|
max_nb_chunks: self.max_nb_chunks,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn clone_no_threadpool(other: &IndexerConfig) -> Self {
|
||||||
|
Self {
|
||||||
|
log_every_n: other.log_every_n.clone(),
|
||||||
|
max_nb_chunks: other.max_nb_chunks.clone(),
|
||||||
|
documents_chunk_size: other.documents_chunk_size.clone(),
|
||||||
|
max_memory: other.max_memory.clone(),
|
||||||
|
chunk_compression_type: other.chunk_compression_type.clone(),
|
||||||
|
chunk_compression_level: other.chunk_compression_level.clone(),
|
||||||
|
max_positions_per_attributes: other.max_positions_per_attributes.clone(),
|
||||||
|
skip_index_budget: other.skip_index_budget.clone(),
|
||||||
|
thread_pool: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for IndexerConfig {
|
impl Default for IndexerConfig {
|
||||||
@ -38,7 +50,7 @@ impl Default for IndexerConfig {
|
|||||||
max_memory: None,
|
max_memory: None,
|
||||||
chunk_compression_type: CompressionType::None,
|
chunk_compression_type: CompressionType::None,
|
||||||
chunk_compression_level: None,
|
chunk_compression_level: None,
|
||||||
thread_pool: RwLock::new(None),
|
thread_pool: None,
|
||||||
max_positions_per_attributes: None,
|
max_positions_per_attributes: None,
|
||||||
skip_index_budget: false,
|
skip_index_budget: false,
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user