diff --git a/crates/meilisearch/src/metrics.rs b/crates/meilisearch/src/metrics.rs index d380e9b96..be9fbfc49 100644 --- a/crates/meilisearch/src/metrics.rs +++ b/crates/meilisearch/src/metrics.rs @@ -49,4 +49,18 @@ lazy_static! { pub static ref MEILISEARCH_IS_INDEXING: IntGauge = register_int_gauge!(opts!("meilisearch_is_indexing", "Meilisearch Is Indexing")) .expect("Can't create a metric"); + pub static ref MEILISEARCH_SEARCH_QUEUE_SIZE: IntGauge = register_int_gauge!(opts!( + "meilisearch_search_queue_size", + "Meilisearch Search Queue Size" + )) + .expect("Can't create a metric"); + pub static ref MEILISEARCH_SEARCHES_RUNNING: IntGauge = + register_int_gauge!(opts!("meilisearch_searches_running", "Meilisearch Searches Running")) + .expect("Can't create a metric"); + pub static ref MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED: IntGauge = + register_int_gauge!(opts!( + "meilisearch_searches_waiting_to_be_processed", + "Meilisearch Searches Being Processed" + )) + .expect("Can't create a metric"); } diff --git a/crates/meilisearch/src/routes/metrics.rs b/crates/meilisearch/src/routes/metrics.rs index 7a13a758f..48b5d09f5 100644 --- a/crates/meilisearch/src/routes/metrics.rs +++ b/crates/meilisearch/src/routes/metrics.rs @@ -10,6 +10,7 @@ use prometheus::{Encoder, TextEncoder}; use crate::extractors::authentication::policies::ActionPolicy; use crate::extractors::authentication::{AuthenticationError, GuardedData}; use crate::routes::create_all_stats; +use crate::search_queue::SearchQueue; pub fn configure(config: &mut web::ServiceConfig) { config.service(web::resource("").route(web::get().to(get_metrics))); @@ -18,6 +19,7 @@ pub fn configure(config: &mut web::ServiceConfig) { pub async fn get_metrics( index_scheduler: GuardedData, Data>, auth_controller: Data, + search_queue: web::Data, ) -> Result { index_scheduler.features().check_metrics()?; let auth_filters = index_scheduler.filters(); @@ -35,6 +37,11 @@ pub async fn get_metrics( crate::metrics::MEILISEARCH_USED_DB_SIZE_BYTES.set(response.used_database_size as i64); crate::metrics::MEILISEARCH_INDEX_COUNT.set(response.indexes.len() as i64); + crate::metrics::MEILISEARCH_SEARCH_QUEUE_SIZE.set(search_queue.capacity() as i64); + crate::metrics::MEILISEARCH_SEARCHES_RUNNING.set(search_queue.searches_running() as i64); + crate::metrics::MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED + .set(search_queue.searches_waiting() as i64); + for (index, value) in response.indexes.iter() { crate::metrics::MEILISEARCH_INDEX_DOCS_COUNT .with_label_values(&[index]) diff --git a/crates/meilisearch/src/search_queue.rs b/crates/meilisearch/src/search_queue.rs index 195fa1b6f..6ab910164 100644 --- a/crates/meilisearch/src/search_queue.rs +++ b/crates/meilisearch/src/search_queue.rs @@ -18,6 +18,8 @@ //! And should drop the Permit only once you have freed all the RAM consumed by the method. use std::num::NonZeroUsize; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::time::Duration; use rand::rngs::StdRng; @@ -33,6 +35,8 @@ pub struct SearchQueue { /// If we have waited longer than this to get a permit, we should abort the search request entirely. /// The client probably already closed the connection, but we have no way to find out. time_to_abort: Duration, + searches_running: Arc, + searches_waiting_to_be_processed: Arc, } /// You should only run search requests while holding this permit. @@ -68,14 +72,41 @@ impl SearchQueue { // so let's not allocate any RAM and keep a capacity of 1. let (sender, receiver) = mpsc::channel(1); - tokio::task::spawn(Self::run(capacity, paralellism, receiver)); - Self { sender, capacity, time_to_abort: Duration::from_secs(60) } + let instance = Self { + sender, + capacity, + time_to_abort: Duration::from_secs(60), + searches_running: Default::default(), + searches_waiting_to_be_processed: Default::default(), + }; + + tokio::task::spawn(Self::run( + capacity, + paralellism, + receiver, + Arc::clone(&instance.searches_running), + Arc::clone(&instance.searches_waiting_to_be_processed), + )); + + instance } pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self { Self { time_to_abort, ..self } } + pub fn capacity(&self) -> usize { + self.capacity + } + + pub fn searches_running(&self) -> usize { + self.searches_running.load(Ordering::Relaxed) + } + + pub fn searches_waiting(&self) -> usize { + self.searches_waiting_to_be_processed.load(Ordering::Relaxed) + } + /// This function is the main loop, it's in charge on scheduling which search request should execute first and /// how many should executes at the same time. /// @@ -84,6 +115,8 @@ impl SearchQueue { capacity: usize, parallelism: NonZeroUsize, mut receive_new_searches: mpsc::Receiver>, + metric_searches_running: Arc, + metric_searches_waiting: Arc, ) { let mut queue: Vec> = Default::default(); let mut rng: StdRng = StdRng::from_entropy(); @@ -133,6 +166,9 @@ impl SearchQueue { queue.push(search_request); }, } + + metric_searches_running.store(searches_running, Ordering::Relaxed); + metric_searches_waiting.store(queue.len(), Ordering::Relaxed); } }