mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-03 03:47:02 +02:00
Merge remote-tracking branch 'origin/main' into tmp-release-v1.15.1
This commit is contained in:
commit
9bda9a9a64
58 changed files with 2312 additions and 1756 deletions
|
@ -37,7 +37,9 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
|
|||
use meilisearch_auth::{open_auth_store_env, AuthController};
|
||||
use meilisearch_types::milli::constants::VERSION_MAJOR;
|
||||
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
|
||||
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod};
|
||||
use meilisearch_types::milli::update::{
|
||||
default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig,
|
||||
};
|
||||
use meilisearch_types::settings::apply_settings_to_builder;
|
||||
use meilisearch_types::tasks::KindWithContent;
|
||||
use meilisearch_types::versioning::{
|
||||
|
@ -501,7 +503,19 @@ fn import_dump(
|
|||
let network = dump_reader.network()?.cloned().unwrap_or_default();
|
||||
index_scheduler.put_network(network)?;
|
||||
|
||||
let indexer_config = index_scheduler.indexer_config();
|
||||
// 3.1 Use all cpus to process dump if `max_indexing_threads` not configured
|
||||
let backup_config;
|
||||
let base_config = index_scheduler.indexer_config();
|
||||
|
||||
let indexer_config = if base_config.max_threads.is_none() {
|
||||
let (thread_pool, _) = default_thread_pool_and_threads();
|
||||
|
||||
let _config = IndexerConfig { thread_pool, ..*base_config };
|
||||
backup_config = _config;
|
||||
&backup_config
|
||||
} else {
|
||||
base_config
|
||||
};
|
||||
|
||||
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
|
||||
// try to process tasks while we're trying to import the indexes.
|
||||
|
|
|
@ -761,10 +761,12 @@ impl IndexerOpts {
|
|||
max_indexing_memory.to_string(),
|
||||
);
|
||||
}
|
||||
export_to_env_if_not_present(
|
||||
MEILI_MAX_INDEXING_THREADS,
|
||||
max_indexing_threads.0.to_string(),
|
||||
);
|
||||
if let Some(max_indexing_threads) = max_indexing_threads.0 {
|
||||
export_to_env_if_not_present(
|
||||
MEILI_MAX_INDEXING_THREADS,
|
||||
max_indexing_threads.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -772,15 +774,15 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
|||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
|
||||
let thread_pool = ThreadPoolNoAbortBuilder::new()
|
||||
.thread_name(|index| format!("indexing-thread:{index}"))
|
||||
.num_threads(*other.max_indexing_threads)
|
||||
let thread_pool = ThreadPoolNoAbortBuilder::new_for_indexing()
|
||||
.num_threads(other.max_indexing_threads.unwrap_or_else(|| num_cpus::get() / 2))
|
||||
.build()?;
|
||||
|
||||
Ok(Self {
|
||||
thread_pool,
|
||||
log_every_n: Some(DEFAULT_LOG_EVERY_N),
|
||||
max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize),
|
||||
thread_pool: Some(thread_pool),
|
||||
max_threads: *other.max_indexing_threads,
|
||||
max_positions_per_attributes: None,
|
||||
skip_index_budget: other.skip_index_budget,
|
||||
..Default::default()
|
||||
|
@ -843,31 +845,31 @@ fn total_memory_bytes() -> Option<u64> {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
|
||||
pub struct MaxThreads(usize);
|
||||
#[derive(Default, Debug, Clone, Copy, Deserialize, Serialize)]
|
||||
pub struct MaxThreads(Option<usize>);
|
||||
|
||||
impl FromStr for MaxThreads {
|
||||
type Err = ParseIntError;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
usize::from_str(s).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for MaxThreads {
|
||||
fn default() -> Self {
|
||||
MaxThreads(num_cpus::get() / 2)
|
||||
fn from_str(s: &str) -> Result<MaxThreads, Self::Err> {
|
||||
if s.is_empty() || s == "unlimited" {
|
||||
return Ok(MaxThreads::default());
|
||||
}
|
||||
usize::from_str(s).map(Some).map(MaxThreads)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for MaxThreads {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
match self.0 {
|
||||
Some(threads) => write!(f, "{}", threads),
|
||||
None => write!(f, "unlimited"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for MaxThreads {
|
||||
type Target = usize;
|
||||
type Target = Option<usize>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
//! This file implements a queue of searches to process and the ability to control how many searches can be run in parallel.
|
||||
//! We need this because we don't want to process more search requests than we have cores.
|
||||
//! We need this because we don't want to process more search requests than the available CPU cores.
|
||||
//! That slows down everything and consumes RAM for no reason.
|
||||
//! The steps to do a search are to get the `SearchQueue` data structure and try to get a search permit.
|
||||
//! This can fail if the queue is full, and we need to drop your search request to register a new one.
|
||||
|
@ -8,7 +8,7 @@
|
|||
//!
|
||||
//! In order to do a search request you should try to get a search permit.
|
||||
//! Retrieve the `SearchQueue` structure from actix-web (`search_queue: Data<SearchQueue>`)
|
||||
//! and right before processing the search, calls the `SearchQueue::try_get_search_permit` method: `search_queue.try_get_search_permit().await?;`
|
||||
//! and right before processing the search, call the `SearchQueue::try_get_search_permit` method: `search_queue.try_get_search_permit().await?;`
|
||||
//!
|
||||
//! What is going to happen at this point is that you're going to send a oneshot::Sender over an async mpsc channel.
|
||||
//! Then, the queue/scheduler is going to either:
|
||||
|
@ -121,12 +121,12 @@ impl SearchQueue {
|
|||
let mut queue: Vec<oneshot::Sender<Permit>> = Default::default();
|
||||
let mut rng: StdRng = StdRng::from_entropy();
|
||||
let mut searches_running: usize = 0;
|
||||
// By having a capacity of parallelism we ensures that every time a search finish it can release its RAM asap
|
||||
// By having a capacity of parallelism we ensure that every time a search finish it can release its RAM asap
|
||||
let (sender, mut search_finished) = mpsc::channel(parallelism.into());
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// biased select because we wants to free up space before trying to register new tasks
|
||||
// biased select because we want to free up space before trying to register new tasks
|
||||
biased;
|
||||
_ = search_finished.recv() => {
|
||||
searches_running = searches_running.saturating_sub(1);
|
||||
|
@ -148,11 +148,11 @@ impl SearchQueue {
|
|||
|
||||
if searches_running < usize::from(parallelism) && queue.is_empty() {
|
||||
searches_running += 1;
|
||||
// if the search requests die it's not a hard error on our side
|
||||
// if the search requests die, it's not a hard error on our side
|
||||
let _ = search_request.send(Permit { sender: sender.clone() });
|
||||
continue;
|
||||
} else if capacity == 0 {
|
||||
// in the very specific case where we have a capacity of zero
|
||||
// in the very specific case where we have a capacity of zero,
|
||||
// we must refuse the request straight away without going through
|
||||
// the queue stuff.
|
||||
drop(search_request);
|
||||
|
@ -183,7 +183,7 @@ impl SearchQueue {
|
|||
.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))?;
|
||||
|
||||
// If we've been for more than one minute to get a search permit, it's better to simply
|
||||
// abort the search request than spending time processing something were the client
|
||||
// abort the search request than spending time processing something where the client
|
||||
// most certainly exited or got a timeout a long time ago.
|
||||
// We may find a better solution in https://github.com/actix/actix-web/issues/3462.
|
||||
if now.elapsed() > self.time_to_abort {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue