explicitely drop the search permit

This commit is contained in:
Tamo 2024-08-28 14:29:25 +02:00
parent 9eee467226
commit 1b90e6ce5f
5 changed files with 36 additions and 4 deletions

View File

@ -81,7 +81,7 @@ pub async fn search(
let index = index_scheduler.index(&index_uid)?; let index = index_scheduler.index(&index_uid)?;
let features = index_scheduler.features(); let features = index_scheduler.features();
let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?; let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?;
let _permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_facet_search( perform_facet_search(
&index, &index,
@ -94,6 +94,7 @@ pub async fn search(
) )
}) })
.await?; .await?;
permit.drop().await;
if let Ok(ref search_result) = search_result { if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result); aggregate.succeed(search_result);

View File

@ -233,11 +233,12 @@ pub async fn search_with_url_query(
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?;
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?; let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?;
let _permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features()) perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features())
}) })
.await?; .await?;
permit.drop().await;
if let Ok(ref search_result) = search_result { if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result); aggregate.succeed(search_result);
} }
@ -276,11 +277,12 @@ pub async fn search_with_post(
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?;
let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?; let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?;
let _permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features()) perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features())
}) })
.await?; .await?;
permit.drop().await;
if let Ok(ref search_result) = search_result { if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result); aggregate.succeed(search_result);
if search_result.degraded { if search_result.degraded {

View File

@ -39,7 +39,7 @@ pub async fn multi_search_with_post(
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
// Since we don't want to process half of the search requests and then get a permit refused // Since we don't want to process half of the search requests and then get a permit refused
// we're going to get one permit for the whole duration of the multi-search request. // we're going to get one permit for the whole duration of the multi-search request.
let _permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let federated_search = params.into_inner(); let federated_search = params.into_inner();
@ -162,6 +162,7 @@ pub async fn multi_search_with_post(
HttpResponse::Ok().json(SearchResults { results: search_results }) HttpResponse::Ok().json(SearchResults { results: search_results })
} }
}; };
permit.drop().await;
Ok(response) Ok(response)
} }

View File

@ -33,11 +33,20 @@ pub struct SearchQueue {
/// You should only run search requests while holding this permit. /// You should only run search requests while holding this permit.
/// Once it's dropped, a new search request will be able to process. /// Once it's dropped, a new search request will be able to process.
/// You should always try to drop the permit yourself calling the `drop` async method on it.
#[derive(Debug)] #[derive(Debug)]
pub struct Permit { pub struct Permit {
sender: mpsc::Sender<()>, sender: mpsc::Sender<()>,
} }
impl Permit {
/// Drop the permit giving back on permit to the search queue.
pub async fn drop(self) {
// if the channel is closed then the whole instance is down
let _ = self.sender.send(()).await;
}
}
impl Drop for Permit { impl Drop for Permit {
fn drop(&mut self) { fn drop(&mut self) {
let sender = self.sender.clone(); let sender = self.sender.clone();

View File

@ -37,6 +37,25 @@ async fn search_queue_register() {
.unwrap(); .unwrap();
} }
#[actix_rt::test]
async fn search_queue_register_with_explicit_drop() {
let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap());
// First, use all the cores
let permit1 = queue.try_get_search_permit().await.unwrap();
let _permit2 = queue.try_get_search_permit().await.unwrap();
// If we free one spot we should be able to register one new search
permit1.drop().await;
let permit3 = queue.try_get_search_permit().await.unwrap();
// And again
permit3.drop().await;
let _permit4 = queue.try_get_search_permit().await.unwrap();
}
#[actix_rt::test] #[actix_rt::test]
async fn wait_till_cores_are_available() { async fn wait_till_cores_are_available() {
let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap())); let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));