2024-03-27 15:38:36 +01:00
//! This file implements a queue of searches to process and the ability to control how many searches can be run in parallel.
//! We need this because we don't want to process more search requests than we have cores.
//! That slows down everything and consumes RAM for no reason.
//! The steps to do a search are to get the `SearchQueue` data structure and try to get a search permit.
//! This can fail if the queue is full, and we need to drop your search request to register a new one.
//!
//! ### How to do a search request
//!
//! In order to do a search request you should try to get a search permit.
//! Retrieve the `SearchQueue` structure from actix-web (`search_queue: Data<SearchQueue>`)
//! and right before processing the search, calls the `SearchQueue::try_get_search_permit` method: `search_queue.try_get_search_permit().await?;`
//!
//! What is going to happen at this point is that you're going to send a oneshot::Sender over an async mpsc channel.
//! Then, the queue/scheduler is going to either:
//! - Drop your oneshot channel => that means there are too many searches going on, and yours won't be executed.
//! You should exit and free all the RAM you use ASAP.
//! - Sends you a Permit => that will unlock the method, and you will be able to process your search.
//! And should drop the Permit only once you have freed all the RAM consumed by the method.
2024-03-26 17:53:37 +01:00
use std ::num ::NonZeroUsize ;
2024-03-26 15:56:43 +01:00
2024-03-26 19:23:55 +01:00
use rand ::rngs ::StdRng ;
use rand ::{ Rng , SeedableRng } ;
2024-03-26 15:56:43 +01:00
use tokio ::sync ::{ mpsc , oneshot } ;
use crate ::error ::MeilisearchHttpError ;
#[ derive(Debug) ]
pub struct SearchQueue {
sender : mpsc ::Sender < oneshot ::Sender < Permit > > ,
capacity : usize ,
}
2024-03-27 15:38:36 +01:00
/// You should only run search requests while holding this permit.
/// Once it's dropped, a new search request will be able to process.
2024-08-28 14:29:25 +02:00
/// You should always try to drop the permit yourself calling the `drop` async method on it.
2024-03-26 15:56:43 +01:00
#[ derive(Debug) ]
pub struct Permit {
sender : mpsc ::Sender < ( ) > ,
}
2024-08-28 14:29:25 +02:00
impl Permit {
/// Drop the permit giving back on permit to the search queue.
pub async fn drop ( self ) {
// if the channel is closed then the whole instance is down
let _ = self . sender . send ( ( ) ) . await ;
}
}
2024-03-26 15:56:43 +01:00
impl Drop for Permit {
fn drop ( & mut self ) {
2024-08-28 14:37:55 +02:00
tracing ::warn! ( " Internal error, a search permit was lazily dropped. If you see this message, please open an issue on the meilisearch repository at <https://github.com/meilisearch/meilisearch/issues/new?template=bug_report.md&title=[INTERNAL%20ERROR]%20Meilisearch%20lazily%20dropped%20a%20search%20permit> " ) ;
2024-06-05 18:26:36 +02:00
let sender = self . sender . clone ( ) ;
2024-03-26 15:56:43 +01:00
// if the channel is closed then the whole instance is down
2024-06-05 18:26:36 +02:00
std ::mem ::drop ( tokio ::spawn ( async move { sender . send ( ( ) ) . await } ) ) ;
2024-03-26 15:56:43 +01:00
}
}
impl SearchQueue {
2024-03-26 17:28:03 +01:00
pub fn new ( capacity : usize , paralellism : NonZeroUsize ) -> Self {
2024-03-26 17:53:37 +01:00
// Search requests are going to wait until we're available anyway,
// so let's not allocate any RAM and keep a capacity of 1.
2024-03-26 15:56:43 +01:00
let ( sender , receiver ) = mpsc ::channel ( 1 ) ;
2024-03-26 17:53:37 +01:00
2024-03-26 15:56:43 +01:00
tokio ::task ::spawn ( Self ::run ( capacity , paralellism , receiver ) ) ;
Self { sender , capacity }
}
2024-03-27 15:38:36 +01:00
/// This function is the main loop, it's in charge on scheduling which search request should execute first and
/// how many should executes at the same time.
///
/// It **must never** panic or exit.
2024-03-26 15:56:43 +01:00
async fn run (
capacity : usize ,
2024-03-26 17:28:03 +01:00
parallelism : NonZeroUsize ,
2024-03-26 15:56:43 +01:00
mut receive_new_searches : mpsc ::Receiver < oneshot ::Sender < Permit > > ,
) {
let mut queue : Vec < oneshot ::Sender < Permit > > = Default ::default ( ) ;
let mut rng : StdRng = StdRng ::from_entropy ( ) ;
2024-03-26 17:28:03 +01:00
let mut searches_running : usize = 0 ;
2024-03-27 15:38:36 +01:00
// By having a capacity of parallelism we ensures that every time a search finish it can release its RAM asap
2024-03-26 17:28:03 +01:00
let ( sender , mut search_finished ) = mpsc ::channel ( parallelism . into ( ) ) ;
2024-03-26 15:56:43 +01:00
loop {
tokio ::select! {
2024-03-27 11:05:37 +01:00
// biased select because we wants to free up space before trying to register new tasks
biased ;
_ = search_finished . recv ( ) = > {
searches_running = searches_running . saturating_sub ( 1 ) ;
if ! queue . is_empty ( ) {
// Can't panic: the queue wasn't empty thus the range isn't empty.
let remove = rng . gen_range ( 0 .. queue . len ( ) ) ;
let channel = queue . swap_remove ( remove ) ;
let _ = channel . send ( Permit { sender : sender . clone ( ) } ) ;
}
} ,
2024-03-26 15:56:43 +01:00
search_request = receive_new_searches . recv ( ) = > {
2024-05-15 15:02:26 +02:00
let search_request = match search_request {
Some ( search_request ) = > search_request ,
// This should never happen while actix-web is running, but it's not a reason to crash
// and it can generate a lot of noise in the tests.
None = > continue ,
} ;
2024-03-26 17:28:03 +01:00
if searches_running < usize ::from ( parallelism ) & & queue . is_empty ( ) {
2024-03-26 15:56:43 +01:00
searches_running + = 1 ;
// if the search requests die it's not a hard error on our side
let _ = search_request . send ( Permit { sender : sender . clone ( ) } ) ;
continue ;
2024-03-26 19:04:39 +01:00
} else if capacity = = 0 {
// in the very specific case where we have a capacity of zero
// we must refuse the request straight away without going through
// the queue stuff.
drop ( search_request ) ;
continue ;
} else if queue . len ( ) > = capacity {
2024-03-26 15:56:43 +01:00
let remove = rng . gen_range ( 0 .. queue . len ( ) ) ;
let thing = queue . swap_remove ( remove ) ; // this will drop the channel and notify the search that it won't be processed
drop ( thing ) ;
}
queue . push ( search_request ) ;
} ,
}
}
}
2024-03-27 15:38:36 +01:00
/// Returns a search `Permit`.
/// It should be dropped as soon as you've freed all the RAM associated with the search request being processed.
2024-03-26 17:28:03 +01:00
pub async fn try_get_search_permit ( & self ) -> Result < Permit , MeilisearchHttpError > {
2024-03-26 15:56:43 +01:00
let ( sender , receiver ) = oneshot ::channel ( ) ;
self . sender . send ( sender ) . await . map_err ( | _ | MeilisearchHttpError ::SearchLimiterIsDown ) ? ;
2024-03-26 17:53:37 +01:00
receiver . await . map_err ( | _ | MeilisearchHttpError ::TooManySearchRequests ( self . capacity ) )
2024-03-26 15:56:43 +01:00
}
2024-03-27 15:49:43 +01:00
/// Returns `Ok(())` if everything seems normal.
/// Returns `Err(MeilisearchHttpError::SearchLimiterIsDown)` if the search limiter seems down.
pub fn health ( & self ) -> Result < ( ) , MeilisearchHttpError > {
if self . sender . is_closed ( ) {
Err ( MeilisearchHttpError ::SearchLimiterIsDown )
} else {
Ok ( ( ) )
}
}
2024-03-26 15:56:43 +01:00
}