introduce TaskListIdentifier

This commit is contained in:
ad hoc 2022-05-16 20:16:23 +02:00
parent aa50acb031
commit 5a5066023b
No known key found for this signature in database
GPG Key ID: 4F00A782990CC643
4 changed files with 54 additions and 18 deletions

View File

@ -216,6 +216,7 @@ impl From<Task> for TaskView {
TaskType::IndexUpdate,
Some(TaskDetails::IndexInfo { primary_key }),
),
TaskContent::Dump { path: _ } => todo!(),
};
// An event always has at least one event: "Created"

View File

@ -351,6 +351,7 @@ where
Ok(TaskResult::Other)
}
TaskContent::Dump { path: _ } => Ok(TaskResult::Other),
}
}
@ -504,7 +505,7 @@ mod test {
proptest! {
#[test]
fn test_process_task(
task in any::<Task>().prop_filter("uid must be Some", |t| t.index_uid.is_some()),
task in any::<Task>().prop_filter("IndexUid should be Some", |s| s.index_uid.is_some()),
index_exists in any::<bool>(),
index_op_fails in any::<bool>(),
any_int in any::<u64>(),
@ -580,6 +581,7 @@ mod test {
.then(move |_| result());
}
}
TaskContent::Dump { path: _ } => { }
}
mocker.when::<(), IndexResult<IndexStats>>("stats")
@ -608,6 +610,7 @@ mod test {
}
// if index already exists, create index will return an error
TaskContent::IndexCreation { .. } if index_exists => (),
TaskContent::Dump { .. } => (),
// The index exists and get should be called
_ if index_exists => {
index_store
@ -648,7 +651,7 @@ mod test {
// Test for some expected output scenarios:
// Index creation and deletion cannot fail because of a failed index op, since they
// don't perform index ops.
if index_op_fails && !matches!(task.content, TaskContent::IndexDeletion | TaskContent::IndexCreation { primary_key: None } | TaskContent::IndexUpdate { primary_key: None })
if index_op_fails && !matches!(task.content, TaskContent::IndexDeletion | TaskContent::IndexCreation { primary_key: None } | TaskContent::IndexUpdate { primary_key: None } | TaskContent::Dump { .. })
|| (index_exists && matches!(task.content, TaskContent::IndexCreation { .. }))
|| (!index_exists && matches!(task.content, TaskContent::IndexDeletion
| TaskContent::DocumentDeletion(_)

View File

@ -21,8 +21,13 @@ use super::{TaskFilter, TaskPerformer, TaskStore};
#[derive(Eq, Debug, Clone, Copy)]
enum TaskType {
DocumentAddition { number: usize },
DocumentUpdate { number: usize },
DocumentAddition {
number: usize,
},
DocumentUpdate {
number: usize,
},
/// Any other kind of task, including Dumps
Other,
}
@ -63,7 +68,7 @@ impl Ord for PendingTask {
#[derive(Debug)]
struct TaskList {
index: String,
id: TaskListIdentifier,
tasks: BinaryHeap<PendingTask>,
}
@ -82,9 +87,9 @@ impl DerefMut for TaskList {
}
impl TaskList {
fn new(index: String) -> Self {
fn new(id: TaskListIdentifier) -> Self {
Self {
index,
id,
tasks: Default::default(),
}
}
@ -92,7 +97,7 @@ impl TaskList {
impl PartialEq for TaskList {
fn eq(&self, other: &Self) -> bool {
self.index == other.index
self.id == other.id
}
}
@ -100,6 +105,8 @@ impl Eq for TaskList {}
impl Ord for TaskList {
fn cmp(&self, other: &Self) -> Ordering {
match (&self.id, &other.id) {
(TaskListIdentifier::Index(_), TaskListIdentifier::Index(_)) => {
match (self.peek(), other.peek()) {
(None, None) => Ordering::Equal,
(None, Some(_)) => Ordering::Less,
@ -107,6 +114,13 @@ impl Ord for TaskList {
(Some(lhs), Some(rhs)) => lhs.cmp(rhs),
}
}
(TaskListIdentifier::Index(_), TaskListIdentifier::Dump) => Ordering::Greater,
(TaskListIdentifier::Dump, TaskListIdentifier::Index(_)) => Ordering::Less,
(TaskListIdentifier::Dump, TaskListIdentifier::Dump) => {
unreachable!("There should be only one Dump task list")
}
}
}
}
impl PartialOrd for TaskList {
@ -115,19 +129,27 @@ impl PartialOrd for TaskList {
}
}
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
enum TaskListIdentifier {
Index(String),
Dump,
}
#[derive(Default)]
struct TaskQueue {
/// Maps index uids to their TaskList, for quick access
index_tasks: HashMap<String, Arc<AtomicRefCell<TaskList>>>,
index_tasks: HashMap<TaskListIdentifier, Arc<AtomicRefCell<TaskList>>>,
/// A queue that orders TaskList by the priority of their fist update
queue: BinaryHeap<Arc<AtomicRefCell<TaskList>>>,
}
impl TaskQueue {
fn insert(&mut self, task: Task) {
// TODO(marin): The index uid should be remaped to a task queue identifier here
let uid = task.index_uid.unwrap().into_inner();
let id = task.id;
let uid = match task.index_uid {
Some(uid) => TaskListIdentifier::Index(uid.into_inner()),
None => unreachable!(),
};
let kind = match task.content {
TaskContent::DocumentAddition {
documents_count,
@ -161,7 +183,7 @@ impl TaskQueue {
list.push(task);
}
Entry::Vacant(entry) => {
let mut task_list = TaskList::new(entry.key().to_owned());
let mut task_list = TaskList::new(entry.key().clone());
task_list.push(task);
let task_list = Arc::new(AtomicRefCell::new(task_list));
entry.insert(task_list.clone());
@ -182,7 +204,7 @@ impl TaskQueue {
// After being mutated, the head is reinserted to the correct position.
self.queue.push(head);
} else {
self.index_tasks.remove(&head.borrow().index);
self.index_tasks.remove(&head.borrow().id);
}
Some(result)

View File

@ -78,6 +78,12 @@ pub struct Task {
/// then this is None
// TODO: when next forward breaking dumps, it would be a good idea to move this field inside of
// the TaskContent.
#[cfg_attr(
test,
proptest(
strategy = "proptest::option::weighted(proptest::option::Probability::new(0.99), IndexUid::arbitrary())"
)
)]
pub index_uid: Option<IndexUid>,
pub content: TaskContent,
pub events: Vec<TaskEvent>,
@ -165,6 +171,10 @@ pub enum TaskContent {
IndexUpdate {
primary_key: Option<String>,
},
Dump {
#[cfg_attr(test, proptest(value = "PathBuf::from(\".\")"))]
path: PathBuf,
},
}
#[cfg(test)]