reverse the order of the task queue

This commit is contained in:
Tamo 2024-11-07 19:17:15 +01:00
parent a5d7ae23bd
commit 2eb1801e85
5 changed files with 115 additions and 133 deletions

View file

@ -84,6 +84,8 @@ pub struct Query {
pub limit: Option<u32>,
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched
pub from: Option<u32>,
/// The order used to return the tasks. By default the newest tasks are returned first and the boolean is `false`.
pub reverse: Option<bool>,
/// The allowed [statuses](`meilisearch_types::tasks::Task::status`) of the matched tasls
pub statuses: Option<Vec<Status>>,
/// The allowed [kinds](meilisearch_types::tasks::Kind) of the matched tasks.
@ -126,6 +128,7 @@ impl Query {
Query {
limit: None,
from: None,
reverse: None,
statuses: None,
types: None,
index_uids: None,
@ -706,7 +709,12 @@ impl IndexScheduler {
let mut tasks = self.all_task_ids(rtxn)?;
if let Some(from) = &query.from {
tasks.remove_range(from.saturating_add(1)..);
let range = if query.reverse.unwrap_or_default() {
u32::MIN..*from
} else {
from.saturating_add(1)..u32::MAX
};
tasks.remove_range(range);
}
if let Some(status) = &query.statuses {
@ -826,7 +834,11 @@ impl IndexScheduler {
)?;
if let Some(limit) = query.limit {
tasks = tasks.into_iter().rev().take(limit as usize).collect();
tasks = if query.reverse.unwrap_or_default() {
tasks.into_iter().take(limit as usize).collect()
} else {
tasks.into_iter().rev().take(limit as usize).collect()
};
}
Ok(tasks)
@ -951,10 +963,13 @@ impl IndexScheduler {
let rtxn = self.env.read_txn()?;
let (tasks, total) = self.get_task_ids_from_authorized_indexes(&rtxn, &query, filters)?;
let tasks = self.get_existing_tasks(
&rtxn,
tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize),
)?;
let tasks = if query.reverse.unwrap_or_default() {
Box::new(tasks.into_iter()) as Box<dyn Iterator<Item = u32>>
} else {
Box::new(tasks.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
};
let tasks =
self.get_existing_tasks(&rtxn, tasks.take(query.limit.unwrap_or(u32::MAX) as usize))?;
let ProcessingTasks { started_at, processing, .. } =
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();