Merge branch 'main' into indexer-edition-2024

This commit is contained in:
Louis Dureuil 2024-11-20 16:59:58 +01:00
commit 6e6acfcf1b
No known key found for this signature in database
330 changed files with 10063 additions and 1499 deletions

View file

@ -49,4 +49,18 @@ lazy_static! {
pub static ref MEILISEARCH_IS_INDEXING: IntGauge =
register_int_gauge!(opts!("meilisearch_is_indexing", "Meilisearch Is Indexing"))
.expect("Can't create a metric");
pub static ref MEILISEARCH_SEARCH_QUEUE_SIZE: IntGauge = register_int_gauge!(opts!(
"meilisearch_search_queue_size",
"Meilisearch Search Queue Size"
))
.expect("Can't create a metric");
pub static ref MEILISEARCH_SEARCHES_RUNNING: IntGauge =
register_int_gauge!(opts!("meilisearch_searches_running", "Meilisearch Searches Running"))
.expect("Can't create a metric");
pub static ref MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED: IntGauge =
register_int_gauge!(opts!(
"meilisearch_searches_waiting_to_be_processed",
"Meilisearch Searches Being Processed"
))
.expect("Can't create a metric");
}

View file

@ -0,0 +1,80 @@
use actix_web::{
web::{self, Data},
HttpResponse,
};
use deserr::actix_web::AwebQueryParameter;
use index_scheduler::{IndexScheduler, Query};
use meilisearch_types::{
batch_view::BatchView, batches::BatchId, deserr::DeserrQueryParamError, error::ResponseError,
keys::actions,
};
use serde::Serialize;
use crate::extractors::{authentication::GuardedData, sequential_extractor::SeqHandler};
use super::{tasks::TasksFilterQuery, ActionPolicy};
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::get().to(SeqHandler(get_batches))))
.service(web::resource("/{batch_id}").route(web::get().to(SeqHandler(get_batch))));
}
async fn get_batch(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, Data<IndexScheduler>>,
batch_uid: web::Path<String>,
) -> Result<HttpResponse, ResponseError> {
let batch_uid_string = batch_uid.into_inner();
let batch_uid: BatchId = match batch_uid_string.parse() {
Ok(id) => id,
Err(_e) => {
return Err(
index_scheduler::Error::InvalidBatchUid { batch_uid: batch_uid_string }.into()
)
}
};
let query = index_scheduler::Query { uids: Some(vec![batch_uid]), ..Query::default() };
let filters = index_scheduler.filters();
let (batches, _) = index_scheduler.get_batches_from_authorized_indexes(query, filters)?;
if let Some(batch) = batches.first() {
let task_view = BatchView::from_batch(batch);
Ok(HttpResponse::Ok().json(task_view))
} else {
Err(index_scheduler::Error::BatchNotFound(batch_uid).into())
}
}
#[derive(Debug, Serialize)]
pub struct AllBatches {
results: Vec<BatchView>,
total: u64,
limit: u32,
from: Option<u32>,
next: Option<u32>,
}
async fn get_batches(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, Data<IndexScheduler>>,
params: AwebQueryParameter<TasksFilterQuery, DeserrQueryParamError>,
) -> Result<HttpResponse, ResponseError> {
let mut params = params.into_inner();
// We +1 just to know if there is more after this "page" or not.
params.limit.0 = params.limit.0.saturating_add(1);
let limit = params.limit.0;
let query = params.into_query();
let filters = index_scheduler.filters();
let (tasks, total) = index_scheduler.get_batches_from_authorized_indexes(query, filters)?;
let mut results: Vec<_> = tasks.iter().map(BatchView::from_batch).collect();
// If we were able to fetch the number +1 tasks we asked
// it means that there is more to come.
let next = if results.len() == limit as usize { results.pop().map(|t| t.uid) } else { None };
let from = results.first().map(|t| t.uid);
let tasks = AllBatches { results, limit: limit.saturating_sub(1), total, from, next };
Ok(HttpResponse::Ok().json(tasks))
}

View file

