diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index edbf2cae0..90ac4f8df 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -41,7 +41,7 @@ use uuid::Uuid; use crate::autobatcher::{self, BatchKind}; use crate::utils::{self, swap_index_uid_in_task}; -use crate::{Error, IndexScheduler, Query, Result, TaskId}; +use crate::{Error, IndexScheduler, Result, TaskId}; /// Represents a combination of tasks that can all be processed at the same time. /// @@ -854,12 +854,10 @@ impl IndexScheduler { return Err(Error::IndexNotFound(rhs.to_owned())); } - // 2. Get the task set for index = name. - let mut index_lhs_task_ids = - self.get_task_ids(&Query::default().with_index(lhs.to_owned()))?; + // 2. Get the task set for index = name that appeared before the index swap task + let mut index_lhs_task_ids = self.index_tasks(wtxn, lhs)?; index_lhs_task_ids.remove_range(task_id..); - let mut index_rhs_task_ids = - self.get_task_ids(&Query::default().with_index(rhs.to_owned()))?; + let mut index_rhs_task_ids = self.index_tasks(wtxn, rhs)?; index_rhs_task_ids.remove_range(task_id..); // 3. before_name -> new_name in the task's KindWithContent diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index b60ea8718..4d95aa8e5 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -41,7 +41,7 @@ pub use error::Error; use file_store::FileStore; use meilisearch_types::error::ResponseError; use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; -use meilisearch_types::heed::{self, Database, Env}; +use meilisearch_types::heed::{self, Database, Env, RoTxn}; use meilisearch_types::milli; use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::update::IndexerConfig; @@ -100,7 +100,7 @@ pub struct Query { } impl Query { - /// Return `true` iff every field of the query is set to `None`, such that the query + /// Return `true` if every field of the query is set to `None`, such that the query /// matches all tasks. pub fn is_empty(&self) -> bool { matches!( @@ -393,6 +393,10 @@ impl IndexScheduler { Ok(this) } + pub fn read_txn(&self) -> Result { + self.env.read_txn().map_err(|e| e.into()) + } + /// Start the run loop for the given index scheduler. /// /// This function will execute in a different thread and must be called @@ -442,14 +446,12 @@ impl IndexScheduler { self.index_mapper.indexes(&rtxn) } - /// Return the task ids matched by the given query. - pub fn get_task_ids(&self, query: &Query) -> Result { - let rtxn = self.env.read_txn()?; - + /// Return the task ids matched by the given query from the index scheduler's point of view. + pub(crate) fn get_task_ids(&self, rtxn: &RoTxn, query: &Query) -> Result { let ProcessingTasks { started_at: started_at_processing, processing: processing_tasks } = self.processing_tasks.read().unwrap().clone(); - let mut tasks = self.all_task_ids(&rtxn)?; + let mut tasks = self.all_task_ids(rtxn)?; if let Some(from) = &query.from { tasks.remove_range(from.saturating_add(1)..); @@ -463,7 +465,7 @@ impl IndexScheduler { Status::Processing => { status_tasks |= &processing_tasks; } - status => status_tasks |= &self.get_status(&rtxn, *status)?, + status => status_tasks |= &self.get_status(rtxn, *status)?, }; } if !status.contains(&Status::Processing) { @@ -480,7 +482,7 @@ impl IndexScheduler { if let Some(kind) = &query.kind { let mut kind_tasks = RoaringBitmap::new(); for kind in kind { - kind_tasks |= self.get_kind(&rtxn, *kind)?; + kind_tasks |= self.get_kind(rtxn, *kind)?; } tasks &= &kind_tasks; } @@ -488,7 +490,7 @@ impl IndexScheduler { if let Some(index) = &query.index_uid { let mut index_tasks = RoaringBitmap::new(); for index in index { - index_tasks |= self.index_tasks(&rtxn, index)?; + index_tasks |= self.index_tasks(rtxn, index)?; } tasks &= &index_tasks; } @@ -529,7 +531,7 @@ impl IndexScheduler { }; keep_tasks_within_datetimes( - &rtxn, + rtxn, &mut filtered_non_processing_tasks, self.started_at, query.after_started_at, @@ -539,7 +541,7 @@ impl IndexScheduler { }; keep_tasks_within_datetimes( - &rtxn, + rtxn, &mut tasks, self.enqueued_at, query.after_enqueued_at, @@ -547,7 +549,7 @@ impl IndexScheduler { )?; keep_tasks_within_datetimes( - &rtxn, + rtxn, &mut tasks, self.finished_at, query.after_finished_at, @@ -561,10 +563,70 @@ impl IndexScheduler { Ok(tasks) } - /// Returns the tasks matched by the given query. - pub fn get_tasks(&self, query: Query) -> Result> { - let tasks = self.get_task_ids(&query)?; + /// Return true iff there is at least one task associated with this index + /// that is processing. + pub fn is_index_processing(&self, index: &str) -> Result { let rtxn = self.env.read_txn()?; + let processing_tasks = self.processing_tasks.read().unwrap().processing.clone(); + let index_tasks = self.index_tasks(&rtxn, index)?; + let nbr_index_processing_tasks = processing_tasks.intersection_len(&index_tasks); + Ok(nbr_index_processing_tasks > 0) + } + + /// Return the task ids matching the query from the user's point of view. + /// + /// There are two differences between an internal query and a query executed by + /// the user. + /// + /// 1. IndexSwap tasks are not publicly associated with any index, but they are associated + /// with many indexes internally. + /// 2. The user may not have the rights to access the tasks (internally) associated with all indexes. + pub fn get_task_ids_from_authorized_indexes( + &self, + rtxn: &RoTxn, + query: &Query, + authorized_indexes: &Option>, + ) -> Result { + let mut tasks = self.get_task_ids(rtxn, query)?; + + // If the query contains a list of index_uid, then we must exclude IndexSwap tasks + // from the result (because it is not publicly associated with any index) + if query.index_uid.is_some() { + tasks -= self.get_kind(rtxn, Kind::IndexSwap)? + } + + // Any task that is internally associated with a non-authorized index + // must be discarded. + if let Some(authorized_indexes) = authorized_indexes { + let all_indexes_iter = self.index_tasks.iter(rtxn)?; + for result in all_indexes_iter { + let (index, index_tasks) = result?; + if !authorized_indexes.contains(&index.to_owned()) { + tasks -= index_tasks; + } + } + } + + Ok(tasks) + } + + /// Return the tasks matching the query from the user's point of view. + /// + /// There are two differences between an internal query and a query executed by + /// the user. + /// + /// 1. IndexSwap tasks are not publicly associated with any index, but they are associated + /// with many indexes internally. + /// 2. The user may not have the rights to access the tasks (internally) associated with all indexes. + pub fn get_tasks_from_authorized_indexes( + &self, + query: Query, + authorized_indexes: Option>, + ) -> Result> { + let rtxn = self.env.read_txn()?; + + let tasks = + self.get_task_ids_from_authorized_indexes(&rtxn, &query, &authorized_indexes)?; let tasks = self.get_existing_tasks( &rtxn, @@ -1187,12 +1249,7 @@ mod tests { handle.wait_till(Breakpoint::AfterProcessing); index_scheduler.assert_internally_consistent(); - let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap(); - tasks.reverse(); - assert_eq!(tasks.len(), 3); - assert_eq!(tasks[0].status, Status::Succeeded); - assert_eq!(tasks[1].status, Status::Succeeded); - assert_eq!(tasks[2].status, Status::Succeeded); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_tasks_processed"); } #[test] @@ -1231,13 +1288,7 @@ mod tests { handle.wait_till(Breakpoint::AfterProcessing); index_scheduler.assert_internally_consistent(); - let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap(); - tasks.reverse(); - assert_eq!(tasks.len(), 4); - assert_eq!(tasks[0].status, Status::Succeeded); - assert_eq!(tasks[1].status, Status::Succeeded); - assert_eq!(tasks[2].status, Status::Succeeded); - assert_eq!(tasks[3].status, Status::Succeeded); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_tasks_processed"); } #[test] @@ -1493,15 +1544,7 @@ mod tests { index_scheduler.assert_internally_consistent(); } - let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap(); - tasks.reverse(); - assert_eq!(tasks.len(), 6); - assert_eq!(tasks[0].status, Status::Succeeded); - assert_eq!(tasks[1].status, Status::Succeeded); - assert_eq!(tasks[2].status, Status::Succeeded); - assert_eq!(tasks[3].status, Status::Succeeded); - assert_eq!(tasks[4].status, Status::Succeeded); - assert_eq!(tasks[5].status, Status::Succeeded); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_tasks_processed"); } #[test] @@ -2054,37 +2097,45 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "finished"); + let rtxn = index_scheduler.env.read_txn().unwrap(); let query = Query { limit: Some(0), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[]"); let query = Query { limit: Some(1), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[2,]"); let query = Query { limit: Some(2), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); let query = Query { from: Some(1), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[0,1,]"); let query = Query { from: Some(2), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); let query = Query { from: Some(1), limit: Some(1), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[1,]"); let query = Query { from: Some(1), limit: Some(2), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[0,1,]"); } #[test] - fn query_processing_tasks() { + fn query_tasks_simple() { let start_time = OffsetDateTime::now_utc(); let (index_scheduler, handle) = @@ -2101,19 +2152,24 @@ mod tests { handle.wait_till(Breakpoint::BatchCreated); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let query = Query { status: Some(vec![Status::Processing]), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[0,]"); // only the processing tasks in the first tick let query = Query { status: Some(vec![Status::Enqueued]), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); // only the enqueued tasks in the first tick let query = Query { status: Some(vec![Status::Enqueued, Status::Processing]), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick let query = Query { @@ -2121,7 +2177,8 @@ mod tests { after_started_at: Some(start_time), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // both enqueued and processing tasks in the first tick, but limited to those with a started_at // that comes after the start of the test, which should excludes the enqueued tasks snapshot!(snapshot_bitmap(&tasks), @"[0,]"); @@ -2131,7 +2188,8 @@ mod tests { before_started_at: Some(start_time), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // both enqueued and processing tasks in the first tick, but limited to those with a started_at // that comes before the start of the test, which should excludes all of them snapshot!(snapshot_bitmap(&tasks), @"[]"); @@ -2142,7 +2200,8 @@ mod tests { before_started_at: Some(start_time + Duration::minutes(1)), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // both enqueued and processing tasks in the first tick, but limited to those with a started_at // that comes after the start of the test and before one minute after the start of the test, // which should exclude the enqueued tasks and include the only processing task @@ -2150,6 +2209,8 @@ mod tests { handle.wait_till(Breakpoint::BatchCreated); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let second_start_time = OffsetDateTime::now_utc(); let query = Query { @@ -2158,7 +2219,8 @@ mod tests { before_started_at: Some(start_time + Duration::minutes(1)), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // both succeeded and processing tasks in the first tick, but limited to those with a started_at // that comes after the start of the test and before one minute after the start of the test, // which should include all tasks @@ -2169,7 +2231,8 @@ mod tests { before_started_at: Some(start_time), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // both succeeded and processing tasks in the first tick, but limited to those with a started_at // that comes before the start of the test, which should exclude all tasks snapshot!(snapshot_bitmap(&tasks), @"[]"); @@ -2180,7 +2243,8 @@ mod tests { before_started_at: Some(second_start_time + Duration::minutes(1)), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // both succeeded and processing tasks in the first tick, but limited to those with a started_at // that comes after the start of the second part of the test and before one minute after the // second start of the test, which should exclude all tasks @@ -2188,7 +2252,11 @@ mod tests { // now we make one more batch, the started_at field of the new tasks will be past `second_start_time` handle.wait_till(Breakpoint::BatchCreated); - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + + let rtxn = index_scheduler.env.read_txn().unwrap(); + + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // we run the same query to verify that, and indeed find that the last task is matched snapshot!(snapshot_bitmap(&tasks), @"[2,]"); @@ -2198,15 +2266,19 @@ mod tests { before_started_at: Some(second_start_time + Duration::minutes(1)), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // enqueued, succeeded, or processing tasks started after the second part of the test, should // again only return the last task snapshot!(snapshot_bitmap(&tasks), @"[2,]"); handle.wait_till(Breakpoint::AfterProcessing); + let rtxn = index_scheduler.read_txn().unwrap(); + // now the last task should have failed snapshot!(snapshot_index_scheduler(&index_scheduler), name: "end"); - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // so running the last query should return nothing snapshot!(snapshot_bitmap(&tasks), @"[]"); @@ -2216,7 +2288,8 @@ mod tests { before_started_at: Some(second_start_time + Duration::minutes(1)), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // but the same query on failed tasks should return the last task snapshot!(snapshot_bitmap(&tasks), @"[2,]"); @@ -2226,7 +2299,8 @@ mod tests { before_started_at: Some(second_start_time + Duration::minutes(1)), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // but the same query on failed tasks should return the last task snapshot!(snapshot_bitmap(&tasks), @"[2,]"); @@ -2237,7 +2311,8 @@ mod tests { before_started_at: Some(second_start_time + Duration::minutes(1)), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // same query but with an invalid uid snapshot!(snapshot_bitmap(&tasks), @"[]"); @@ -2248,11 +2323,77 @@ mod tests { before_started_at: Some(second_start_time + Duration::minutes(1)), ..Default::default() }; - let tasks = index_scheduler.get_task_ids(&query).unwrap(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); // same query but with a valid uid snapshot!(snapshot_bitmap(&tasks), @"[2,]"); } + #[test] + fn query_tasks_special_rules() { + let (index_scheduler, handle) = + IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]); + + let kind = index_creation_task("catto", "mouse"); + let _task = index_scheduler.register(kind).unwrap(); + let kind = index_creation_task("doggo", "sheep"); + let _task = index_scheduler.register(kind).unwrap(); + let kind = KindWithContent::IndexSwap { + swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }], + }; + let _task = index_scheduler.register(kind).unwrap(); + let kind = KindWithContent::IndexSwap { + swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "whalo".to_owned()) }], + }; + let _task = index_scheduler.register(kind).unwrap(); + + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); + + handle.wait_till(Breakpoint::BatchCreated); + + let rtxn = index_scheduler.env.read_txn().unwrap(); + + let query = Query { index_uid: Some(vec!["catto".to_owned()]), ..Default::default() }; + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); + // only the first task associated with catto is returned, the indexSwap tasks are excluded! + snapshot!(snapshot_bitmap(&tasks), @"[0,]"); + + let query = Query { index_uid: Some(vec!["catto".to_owned()]), ..Default::default() }; + let tasks = index_scheduler + .get_task_ids_from_authorized_indexes(&rtxn, &query, &Some(vec!["doggo".to_owned()])) + .unwrap(); + // we have asked for only the tasks associated with catto, but are only authorized to retrieve the tasks + // associated with doggo -> empty result + snapshot!(snapshot_bitmap(&tasks), @"[]"); + + let query = Query::default(); + let tasks = index_scheduler + .get_task_ids_from_authorized_indexes(&rtxn, &query, &Some(vec!["doggo".to_owned()])) + .unwrap(); + // we asked for all the tasks, but we are only authorized to retrieve the doggo tasks + // -> only the index creation of doggo should be returned + snapshot!(snapshot_bitmap(&tasks), @"[1,]"); + + let query = Query::default(); + let tasks = index_scheduler + .get_task_ids_from_authorized_indexes( + &rtxn, + &query, + &Some(vec!["catto".to_owned(), "doggo".to_owned()]), + ) + .unwrap(); + // we asked for all the tasks, but we are only authorized to retrieve the doggo and catto tasks + // -> all tasks except the swap of catto with whalo are returned + snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); + + let query = Query::default(); + let tasks = + index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap(); + // we asked for all the tasks with all index authorized -> all tasks returned + snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,3,]"); + } + #[test] fn fail_in_create_batch_for_index_creation() { let (index_scheduler, handle) = diff --git a/index-scheduler/src/snapshots/lib.rs/do_not_batch_task_of_different_indexes/all_tasks_processed.snap b/index-scheduler/src/snapshots/lib.rs/do_not_batch_task_of_different_indexes/all_tasks_processed.snap new file mode 100644 index 000000000..8541c7c1b --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/do_not_batch_task_of_different_indexes/all_tasks_processed.snap @@ -0,0 +1,59 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggos", primary_key: None }} +1 {uid: 1, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "cattos", primary_key: None }} +2 {uid: 2, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "girafos", primary_key: None }} +3 {uid: 3, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "doggos" }} +4 {uid: 4, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "cattos" }} +5 {uid: 5, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "girafos" }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,2,3,4,5,] +---------------------------------------------------------------------- +### Kind: +"documentDeletion" [3,4,5,] +"indexCreation" [0,1,2,] +---------------------------------------------------------------------- +### Index Tasks: +cattos [1,4,] +doggos [0,3,] +girafos [2,5,] +---------------------------------------------------------------------- +### Index Mapper: +["cattos", "doggos", "girafos"] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/process_tasks_inserted_without_new_signal/all_tasks_processed.snap b/index-scheduler/src/snapshots/lib.rs/process_tasks_inserted_without_new_signal/all_tasks_processed.snap new file mode 100644 index 000000000..c75964581 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/process_tasks_inserted_without_new_signal/all_tasks_processed.snap @@ -0,0 +1,46 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggos", primary_key: None }} +1 {uid: 1, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "cattos", primary_key: None }} +2 {uid: 2, status: succeeded, details: { deleted_documents: Some(0) }, kind: IndexDeletion { index_uid: "doggos" }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,2,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,] +"indexDeletion" [2,] +---------------------------------------------------------------------- +### Index Tasks: +cattos [1,] +doggos [0,2,] +---------------------------------------------------------------------- +### Index Mapper: +["cattos"] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/process_tasks_without_autobatching/all_tasks_processed.snap b/index-scheduler/src/snapshots/lib.rs/process_tasks_without_autobatching/all_tasks_processed.snap new file mode 100644 index 000000000..44ce75ebb --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/process_tasks_without_autobatching/all_tasks_processed.snap @@ -0,0 +1,49 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = false +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggos", primary_key: None }} +1 {uid: 1, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "doggos" }} +2 {uid: 2, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "doggos" }} +3 {uid: 3, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "doggos" }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,2,3,] +---------------------------------------------------------------------- +### Kind: +"documentDeletion" [1,2,3,] +"indexCreation" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,2,3,] +---------------------------------------------------------------------- +### Index Mapper: +["doggos"] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/query_processing_tasks/end.snap b/index-scheduler/src/snapshots/lib.rs/query_tasks_simple/end.snap similarity index 100% rename from index-scheduler/src/snapshots/lib.rs/query_processing_tasks/end.snap rename to index-scheduler/src/snapshots/lib.rs/query_tasks_simple/end.snap diff --git a/index-scheduler/src/snapshots/lib.rs/query_processing_tasks/start.snap b/index-scheduler/src/snapshots/lib.rs/query_tasks_simple/start.snap similarity index 100% rename from index-scheduler/src/snapshots/lib.rs/query_processing_tasks/start.snap rename to index-scheduler/src/snapshots/lib.rs/query_tasks_simple/start.snap diff --git a/index-scheduler/src/snapshots/lib.rs/query_tasks_special_rules/start.snap b/index-scheduler/src/snapshots/lib.rs/query_tasks_special_rules/start.snap new file mode 100644 index 000000000..caab362d7 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/query_tasks_special_rules/start.snap @@ -0,0 +1,42 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +1 {uid: 1, status: enqueued, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }} +2 {uid: 2, status: enqueued, details: { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }} +3 {uid: 3, status: enqueued, details: { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,2,3,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,] +"indexSwap" [2,3,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,2,3,] +doggo [1,2,] +whalo [3,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/meilisearch-auth/src/lib.rs b/meilisearch-auth/src/lib.rs index 12d810aec..020a2821c 100644 --- a/meilisearch-auth/src/lib.rs +++ b/meilisearch-auth/src/lib.rs @@ -215,6 +215,27 @@ impl SearchRules { } } } + + /// Return the list of indexes such that `self.is_index_authorized(index) == true`, + /// or `None` if all indexes satisfy this condition. + pub fn authorized_indexes(&self) -> Option> { + match self { + SearchRules::Set(set) => { + if set.contains("*") { + None + } else { + Some(set.iter().cloned().collect()) + } + } + SearchRules::Map(map) => { + if map.contains_key("*") { + None + } else { + Some(map.keys().cloned().collect()) + } + } + } + } } impl IntoIterator for SearchRules { diff --git a/meilisearch-http/src/routes/indexes/mod.rs b/meilisearch-http/src/routes/indexes/mod.rs index d370483c6..e8fca0cf8 100644 --- a/meilisearch-http/src/routes/indexes/mod.rs +++ b/meilisearch-http/src/routes/indexes/mod.rs @@ -1,11 +1,11 @@ use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; -use index_scheduler::{IndexScheduler, Query}; +use index_scheduler::IndexScheduler; use log::debug; use meilisearch_types::error::ResponseError; use meilisearch_types::index_uid::IndexUid; use meilisearch_types::milli::{self, FieldDistribution, Index}; -use meilisearch_types::tasks::{KindWithContent, Status}; +use meilisearch_types::tasks::KindWithContent; use serde::{Deserialize, Serialize}; use serde_json::json; use time::OffsetDateTime; @@ -202,14 +202,7 @@ impl IndexStats { index_uid: String, ) -> Result { // we check if there is currently a task processing associated with this index. - let processing_task = index_scheduler.get_tasks(Query { - status: Some(vec![Status::Processing]), - index_uid: Some(vec![index_uid.clone()]), - limit: Some(1), - ..Query::default() - })?; - let is_processing = !processing_task.is_empty(); - + let is_processing = index_scheduler.is_index_processing(&index_uid)?; let index = index_scheduler.index(&index_uid)?; let rtxn = index.read_txn()?; Ok(IndexStats { diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index 4463aee5e..81e100214 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -270,11 +270,10 @@ pub fn create_all_stats( let mut last_task: Option = None; let mut indexes = BTreeMap::new(); let mut database_size = 0; - let processing_task = index_scheduler.get_tasks(Query { - status: Some(vec![Status::Processing]), - limit: Some(1), - ..Query::default() - })?; + let processing_task = index_scheduler.get_tasks_from_authorized_indexes( + Query { status: Some(vec![Status::Processing]), limit: Some(1), ..Query::default() }, + search_rules.authorized_indexes(), + )?; let processing_index = processing_task.first().and_then(|task| task.index_uid()); for (name, index) in index_scheduler.indexes()? { if !search_rules.is_index_authorized(&name) { diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index 59f1a1f68..0c9a49a3c 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -291,8 +291,11 @@ async fn cancel_tasks( return Err(index_scheduler::Error::TaskCancelationWithEmptyQuery.into()); } - let filtered_query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query); - let tasks = index_scheduler.get_task_ids(&filtered_query)?; + let tasks = index_scheduler.get_task_ids_from_authorized_indexes( + &index_scheduler.read_txn()?, + &query, + &index_scheduler.filters().search_rules.authorized_indexes(), + )?; let task_cancelation = KindWithContent::TaskCancelation { query: req.query_string().to_string(), tasks }; @@ -348,8 +351,11 @@ async fn delete_tasks( return Err(index_scheduler::Error::TaskDeletionWithEmptyQuery.into()); } - let filtered_query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query); - let tasks = index_scheduler.get_task_ids(&filtered_query)?; + let tasks = index_scheduler.get_task_ids_from_authorized_indexes( + &index_scheduler.read_txn()?, + &query, + &index_scheduler.filters().search_rules.authorized_indexes(), + )?; let task_deletion = KindWithContent::TaskDeletion { query: req.query_string().to_string(), tasks }; @@ -425,10 +431,15 @@ async fn get_tasks( before_finished_at, after_finished_at, }; - let query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query); - let mut tasks_results: Vec = - index_scheduler.get_tasks(query)?.into_iter().map(|t| TaskView::from_task(&t)).collect(); + let mut tasks_results: Vec = index_scheduler + .get_tasks_from_authorized_indexes( + query, + index_scheduler.filters().search_rules.authorized_indexes(), + )? + .into_iter() + .map(|t| TaskView::from_task(&t)) + .collect(); // If we were able to fetch the number +1 tasks we asked // it means that there is more to come. @@ -454,17 +465,15 @@ async fn get_task( analytics.publish("Tasks Seen".to_string(), json!({ "per_task_uid": true }), Some(&req)); - let search_rules = &index_scheduler.filters().search_rules; - let mut filters = index_scheduler::Query::default(); - if !search_rules.is_index_authorized("*") { - for (index, _policy) in search_rules.clone() { - filters = filters.with_index(index); - } - } + let query = index_scheduler::Query { uid: Some(vec![task_id]), ..Query::default() }; - filters.uid = Some(vec![task_id]); - - if let Some(task) = index_scheduler.get_tasks(filters)?.first() { + if let Some(task) = index_scheduler + .get_tasks_from_authorized_indexes( + query, + index_scheduler.filters().search_rules.authorized_indexes(), + )? + .first() + { let task_view = TaskView::from_task(task); Ok(HttpResponse::Ok().json(task_view)) } else { @@ -472,39 +481,6 @@ async fn get_task( } } -fn filter_out_inaccessible_indexes_from_query( - index_scheduler: &GuardedData, Data>, - query: &Query, -) -> Query { - let mut query = query.clone(); - - // First remove all indexes from the query, we will add them back later - let indexes = query.index_uid.take(); - - let search_rules = &index_scheduler.filters().search_rules; - - // We filter on potential indexes and make sure that the search filter - // restrictions are also applied. - match indexes { - Some(indexes) => { - for name in indexes.iter() { - if search_rules.is_index_authorized(name) { - query = query.with_index(name.to_string()); - } - } - } - None => { - if !search_rules.is_index_authorized("*") { - for (index, _policy) in search_rules.clone() { - query = query.with_index(index.to_string()); - } - } - } - }; - - query -} - pub(crate) mod date_deserializer { use time::format_description::well_known::Rfc3339; use time::macros::format_description;