From 5a5066023baa514a6779fc77a6330cfef28ac4af Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 16 May 2022 20:16:23 +0200 Subject: [PATCH] introduce TaskListIdentifier --- meilisearch-http/src/task.rs | 1 + meilisearch-lib/src/index_resolver/mod.rs | 7 ++- meilisearch-lib/src/tasks/scheduler.rs | 54 ++++++++++++++++------- meilisearch-lib/src/tasks/task.rs | 10 +++++ 4 files changed, 54 insertions(+), 18 deletions(-) diff --git a/meilisearch-http/src/task.rs b/meilisearch-http/src/task.rs index c8e269e56..56a181d29 100644 --- a/meilisearch-http/src/task.rs +++ b/meilisearch-http/src/task.rs @@ -216,6 +216,7 @@ impl From for TaskView { TaskType::IndexUpdate, Some(TaskDetails::IndexInfo { primary_key }), ), + TaskContent::Dump { path: _ } => todo!(), }; // An event always has at least one event: "Created" diff --git a/meilisearch-lib/src/index_resolver/mod.rs b/meilisearch-lib/src/index_resolver/mod.rs index 9db808d3f..3b8bdd631 100644 --- a/meilisearch-lib/src/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_resolver/mod.rs @@ -351,6 +351,7 @@ where Ok(TaskResult::Other) } + TaskContent::Dump { path: _ } => Ok(TaskResult::Other), } } @@ -504,7 +505,7 @@ mod test { proptest! { #[test] fn test_process_task( - task in any::().prop_filter("uid must be Some", |t| t.index_uid.is_some()), + task in any::().prop_filter("IndexUid should be Some", |s| s.index_uid.is_some()), index_exists in any::(), index_op_fails in any::(), any_int in any::(), @@ -580,6 +581,7 @@ mod test { .then(move |_| result()); } } + TaskContent::Dump { path: _ } => { } } mocker.when::<(), IndexResult>("stats") @@ -608,6 +610,7 @@ mod test { } // if index already exists, create index will return an error TaskContent::IndexCreation { .. } if index_exists => (), + TaskContent::Dump { .. } => (), // The index exists and get should be called _ if index_exists => { index_store @@ -648,7 +651,7 @@ mod test { // Test for some expected output scenarios: // Index creation and deletion cannot fail because of a failed index op, since they // don't perform index ops. - if index_op_fails && !matches!(task.content, TaskContent::IndexDeletion | TaskContent::IndexCreation { primary_key: None } | TaskContent::IndexUpdate { primary_key: None }) + if index_op_fails && !matches!(task.content, TaskContent::IndexDeletion | TaskContent::IndexCreation { primary_key: None } | TaskContent::IndexUpdate { primary_key: None } | TaskContent::Dump { .. }) || (index_exists && matches!(task.content, TaskContent::IndexCreation { .. })) || (!index_exists && matches!(task.content, TaskContent::IndexDeletion | TaskContent::DocumentDeletion(_) diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index 94de2a5fd..67aa6d8e5 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -21,8 +21,13 @@ use super::{TaskFilter, TaskPerformer, TaskStore}; #[derive(Eq, Debug, Clone, Copy)] enum TaskType { - DocumentAddition { number: usize }, - DocumentUpdate { number: usize }, + DocumentAddition { + number: usize, + }, + DocumentUpdate { + number: usize, + }, + /// Any other kind of task, including Dumps Other, } @@ -63,7 +68,7 @@ impl Ord for PendingTask { #[derive(Debug)] struct TaskList { - index: String, + id: TaskListIdentifier, tasks: BinaryHeap, } @@ -82,9 +87,9 @@ impl DerefMut for TaskList { } impl TaskList { - fn new(index: String) -> Self { + fn new(id: TaskListIdentifier) -> Self { Self { - index, + id, tasks: Default::default(), } } @@ -92,7 +97,7 @@ impl TaskList { impl PartialEq for TaskList { fn eq(&self, other: &Self) -> bool { - self.index == other.index + self.id == other.id } } @@ -100,11 +105,20 @@ impl Eq for TaskList {} impl Ord for TaskList { fn cmp(&self, other: &Self) -> Ordering { - match (self.peek(), other.peek()) { - (None, None) => Ordering::Equal, - (None, Some(_)) => Ordering::Less, - (Some(_), None) => Ordering::Greater, - (Some(lhs), Some(rhs)) => lhs.cmp(rhs), + match (&self.id, &other.id) { + (TaskListIdentifier::Index(_), TaskListIdentifier::Index(_)) => { + match (self.peek(), other.peek()) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(lhs), Some(rhs)) => lhs.cmp(rhs), + } + } + (TaskListIdentifier::Index(_), TaskListIdentifier::Dump) => Ordering::Greater, + (TaskListIdentifier::Dump, TaskListIdentifier::Index(_)) => Ordering::Less, + (TaskListIdentifier::Dump, TaskListIdentifier::Dump) => { + unreachable!("There should be only one Dump task list") + } } } } @@ -115,19 +129,27 @@ impl PartialOrd for TaskList { } } +#[derive(PartialEq, Eq, Hash, Debug, Clone)] +enum TaskListIdentifier { + Index(String), + Dump, +} + #[derive(Default)] struct TaskQueue { /// Maps index uids to their TaskList, for quick access - index_tasks: HashMap>>, + index_tasks: HashMap>>, /// A queue that orders TaskList by the priority of their fist update queue: BinaryHeap>>, } impl TaskQueue { fn insert(&mut self, task: Task) { - // TODO(marin): The index uid should be remaped to a task queue identifier here - let uid = task.index_uid.unwrap().into_inner(); let id = task.id; + let uid = match task.index_uid { + Some(uid) => TaskListIdentifier::Index(uid.into_inner()), + None => unreachable!(), + }; let kind = match task.content { TaskContent::DocumentAddition { documents_count, @@ -161,7 +183,7 @@ impl TaskQueue { list.push(task); } Entry::Vacant(entry) => { - let mut task_list = TaskList::new(entry.key().to_owned()); + let mut task_list = TaskList::new(entry.key().clone()); task_list.push(task); let task_list = Arc::new(AtomicRefCell::new(task_list)); entry.insert(task_list.clone()); @@ -182,7 +204,7 @@ impl TaskQueue { // After being mutated, the head is reinserted to the correct position. self.queue.push(head); } else { - self.index_tasks.remove(&head.borrow().index); + self.index_tasks.remove(&head.borrow().id); } Some(result) diff --git a/meilisearch-lib/src/tasks/task.rs b/meilisearch-lib/src/tasks/task.rs index d7a73a2ae..c20d2151b 100644 --- a/meilisearch-lib/src/tasks/task.rs +++ b/meilisearch-lib/src/tasks/task.rs @@ -78,6 +78,12 @@ pub struct Task { /// then this is None // TODO: when next forward breaking dumps, it would be a good idea to move this field inside of // the TaskContent. + #[cfg_attr( + test, + proptest( + strategy = "proptest::option::weighted(proptest::option::Probability::new(0.99), IndexUid::arbitrary())" + ) + )] pub index_uid: Option, pub content: TaskContent, pub events: Vec, @@ -165,6 +171,10 @@ pub enum TaskContent { IndexUpdate { primary_key: Option, }, + Dump { + #[cfg_attr(test, proptest(value = "PathBuf::from(\".\")"))] + path: PathBuf, + }, } #[cfg(test)]