From a7ac590e9efdd052997d7df7785a8cfe8d00d7cd Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 20 Nov 2024 13:29:52 +0100 Subject: [PATCH] implements the reverse query parameter for the batches --- crates/index-scheduler/src/lib.rs | 76 +++++++++++++++------- crates/meilisearch/tests/batches/errors.rs | 27 ++++++++ crates/meilisearch/tests/batches/mod.rs | 38 +++++++++++ 3 files changed, 119 insertions(+), 22 deletions(-) diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index b976d7342..083363c95 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -948,21 +948,44 @@ impl IndexScheduler { processing: &ProcessingTasks, query: &Query, ) -> Result { + let Query { + limit, + from, + reverse, + uids, + batch_uids, + statuses, + types, + index_uids, + canceled_by, + before_enqueued_at, + after_enqueued_at, + before_started_at, + after_started_at, + before_finished_at, + after_finished_at, + } = query; + let mut batches = self.all_batch_ids(rtxn)?; if let Some(batch_id) = processing.batch.as_ref().map(|batch| batch.uid) { batches.insert(batch_id); } - if let Some(from) = &query.from { - batches.remove_range(from.saturating_add(1)..); + if let Some(from) = from { + let range = if reverse.unwrap_or_default() { + u32::MIN..*from + } else { + from.saturating_add(1)..u32::MAX + }; + batches.remove_range(range); } - if let Some(batch_uids) = &query.batch_uids { + if let Some(batch_uids) = &batch_uids { let batches_uids = RoaringBitmap::from_iter(batch_uids); batches &= batches_uids; } - if let Some(status) = &query.statuses { + if let Some(status) = &statuses { let mut status_batches = RoaringBitmap::new(); for status in status { match status { @@ -985,7 +1008,7 @@ impl IndexScheduler { batches &= status_batches; } - if let Some(task_uids) = &query.uids { + if let Some(task_uids) = &uids { let mut batches_by_task_uids = RoaringBitmap::new(); for task_uid in task_uids { if let Some(task) = self.get_task(rtxn, *task_uid)? { @@ -998,7 +1021,7 @@ impl IndexScheduler { } // There is no database for this query, we must retrieve the task queried by the client and ensure it's valid - if let Some(canceled_by) = &query.canceled_by { + if let Some(canceled_by) = &canceled_by { let mut all_canceled_batches = RoaringBitmap::new(); for cancel_uid in canceled_by { if let Some(task) = self.get_task(rtxn, *cancel_uid)? { @@ -1021,7 +1044,7 @@ impl IndexScheduler { } } - if let Some(kind) = &query.types { + if let Some(kind) = &types { let mut kind_batches = RoaringBitmap::new(); for kind in kind { kind_batches |= self.get_batch_kind(rtxn, *kind)?; @@ -1036,7 +1059,7 @@ impl IndexScheduler { batches &= &kind_batches; } - if let Some(index) = &query.index_uids { + if let Some(index) = &index_uids { let mut index_batches = RoaringBitmap::new(); for index in index { index_batches |= self.index_batches(rtxn, index)?; @@ -1077,17 +1100,17 @@ impl IndexScheduler { filtered_processing_batches.clear(); } }; - match (query.after_started_at, query.before_started_at) { + match (after_started_at, before_started_at) { (None, None) => (), (None, Some(before)) => { - clear_filtered_processing_batches(Bound::Unbounded, Bound::Excluded(before)) + clear_filtered_processing_batches(Bound::Unbounded, Bound::Excluded(*before)) } (Some(after), None) => { - clear_filtered_processing_batches(Bound::Excluded(after), Bound::Unbounded) + clear_filtered_processing_batches(Bound::Excluded(*after), Bound::Unbounded) } (Some(after), Some(before)) => clear_filtered_processing_batches( - Bound::Excluded(after), - Bound::Excluded(before), + Bound::Excluded(*after), + Bound::Excluded(*before), ), }; @@ -1095,8 +1118,8 @@ impl IndexScheduler { rtxn, &mut filtered_non_processing_batches, self.batch_started_at, - query.after_started_at, - query.before_started_at, + *after_started_at, + *before_started_at, )?; filtered_non_processing_batches | filtered_processing_batches }; @@ -1105,20 +1128,24 @@ impl IndexScheduler { rtxn, &mut batches, self.batch_enqueued_at, - query.after_enqueued_at, - query.before_enqueued_at, + *after_enqueued_at, + *before_enqueued_at, )?; keep_ids_within_datetimes( rtxn, &mut batches, self.batch_finished_at, - query.after_finished_at, - query.before_finished_at, + *after_finished_at, + *before_finished_at, )?; - if let Some(limit) = query.limit { - batches = batches.into_iter().rev().take(limit as usize).collect(); + if let Some(limit) = limit { + batches = if query.reverse.unwrap_or_default() { + batches.into_iter().take(*limit as usize).collect() + } else { + batches.into_iter().rev().take(*limit as usize).collect() + }; } Ok(batches) @@ -1372,11 +1399,16 @@ impl IndexScheduler { let (batches, total) = self.get_batch_ids_from_authorized_indexes(&rtxn, &processing, &query, filters)?; + let batches = if query.reverse.unwrap_or_default() { + Box::new(batches.into_iter()) as Box> + } else { + Box::new(batches.into_iter().rev()) as Box> + }; let batches = self.get_existing_batches( &rtxn, &processing, - batches.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), + batches.take(query.limit.unwrap_or(u32::MAX) as usize), )?; Ok((batches, total)) diff --git a/crates/meilisearch/tests/batches/errors.rs b/crates/meilisearch/tests/batches/errors.rs index 11f9e9b3c..2c3484bc1 100644 --- a/crates/meilisearch/tests/batches/errors.rs +++ b/crates/meilisearch/tests/batches/errors.rs @@ -114,6 +114,33 @@ async fn batch_bad_from() { "#); } +#[actix_rt::test] +async fn bask_bad_reverse() { + let server = Server::new_shared(); + + let (response, code) = server.batches_filter("reverse=doggo").await; + snapshot!(code, @"400 Bad Request"); + snapshot!(response, @r###" + { + "message": "Invalid value in parameter `reverse`: could not parse `doggo` as a boolean, expected either `true` or `false`", + "code": "invalid_task_reverse", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#invalid_task_reverse" + } + "###); + + let (response, code) = server.batches_filter("reverse=*").await; + snapshot!(code, @"400 Bad Request"); + snapshot!(response, @r###" + { + "message": "Invalid value in parameter `reverse`: could not parse `*` as a boolean, expected either `true` or `false`", + "code": "invalid_task_reverse", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#invalid_task_reverse" + } + "###); +} + #[actix_rt::test] async fn batch_bad_after_enqueued_at() { let server = Server::new_shared(); diff --git a/crates/meilisearch/tests/batches/mod.rs b/crates/meilisearch/tests/batches/mod.rs index d715c29af..81d254f9f 100644 --- a/crates/meilisearch/tests/batches/mod.rs +++ b/crates/meilisearch/tests/batches/mod.rs @@ -49,6 +49,44 @@ async fn list_batches() { assert_eq!(response["results"].as_array().unwrap().len(), 2); } +#[actix_rt::test] +async fn list_batches_pagination_and_reverse() { + let server = Server::new().await; + // First of all we want to create a lot of batches very quickly. The fastest way is to delete a lot of unexisting indexes + let mut last_batch = None; + for i in 0..10 { + let index = server.index(format!("test-{i}")); + last_batch = Some(index.create(None).await.0.uid()); + } + server.wait_task(last_batch.unwrap()).await; + + let (response, code) = server.batches_filter("limit=3").await; + assert_eq!(code, 200); + let results = response["results"].as_array().unwrap(); + let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect(); + snapshot!(format!("{batch_ids:?}"), @"[9, 8, 7]"); + + let (response, code) = server.batches_filter("limit=3&from=1").await; + assert_eq!(code, 200); + let results = response["results"].as_array().unwrap(); + let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect(); + snapshot!(format!("{batch_ids:?}"), @"[1, 0]"); + + // In reversed order + + let (response, code) = server.batches_filter("limit=3&reverse=true").await; + assert_eq!(code, 200); + let results = response["results"].as_array().unwrap(); + let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect(); + snapshot!(format!("{batch_ids:?}"), @"[0, 1, 2]"); + + let (response, code) = server.batches_filter("limit=3&from=8&reverse=true").await; + assert_eq!(code, 200); + let results = response["results"].as_array().unwrap(); + let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect(); + snapshot!(format!("{batch_ids:?}"), @"[8, 9]"); +} + #[actix_rt::test] async fn list_batches_with_star_filters() { let server = Server::new().await;