diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index 1b94201f2..eea012331 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -2,6 +2,7 @@ use std::{fmt, io}; use actix_web::http::StatusCode; use actix_web::{self as aweb, HttpResponseBuilder}; +use aweb::http::header; use aweb::rt::task::JoinError; use convert_case::Casing; use milli::heed::{Error as HeedError, MdbError}; @@ -56,7 +57,14 @@ where impl aweb::error::ResponseError for ResponseError { fn error_response(&self) -> aweb::HttpResponse { let json = serde_json::to_vec(self).unwrap(); - HttpResponseBuilder::new(self.status_code()).content_type("application/json").body(json) + let mut builder = HttpResponseBuilder::new(self.status_code()); + builder.content_type("application/json"); + + if self.code == StatusCode::SERVICE_UNAVAILABLE { + builder.insert_header((header::RETRY_AFTER, "10")); + } + + builder.body(json) } fn status_code(&self) -> StatusCode { @@ -305,6 +313,7 @@ MissingSwapIndexes , InvalidRequest , BAD_REQUEST ; MissingTaskFilters , InvalidRequest , BAD_REQUEST ; NoSpaceLeftOnDevice , System , UNPROCESSABLE_ENTITY; PayloadTooLarge , InvalidRequest , PAYLOAD_TOO_LARGE ; +TooManySearchRequests , System , SERVICE_UNAVAILABLE ; TaskNotFound , InvalidRequest , NOT_FOUND ; TooManyOpenFiles , System , UNPROCESSABLE_ENTITY ; TooManyVectors , InvalidRequest , BAD_REQUEST ; diff --git a/meilisearch/src/analytics/segment_analytics.rs b/meilisearch/src/analytics/segment_analytics.rs index 99298bd43..b334f651d 100644 --- a/meilisearch/src/analytics/segment_analytics.rs +++ b/meilisearch/src/analytics/segment_analytics.rs @@ -252,6 +252,7 @@ impl super::Analytics for SegmentAnalytics { struct Infos { env: String, experimental_enable_metrics: bool, + experimental_search_queue_size: usize, experimental_logs_mode: LogMode, experimental_replication_parameters: bool, experimental_enable_logs_route: bool, @@ -293,6 +294,7 @@ impl From for Infos { let Opt { db_path, experimental_enable_metrics, + experimental_search_queue_size, experimental_logs_mode, experimental_replication_parameters, experimental_enable_logs_route, @@ -342,6 +344,7 @@ impl From for Infos { Self { env, experimental_enable_metrics, + experimental_search_queue_size, experimental_logs_mode, experimental_replication_parameters, experimental_enable_logs_route, diff --git a/meilisearch/src/error.rs b/meilisearch/src/error.rs index a8351fd1f..5a0b04020 100644 --- a/meilisearch/src/error.rs +++ b/meilisearch/src/error.rs @@ -29,6 +29,10 @@ pub enum MeilisearchHttpError { InvalidExpression(&'static [&'static str], Value), #[error("A {0} payload is missing.")] MissingPayload(PayloadType), + #[error("Too many search requests running at the same time: {0}. Retry after 10s.")] + TooManySearchRequests(usize), + #[error("Internal error: Search limiter is down.")] + SearchLimiterIsDown, #[error("The provided payload reached the size limit. The maximum accepted payload size is {}.", Byte::from_bytes(*.0 as u64).get_appropriate_unit(true))] PayloadTooLarge(usize), #[error("Two indexes must be given for each swap. The list `[{}]` contains {} indexes.", @@ -69,6 +73,8 @@ impl ErrorCode for MeilisearchHttpError { MeilisearchHttpError::EmptyFilter => Code::InvalidDocumentFilter, MeilisearchHttpError::InvalidExpression(_, _) => Code::InvalidSearchFilter, MeilisearchHttpError::PayloadTooLarge(_) => Code::PayloadTooLarge, + MeilisearchHttpError::TooManySearchRequests(_) => Code::TooManySearchRequests, + MeilisearchHttpError::SearchLimiterIsDown => Code::Internal, MeilisearchHttpError::SwapIndexPayloadWrongLength(_) => Code::InvalidSwapIndexes, MeilisearchHttpError::IndexUid(e) => e.error_code(), MeilisearchHttpError::SerdeJson(_) => Code::Internal, diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 820f1ae42..bb7562c85 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -9,12 +9,14 @@ pub mod middleware; pub mod option; pub mod routes; pub mod search; +pub mod search_queue; use std::fs::File; use std::io::{BufReader, BufWriter}; +use std::num::NonZeroUsize; use std::path::Path; use std::sync::Arc; -use std::thread; +use std::thread::{self, available_parallelism}; use std::time::Duration; use actix_cors::Cors; @@ -38,6 +40,7 @@ use meilisearch_types::versioning::{check_version_file, create_version_file}; use meilisearch_types::{compression, milli, VERSION_FILE_NAME}; pub use option::Opt; use option::ScheduleSnapshot; +use search_queue::SearchQueue; use tracing::{error, info_span}; use tracing_subscriber::filter::Targets; @@ -469,10 +472,15 @@ pub fn configure_data( (logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle), analytics: Arc, ) { + let search_queue = SearchQueue::new( + opt.experimental_search_queue_size, + available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()), + ); let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize; config .app_data(index_scheduler) .app_data(auth) + .app_data(web::Data::new(search_queue)) .app_data(web::Data::from(analytics)) .app_data(web::Data::new(logs_route)) .app_data(web::Data::new(logs_stderr)) diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index 43bf2c62c..651af7336 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -54,6 +54,7 @@ const MEILI_EXPERIMENTAL_LOGS_MODE: &str = "MEILI_EXPERIMENTAL_LOGS_MODE"; const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS"; const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE"; const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS"; +const MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE: &str = "MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE"; const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str = "MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE"; const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str = @@ -344,6 +345,15 @@ pub struct Opt { #[serde(default)] pub experimental_enable_metrics: bool, + /// Experimental search queue size. For more information, see: + /// + /// Lets you customize the size of the search queue. Meilisearch processes your search requests as fast as possible but once the + /// queue is full it starts returning HTTP 503, Service Unavailable. + /// The default value is 1000. + #[clap(long, env = MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE, default_value_t = 1000)] + #[serde(default)] + pub experimental_search_queue_size: usize, + /// Experimental logs mode feature. For more information, see: /// /// Change the mode of the logs on the console. @@ -473,6 +483,7 @@ impl Opt { #[cfg(feature = "analytics")] no_analytics, experimental_enable_metrics, + experimental_search_queue_size, experimental_logs_mode, experimental_enable_logs_route, experimental_replication_parameters, @@ -532,6 +543,10 @@ impl Opt { MEILI_EXPERIMENTAL_ENABLE_METRICS, experimental_enable_metrics.to_string(), ); + export_to_env_if_not_present( + MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE, + experimental_search_queue_size.to_string(), + ); export_to_env_if_not_present( MEILI_EXPERIMENTAL_LOGS_MODE, experimental_logs_mode.to_string(), diff --git a/meilisearch/src/routes/indexes/facet_search.rs b/meilisearch/src/routes/indexes/facet_search.rs index a980fb278..272b8156f 100644 --- a/meilisearch/src/routes/indexes/facet_search.rs +++ b/meilisearch/src/routes/indexes/facet_search.rs @@ -17,6 +17,7 @@ use crate::search::{ DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET, }; +use crate::search_queue::SearchQueue; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(search))); @@ -48,6 +49,7 @@ pub struct FacetSearchQuery { pub async fn search( index_scheduler: GuardedData, Data>, + search_queue: Data, index_uid: web::Path, params: AwebJson, req: HttpRequest, @@ -71,6 +73,7 @@ pub async fn search( let index = index_scheduler.index(&index_uid)?; let features = index_scheduler.features(); + let _permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || { perform_facet_search(&index, search_query, facet_query, facet_name, features) }) diff --git a/meilisearch/src/routes/indexes/search.rs b/meilisearch/src/routes/indexes/search.rs index 8de2be13f..f16a6c4df 100644 --- a/meilisearch/src/routes/indexes/search.rs +++ b/meilisearch/src/routes/indexes/search.rs @@ -23,6 +23,7 @@ use crate::search::{ DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET, DEFAULT_SEMANTIC_RATIO, }; +use crate::search_queue::SearchQueue; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service( @@ -182,6 +183,7 @@ fn fix_sort_query_parameters(sort_query: &str) -> Vec { pub async fn search_with_url_query( index_scheduler: GuardedData, Data>, + search_queue: web::Data, index_uid: web::Path, params: AwebQueryParameter, req: HttpRequest, @@ -204,6 +206,7 @@ pub async fn search_with_url_query( let distribution = embed(&mut query, index_scheduler.get_ref(), &index)?; + let _permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || perform_search(&index, query, features, distribution)) .await?; @@ -220,6 +223,7 @@ pub async fn search_with_url_query( pub async fn search_with_post( index_scheduler: GuardedData, Data>, + search_queue: web::Data, index_uid: web::Path, params: AwebJson, req: HttpRequest, @@ -243,6 +247,7 @@ pub async fn search_with_post( let distribution = embed(&mut query, index_scheduler.get_ref(), &index)?; + let _permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || perform_search(&index, query, features, distribution)) .await?; diff --git a/meilisearch/src/routes/mod.rs b/meilisearch/src/routes/mod.rs index 1c1465582..7cf886017 100644 --- a/meilisearch/src/routes/mod.rs +++ b/meilisearch/src/routes/mod.rs @@ -15,6 +15,7 @@ use tracing::debug; use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; +use crate::search_queue::SearchQueue; use crate::Opt; const PAGINATION_DEFAULT_LIMIT: usize = 20; @@ -385,10 +386,12 @@ pub async fn get_health( req: HttpRequest, index_scheduler: Data, auth_controller: Data, + search_queue: Data, analytics: web::Data, ) -> Result { analytics.health_seen(&req); + search_queue.health().unwrap(); index_scheduler.health().unwrap(); auth_controller.health().unwrap(); diff --git a/meilisearch/src/routes/multi_search.rs b/meilisearch/src/routes/multi_search.rs index f54b8ae8f..b2055fb07 100644 --- a/meilisearch/src/routes/multi_search.rs +++ b/meilisearch/src/routes/multi_search.rs @@ -17,6 +17,7 @@ use crate::routes::indexes::search::embed; use crate::search::{ add_search_rules, perform_search, SearchQueryWithIndex, SearchResultWithIndex, }; +use crate::search_queue::SearchQueue; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(SeqHandler(multi_search_with_post)))); @@ -35,6 +36,7 @@ pub struct SearchQueries { pub async fn multi_search_with_post( index_scheduler: GuardedData, Data>, + search_queue: Data, params: AwebJson, req: HttpRequest, analytics: web::Data, @@ -44,6 +46,10 @@ pub async fn multi_search_with_post( let mut multi_aggregate = MultiSearchAggregator::from_queries(&queries, &req); let features = index_scheduler.features(); + // Since we don't want to process half of the search requests and then get a permit refused + // we're going to get one permit for the whole duration of the multi-search request. + let _permit = search_queue.try_get_search_permit().await?; + // Explicitly expect a `(ResponseError, usize)` for the error type rather than `ResponseError` only, // so that `?` doesn't work if it doesn't use `with_index`, ensuring that it is not forgotten in case of code // changes. diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs new file mode 100644 index 000000000..6d5044d20 --- /dev/null +++ b/meilisearch/src/search_queue.rs @@ -0,0 +1,130 @@ +//! This file implements a queue of searches to process and the ability to control how many searches can be run in parallel. +//! We need this because we don't want to process more search requests than we have cores. +//! That slows down everything and consumes RAM for no reason. +//! The steps to do a search are to get the `SearchQueue` data structure and try to get a search permit. +//! This can fail if the queue is full, and we need to drop your search request to register a new one. +//! +//! ### How to do a search request +//! +//! In order to do a search request you should try to get a search permit. +//! Retrieve the `SearchQueue` structure from actix-web (`search_queue: Data`) +//! and right before processing the search, calls the `SearchQueue::try_get_search_permit` method: `search_queue.try_get_search_permit().await?;` +//! +//! What is going to happen at this point is that you're going to send a oneshot::Sender over an async mpsc channel. +//! Then, the queue/scheduler is going to either: +//! - Drop your oneshot channel => that means there are too many searches going on, and yours won't be executed. +//! You should exit and free all the RAM you use ASAP. +//! - Sends you a Permit => that will unlock the method, and you will be able to process your search. +//! And should drop the Permit only once you have freed all the RAM consumed by the method. + +use std::num::NonZeroUsize; + +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use tokio::sync::{mpsc, oneshot}; + +use crate::error::MeilisearchHttpError; + +#[derive(Debug)] +pub struct SearchQueue { + sender: mpsc::Sender>, + capacity: usize, +} + +/// You should only run search requests while holding this permit. +/// Once it's dropped, a new search request will be able to process. +#[derive(Debug)] +pub struct Permit { + sender: mpsc::Sender<()>, +} + +impl Drop for Permit { + fn drop(&mut self) { + // if the channel is closed then the whole instance is down + let _ = futures::executor::block_on(self.sender.send(())); + } +} + +impl SearchQueue { + pub fn new(capacity: usize, paralellism: NonZeroUsize) -> Self { + // Search requests are going to wait until we're available anyway, + // so let's not allocate any RAM and keep a capacity of 1. + let (sender, receiver) = mpsc::channel(1); + + tokio::task::spawn(Self::run(capacity, paralellism, receiver)); + Self { sender, capacity } + } + + /// This function is the main loop, it's in charge on scheduling which search request should execute first and + /// how many should executes at the same time. + /// + /// It **must never** panic or exit. + async fn run( + capacity: usize, + parallelism: NonZeroUsize, + mut receive_new_searches: mpsc::Receiver>, + ) { + let mut queue: Vec> = Default::default(); + let mut rng: StdRng = StdRng::from_entropy(); + let mut searches_running: usize = 0; + // By having a capacity of parallelism we ensures that every time a search finish it can release its RAM asap + let (sender, mut search_finished) = mpsc::channel(parallelism.into()); + + loop { + tokio::select! { + // biased select because we wants to free up space before trying to register new tasks + biased; + _ = search_finished.recv() => { + searches_running = searches_running.saturating_sub(1); + if !queue.is_empty() { + // Can't panic: the queue wasn't empty thus the range isn't empty. + let remove = rng.gen_range(0..queue.len()); + let channel = queue.swap_remove(remove); + let _ = channel.send(Permit { sender: sender.clone() }); + } + }, + + search_request = receive_new_searches.recv() => { + // this unwrap is safe because we're sure the `SearchQueue` still lives somewhere in actix-web + let search_request = search_request.unwrap(); + if searches_running < usize::from(parallelism) && queue.is_empty() { + searches_running += 1; + // if the search requests die it's not a hard error on our side + let _ = search_request.send(Permit { sender: sender.clone() }); + continue; + } else if capacity == 0 { + // in the very specific case where we have a capacity of zero + // we must refuse the request straight away without going through + // the queue stuff. + drop(search_request); + continue; + + } else if queue.len() >= capacity { + let remove = rng.gen_range(0..queue.len()); + let thing = queue.swap_remove(remove); // this will drop the channel and notify the search that it won't be processed + drop(thing); + } + queue.push(search_request); + }, + } + } + } + + /// Returns a search `Permit`. + /// It should be dropped as soon as you've freed all the RAM associated with the search request being processed. + pub async fn try_get_search_permit(&self) -> Result { + let (sender, receiver) = oneshot::channel(); + self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; + receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity)) + } + + /// Returns `Ok(())` if everything seems normal. + /// Returns `Err(MeilisearchHttpError::SearchLimiterIsDown)` if the search limiter seems down. + pub fn health(&self) -> Result<(), MeilisearchHttpError> { + if self.sender.is_closed() { + Err(MeilisearchHttpError::SearchLimiterIsDown) + } else { + Ok(()) + } + } +} diff --git a/meilisearch/tests/search/mod.rs b/meilisearch/tests/search/mod.rs index 88470187a..e5925d77e 100644 --- a/meilisearch/tests/search/mod.rs +++ b/meilisearch/tests/search/mod.rs @@ -10,6 +10,7 @@ mod hybrid; mod multi; mod pagination; mod restrict_searchable; +mod search_queue; use once_cell::sync::Lazy; diff --git a/meilisearch/tests/search/search_queue.rs b/meilisearch/tests/search/search_queue.rs new file mode 100644 index 000000000..3b4fbf252 --- /dev/null +++ b/meilisearch/tests/search/search_queue.rs @@ -0,0 +1,184 @@ +use std::num::NonZeroUsize; +use std::sync::Arc; +use std::time::Duration; + +use actix_web::ResponseError; +use meili_snap::snapshot; +use meilisearch::search_queue::SearchQueue; + +#[actix_rt::test] +async fn search_queue_register() { + let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap()); + + // First, use all the cores + let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a permit straight away") + .unwrap(); + let _permit2 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + // If we free one spot we should be able to register one new search + drop(permit1); + + let permit3 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + // And again + drop(permit3); + + let _permit4 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a permit straight away") + .unwrap(); +} + +#[actix_rt::test] +async fn wait_till_cores_are_available() { + let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap())); + + // First, use all the cores + let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + let ret = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()).await; + assert!(ret.is_err(), "The capacity is full, we should not get a permit"); + + let q = queue.clone(); + let task = tokio::task::spawn(async move { q.try_get_search_permit().await }); + + // after dropping a permit the previous task should be able to finish + drop(permit1); + let _permit2 = tokio::time::timeout(Duration::from_secs(1), task) + .await + .expect("I should get a permit straight away") + .unwrap(); +} + +#[actix_rt::test] +async fn refuse_search_requests_when_queue_is_full() { + let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap())); + + // First, use the whole capacity of the + let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + let q = queue.clone(); + let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await }); + + // Here the queue is full. By registering two new search requests the permit 2 and 3 should be thrown out + let q = queue.clone(); + let _permit3 = tokio::task::spawn(async move { q.try_get_search_permit().await }); + + let permit2 = tokio::time::timeout(Duration::from_secs(1), permit2) + .await + .expect("I should get a result straight away") + .unwrap(); // task should end successfully + + let err = meilisearch_types::error::ResponseError::from(permit2.unwrap_err()); + let http_response = err.error_response(); + let mut headers: Vec<_> = http_response + .headers() + .iter() + .map(|(name, value)| (name.to_string(), value.to_str().unwrap().to_string())) + .collect(); + headers.sort(); + snapshot!(format!("{headers:?}"), @r###"[("content-type", "application/json"), ("retry-after", "10")]"###); + + let err = serde_json::to_string_pretty(&err).unwrap(); + snapshot!(err, @r###" + { + "message": "Too many search requests running at the same time: 1. Retry after 10s.", + "code": "too_many_search_requests", + "type": "system", + "link": "https://docs.meilisearch.com/errors#too_many_search_requests" + } + "###); +} + +#[actix_rt::test] +async fn search_request_crashes_while_holding_permits() { + let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap())); + + let (send, recv) = tokio::sync::oneshot::channel(); + + // This first request take a cpu + let q = queue.clone(); + tokio::task::spawn(async move { + let _permit = q.try_get_search_permit().await.unwrap(); + recv.await.unwrap(); + panic!("oops an unexpected crash happened") + }); + + // This second request waits in the queue till the first request finishes + let q = queue.clone(); + let task = tokio::task::spawn(async move { + let _permit = q.try_get_search_permit().await.unwrap(); + }); + + // By sending something in the channel the request holding a CPU will panic and should lose its permit + send.send(()).unwrap(); + + // Then the second request should be able to process and finishes correctly without panic + tokio::time::timeout(Duration::from_secs(1), task) + .await + .expect("I should get a permit straight away") + .unwrap(); + + // I should even be able to take second permit here + let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a permit straight away") + .unwrap(); +} + +#[actix_rt::test] +async fn works_with_capacity_of_zero() { + let queue = Arc::new(SearchQueue::new(0, NonZeroUsize::new(1).unwrap())); + + // First, use the whole capacity of the + let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + // then we should get an error if we try to register a second search request. + let permit2 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a result straight away"); + + let err = meilisearch_types::error::ResponseError::from(permit2.unwrap_err()); + let http_response = err.error_response(); + let mut headers: Vec<_> = http_response + .headers() + .iter() + .map(|(name, value)| (name.to_string(), value.to_str().unwrap().to_string())) + .collect(); + headers.sort(); + snapshot!(format!("{headers:?}"), @r###"[("content-type", "application/json"), ("retry-after", "10")]"###); + + let err = serde_json::to_string_pretty(&err).unwrap(); + snapshot!(err, @r###" + { + "message": "Too many search requests running at the same time: 0. Retry after 10s.", + "code": "too_many_search_requests", + "type": "system", + "link": "https://docs.meilisearch.com/errors#too_many_search_requests" + } + "###); + + drop(permit1); + // After dropping the first permit we should be able to get a new permit + let _permit3 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a permit straight away") + .unwrap(); +}