stop trying to process searches after one minute

This commit is contained in:
Tamo 2024-08-28 19:01:54 +02:00
parent 541a23b17a
commit 11bee34bf0
2 changed files with 42 additions and 2 deletions

View File

@ -18,6 +18,7 @@
//! And should drop the Permit only once you have freed all the RAM consumed by the method. //! And should drop the Permit only once you have freed all the RAM consumed by the method.
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use std::time::Duration;
use rand::rngs::StdRng; use rand::rngs::StdRng;
use rand::{Rng, SeedableRng}; use rand::{Rng, SeedableRng};
@ -29,6 +30,9 @@ use crate::error::MeilisearchHttpError;
pub struct SearchQueue { pub struct SearchQueue {
sender: mpsc::Sender<oneshot::Sender<Permit>>, sender: mpsc::Sender<oneshot::Sender<Permit>>,
capacity: usize, capacity: usize,
/// If we have waited longer than this to get a permit, we should abort the search request entirely.
/// The client probably already closed the connection, but we have no way to find out.
time_to_abort: Duration,
} }
/// You should only run search requests while holding this permit. /// You should only run search requests while holding this permit.
@ -65,7 +69,11 @@ impl SearchQueue {
let (sender, receiver) = mpsc::channel(1); let (sender, receiver) = mpsc::channel(1);
tokio::task::spawn(Self::run(capacity, paralellism, receiver)); tokio::task::spawn(Self::run(capacity, paralellism, receiver));
Self { sender, capacity } Self { sender, capacity, time_to_abort: Duration::from_secs(60) }
}
pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self {
Self { time_to_abort, ..self }
} }
/// This function is the main loop, it's in charge on scheduling which search request should execute first and /// This function is the main loop, it's in charge on scheduling which search request should execute first and
@ -131,9 +139,23 @@ impl SearchQueue {
/// Returns a search `Permit`. /// Returns a search `Permit`.
/// It should be dropped as soon as you've freed all the RAM associated with the search request being processed. /// It should be dropped as soon as you've freed all the RAM associated with the search request being processed.
pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> { pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> {
let now = std::time::Instant::now();
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?;
receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity)) let permit = receiver
.await
.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))?;
// If we've been for more than one minute to get a search permit, it's better to simply
// abort the search request than spending time processing something were the client
// most certainly exited or got a timeout a long time ago.
// We may find a better solution in https://github.com/actix/actix-web/issues/3462.
if now.elapsed() > self.time_to_abort {
permit.drop().await;
Err(MeilisearchHttpError::TooManySearchRequests(self.capacity))
} else {
Ok(permit)
}
} }
/// Returns `Ok(())` if everything seems normal. /// Returns `Ok(())` if everything seems normal.

View File

@ -56,6 +56,24 @@ async fn search_queue_register_with_explicit_drop() {
let _permit4 = queue.try_get_search_permit().await.unwrap(); let _permit4 = queue.try_get_search_permit().await.unwrap();
} }
#[actix_rt::test]
async fn search_queue_register_with_time_to_abort() {
let queue = Arc::new(
SearchQueue::new(1, NonZeroUsize::new(1).unwrap())
.with_time_to_abort(Duration::from_secs(1)),
);
// First, use all the cores
let permit1 = queue.try_get_search_permit().await.unwrap();
let q = queue.clone();
let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await });
tokio::time::sleep(Duration::from_secs(1)).await;
permit1.drop().await;
let ret = permit2.await.unwrap();
snapshot!(ret.unwrap_err(), @"Too many search requests running at the same time: 1. Retry after 10s.");
}
#[actix_rt::test] #[actix_rt::test]
async fn wait_till_cores_are_available() { async fn wait_till_cores_are_available() {
let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap())); let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));