2024-03-26 17:53:37 +01:00
|
|
|
use std::num::NonZeroUsize;
|
2024-03-26 15:56:43 +01:00
|
|
|
|
|
|
|
use rand::{rngs::StdRng, Rng, SeedableRng};
|
|
|
|
use tokio::sync::{mpsc, oneshot};
|
|
|
|
|
|
|
|
use crate::error::MeilisearchHttpError;
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct SearchQueue {
|
|
|
|
sender: mpsc::Sender<oneshot::Sender<Permit>>,
|
|
|
|
capacity: usize,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct Permit {
|
|
|
|
sender: mpsc::Sender<()>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for Permit {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
// if the channel is closed then the whole instance is down
|
|
|
|
let _ = futures::executor::block_on(self.sender.send(()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl SearchQueue {
|
2024-03-26 17:28:03 +01:00
|
|
|
pub fn new(capacity: usize, paralellism: NonZeroUsize) -> Self {
|
2024-03-26 17:53:37 +01:00
|
|
|
// Search requests are going to wait until we're available anyway,
|
|
|
|
// so let's not allocate any RAM and keep a capacity of 1.
|
2024-03-26 15:56:43 +01:00
|
|
|
let (sender, receiver) = mpsc::channel(1);
|
2024-03-26 17:53:37 +01:00
|
|
|
|
2024-03-26 15:56:43 +01:00
|
|
|
tokio::task::spawn(Self::run(capacity, paralellism, receiver));
|
|
|
|
Self { sender, capacity }
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn run(
|
|
|
|
capacity: usize,
|
2024-03-26 17:28:03 +01:00
|
|
|
parallelism: NonZeroUsize,
|
2024-03-26 15:56:43 +01:00
|
|
|
mut receive_new_searches: mpsc::Receiver<oneshot::Sender<Permit>>,
|
|
|
|
) {
|
|
|
|
let mut queue: Vec<oneshot::Sender<Permit>> = Default::default();
|
|
|
|
let mut rng: StdRng = StdRng::from_entropy();
|
2024-03-26 17:28:03 +01:00
|
|
|
let mut searches_running: usize = 0;
|
2024-03-26 15:56:43 +01:00
|
|
|
// by having a capacity of parallelism we ensures that every time a search finish it can release its RAM asap
|
2024-03-26 17:28:03 +01:00
|
|
|
let (sender, mut search_finished) = mpsc::channel(parallelism.into());
|
2024-03-26 15:56:43 +01:00
|
|
|
|
|
|
|
loop {
|
|
|
|
tokio::select! {
|
|
|
|
search_request = receive_new_searches.recv() => {
|
2024-03-26 19:04:39 +01:00
|
|
|
// this unwrap is safe because we're sure the `SearchQueue` still lives somewhere in actix-web
|
2024-03-26 15:56:43 +01:00
|
|
|
let search_request = search_request.unwrap();
|
2024-03-26 17:28:03 +01:00
|
|
|
if searches_running < usize::from(parallelism) && queue.is_empty() {
|
2024-03-26 15:56:43 +01:00
|
|
|
searches_running += 1;
|
|
|
|
// if the search requests die it's not a hard error on our side
|
|
|
|
let _ = search_request.send(Permit { sender: sender.clone() });
|
|
|
|
continue;
|
2024-03-26 19:04:39 +01:00
|
|
|
} else if capacity == 0 {
|
|
|
|
// in the very specific case where we have a capacity of zero
|
|
|
|
// we must refuse the request straight away without going through
|
|
|
|
// the queue stuff.
|
|
|
|
drop(search_request);
|
|
|
|
continue;
|
|
|
|
|
|
|
|
} else if queue.len() >= capacity {
|
2024-03-26 15:56:43 +01:00
|
|
|
let remove = rng.gen_range(0..queue.len());
|
|
|
|
let thing = queue.swap_remove(remove); // this will drop the channel and notify the search that it won't be processed
|
|
|
|
drop(thing);
|
|
|
|
}
|
|
|
|
queue.push(search_request);
|
|
|
|
},
|
|
|
|
_ = search_finished.recv() => {
|
|
|
|
searches_running = searches_running.saturating_sub(1);
|
|
|
|
if !queue.is_empty() {
|
|
|
|
let remove = rng.gen_range(0..queue.len());
|
|
|
|
let channel = queue.swap_remove(remove);
|
|
|
|
let _ = channel.send(Permit { sender: sender.clone() });
|
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-03-26 17:28:03 +01:00
|
|
|
pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> {
|
2024-03-26 15:56:43 +01:00
|
|
|
let (sender, receiver) = oneshot::channel();
|
|
|
|
self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?;
|
2024-03-26 17:53:37 +01:00
|
|
|
receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))
|
2024-03-26 15:56:43 +01:00
|
|
|
}
|
|
|
|
}
|