implements the reverse query parameter for the batches

This commit is contained in:
Tamo 2024-11-20 13:29:52 +01:00
parent 8ad68dd708
commit a7ac590e9e
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
3 changed files with 119 additions and 22 deletions

View File

@ -948,21 +948,44 @@ impl IndexScheduler {
processing: &ProcessingTasks, processing: &ProcessingTasks,
query: &Query, query: &Query,
) -> Result<RoaringBitmap> { ) -> Result<RoaringBitmap> {
let Query {
limit,
from,
reverse,
uids,
batch_uids,
statuses,
types,
index_uids,
canceled_by,
before_enqueued_at,
after_enqueued_at,
before_started_at,
after_started_at,
before_finished_at,
after_finished_at,
} = query;
let mut batches = self.all_batch_ids(rtxn)?; let mut batches = self.all_batch_ids(rtxn)?;
if let Some(batch_id) = processing.batch.as_ref().map(|batch| batch.uid) { if let Some(batch_id) = processing.batch.as_ref().map(|batch| batch.uid) {
batches.insert(batch_id); batches.insert(batch_id);
} }
if let Some(from) = &query.from { if let Some(from) = from {
batches.remove_range(from.saturating_add(1)..); let range = if reverse.unwrap_or_default() {
u32::MIN..*from
} else {
from.saturating_add(1)..u32::MAX
};
batches.remove_range(range);
} }
if let Some(batch_uids) = &query.batch_uids { if let Some(batch_uids) = &batch_uids {
let batches_uids = RoaringBitmap::from_iter(batch_uids); let batches_uids = RoaringBitmap::from_iter(batch_uids);
batches &= batches_uids; batches &= batches_uids;
} }
if let Some(status) = &query.statuses { if let Some(status) = &statuses {
let mut status_batches = RoaringBitmap::new(); let mut status_batches = RoaringBitmap::new();
for status in status { for status in status {
match status { match status {
@ -985,7 +1008,7 @@ impl IndexScheduler {
batches &= status_batches; batches &= status_batches;
} }
if let Some(task_uids) = &query.uids { if let Some(task_uids) = &uids {
let mut batches_by_task_uids = RoaringBitmap::new(); let mut batches_by_task_uids = RoaringBitmap::new();
for task_uid in task_uids { for task_uid in task_uids {
if let Some(task) = self.get_task(rtxn, *task_uid)? { if let Some(task) = self.get_task(rtxn, *task_uid)? {
@ -998,7 +1021,7 @@ impl IndexScheduler {
} }
// There is no database for this query, we must retrieve the task queried by the client and ensure it's valid // There is no database for this query, we must retrieve the task queried by the client and ensure it's valid
if let Some(canceled_by) = &query.canceled_by { if let Some(canceled_by) = &canceled_by {
let mut all_canceled_batches = RoaringBitmap::new(); let mut all_canceled_batches = RoaringBitmap::new();
for cancel_uid in canceled_by { for cancel_uid in canceled_by {
if let Some(task) = self.get_task(rtxn, *cancel_uid)? { if let Some(task) = self.get_task(rtxn, *cancel_uid)? {
@ -1021,7 +1044,7 @@ impl IndexScheduler {
} }
} }
if let Some(kind) = &query.types { if let Some(kind) = &types {
let mut kind_batches = RoaringBitmap::new(); let mut kind_batches = RoaringBitmap::new();
for kind in kind { for kind in kind {
kind_batches |= self.get_batch_kind(rtxn, *kind)?; kind_batches |= self.get_batch_kind(rtxn, *kind)?;
@ -1036,7 +1059,7 @@ impl IndexScheduler {
batches &= &kind_batches; batches &= &kind_batches;
} }
if let Some(index) = &query.index_uids { if let Some(index) = &index_uids {
let mut index_batches = RoaringBitmap::new(); let mut index_batches = RoaringBitmap::new();
for index in index { for index in index {
index_batches |= self.index_batches(rtxn, index)?; index_batches |= self.index_batches(rtxn, index)?;
@ -1077,17 +1100,17 @@ impl IndexScheduler {
filtered_processing_batches.clear(); filtered_processing_batches.clear();
} }
}; };
match (query.after_started_at, query.before_started_at) { match (after_started_at, before_started_at) {
(None, None) => (), (None, None) => (),
(None, Some(before)) => { (None, Some(before)) => {
clear_filtered_processing_batches(Bound::Unbounded, Bound::Excluded(before)) clear_filtered_processing_batches(Bound::Unbounded, Bound::Excluded(*before))
} }
(Some(after), None) => { (Some(after), None) => {
clear_filtered_processing_batches(Bound::Excluded(after), Bound::Unbounded) clear_filtered_processing_batches(Bound::Excluded(*after), Bound::Unbounded)
} }
(Some(after), Some(before)) => clear_filtered_processing_batches( (Some(after), Some(before)) => clear_filtered_processing_batches(
Bound::Excluded(after), Bound::Excluded(*after),
Bound::Excluded(before), Bound::Excluded(*before),
), ),
}; };
@ -1095,8 +1118,8 @@ impl IndexScheduler {
rtxn, rtxn,
&mut filtered_non_processing_batches, &mut filtered_non_processing_batches,
self.batch_started_at, self.batch_started_at,
query.after_started_at, *after_started_at,
query.before_started_at, *before_started_at,
)?; )?;
filtered_non_processing_batches | filtered_processing_batches filtered_non_processing_batches | filtered_processing_batches
}; };
@ -1105,20 +1128,24 @@ impl IndexScheduler {
rtxn, rtxn,
&mut batches, &mut batches,
self.batch_enqueued_at, self.batch_enqueued_at,
query.after_enqueued_at, *after_enqueued_at,
query.before_enqueued_at, *before_enqueued_at,
)?; )?;
keep_ids_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut batches, &mut batches,
self.batch_finished_at, self.batch_finished_at,
query.after_finished_at, *after_finished_at,
query.before_finished_at, *before_finished_at,
)?; )?;
if let Some(limit) = query.limit { if let Some(limit) = limit {
batches = batches.into_iter().rev().take(limit as usize).collect(); batches = if query.reverse.unwrap_or_default() {
batches.into_iter().take(*limit as usize).collect()
} else {
batches.into_iter().rev().take(*limit as usize).collect()
};
} }
Ok(batches) Ok(batches)
@ -1372,11 +1399,16 @@ impl IndexScheduler {
let (batches, total) = let (batches, total) =
self.get_batch_ids_from_authorized_indexes(&rtxn, &processing, &query, filters)?; self.get_batch_ids_from_authorized_indexes(&rtxn, &processing, &query, filters)?;
let batches = if query.reverse.unwrap_or_default() {
Box::new(batches.into_iter()) as Box<dyn Iterator<Item = u32>>
} else {
Box::new(batches.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
};
let batches = self.get_existing_batches( let batches = self.get_existing_batches(
&rtxn, &rtxn,
&processing, &processing,
batches.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), batches.take(query.limit.unwrap_or(u32::MAX) as usize),
)?; )?;
Ok((batches, total)) Ok((batches, total))

View File

@ -114,6 +114,33 @@ async fn batch_bad_from() {
"#); "#);
} }
#[actix_rt::test]
async fn bask_bad_reverse() {
let server = Server::new_shared();
let (response, code) = server.batches_filter("reverse=doggo").await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Invalid value in parameter `reverse`: could not parse `doggo` as a boolean, expected either `true` or `false`",
"code": "invalid_task_reverse",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_reverse"
}
"###);
let (response, code) = server.batches_filter("reverse=*").await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Invalid value in parameter `reverse`: could not parse `*` as a boolean, expected either `true` or `false`",
"code": "invalid_task_reverse",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_reverse"
}
"###);
}
#[actix_rt::test] #[actix_rt::test]
async fn batch_bad_after_enqueued_at() { async fn batch_bad_after_enqueued_at() {
let server = Server::new_shared(); let server = Server::new_shared();

View File

@ -49,6 +49,44 @@ async fn list_batches() {
assert_eq!(response["results"].as_array().unwrap().len(), 2); assert_eq!(response["results"].as_array().unwrap().len(), 2);
} }
#[actix_rt::test]
async fn list_batches_pagination_and_reverse() {
let server = Server::new().await;
// First of all we want to create a lot of batches very quickly. The fastest way is to delete a lot of unexisting indexes
let mut last_batch = None;
for i in 0..10 {
let index = server.index(format!("test-{i}"));
last_batch = Some(index.create(None).await.0.uid());
}
server.wait_task(last_batch.unwrap()).await;
let (response, code) = server.batches_filter("limit=3").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{batch_ids:?}"), @"[9, 8, 7]");
let (response, code) = server.batches_filter("limit=3&from=1").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{batch_ids:?}"), @"[1, 0]");
// In reversed order
let (response, code) = server.batches_filter("limit=3&reverse=true").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{batch_ids:?}"), @"[0, 1, 2]");
let (response, code) = server.batches_filter("limit=3&from=8&reverse=true").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{batch_ids:?}"), @"[8, 9]");
}
#[actix_rt::test] #[actix_rt::test]
async fn list_batches_with_star_filters() { async fn list_batches_with_star_filters() {
let server = Server::new().await; let server = Server::new().await;