4898: Explicitely drop the search permits r=ManyTheFish a=irevoire

# Pull Request

## Related issue
May be related to #4654 and https://github.com/meilisearch/meilisearch-support/issues/350

## What does this PR do?
- Stop spawning a tokio task that is not immediately scheduled and instead explicitly drop the search permit

This should make new search requests to be scheduled quicker than before and reduce the general load on tokio

Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2024-08-28 13:33:38 +00:00 committed by GitHub
commit 541a23b17a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 46 additions and 7 deletions

View File

@ -81,7 +81,7 @@ pub async fn search(
let index = index_scheduler.index(&index_uid)?; let index = index_scheduler.index(&index_uid)?;
let features = index_scheduler.features(); let features = index_scheduler.features();
let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?; let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?;
let _permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_facet_search( perform_facet_search(
&index, &index,
@ -93,7 +93,9 @@ pub async fn search(
locales, locales,
) )
}) })
.await?; .await;
permit.drop().await;
let search_result = search_result?;
if let Ok(ref search_result) = search_result { if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result); aggregate.succeed(search_result);

View File

@ -233,11 +233,13 @@ pub async fn search_with_url_query(
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?;
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?; let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?;
let _permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features()) perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features())
}) })
.await?; .await;
permit.drop().await;
let search_result = search_result?;
if let Ok(ref search_result) = search_result { if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result); aggregate.succeed(search_result);
} }
@ -276,11 +278,13 @@ pub async fn search_with_post(
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?;
let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?; let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?;
let _permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features()) perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features())
}) })
.await?; .await;
permit.drop().await;
let search_result = search_result?;
if let Ok(ref search_result) = search_result { if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result); aggregate.succeed(search_result);
if search_result.degraded { if search_result.degraded {

View File

@ -39,7 +39,7 @@ pub async fn multi_search_with_post(
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
// Since we don't want to process half of the search requests and then get a permit refused // Since we don't want to process half of the search requests and then get a permit refused
// we're going to get one permit for the whole duration of the multi-search request. // we're going to get one permit for the whole duration of the multi-search request.
let _permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let federated_search = params.into_inner(); let federated_search = params.into_inner();
@ -81,6 +81,7 @@ pub async fn multi_search_with_post(
perform_federated_search(&index_scheduler, queries, federation, features) perform_federated_search(&index_scheduler, queries, federation, features)
}) })
.await; .await;
permit.drop().await;
if let Ok(Ok(_)) = search_result { if let Ok(Ok(_)) = search_result {
multi_aggregate.succeed(); multi_aggregate.succeed();
@ -143,6 +144,7 @@ pub async fn multi_search_with_post(
Ok(search_results) Ok(search_results)
} }
.await; .await;
permit.drop().await;
if search_results.is_ok() { if search_results.is_ok() {
multi_aggregate.succeed(); multi_aggregate.succeed();

View File

@ -33,12 +33,24 @@ pub struct SearchQueue {
/// You should only run search requests while holding this permit. /// You should only run search requests while holding this permit.
/// Once it's dropped, a new search request will be able to process. /// Once it's dropped, a new search request will be able to process.
/// You should always try to drop the permit yourself calling the `drop` async method on it.
#[derive(Debug)] #[derive(Debug)]
pub struct Permit { pub struct Permit {
sender: mpsc::Sender<()>, sender: mpsc::Sender<()>,
} }
impl Permit {
/// Drop the permit giving back on permit to the search queue.
pub async fn drop(self) {
// if the channel is closed then the whole instance is down
let _ = self.sender.send(()).await;
}
}
impl Drop for Permit { impl Drop for Permit {
/// The implicit drop implementation can still be called in multiple cases:
/// - We forgot to call the explicit one somewhere => this should be fixed on our side asap
/// - The future is cancelled while running and the permit dropped with it
fn drop(&mut self) { fn drop(&mut self) {
let sender = self.sender.clone(); let sender = self.sender.clone();
// if the channel is closed then the whole instance is down // if the channel is closed then the whole instance is down

View File

@ -37,6 +37,25 @@ async fn search_queue_register() {
.unwrap(); .unwrap();
} }
#[actix_rt::test]
async fn search_queue_register_with_explicit_drop() {
let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap());
// First, use all the cores
let permit1 = queue.try_get_search_permit().await.unwrap();
let _permit2 = queue.try_get_search_permit().await.unwrap();
// If we free one spot we should be able to register one new search
permit1.drop().await;
let permit3 = queue.try_get_search_permit().await.unwrap();
// And again
permit3.drop().await;
let _permit4 = queue.try_get_search_permit().await.unwrap();
}
#[actix_rt::test] #[actix_rt::test]
async fn wait_till_cores_are_available() { async fn wait_till_cores_are_available() {
let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap())); let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));