diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index 651af7336..d3d9150d6 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -6,6 +6,7 @@ use std::num::ParseIntError; use std::ops::Deref; use std::path::PathBuf; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::{env, fmt, fs}; @@ -666,15 +667,23 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { type Error = anyhow::Error; fn try_from(other: &IndexerOpts) -> Result { + let pool_panic_catched = Arc::new(AtomicBool::new(false)); let thread_pool = rayon::ThreadPoolBuilder::new() .thread_name(|index| format!("indexing-thread:{index}")) .num_threads(*other.max_indexing_threads) + .panic_handler({ + // TODO What should we do with this Box. + // So, let's just set a value to true to cancel the task with a message for now. + let panic_cathed = pool_panic_catched.clone(); + move |_result| panic_cathed.store(true, Ordering::SeqCst) + }) .build()?; Ok(Self { log_every_n: Some(DEFAULT_LOG_EVERY_N), max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize), thread_pool: Some(thread_pool), + pool_panic_catched, max_positions_per_attributes: None, skip_index_budget: other.skip_index_budget, ..Default::default() diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index aa9789a1a..3b27f96f4 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -8,6 +8,7 @@ use std::collections::{HashMap, HashSet}; use std::io::{Read, Seek}; use std::num::NonZeroU32; use std::result::Result as StdResult; +use std::sync::atomic::Ordering; use std::sync::Arc; use crossbeam_channel::{Receiver, Sender}; @@ -296,20 +297,24 @@ where let settings_diff = Arc::new(settings_diff); let backup_pool; + let pool_catched_panic = self.indexer_config.pool_panic_catched.clone(); let pool = match self.indexer_config.thread_pool { Some(ref pool) => pool, - #[cfg(not(test))] None => { - // We initialize a bakcup pool with the default + // We initialize a backup pool with the default // settings if none have already been set. - backup_pool = rayon::ThreadPoolBuilder::new().build()?; - &backup_pool - } - #[cfg(test)] - None => { - // We initialize a bakcup pool with the default - // settings if none have already been set. - backup_pool = rayon::ThreadPoolBuilder::new().num_threads(1).build()?; + let mut pool_builder = rayon::ThreadPoolBuilder::new(); + pool_builder = pool_builder.panic_handler({ + let catched_panic = pool_catched_panic.clone(); + move |_result| catched_panic.store(true, Ordering::SeqCst) + }); + + #[cfg(test)] + { + pool_builder = pool_builder.num_threads(1); + } + + backup_pool = pool_builder.build()?; &backup_pool } }; diff --git a/milli/src/update/indexer_config.rs b/milli/src/update/indexer_config.rs index ff7942fdb..b23d8e700 100644 --- a/milli/src/update/indexer_config.rs +++ b/milli/src/update/indexer_config.rs @@ -1,3 +1,6 @@ +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + use grenad::CompressionType; use rayon::ThreadPool; @@ -10,6 +13,9 @@ pub struct IndexerConfig { pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, pub thread_pool: Option, + /// Set to true if the thread pool catched a panic + /// and we must abort the task + pub pool_panic_catched: Arc, pub max_positions_per_attributes: Option, pub skip_index_budget: bool, } @@ -24,6 +30,7 @@ impl Default for IndexerConfig { chunk_compression_type: CompressionType::None, chunk_compression_level: None, thread_pool: None, + pool_panic_catched: Arc::default(), max_positions_per_attributes: None, skip_index_budget: false, }