move index_uid from task to task_content

This commit is contained in:
ad hoc 2022-05-31 17:18:40 +02:00
parent cf2d8de48a
commit 0c5352fc22
No known key found for this signature in database
GPG key ID: 4F00A782990CC643
12 changed files with 452 additions and 345 deletions

View file

@ -55,6 +55,7 @@ mod test {
task::{Task, TaskContent},
};
use crate::update_file_store::{Result as FileStoreResult, UpdateFileStore};
use crate::IndexUid;
use super::*;
use milli::update::IndexDocumentsMethod;
@ -103,13 +104,13 @@ mod test {
let task = Task {
id: 1,
index_uid: None,
content: TaskContent::DocumentAddition {
content_uuid,
merge_strategy: IndexDocumentsMethod::ReplaceDocuments,
primary_key: None,
documents_count: 100,
allow_index_creation: true,
index_uid: IndexUid::new_unchecked("test"),
},
events: Vec::new(),
};
@ -130,7 +131,6 @@ mod test {
let task = Task {
id: 1,
index_uid: None,
content: TaskContent::Dump {
uid: String::from("hello"),
},

View file

@ -17,9 +17,9 @@ mod test {
TaskContent::DocumentAddition { .. } => {
BatchContent::DocumentsAdditionBatch(vec![task])
}
TaskContent::DocumentDeletion(_)
TaskContent::DocumentDeletion { .. }
| TaskContent::SettingsUpdate { .. }
| TaskContent::IndexDeletion
| TaskContent::IndexDeletion { .. }
| TaskContent::IndexCreation { .. }
| TaskContent::IndexUpdate { .. } => BatchContent::IndexUpdate(task),
TaskContent::Dump { .. } => BatchContent::Dump(task),

View file

@ -131,6 +131,22 @@ enum TaskListIdentifier {
Dump,
}
impl From<&Task> for TaskListIdentifier {
fn from(task: &Task) -> Self {
match &task.content {
TaskContent::DocumentAddition { index_uid, .. }
| TaskContent::DocumentDeletion { index_uid, .. }
| TaskContent::SettingsUpdate { index_uid, .. }
| TaskContent::IndexDeletion { index_uid }
| TaskContent::IndexCreation { index_uid, .. }
| TaskContent::IndexUpdate { index_uid, .. } => {
TaskListIdentifier::Index(index_uid.as_str().to_string())
}
TaskContent::Dump { .. } => TaskListIdentifier::Dump,
}
}
}
#[derive(Default)]
struct TaskQueue {
/// Maps index uids to their TaskList, for quick access
@ -142,11 +158,8 @@ struct TaskQueue {
impl TaskQueue {
fn insert(&mut self, task: Task) {
let id = task.id;
let uid = match task.index_uid {
Some(uid) => TaskListIdentifier::Index(uid.into_inner()),
None if matches!(task.content, TaskContent::Dump { .. }) => TaskListIdentifier::Dump,
None => unreachable!("invalid task state"),
};
let uid = TaskListIdentifier::from(&task);
let kind = match task.content {
TaskContent::DocumentAddition {
documents_count,
@ -163,9 +176,9 @@ impl TaskQueue {
number: documents_count,
},
TaskContent::Dump { .. } => TaskType::Dump,
TaskContent::DocumentDeletion(_)
TaskContent::DocumentDeletion { .. }
| TaskContent::SettingsUpdate { .. }
| TaskContent::IndexDeletion
| TaskContent::IndexDeletion { .. }
| TaskContent::IndexCreation { .. }
| TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate,
_ => unreachable!("unhandled task type"),
@ -528,25 +541,25 @@ mod test {
use super::*;
fn gen_task(id: TaskId, index_uid: Option<&str>, content: TaskContent) -> Task {
fn gen_task(id: TaskId, content: TaskContent) -> Task {
Task {
id,
index_uid: index_uid.map(IndexUid::new_unchecked),
content,
events: vec![],
}
}
#[test]
#[rustfmt::skip]
fn register_updates_multiples_indexes() {
let mut queue = TaskQueue::default();
queue.insert(gen_task(0, Some("test1"), TaskContent::IndexDeletion));
queue.insert(gen_task(1, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(2, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(3, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(4, Some("test1"), TaskContent::IndexDeletion));
queue.insert(gen_task(5, Some("test1"), TaskContent::IndexDeletion));
queue.insert(gen_task(6, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(0, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1") }));
queue.insert(gen_task(1, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") }));
queue.insert(gen_task(2, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") }));
queue.insert(gen_task(3, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") }));
queue.insert(gen_task(4, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1") }));
queue.insert(gen_task(5, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1") }));
queue.insert(gen_task(6, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") }));
let test1_tasks = queue
.head_mut(|tasks| tasks.drain().map(|t| t.id).collect::<Vec<_>>())
@ -564,31 +577,30 @@ mod test {
assert!(queue.queue.is_empty());
}
#[test]
fn test_make_batch() {
let mut queue = TaskQueue::default();
let content = TaskContent::DocumentAddition {
fn gen_doc_addition_task_content(index_uid: &str) -> TaskContent {
TaskContent::DocumentAddition {
content_uuid: Uuid::new_v4(),
merge_strategy: IndexDocumentsMethod::ReplaceDocuments,
primary_key: Some("test".to_string()),
documents_count: 0,
allow_index_creation: true,
};
queue.insert(gen_task(0, Some("test1"), content.clone()));
queue.insert(gen_task(1, Some("test2"), content.clone()));
queue.insert(gen_task(2, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(3, Some("test2"), content.clone()));
queue.insert(gen_task(4, Some("test1"), content.clone()));
queue.insert(gen_task(5, Some("test1"), TaskContent::IndexDeletion));
queue.insert(gen_task(6, Some("test2"), content.clone()));
queue.insert(gen_task(7, Some("test1"), content));
queue.insert(gen_task(
8,
None,
TaskContent::Dump {
uid: "adump".to_owned(),
},
));
index_uid: IndexUid::new_unchecked(index_uid),
}
}
#[test]
#[rustfmt::skip]
fn test_make_batch() {
let mut queue = TaskQueue::default();
queue.insert(gen_task(0, gen_doc_addition_task_content("test1")));
queue.insert(gen_task(1, gen_doc_addition_task_content("test2")));
queue.insert(gen_task(2, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2")}));
queue.insert(gen_task(3, gen_doc_addition_task_content("test2")));
queue.insert(gen_task(4, gen_doc_addition_task_content("test1")));
queue.insert(gen_task(5, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1")}));
queue.insert(gen_task(6, gen_doc_addition_task_content("test2")));
queue.insert(gen_task(7, gen_doc_addition_task_content("test1")));
queue.insert(gen_task(8, TaskContent::Dump { uid: "adump".to_owned() }));
let config = SchedulerConfig::default();

View file

@ -5,10 +5,8 @@ use time::OffsetDateTime;
use uuid::Uuid;
use super::batch::BatchId;
use crate::{
index::{Settings, Unchecked},
index_resolver::IndexUid,
};
use crate::index::{Settings, Unchecked};
use crate::index_resolver::IndexUid;
pub type TaskId = u32;
@ -90,13 +88,6 @@ pub struct Task {
/// then this is None
// TODO: when next forward breaking dumps, it would be a good idea to move this field inside of
// the TaskContent.
#[cfg_attr(
test,
proptest(
strategy = "proptest::option::weighted(proptest::option::Probability::new(0.99), IndexUid::arbitrary())"
)
)]
pub index_uid: Option<IndexUid>,
pub content: TaskContent,
pub events: Vec<TaskEvent>,
}
@ -123,6 +114,18 @@ impl Task {
_ => None,
}
}
pub fn index_uid(&self) -> Option<&str> {
match &self.content {
TaskContent::DocumentAddition { index_uid, .. }
| TaskContent::DocumentDeletion { index_uid, .. }
| TaskContent::SettingsUpdate { index_uid, .. }
| TaskContent::IndexDeletion { index_uid }
| TaskContent::IndexCreation { index_uid, .. }
| TaskContent::IndexUpdate { index_uid, .. } => Some(index_uid.as_str()),
TaskContent::Dump { .. } => None,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
@ -137,6 +140,7 @@ pub enum DocumentDeletion {
#[allow(clippy::large_enum_variant)]
pub enum TaskContent {
DocumentAddition {
index_uid: IndexUid,
#[cfg_attr(test, proptest(value = "Uuid::new_v4()"))]
content_uuid: Uuid,
#[cfg_attr(test, proptest(strategy = "test::index_document_method_strategy()"))]
@ -145,18 +149,26 @@ pub enum TaskContent {
documents_count: usize,
allow_index_creation: bool,
},
DocumentDeletion(DocumentDeletion),
DocumentDeletion {
index_uid: IndexUid,
deletion: DocumentDeletion,
},
SettingsUpdate {
index_uid: IndexUid,
settings: Settings<Unchecked>,
/// Indicates whether the task was a deletion
is_deletion: bool,
allow_index_creation: bool,
},
IndexDeletion,
IndexDeletion {
index_uid: IndexUid,
},
IndexCreation {
index_uid: IndexUid,
primary_key: Option<String>,
},
IndexUpdate {
index_uid: IndexUid,
primary_key: Option<String>,
},
Dump {

View file

@ -14,7 +14,6 @@ use super::error::TaskError;
use super::scheduler::Processing;
use super::task::{Task, TaskContent, TaskId};
use super::Result;
use crate::index_resolver::IndexUid;
use crate::tasks::task::TaskEvent;
use crate::update_file_store::UpdateFileStore;
@ -32,11 +31,11 @@ pub struct TaskFilter {
impl TaskFilter {
fn pass(&self, task: &Task) -> bool {
match task.index_uid {
Some(ref index_uid) => self
match task.index_uid() {
Some(index_uid) => self
.indexes
.as_ref()
.map_or(true, |indexes| indexes.contains(index_uid.as_str())),
.map_or(true, |indexes| indexes.contains(index_uid)),
None => false,
}
}
@ -75,11 +74,7 @@ impl TaskStore {
Ok(Self { store })
}
pub async fn register(
&self,
index_uid: Option<IndexUid>,
content: TaskContent,
) -> Result<Task> {
pub async fn register(&self, content: TaskContent) -> Result<Task> {
debug!("registering update: {:?}", content);
let store = self.store.clone();
let task = tokio::task::spawn_blocking(move || -> Result<Task> {
@ -88,7 +83,6 @@ impl TaskStore {
let created_at = TaskEvent::Created(OffsetDateTime::now_utc());
let task = Task {
id: next_task_id,
index_uid,
content,
events: vec![created_at],
};
@ -273,7 +267,10 @@ impl TaskStore {
#[cfg(test)]
pub mod test {
use crate::tasks::{scheduler::Processing, task_store::store::test::tmp_env};
use crate::{
tasks::{scheduler::Processing, task_store::store::test::tmp_env},
IndexUid,
};
use super::*;
@ -359,13 +356,9 @@ pub mod test {
}
}
pub async fn register(
&self,
index_uid: Option<IndexUid>,
content: TaskContent,
) -> Result<Task> {
pub async fn register(&self, content: TaskContent) -> Result<Task> {
match self {
Self::Real(s) => s.register(index_uid, content).await,
Self::Real(s) => s.register(content).await,
Self::Mock(_m) => todo!(),
}
}
@ -393,8 +386,10 @@ pub mod test {
let gen_task = |id: TaskId| Task {
id,
index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexCreation { primary_key: None },
content: TaskContent::IndexCreation {
primary_key: None,
index_uid: IndexUid::new_unchecked("test"),
},
events: Vec::new(),
};

View file

@ -77,7 +77,7 @@ impl Store {
pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> {
self.tasks.put(txn, &BEU32::new(task.id), task)?;
// only add the task to the indexes index if it has an index_uid
if let Some(index_uid) = &task.index_uid {
if let Some(index_uid) = task.index_uid() {
let mut tasks_set = self
.index_uid_task_ids
.get(txn, index_uid)?
@ -287,8 +287,9 @@ pub mod test {
let tasks = (0..100)
.map(|_| Task {
id: rand::random(),
index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexDeletion,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test"),
},
events: vec![],
})
.collect::<Vec<_>>();
@ -318,15 +319,17 @@ pub mod test {
let task_1 = Task {
id: 1,
index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexDeletion,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test"),
},
events: vec![],
};
let task_2 = Task {
id: 0,
index_uid: Some(IndexUid::new_unchecked("test1")),
content: TaskContent::IndexDeletion,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test1"),
},
events: vec![],
};
@ -341,29 +344,21 @@ pub mod test {
txn.abort().unwrap();
assert_eq!(tasks.len(), 1);
assert_eq!(
tasks
.first()
.as_ref()
.unwrap()
.index_uid
.as_ref()
.unwrap()
.as_str(),
"test"
);
assert_eq!(tasks.first().as_ref().unwrap().index_uid().unwrap(), "test");
// same thing but invert the ids
let task_1 = Task {
id: 0,
index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexDeletion,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test"),
},
events: vec![],
};
let task_2 = Task {
id: 1,
index_uid: Some(IndexUid::new_unchecked("test1")),
content: TaskContent::IndexDeletion,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test1"),
},
events: vec![],
};
@ -378,14 +373,7 @@ pub mod test {
assert_eq!(tasks.len(), 1);
assert_eq!(
&*tasks
.first()
.as_ref()
.unwrap()
.index_uid
.as_ref()
.unwrap()
.as_str(),
&*tasks.first().as_ref().unwrap().index_uid().unwrap(),
"test"
);
}