From 1b90e6ce5fabcd9f777f9d28c6705c46366942d3 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 28 Aug 2024 14:29:25 +0200 Subject: [PATCH 1/3] explicitely drop the search permit --- .../src/routes/indexes/facet_search.rs | 3 ++- meilisearch/src/routes/indexes/search.rs | 6 ++++-- meilisearch/src/routes/multi_search.rs | 3 ++- meilisearch/src/search_queue.rs | 9 +++++++++ meilisearch/tests/search/search_queue.rs | 19 +++++++++++++++++++ 5 files changed, 36 insertions(+), 4 deletions(-) diff --git a/meilisearch/src/routes/indexes/facet_search.rs b/meilisearch/src/routes/indexes/facet_search.rs index a648987ca..b1e57b865 100644 --- a/meilisearch/src/routes/indexes/facet_search.rs +++ b/meilisearch/src/routes/indexes/facet_search.rs @@ -81,7 +81,7 @@ pub async fn search( let index = index_scheduler.index(&index_uid)?; let features = index_scheduler.features(); let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?; - let _permit = search_queue.try_get_search_permit().await?; + let permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || { perform_facet_search( &index, @@ -94,6 +94,7 @@ pub async fn search( ) }) .await?; + permit.drop().await; if let Ok(ref search_result) = search_result { aggregate.succeed(search_result); diff --git a/meilisearch/src/routes/indexes/search.rs b/meilisearch/src/routes/indexes/search.rs index e60f95948..89647c243 100644 --- a/meilisearch/src/routes/indexes/search.rs +++ b/meilisearch/src/routes/indexes/search.rs @@ -233,11 +233,12 @@ pub async fn search_with_url_query( let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?; - let _permit = search_queue.try_get_search_permit().await?; + let permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || { perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features()) }) .await?; + permit.drop().await; if let Ok(ref search_result) = search_result { aggregate.succeed(search_result); } @@ -276,11 +277,12 @@ pub async fn search_with_post( let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?; - let _permit = search_queue.try_get_search_permit().await?; + let permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || { perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features()) }) .await?; + permit.drop().await; if let Ok(ref search_result) = search_result { aggregate.succeed(search_result); if search_result.degraded { diff --git a/meilisearch/src/routes/multi_search.rs b/meilisearch/src/routes/multi_search.rs index b8822488f..b2ce05298 100644 --- a/meilisearch/src/routes/multi_search.rs +++ b/meilisearch/src/routes/multi_search.rs @@ -39,7 +39,7 @@ pub async fn multi_search_with_post( ) -> Result { // Since we don't want to process half of the search requests and then get a permit refused // we're going to get one permit for the whole duration of the multi-search request. - let _permit = search_queue.try_get_search_permit().await?; + let permit = search_queue.try_get_search_permit().await?; let federated_search = params.into_inner(); @@ -162,6 +162,7 @@ pub async fn multi_search_with_post( HttpResponse::Ok().json(SearchResults { results: search_results }) } }; + permit.drop().await; Ok(response) } diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index 4f6dccc42..71833dd24 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -33,11 +33,20 @@ pub struct SearchQueue { /// You should only run search requests while holding this permit. /// Once it's dropped, a new search request will be able to process. +/// You should always try to drop the permit yourself calling the `drop` async method on it. #[derive(Debug)] pub struct Permit { sender: mpsc::Sender<()>, } +impl Permit { + /// Drop the permit giving back on permit to the search queue. + pub async fn drop(self) { + // if the channel is closed then the whole instance is down + let _ = self.sender.send(()).await; + } +} + impl Drop for Permit { fn drop(&mut self) { let sender = self.sender.clone(); diff --git a/meilisearch/tests/search/search_queue.rs b/meilisearch/tests/search/search_queue.rs index 3b4fbf252..cbd16097f 100644 --- a/meilisearch/tests/search/search_queue.rs +++ b/meilisearch/tests/search/search_queue.rs @@ -37,6 +37,25 @@ async fn search_queue_register() { .unwrap(); } +#[actix_rt::test] +async fn search_queue_register_with_explicit_drop() { + let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap()); + + // First, use all the cores + let permit1 = queue.try_get_search_permit().await.unwrap(); + let _permit2 = queue.try_get_search_permit().await.unwrap(); + + // If we free one spot we should be able to register one new search + permit1.drop().await; + + let permit3 = queue.try_get_search_permit().await.unwrap(); + + // And again + permit3.drop().await; + + let _permit4 = queue.try_get_search_permit().await.unwrap(); +} + #[actix_rt::test] async fn wait_till_cores_are_available() { let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap())); From 241746e7f4056ae075eaf0424b2be8900f81a341 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 28 Aug 2024 14:37:55 +0200 Subject: [PATCH 2/3] add a warning to help us find when we forget to drop explicitely drop a permit --- meilisearch/src/search_queue.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index 71833dd24..44a66ff82 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -49,6 +49,7 @@ impl Permit { impl Drop for Permit { fn drop(&mut self) { + tracing::warn!("Internal error, a search permit was lazily dropped. If you see this message, please open an issue on the meilisearch repository at "); let sender = self.sender.clone(); // if the channel is closed then the whole instance is down std::mem::drop(tokio::spawn(async move { sender.send(()).await })); From 69ab09c149fedb3f1b337c7419211f940f722c33 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 28 Aug 2024 15:17:10 +0200 Subject: [PATCH 3/3] ensure we never early exit when we have a permit and remove the warning when we implicitely drop a permit --- meilisearch/src/routes/indexes/facet_search.rs | 3 ++- meilisearch/src/routes/indexes/search.rs | 6 ++++-- meilisearch/src/routes/multi_search.rs | 3 ++- meilisearch/src/search_queue.rs | 4 +++- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/meilisearch/src/routes/indexes/facet_search.rs b/meilisearch/src/routes/indexes/facet_search.rs index b1e57b865..1df80711d 100644 --- a/meilisearch/src/routes/indexes/facet_search.rs +++ b/meilisearch/src/routes/indexes/facet_search.rs @@ -93,8 +93,9 @@ pub async fn search( locales, ) }) - .await?; + .await; permit.drop().await; + let search_result = search_result?; if let Ok(ref search_result) = search_result { aggregate.succeed(search_result); diff --git a/meilisearch/src/routes/indexes/search.rs b/meilisearch/src/routes/indexes/search.rs index 89647c243..362bc9937 100644 --- a/meilisearch/src/routes/indexes/search.rs +++ b/meilisearch/src/routes/indexes/search.rs @@ -237,8 +237,9 @@ pub async fn search_with_url_query( let search_result = tokio::task::spawn_blocking(move || { perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features()) }) - .await?; + .await; permit.drop().await; + let search_result = search_result?; if let Ok(ref search_result) = search_result { aggregate.succeed(search_result); } @@ -281,8 +282,9 @@ pub async fn search_with_post( let search_result = tokio::task::spawn_blocking(move || { perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features()) }) - .await?; + .await; permit.drop().await; + let search_result = search_result?; if let Ok(ref search_result) = search_result { aggregate.succeed(search_result); if search_result.degraded { diff --git a/meilisearch/src/routes/multi_search.rs b/meilisearch/src/routes/multi_search.rs index b2ce05298..5fcb868c6 100644 --- a/meilisearch/src/routes/multi_search.rs +++ b/meilisearch/src/routes/multi_search.rs @@ -81,6 +81,7 @@ pub async fn multi_search_with_post( perform_federated_search(&index_scheduler, queries, federation, features) }) .await; + permit.drop().await; if let Ok(Ok(_)) = search_result { multi_aggregate.succeed(); @@ -143,6 +144,7 @@ pub async fn multi_search_with_post( Ok(search_results) } .await; + permit.drop().await; if search_results.is_ok() { multi_aggregate.succeed(); @@ -162,7 +164,6 @@ pub async fn multi_search_with_post( HttpResponse::Ok().json(SearchResults { results: search_results }) } }; - permit.drop().await; Ok(response) } diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index 44a66ff82..ecdaaf3ff 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -48,8 +48,10 @@ impl Permit { } impl Drop for Permit { + /// The implicit drop implementation can still be called in multiple cases: + /// - We forgot to call the explicit one somewhere => this should be fixed on our side asap + /// - The future is cancelled while running and the permit dropped with it fn drop(&mut self) { - tracing::warn!("Internal error, a search permit was lazily dropped. If you see this message, please open an issue on the meilisearch repository at "); let sender = self.sender.clone(); // if the channel is closed then the whole instance is down std::mem::drop(tokio::spawn(async move { sender.send(()).await }));