@ -10,6 +10,7 @@ use prometheus::{Encoder, TextEncoder};
use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::routes::create_all_stats;
use crate::search_queue::SearchQueue;
pub fn configure(config: &mut web::ServiceConfig) {
config.service(web::resource("").route(web::get().to(get_metrics)));
@ -18,6 +19,7 @@ pub fn configure(config: &mut web::ServiceConfig) {
pub async fn get_metrics(
index_scheduler: GuardedData<ActionPolicy<{ actions::METRICS_GET }>, Data<IndexScheduler>>,
auth_controller: Data<AuthController>,
search_queue: web::Data<SearchQueue>,
) -> Result<HttpResponse, ResponseError> {
index_scheduler.features().check_metrics()?;
let auth_filters = index_scheduler.filters();
@ -35,6 +37,11 @@ pub async fn get_metrics(
crate::metrics::MEILISEARCH_USED_DB_SIZE_BYTES.set(response.used_database_size as i64);
crate::metrics::MEILISEARCH_INDEX_COUNT.set(response.indexes.len() as i64);
crate::metrics::MEILISEARCH_SEARCH_QUEUE_SIZE.set(search_queue.capacity() as i64);
crate::metrics::MEILISEARCH_SEARCHES_RUNNING.set(search_queue.searches_running() as i64);
crate::metrics::MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED
.set(search_queue.searches_waiting() as i64);
for (index, value) in response.indexes.iter() {
crate::metrics::MEILISEARCH_INDEX_DOCS_COUNT
.with_label_values(&[index])

View file

@ -19,6 +19,7 @@ use crate::Opt;
const PAGINATION_DEFAULT_LIMIT: usize = 20;
mod api_key;
pub mod batches;
mod dump;
pub mod features;
pub mod indexes;
@ -32,6 +33,7 @@ pub mod tasks;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::scope("/tasks").configure(tasks::configure))
.service(web::scope("/batches").configure(batches::configure))
.service(web::resource("/health").route(web::get().to(get_health)))
.service(web::scope("/logs").configure(logs::configure))
.service(web::scope("/keys").configure(api_key::configure))

View file

@ -3,6 +3,7 @@ use actix_web::{web, HttpRequest, HttpResponse};
use deserr::actix_web::AwebQueryParameter;
use deserr::Deserr;
use index_scheduler::{IndexScheduler, Query, TaskId};
use meilisearch_types::batches::BatchId;
use meilisearch_types::deserr::query_params::Param;
use meilisearch_types::deserr::DeserrQueryParamError;
use meilisearch_types::error::deserr_codes::*;
@ -17,15 +18,13 @@ use time::macros::format_description;
use time::{Date, Duration, OffsetDateTime, Time};
use tokio::task;
use super::{get_task_id, is_dry_run, SummarizedTaskView};
use super::{get_task_id, is_dry_run, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
use crate::analytics::{Aggregate, AggregateMethod, Analytics};
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::{aggregate_methods, Opt};
const DEFAULT_LIMIT: u32 = 20;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(
web::resource("")
@ -35,13 +34,19 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks))))
.service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task))));
}
#[derive(Debug, Deserr)]
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
pub struct TasksFilterQuery {
#[deserr(default = Param(DEFAULT_LIMIT), error = DeserrQueryParamError<InvalidTaskLimit>)]
#[deserr(default = Param(PAGINATION_DEFAULT_LIMIT as u32), error = DeserrQueryParamError<InvalidTaskLimit>)]
pub limit: Param<u32>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskFrom>)]
pub from: Option<Param<TaskId>>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskReverse>)]
pub reverse: Option<Param<bool>>,
#[deserr(default, error = DeserrQueryParamError<InvalidBatchUids>)]
pub batch_uids: OptionStarOrList<BatchId>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskUids>)]
pub uids: OptionStarOrList<u32>,
@ -69,10 +74,12 @@ pub struct TasksFilterQuery {
}
impl TasksFilterQuery {
fn into_query(self) -> Query {
pub(crate) fn into_query(self) -> Query {
Query {
limit: Some(self.limit.0),
from: self.from.as_deref().copied(),
reverse: self.reverse.as_deref().copied(),
batch_uids: self.batch_uids.merge_star_and_none(),
statuses: self.statuses.merge_star_and_none(),
types: self.types.merge_star_and_none(),
index_uids: self.index_uids.map(|x| x.to_string()).merge_star_and_none(),
@ -94,6 +101,7 @@ impl TaskDeletionOrCancelationQuery {
self,
TaskDeletionOrCancelationQuery {
uids: OptionStarOrList::None,
batch_uids: OptionStarOrList::None,
canceled_by: OptionStarOrList::None,
types: OptionStarOrList::None,
statuses: OptionStarOrList::None,
@ -113,9 +121,11 @@ impl TaskDeletionOrCancelationQuery {
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
pub struct TaskDeletionOrCancelationQuery {
#[deserr(default, error = DeserrQueryParamError<InvalidTaskUids>)]
pub uids: OptionStarOrList<u32>,
pub uids: OptionStarOrList<TaskId>,
#[deserr(default, error = DeserrQueryParamError<InvalidBatchUids>)]
pub batch_uids: OptionStarOrList<BatchId>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskCanceledBy>)]
pub canceled_by: OptionStarOrList<u32>,
pub canceled_by: OptionStarOrList<TaskId>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskTypes>)]
pub types: OptionStarOrList<Kind>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskStatuses>)]
@ -142,6 +152,8 @@ impl TaskDeletionOrCancelationQuery {
Query {
limit: None,
from: None,
reverse: None,
batch_uids: self.batch_uids.merge_star_and_none(),
statuses: self.statuses.merge_star_and_none(),
types: self.types.merge_star_and_none(),
index_uids: self.index_uids.map(|x| x.to_string()).merge_star_and_none(),
@ -359,7 +371,7 @@ async fn get_task(
let task_uid: TaskId = match task_uid_string.parse() {
Ok(id) => id,
Err(_e) => {
return Err(index_scheduler::Error::InvalidTaskUids { task_uid: task_uid_string }.into())
return Err(index_scheduler::Error::InvalidTaskUid { task_uid: task_uid_string }.into())
}
};
@ -481,7 +493,7 @@ mod tests {
// Stars are allowed in date fields as well
let params = "afterEnqueuedAt=*&beforeStartedAt=*&afterFinishedAt=*&beforeFinishedAt=*&afterStartedAt=*&beforeEnqueuedAt=*";
let query = deserr_query_params::<TaskDeletionOrCancelationQuery>(params).unwrap();
snapshot!(format!("{:?}", query), @"TaskDeletionOrCancelationQuery { uids: None, canceled_by: None, types: None, statuses: None, index_uids: None, after_enqueued_at: Star, before_enqueued_at: Star, after_started_at: Star, before_started_at: Star, after_finished_at: Star, before_finished_at: Star }");
snapshot!(format!("{:?}", query), @"TaskDeletionOrCancelationQuery { uids: None, batch_uids: None, canceled_by: None, types: None, statuses: None, index_uids: None, after_enqueued_at: Star, before_enqueued_at: Star, after_started_at: Star, before_started_at: Star, after_finished_at: Star, before_finished_at: Star }");
}
{
let params = "afterFinishedAt=2021";
@ -701,20 +713,20 @@ mod tests {
{
let params = "from=12&limit=15&indexUids=toto,tata-78&statuses=succeeded,enqueued&afterEnqueuedAt=2012-04-23&uids=1,2,3";
let query = deserr_query_params::<TasksFilterQuery>(params).unwrap();
snapshot!(format!("{:?}", query), @r###"TasksFilterQuery { limit: Param(15), from: Some(Param(12)), uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: List([Succeeded, Enqueued]), index_uids: List([IndexUid("toto"), IndexUid("tata-78")]), after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"###);
snapshot!(format!("{:?}", query), @r###"TasksFilterQuery { limit: Param(15), from: Some(Param(12)), reverse: None, batch_uids: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: List([Succeeded, Enqueued]), index_uids: List([IndexUid("toto"), IndexUid("tata-78")]), after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"###);
}
{
// Stars should translate to `None` in the query
// Verify value of the default limit
let params = "indexUids=*&statuses=succeeded,*&afterEnqueuedAt=2012-04-23&uids=1,2,3";
let query = deserr_query_params::<TasksFilterQuery>(params).unwrap();
snapshot!(format!("{:?}", query), @"TasksFilterQuery { limit: Param(20), from: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: Star, index_uids: Star, after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }");
snapshot!(format!("{:?}", query), @"TasksFilterQuery { limit: Param(20), from: None, reverse: None, batch_uids: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: Star, index_uids: Star, after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }");
}
{
// Stars should also translate to `None` in task deletion/cancelation queries
let params = "indexUids=*&statuses=succeeded,*&afterEnqueuedAt=2012-04-23&uids=1,2,3";
let query = deserr_query_params::<TaskDeletionOrCancelationQuery>(params).unwrap();
snapshot!(format!("{:?}", query), @"TaskDeletionOrCancelationQuery { uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: Star, index_uids: Star, after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }");
snapshot!(format!("{:?}", query), @"TaskDeletionOrCancelationQuery { uids: List([1, 2, 3]), batch_uids: None, canceled_by: None, types: None, statuses: Star, index_uids: Star, after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }");
}
{
// Star in from not allowed
@ -735,7 +747,7 @@ mod tests {
let err = deserr_query_params::<TaskDeletionOrCancelationQuery>(params).unwrap_err();
snapshot!(meili_snap::json_string!(err), @r###"
{
"message": "Unknown parameter `from`: expected one of `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"message": "Unknown parameter `from`: expected one of `uids`, `batchUids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
@ -748,7 +760,7 @@ mod tests {
let err = deserr_query_params::<TaskDeletionOrCancelationQuery>(params).unwrap_err();
snapshot!(meili_snap::json_string!(err), @r###"
{
"message": "Unknown parameter `limit`: expected one of `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"message": "Unknown parameter `limit`: expected one of `uids`, `batchUids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
@ -768,7 +780,7 @@ mod tests {
let params = "statuses=*";
let query = deserr_query_params::<TaskDeletionOrCancelationQuery>(params).unwrap();
assert!(!query.is_empty());
snapshot!(format!("{query:?}"), @"TaskDeletionOrCancelationQuery { uids: None, canceled_by: None, types: None, statuses: Star, index_uids: None, after_enqueued_at: None, before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }");
snapshot!(format!("{query:?}"), @"TaskDeletionOrCancelationQuery { uids: None, batch_uids: None, canceled_by: None, types: None, statuses: Star, index_uids: None, after_enqueued_at: None, before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }");
}
}
}

View file

@ -1735,46 +1735,51 @@ fn format_fields(
// select the attributes to retrieve
let displayable_names =
displayable_ids.iter().map(|&fid| field_ids_map.name(fid).expect("Missing field name"));
permissive_json_pointer::map_leaf_values(&mut document, displayable_names, |key, value| {
// To get the formatting option of each key we need to see all the rules that applies
// to the value and merge them together. eg. If a user said he wanted to highlight `doggo`
// and crop `doggo.name`. `doggo.name` needs to be highlighted + cropped while `doggo.age` is only
// highlighted.
// Warn: The time to compute the format list scales with the number of fields to format;
// cumulated with map_leaf_values that iterates over all the nested fields, it gives a quadratic complexity:
// d*f where d is the total number of fields to display and f is the total number of fields to format.
let format = formatting_fields_options
.iter()
.filter(|(name, _option)| {
milli::is_faceted_by(name, key) || milli::is_faceted_by(key, name)
})
.map(|(_, option)| **option)
.reduce(|acc, option| acc.merge(option));
let mut infos = Vec::new();
// if no locales has been provided, we try to find the locales in the localized_attributes.
let locales = locales.or_else(|| {
localized_attributes
permissive_json_pointer::map_leaf_values(
&mut document,
displayable_names,
|key, array_indices, value| {
// To get the formatting option of each key we need to see all the rules that applies
// to the value and merge them together. eg. If a user said he wanted to highlight `doggo`
// and crop `doggo.name`. `doggo.name` needs to be highlighted + cropped while `doggo.age` is only
// highlighted.
// Warn: The time to compute the format list scales with the number of fields to format;
// cumulated with map_leaf_values that iterates over all the nested fields, it gives a quadratic complexity:
// d*f where d is the total number of fields to display and f is the total number of fields to format.
let format = formatting_fields_options
.iter()
.find(|rule| rule.match_str(key))
.map(LocalizedAttributesRule::locales)
});
.filter(|(name, _option)| {
milli::is_faceted_by(name, key) || milli::is_faceted_by(key, name)
})
.map(|(_, option)| **option)
.reduce(|acc, option| acc.merge(option));
let mut infos = Vec::new();
*value = format_value(
std::mem::take(value),
builder,
format,
&mut infos,
compute_matches,
locales,
);
// if no locales has been provided, we try to find the locales in the localized_attributes.
let locales = locales.or_else(|| {
localized_attributes
.iter()
.find(|rule| rule.match_str(key))
.map(LocalizedAttributesRule::locales)
});
if let Some(matches) = matches_position.as_mut() {
if !infos.is_empty() {
matches.insert(key.to_owned(), infos);
*value = format_value(
std::mem::take(value),
builder,
format,
&mut infos,
compute_matches,
array_indices,
locales,
);
if let Some(matches) = matches_position.as_mut() {
if !infos.is_empty() {
matches.insert(key.to_owned(), infos);
}
}
}
});
},
);
let selectors = formatted_options
.keys()
@ -1792,13 +1797,14 @@ fn format_value(
format_options: Option<FormatOptions>,
infos: &mut Vec<MatchBounds>,
compute_matches: bool,
array_indices: &[usize],
locales: Option<&[Language]>,
) -> Value {
match value {
Value::String(old_string) => {
let mut matcher = builder.build(&old_string, locales);
if compute_matches {
let matches = matcher.matches();
let matches = matcher.matches(array_indices);
infos.extend_from_slice(&matches[..]);
}
@ -1810,51 +1816,15 @@ fn format_value(
None => Value::String(old_string),
}
}
Value::Array(values) => Value::Array(
values
.into_iter()
.map(|v| {
format_value(
v,
builder,
format_options.map(|format_options| FormatOptions {
highlight: format_options.highlight,
crop: None,
}),
infos,
compute_matches,
locales,
)
})
.collect(),
),
Value::Object(object) => Value::Object(
object
.into_iter()
.map(|(k, v)| {
(
k,
format_value(
v,
builder,
format_options.map(|format_options| FormatOptions {
highlight: format_options.highlight,
crop: None,
}),
infos,
compute_matches,
locales,
),
)
})
.collect(),
),
// `map_leaf_values` makes sure this is only called for leaf fields
Value::Array(_) => unreachable!(),
Value::Object(_) => unreachable!(),
Value::Number(number) => {
let s = number.to_string();
let mut matcher = builder.build(&s, locales);
if compute_matches {
let matches = matcher.matches();
let matches = matcher.matches(array_indices);
infos.extend_from_slice(&matches[..]);
}

View file

@ -18,6 +18,8 @@
//! And should drop the Permit only once you have freed all the RAM consumed by the method.
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use rand::rngs::StdRng;
@ -33,6 +35,8 @@ pub struct SearchQueue {
/// If we have waited longer than this to get a permit, we should abort the search request entirely.
/// The client probably already closed the connection, but we have no way to find out.
time_to_abort: Duration,
searches_running: Arc<AtomicUsize>,
searches_waiting_to_be_processed: Arc<AtomicUsize>,
}
/// You should only run search requests while holding this permit.
@ -68,14 +72,41 @@ impl SearchQueue {
// so let's not allocate any RAM and keep a capacity of 1.
let (sender, receiver) = mpsc::channel(1);
tokio::task::spawn(Self::run(capacity, paralellism, receiver));
Self { sender, capacity, time_to_abort: Duration::from_secs(60) }
let instance = Self {
sender,
capacity,
time_to_abort: Duration::from_secs(60),
searches_running: Default::default(),
searches_waiting_to_be_processed: Default::default(),
};
tokio::task::spawn(Self::run(
capacity,
paralellism,
receiver,
Arc::clone(&instance.searches_running),
Arc::clone(&instance.searches_waiting_to_be_processed),
));
instance
}
pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self {
Self { time_to_abort, ..self }
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn searches_running(&self) -> usize {
self.searches_running.load(Ordering::Relaxed)
}
pub fn searches_waiting(&self) -> usize {
self.searches_waiting_to_be_processed.load(Ordering::Relaxed)
}
/// This function is the main loop, it's in charge on scheduling which search request should execute first and
/// how many should executes at the same time.
///
@ -84,6 +115,8 @@ impl SearchQueue {
capacity: usize,
parallelism: NonZeroUsize,
mut receive_new_searches: mpsc::Receiver<oneshot::Sender<Permit>>,
metric_searches_running: Arc<AtomicUsize>,
metric_searches_waiting: Arc<AtomicUsize>,
) {
let mut queue: Vec<oneshot::Sender<Permit>> = Default::default();
let mut rng: StdRng = StdRng::from_entropy();
@ -133,6 +166,9 @@ impl SearchQueue {
queue.push(search_request);
},
}
metric_searches_running.store(searches_running, Ordering::Relaxed);
metric_searches_waiting.store(queue.len(), Ordering::Relaxed);
}
}