mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-11 05:54:30 +01:00
explicitely drop the search permit
This commit is contained in:
parent
42e7499260
commit
92b151607c
@ -81,7 +81,7 @@ pub async fn search(
|
|||||||
let index = index_scheduler.index(&index_uid)?;
|
let index = index_scheduler.index(&index_uid)?;
|
||||||
let features = index_scheduler.features();
|
let features = index_scheduler.features();
|
||||||
let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?;
|
let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?;
|
||||||
let _permit = search_queue.try_get_search_permit().await?;
|
let permit = search_queue.try_get_search_permit().await?;
|
||||||
let search_result = tokio::task::spawn_blocking(move || {
|
let search_result = tokio::task::spawn_blocking(move || {
|
||||||
perform_facet_search(
|
perform_facet_search(
|
||||||
&index,
|
&index,
|
||||||
@ -94,6 +94,7 @@ pub async fn search(
|
|||||||
)
|
)
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
permit.drop().await;
|
||||||
|
|
||||||
if let Ok(ref search_result) = search_result {
|
if let Ok(ref search_result) = search_result {
|
||||||
aggregate.succeed(search_result);
|
aggregate.succeed(search_result);
|
||||||
|
@ -233,11 +233,12 @@ pub async fn search_with_url_query(
|
|||||||
|
|
||||||
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?;
|
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?;
|
||||||
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?;
|
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?;
|
||||||
let _permit = search_queue.try_get_search_permit().await?;
|
let permit = search_queue.try_get_search_permit().await?;
|
||||||
let search_result = tokio::task::spawn_blocking(move || {
|
let search_result = tokio::task::spawn_blocking(move || {
|
||||||
perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features())
|
perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features())
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
permit.drop().await;
|
||||||
if let Ok(ref search_result) = search_result {
|
if let Ok(ref search_result) = search_result {
|
||||||
aggregate.succeed(search_result);
|
aggregate.succeed(search_result);
|
||||||
}
|
}
|
||||||
@ -276,11 +277,12 @@ pub async fn search_with_post(
|
|||||||
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?;
|
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?;
|
||||||
let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?;
|
let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?;
|
||||||
|
|
||||||
let _permit = search_queue.try_get_search_permit().await?;
|
let permit = search_queue.try_get_search_permit().await?;
|
||||||
let search_result = tokio::task::spawn_blocking(move || {
|
let search_result = tokio::task::spawn_blocking(move || {
|
||||||
perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features())
|
perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features())
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
permit.drop().await;
|
||||||
if let Ok(ref search_result) = search_result {
|
if let Ok(ref search_result) = search_result {
|
||||||
aggregate.succeed(search_result);
|
aggregate.succeed(search_result);
|
||||||
if search_result.degraded {
|
if search_result.degraded {
|
||||||
|
@ -39,7 +39,7 @@ pub async fn multi_search_with_post(
|
|||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
// Since we don't want to process half of the search requests and then get a permit refused
|
// Since we don't want to process half of the search requests and then get a permit refused
|
||||||
// we're going to get one permit for the whole duration of the multi-search request.
|
// we're going to get one permit for the whole duration of the multi-search request.
|
||||||
let _permit = search_queue.try_get_search_permit().await?;
|
let permit = search_queue.try_get_search_permit().await?;
|
||||||
|
|
||||||
let federated_search = params.into_inner();
|
let federated_search = params.into_inner();
|
||||||
|
|
||||||
@ -162,6 +162,7 @@ pub async fn multi_search_with_post(
|
|||||||
HttpResponse::Ok().json(SearchResults { results: search_results })
|
HttpResponse::Ok().json(SearchResults { results: search_results })
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
permit.drop().await;
|
||||||
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
@ -33,11 +33,20 @@ pub struct SearchQueue {
|
|||||||
|
|
||||||
/// You should only run search requests while holding this permit.
|
/// You should only run search requests while holding this permit.
|
||||||
/// Once it's dropped, a new search request will be able to process.
|
/// Once it's dropped, a new search request will be able to process.
|
||||||
|
/// You should always try to drop the permit yourself calling the `drop` async method on it.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Permit {
|
pub struct Permit {
|
||||||
sender: mpsc::Sender<()>,
|
sender: mpsc::Sender<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Permit {
|
||||||
|
/// Drop the permit giving back on permit to the search queue.
|
||||||
|
pub async fn drop(self) {
|
||||||
|
// if the channel is closed then the whole instance is down
|
||||||
|
let _ = self.sender.send(()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Drop for Permit {
|
impl Drop for Permit {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let sender = self.sender.clone();
|
let sender = self.sender.clone();
|
||||||
|
@ -37,6 +37,25 @@ async fn search_queue_register() {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn search_queue_register_with_explicit_drop() {
|
||||||
|
let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap());
|
||||||
|
|
||||||
|
// First, use all the cores
|
||||||
|
let permit1 = queue.try_get_search_permit().await.unwrap();
|
||||||
|
let _permit2 = queue.try_get_search_permit().await.unwrap();
|
||||||
|
|
||||||
|
// If we free one spot we should be able to register one new search
|
||||||
|
permit1.drop().await;
|
||||||
|
|
||||||
|
let permit3 = queue.try_get_search_permit().await.unwrap();
|
||||||
|
|
||||||
|
// And again
|
||||||
|
permit3.drop().await;
|
||||||
|
|
||||||
|
let _permit4 = queue.try_get_search_permit().await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn wait_till_cores_are_available() {
|
async fn wait_till_cores_are_available() {
|
||||||
let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));
|
let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user