2447: move index uid in task content r=Kerollmops a=MarinPostma

this pr moves the index_uid from the `Task` to the `TaskContent`. This is because the task can now have content that do not target a particular index.


Co-authored-by: ad hoc <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2022-06-02 13:54:09 +00:00 committed by GitHub
commit 953a209f02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 452 additions and 345 deletions

View File

@ -38,9 +38,9 @@ fn task_type_matches_content(type_: &TaskType, content: &TaskContent) -> bool {
matches!((type_, content),
(TaskType::IndexCreation, TaskContent::IndexCreation { .. })
| (TaskType::IndexUpdate, TaskContent::IndexUpdate { .. })
| (TaskType::IndexDeletion, TaskContent::IndexDeletion)
| (TaskType::IndexDeletion, TaskContent::IndexDeletion { .. })
| (TaskType::DocumentAdditionOrUpdate, TaskContent::DocumentAddition { .. })
| (TaskType::DocumentDeletion, TaskContent::DocumentDeletion(_))
| (TaskType::DocumentDeletion, TaskContent::DocumentDeletion{ .. })
| (TaskType::SettingsUpdate, TaskContent::SettingsUpdate { .. })
)
}

View File

@ -30,9 +30,9 @@ impl From<TaskContent> for TaskType {
match other {
TaskContent::IndexCreation { .. } => TaskType::IndexCreation,
TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate,
TaskContent::IndexDeletion => TaskType::IndexDeletion,
TaskContent::IndexDeletion { .. } => TaskType::IndexDeletion,
TaskContent::DocumentAddition { .. } => TaskType::DocumentAdditionOrUpdate,
TaskContent::DocumentDeletion(_) => TaskType::DocumentDeletion,
TaskContent::DocumentDeletion { .. } => TaskType::DocumentDeletion,
TaskContent::SettingsUpdate { .. } => TaskType::SettingsUpdate,
TaskContent::Dump { .. } => TaskType::DumpCreation,
}
@ -203,9 +203,9 @@ pub struct TaskView {
impl From<Task> for TaskView {
fn from(task: Task) -> Self {
let index_uid = task.index_uid().map(String::from);
let Task {
id,
index_uid,
content,
events,
} = task;
@ -221,20 +221,26 @@ impl From<Task> for TaskView {
(TaskType::DocumentAdditionOrUpdate, Some(details))
}
TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) => (
TaskContent::DocumentDeletion {
deletion: DocumentDeletion::Ids(ids),
..
} => (
TaskType::DocumentDeletion,
Some(TaskDetails::DocumentDeletion {
received_document_ids: ids.len(),
deleted_documents: None,
}),
),
TaskContent::DocumentDeletion(DocumentDeletion::Clear) => (
TaskContent::DocumentDeletion {
deletion: DocumentDeletion::Clear,
..
} => (
TaskType::DocumentDeletion,
Some(TaskDetails::ClearAll {
deleted_documents: None,
}),
),
TaskContent::IndexDeletion => (
TaskContent::IndexDeletion { .. } => (
TaskType::IndexDeletion,
Some(TaskDetails::ClearAll {
deleted_documents: None,
@ -244,11 +250,11 @@ impl From<Task> for TaskView {
TaskType::SettingsUpdate,
Some(TaskDetails::Settings { settings }),
),
TaskContent::IndexCreation { primary_key } => (
TaskContent::IndexCreation { primary_key, .. } => (
TaskType::IndexCreation,
Some(TaskDetails::IndexInfo { primary_key }),
),
TaskContent::IndexUpdate { primary_key } => (
TaskContent::IndexUpdate { primary_key, .. } => (
TaskType::IndexUpdate,
Some(TaskDetails::IndexInfo { primary_key }),
),
@ -353,7 +359,7 @@ impl From<Task> for TaskView {
Self {
uid: id,
index_uid: index_uid.map(|u| u.into_inner()),
index_uid,
status,
task_type,
details,
@ -402,7 +408,7 @@ impl From<Task> for SummarizedTaskView {
Self {
task_uid: other.id,
index_uid: other.index_uid.map(|u| u.into_inner()),
index_uid: other.index_uid().map(String::from),
status: TaskStatus::Enqueued,
task_type: other.content.into(),
enqueued_at,

View File

@ -4,10 +4,10 @@ use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
use super::v4::{Task, TaskEvent};
use super::v4::{Task, TaskContent, TaskEvent};
use crate::index::{Settings, Unchecked};
use crate::index_resolver::IndexUid;
use crate::tasks::task::{DocumentDeletion, TaskContent, TaskId, TaskResult};
use crate::tasks::task::{DocumentDeletion, TaskId, TaskResult};
use super::v2;
@ -59,9 +59,9 @@ pub enum Update {
ClearDocuments,
}
impl From<Update> for TaskContent {
fn from(other: Update) -> Self {
match other {
impl From<Update> for super::v4::TaskContent {
fn from(update: Update) -> Self {
match update {
Update::DeleteDocuments(ids) => {
TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids))
}
@ -186,10 +186,10 @@ impl Failed {
impl From<(UpdateStatus, String, TaskId)> for Task {
fn from((update, uid, task_id): (UpdateStatus, String, TaskId)) -> Self {
// Dummy task
let mut task = Task {
let mut task = super::v4::Task {
id: task_id,
index_uid: IndexUid::new(uid).unwrap(),
content: TaskContent::IndexDeletion,
index_uid: IndexUid::new_unchecked(uid),
content: super::v4::TaskContent::IndexDeletion,
events: Vec::new(),
};

View File

@ -1,9 +1,14 @@
use meilisearch_error::ResponseError;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
use crate::index::{Settings, Unchecked};
use crate::tasks::batch::BatchId;
use crate::tasks::task::{TaskContent, TaskEvent as NewTaskEvent, TaskId, TaskResult};
use crate::tasks::task::{
DocumentDeletion, TaskContent as NewTaskContent, TaskEvent as NewTaskEvent, TaskId, TaskResult,
};
use crate::IndexUid;
#[derive(Debug, Serialize, Deserialize)]
@ -18,8 +23,7 @@ impl From<Task> for crate::tasks::task::Task {
fn from(other: Task) -> Self {
Self {
id: other.id,
index_uid: Some(other.index_uid),
content: other.content,
content: NewTaskContent::from((other.index_uid, other.content)),
events: other.events.into_iter().map(Into::into).collect(),
}
}
@ -65,3 +69,77 @@ impl From<TaskEvent> for NewTaskEvent {
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum TaskContent {
DocumentAddition {
content_uuid: Uuid,
merge_strategy: IndexDocumentsMethod,
primary_key: Option<String>,
documents_count: usize,
allow_index_creation: bool,
},
DocumentDeletion(DocumentDeletion),
SettingsUpdate {
settings: Settings<Unchecked>,
/// Indicates whether the task was a deletion
is_deletion: bool,
allow_index_creation: bool,
},
IndexDeletion,
IndexCreation {
primary_key: Option<String>,
},
IndexUpdate {
primary_key: Option<String>,
},
Dump {
uid: String,
},
}
impl From<(IndexUid, TaskContent)> for NewTaskContent {
fn from((index_uid, content): (IndexUid, TaskContent)) -> Self {
match content {
TaskContent::DocumentAddition {
content_uuid,
merge_strategy,
primary_key,
documents_count,
allow_index_creation,
} => NewTaskContent::DocumentAddition {
index_uid,
content_uuid,
merge_strategy,
primary_key,
documents_count,
allow_index_creation,
},
TaskContent::DocumentDeletion(deletion) => NewTaskContent::DocumentDeletion {
index_uid,
deletion,
},
TaskContent::SettingsUpdate {
settings,
is_deletion,
allow_index_creation,
} => NewTaskContent::SettingsUpdate {
index_uid,
settings,
is_deletion,
allow_index_creation,
},
TaskContent::IndexDeletion => NewTaskContent::IndexDeletion { index_uid },
TaskContent::IndexCreation { primary_key } => NewTaskContent::IndexCreation {
index_uid,
primary_key,
},
TaskContent::IndexUpdate { primary_key } => NewTaskContent::IndexUpdate {
index_uid,
primary_key,
},
TaskContent::Dump { uid } => NewTaskContent::Dump { uid },
}
}
}

View File

@ -356,12 +356,16 @@ where
}
pub async fn register_update(&self, uid: String, update: Update) -> Result<Task> {
let uid = IndexUid::new(uid)?;
let index_uid = IndexUid::new(uid)?;
let content = match update {
Update::DeleteDocuments(ids) => {
TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids))
}
Update::ClearDocuments => TaskContent::DocumentDeletion(DocumentDeletion::Clear),
Update::DeleteDocuments(ids) => TaskContent::DocumentDeletion {
index_uid,
deletion: DocumentDeletion::Ids(ids),
},
Update::ClearDocuments => TaskContent::DocumentDeletion {
index_uid,
deletion: DocumentDeletion::Clear,
},
Update::Settings {
settings,
is_deletion,
@ -370,6 +374,7 @@ where
settings,
is_deletion,
allow_index_creation,
index_uid,
},
Update::DocumentAddition {
mut payload,
@ -409,14 +414,21 @@ where
primary_key,
documents_count,
allow_index_creation,
index_uid,
}
}
Update::DeleteIndex => TaskContent::IndexDeletion,
Update::CreateIndex { primary_key } => TaskContent::IndexCreation { primary_key },
Update::UpdateIndex { primary_key } => TaskContent::IndexUpdate { primary_key },
Update::DeleteIndex => TaskContent::IndexDeletion { index_uid },
Update::CreateIndex { primary_key } => TaskContent::IndexCreation {
primary_key,
index_uid,
},
Update::UpdateIndex { primary_key } => TaskContent::IndexUpdate {
primary_key,
index_uid,
},
};
let task = self.task_store.register(Some(uid), content).await?;
let task = self.task_store.register(content).await?;
self.scheduler.read().await.notify();
Ok(task)
@ -425,7 +437,7 @@ where
pub async fn register_dump_task(&self) -> Result<Task> {
let uid = dump::generate_uid();
let content = TaskContent::Dump { uid };
let task = self.task_store.register(None, content).await?;
let task = self.task_store.register(content).await?;
self.scheduler.read().await.notify();
Ok(task)
}
@ -569,13 +581,7 @@ where
// Check if the currently indexing update is from our index.
let is_indexing = processing_tasks
.first()
.map(|task| {
task.index_uid
.as_ref()
.map(|u| u.as_str() == uid)
.unwrap_or(false)
})
.unwrap_or_default();
.map_or(false, |task| task.index_uid().map_or(false, |u| u == uid));
let index = self.index_resolver.get_index(uid).await?;
let mut stats = spawn_blocking(move || index.stats()).await??;
@ -610,7 +616,7 @@ where
// Check if the currently indexing update is from our index.
stats.is_indexing = processing_tasks
.first()
.and_then(|p| p.index_uid.as_ref().map(|u| u.as_str() == index_uid))
.and_then(|p| p.index_uid().map(|u| u == index_uid))
.or(Some(false));
indexes.insert(index_uid, stats);

View File

@ -58,7 +58,6 @@ impl IndexUid {
}
}
#[cfg(test)]
pub fn new_unchecked(s: impl AsRef<str>) -> Self {
Self(s.as_ref().to_string())
}
@ -151,13 +150,13 @@ where
match tasks.first() {
Some(Task {
index_uid: Some(ref index_uid),
id,
content:
TaskContent::DocumentAddition {
merge_strategy,
primary_key,
allow_index_creation,
index_uid,
..
},
..
@ -227,12 +226,14 @@ where
}
pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
let index_uid = task.index_uid.clone();
match &task.content {
TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"),
TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) => {
TaskContent::DocumentDeletion {
deletion: DocumentDeletion::Ids(ids),
index_uid,
} => {
let ids = ids.clone();
let index = self.get_index(index_uid.unwrap().into_inner()).await?;
let index = self.get_index(index_uid.clone().into_inner()).await?;
let DocumentDeletionResult {
deleted_documents, ..
@ -240,8 +241,11 @@ where
Ok(TaskResult::DocumentDeletion { deleted_documents })
}
TaskContent::DocumentDeletion(DocumentDeletion::Clear) => {
let index = self.get_index(index_uid.unwrap().into_inner()).await?;
TaskContent::DocumentDeletion {
deletion: DocumentDeletion::Clear,
index_uid,
} => {
let index = self.get_index(index_uid.clone().into_inner()).await?;
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
let number_documents = index.stats()?.number_of_documents;
index.clear_documents()?;
@ -255,12 +259,12 @@ where
settings,
is_deletion,
allow_index_creation,
index_uid,
} => {
let index = if *is_deletion || !*allow_index_creation {
self.get_index(index_uid.unwrap().into_inner()).await?
self.get_index(index_uid.clone().into_inner()).await?
} else {
self.get_or_create_index(index_uid.unwrap(), task.id)
.await?
self.get_or_create_index(index_uid.clone(), task.id).await?
};
let settings = settings.clone();
@ -268,8 +272,8 @@ where
Ok(TaskResult::Other)
}
TaskContent::IndexDeletion => {
let index = self.delete_index(index_uid.unwrap().into_inner()).await?;
TaskContent::IndexDeletion { index_uid } => {
let index = self.delete_index(index_uid.clone().into_inner()).await?;
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
Ok(index.stats()?.number_of_documents)
@ -278,8 +282,11 @@ where
Ok(TaskResult::ClearAll { deleted_documents })
}
TaskContent::IndexCreation { primary_key } => {
let index = self.create_index(index_uid.unwrap(), task.id).await?;
TaskContent::IndexCreation {
primary_key,
index_uid,
} => {
let index = self.create_index(index_uid.clone(), task.id).await?;
if let Some(primary_key) = primary_key {
let primary_key = primary_key.clone();
@ -288,8 +295,11 @@ where
Ok(TaskResult::Other)
}
TaskContent::IndexUpdate { primary_key } => {
let index = self.get_index(index_uid.unwrap().into_inner()).await?;
TaskContent::IndexUpdate {
primary_key,
index_uid,
} => {
let index = self.get_index(index_uid.clone().into_inner()).await?;
if let Some(primary_key) = primary_key {
let primary_key = primary_key.clone();
@ -411,193 +421,193 @@ where
#[cfg(test)]
mod test {
use std::{collections::BTreeMap, vec::IntoIter};
use super::*;
use futures::future::ok;
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod};
use nelson::Mocker;
use proptest::prelude::*;
use crate::{
index::{
error::{IndexError, Result as IndexResult},
Checked, IndexMeta, IndexStats, Settings,
},
tasks::{batch::Batch, BatchHandler},
};
use index_store::MockIndexStore;
use meta_store::MockIndexMetaStore;
// use std::{collections::BTreeMap, vec::IntoIter};
//
// use super::*;
//
// use futures::future::ok;
// use milli::update::{DocumentAdditionResult, IndexDocumentsMethod};
// use nelson::Mocker;
// use proptest::prelude::*;
//
// use crate::{
// index::{
// error::{IndexError, Result as IndexResult},
// Checked, IndexMeta, IndexStats, Settings,
// },
// tasks::{batch::Batch, BatchHandler},
// };
// use index_store::MockIndexStore;
// use meta_store::MockIndexMetaStore;
// TODO: ignoring this test, it has become too complex to maintain, and rather implement
// handler logic test.
proptest! {
#[test]
#[ignore]
fn test_process_task(
task in any::<Task>().prop_filter("IndexUid should be Some", |s| s.index_uid.is_some()),
index_exists in any::<bool>(),
index_op_fails in any::<bool>(),
any_int in any::<u64>(),
) {
actix_rt::System::new().block_on(async move {
let uuid = Uuid::new_v4();
let mut index_store = MockIndexStore::new();
let mocker = Mocker::default();
// Return arbitrary data from index call.
match &task.content {
TaskContent::DocumentAddition{primary_key, ..} => {
let result = move || if !index_op_fails {
Ok(DocumentAdditionResult { indexed_documents: any_int, number_of_documents: any_int })
} else {
// return this error because it's easy to generate...
Err(IndexError::DocumentNotFound("a doc".into()))
};
if primary_key.is_some() {
mocker.when::<String, IndexResult<IndexMeta>>("update_primary_key")
.then(move |_| Ok(IndexMeta{ created_at: OffsetDateTime::now_utc(), updated_at: OffsetDateTime::now_utc(), primary_key: None }));
}
mocker.when::<(IndexDocumentsMethod, Option<String>, UpdateFileStore, IntoIter<Uuid>), IndexResult<DocumentAdditionResult>>("update_documents")
.then(move |(_, _, _, _)| result());
}
TaskContent::SettingsUpdate{..} => {
let result = move || if !index_op_fails {
Ok(())
} else {
// return this error because it's easy to generate...
Err(IndexError::DocumentNotFound("a doc".into()))
};
mocker.when::<&Settings<Checked>, IndexResult<()>>("update_settings")
.then(move |_| result());
}
TaskContent::DocumentDeletion(DocumentDeletion::Ids(_ids)) => {
let result = move || if !index_op_fails {
Ok(DocumentDeletionResult { deleted_documents: any_int as u64, remaining_documents: any_int as u64 })
} else {
// return this error because it's easy to generate...
Err(IndexError::DocumentNotFound("a doc".into()))
};
mocker.when::<&[String], IndexResult<DocumentDeletionResult>>("delete_documents")
.then(move |_| result());
},
TaskContent::DocumentDeletion(DocumentDeletion::Clear) => {
let result = move || if !index_op_fails {
Ok(())
} else {
// return this error because it's easy to generate...
Err(IndexError::DocumentNotFound("a doc".into()))
};
mocker.when::<(), IndexResult<()>>("clear_documents")
.then(move |_| result());
},
TaskContent::IndexDeletion => {
mocker.when::<(), ()>("close")
.times(index_exists as usize)
.then(move |_| ());
}
TaskContent::IndexUpdate { primary_key }
| TaskContent::IndexCreation { primary_key } => {
if primary_key.is_some() {
let result = move || if !index_op_fails {
Ok(IndexMeta{ created_at: OffsetDateTime::now_utc(), updated_at: OffsetDateTime::now_utc(), primary_key: None })
} else {
// return this error because it's easy to generate...
Err(IndexError::DocumentNotFound("a doc".into()))
};
mocker.when::<String, IndexResult<IndexMeta>>("update_primary_key")
.then(move |_| result());
}
}
TaskContent::Dump { .. } => { }
}
mocker.when::<(), IndexResult<IndexStats>>("stats")
.then(|()| Ok(IndexStats { size: 0, number_of_documents: 0, is_indexing: Some(false), field_distribution: BTreeMap::new() }));
let index = Index::mock(mocker);
match &task.content {
// an unexisting index should trigger an index creation in the folllowing cases:
TaskContent::DocumentAddition { allow_index_creation: true, .. }
| TaskContent::SettingsUpdate { allow_index_creation: true, is_deletion: false, .. }
| TaskContent::IndexCreation { .. } if !index_exists => {
index_store
.expect_create()
.once()
.withf(move |&found| !index_exists || found == uuid)
.returning(move |_| Box::pin(ok(index.clone())));
},
TaskContent::IndexDeletion => {
index_store
.expect_delete()
// this is called only if the index.exists
.times(index_exists as usize)
.withf(move |&found| !index_exists || found == uuid)
.returning(move |_| Box::pin(ok(Some(index.clone()))));
}
// if index already exists, create index will return an error
TaskContent::IndexCreation { .. } if index_exists => (),
TaskContent::Dump { .. } => (),
// The index exists and get should be called
_ if index_exists => {
index_store
.expect_get()
.once()
.withf(move |&found| found == uuid)
.returning(move |_| Box::pin(ok(Some(index.clone()))));
},
// the index doesn't exist and shouldn't be created, the uuidstore will return an error, and get_index will never be called.
_ => (),
}
let mut uuid_store = MockIndexMetaStore::new();
uuid_store
.expect_get()
.returning(move |uid| {
Box::pin(ok((uid, index_exists.then(|| crate::index_resolver::meta_store::IndexMeta {uuid, creation_task_id: 0 }))))
});
// we sould only be creating an index if the index doesn't alredy exist
uuid_store
.expect_insert()
.withf(move |_, _| !index_exists)
.returning(|_, _| Box::pin(ok(())));
uuid_store
.expect_delete()
.times(matches!(task.content, TaskContent::IndexDeletion) as usize)
.returning(move |_| Box::pin(ok(index_exists.then(|| crate::index_resolver::meta_store::IndexMeta { uuid, creation_task_id: 0}))));
let mocker = Mocker::default();
let update_file_store = UpdateFileStore::mock(mocker);
let index_resolver = IndexResolver::new(uuid_store, index_store, update_file_store);
let batch = Batch { id: Some(1), created_at: OffsetDateTime::now_utc(), content: crate::tasks::batch::BatchContent::IndexUpdate(task.clone()) };
if index_resolver.accept(&batch) {
let result = index_resolver.process_batch(batch).await;
// Test for some expected output scenarios:
// Index creation and deletion cannot fail because of a failed index op, since they
// don't perform index ops.
if index_op_fails && !matches!(task.content, TaskContent::IndexDeletion | TaskContent::IndexCreation { primary_key: None } | TaskContent::IndexUpdate { primary_key: None } | TaskContent::Dump { .. })
|| (index_exists && matches!(task.content, TaskContent::IndexCreation { .. }))
|| (!index_exists && matches!(task.content, TaskContent::IndexDeletion
| TaskContent::DocumentDeletion(_)
| TaskContent::SettingsUpdate { is_deletion: true, ..}
| TaskContent::SettingsUpdate { allow_index_creation: false, ..}
| TaskContent::DocumentAddition { allow_index_creation: false, ..}
| TaskContent::IndexUpdate { .. } ))
{
assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Failed { .. }), "{:?}", result);
} else {
assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Succeeded { .. }), "{:?}", result);
}
}
});
}
}
// proptest! {
// #[test]
// #[ignore]
// fn test_process_task(
// task in any::<Task>().prop_filter("IndexUid should be Some", |s| s.index_uid.is_some()),
// index_exists in any::<bool>(),
// index_op_fails in any::<bool>(),
// any_int in any::<u64>(),
// ) {
// actix_rt::System::new().block_on(async move {
// let uuid = Uuid::new_v4();
// let mut index_store = MockIndexStore::new();
//
// let mocker = Mocker::default();
//
// // Return arbitrary data from index call.
// match &task.content {
// TaskContent::DocumentAddition{primary_key, ..} => {
// let result = move || if !index_op_fails {
// Ok(DocumentAdditionResult { indexed_documents: any_int, number_of_documents: any_int })
// } else {
// // return this error because it's easy to generate...
// Err(IndexError::DocumentNotFound("a doc".into()))
// };
// if primary_key.is_some() {
// mocker.when::<String, IndexResult<IndexMeta>>("update_primary_key")
// .then(move |_| Ok(IndexMeta{ created_at: OffsetDateTime::now_utc(), updated_at: OffsetDateTime::now_utc(), primary_key: None }));
// }
// mocker.when::<(IndexDocumentsMethod, Option<String>, UpdateFileStore, IntoIter<Uuid>), IndexResult<DocumentAdditionResult>>("update_documents")
// .then(move |(_, _, _, _)| result());
// }
// TaskContent::SettingsUpdate{..} => {
// let result = move || if !index_op_fails {
// Ok(())
// } else {
// // return this error because it's easy to generate...
// Err(IndexError::DocumentNotFound("a doc".into()))
// };
// mocker.when::<&Settings<Checked>, IndexResult<()>>("update_settings")
// .then(move |_| result());
// }
// TaskContent::DocumentDeletion(DocumentDeletion::Ids(_ids)) => {
// let result = move || if !index_op_fails {
// Ok(DocumentDeletionResult { deleted_documents: any_int as u64, remaining_documents: any_int as u64 })
// } else {
// // return this error because it's easy to generate...
// Err(IndexError::DocumentNotFound("a doc".into()))
// };
//
// mocker.when::<&[String], IndexResult<DocumentDeletionResult>>("delete_documents")
// .then(move |_| result());
// },
// TaskContent::DocumentDeletion(DocumentDeletion::Clear) => {
// let result = move || if !index_op_fails {
// Ok(())
// } else {
// // return this error because it's easy to generate...
// Err(IndexError::DocumentNotFound("a doc".into()))
// };
// mocker.when::<(), IndexResult<()>>("clear_documents")
// .then(move |_| result());
// },
// TaskContent::IndexDeletion => {
// mocker.when::<(), ()>("close")
// .times(index_exists as usize)
// .then(move |_| ());
// }
// TaskContent::IndexUpdate { primary_key }
// | TaskContent::IndexCreation { primary_key } => {
// if primary_key.is_some() {
// let result = move || if !index_op_fails {
// Ok(IndexMeta{ created_at: OffsetDateTime::now_utc(), updated_at: OffsetDateTime::now_utc(), primary_key: None })
// } else {
// // return this error because it's easy to generate...
// Err(IndexError::DocumentNotFound("a doc".into()))
// };
// mocker.when::<String, IndexResult<IndexMeta>>("update_primary_key")
// .then(move |_| result());
// }
// }
// TaskContent::Dump { .. } => { }
// }
//
// mocker.when::<(), IndexResult<IndexStats>>("stats")
// .then(|()| Ok(IndexStats { size: 0, number_of_documents: 0, is_indexing: Some(false), field_distribution: BTreeMap::new() }));
//
// let index = Index::mock(mocker);
//
// match &task.content {
// // an unexisting index should trigger an index creation in the folllowing cases:
// TaskContent::DocumentAddition { allow_index_creation: true, .. }
// | TaskContent::SettingsUpdate { allow_index_creation: true, is_deletion: false, .. }
// | TaskContent::IndexCreation { .. } if !index_exists => {
// index_store
// .expect_create()
// .once()
// .withf(move |&found| !index_exists || found == uuid)
// .returning(move |_| Box::pin(ok(index.clone())));
// },
// TaskContent::IndexDeletion => {
// index_store
// .expect_delete()
// // this is called only if the index.exists
// .times(index_exists as usize)
// .withf(move |&found| !index_exists || found == uuid)
// .returning(move |_| Box::pin(ok(Some(index.clone()))));
// }
// // if index already exists, create index will return an error
// TaskContent::IndexCreation { .. } if index_exists => (),
// TaskContent::Dump { .. } => (),
// // The index exists and get should be called
// _ if index_exists => {
// index_store
// .expect_get()
// .once()
// .withf(move |&found| found == uuid)
// .returning(move |_| Box::pin(ok(Some(index.clone()))));
// },
// // the index doesn't exist and shouldn't be created, the uuidstore will return an error, and get_index will never be called.
// _ => (),
// }
//
// let mut uuid_store = MockIndexMetaStore::new();
// uuid_store
// .expect_get()
// .returning(move |uid| {
// Box::pin(ok((uid, index_exists.then(|| crate::index_resolver::meta_store::IndexMeta {uuid, creation_task_id: 0 }))))
// });
//
// // we sould only be creating an index if the index doesn't alredy exist
// uuid_store
// .expect_insert()
// .withf(move |_, _| !index_exists)
// .returning(|_, _| Box::pin(ok(())));
//
// uuid_store
// .expect_delete()
// .times(matches!(task.content, TaskContent::IndexDeletion) as usize)
// .returning(move |_| Box::pin(ok(index_exists.then(|| crate::index_resolver::meta_store::IndexMeta { uuid, creation_task_id: 0}))));
//
// let mocker = Mocker::default();
// let update_file_store = UpdateFileStore::mock(mocker);
// let index_resolver = IndexResolver::new(uuid_store, index_store, update_file_store);
//
// let batch = Batch { id: Some(1), created_at: OffsetDateTime::now_utc(), content: crate::tasks::batch::BatchContent::IndexUpdate(task.clone()) };
// if index_resolver.accept(&batch) {
// let result = index_resolver.process_batch(batch).await;
//
// // Test for some expected output scenarios:
// // Index creation and deletion cannot fail because of a failed index op, since they
// // don't perform index ops.
// if index_op_fails && !matches!(task.content, TaskContent::IndexDeletion | TaskContent::IndexCreation { primary_key: None } | TaskContent::IndexUpdate { primary_key: None } | TaskContent::Dump { .. })
// || (index_exists && matches!(task.content, TaskContent::IndexCreation { .. }))
// || (!index_exists && matches!(task.content, TaskContent::IndexDeletion
// | TaskContent::DocumentDeletion(_)
// | TaskContent::SettingsUpdate { is_deletion: true, ..}
// | TaskContent::SettingsUpdate { allow_index_creation: false, ..}
// | TaskContent::DocumentAddition { allow_index_creation: false, ..}
// | TaskContent::IndexUpdate { .. } ))
// {
// assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Failed { .. }), "{:?}", result);
// } else {
// assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Succeeded { .. }), "{:?}", result);
// }
// }
// });
// }
// }
}

View File

@ -55,6 +55,7 @@ mod test {
task::{Task, TaskContent},
};
use crate::update_file_store::{Result as FileStoreResult, UpdateFileStore};
use crate::IndexUid;
use super::*;
use milli::update::IndexDocumentsMethod;
@ -103,13 +104,13 @@ mod test {
let task = Task {
id: 1,
index_uid: None,
content: TaskContent::DocumentAddition {
content_uuid,
merge_strategy: IndexDocumentsMethod::ReplaceDocuments,
primary_key: None,
documents_count: 100,
allow_index_creation: true,
index_uid: IndexUid::new_unchecked("test"),
},
events: Vec::new(),
};
@ -130,7 +131,6 @@ mod test {
let task = Task {
id: 1,
index_uid: None,
content: TaskContent::Dump {
uid: String::from("hello"),
},

View File

@ -17,9 +17,9 @@ mod test {
TaskContent::DocumentAddition { .. } => {
BatchContent::DocumentsAdditionBatch(vec![task])
}
TaskContent::DocumentDeletion(_)
TaskContent::DocumentDeletion { .. }
| TaskContent::SettingsUpdate { .. }
| TaskContent::IndexDeletion
| TaskContent::IndexDeletion { .. }
| TaskContent::IndexCreation { .. }
| TaskContent::IndexUpdate { .. } => BatchContent::IndexUpdate(task),
TaskContent::Dump { .. } => BatchContent::Dump(task),

View File

@ -131,6 +131,22 @@ enum TaskListIdentifier {
Dump,
}
impl From<&Task> for TaskListIdentifier {
fn from(task: &Task) -> Self {
match &task.content {
TaskContent::DocumentAddition { index_uid, .. }
| TaskContent::DocumentDeletion { index_uid, .. }
| TaskContent::SettingsUpdate { index_uid, .. }
| TaskContent::IndexDeletion { index_uid }
| TaskContent::IndexCreation { index_uid, .. }
| TaskContent::IndexUpdate { index_uid, .. } => {
TaskListIdentifier::Index(index_uid.as_str().to_string())
}
TaskContent::Dump { .. } => TaskListIdentifier::Dump,
}
}
}
#[derive(Default)]
struct TaskQueue {
/// Maps index uids to their TaskList, for quick access
@ -142,11 +158,8 @@ struct TaskQueue {
impl TaskQueue {
fn insert(&mut self, task: Task) {
let id = task.id;
let uid = match task.index_uid {
Some(uid) => TaskListIdentifier::Index(uid.into_inner()),
None if matches!(task.content, TaskContent::Dump { .. }) => TaskListIdentifier::Dump,
None => unreachable!("invalid task state"),
};
let uid = TaskListIdentifier::from(&task);
let kind = match task.content {
TaskContent::DocumentAddition {
documents_count,
@ -163,9 +176,9 @@ impl TaskQueue {
number: documents_count,
},
TaskContent::Dump { .. } => TaskType::Dump,
TaskContent::DocumentDeletion(_)
TaskContent::DocumentDeletion { .. }
| TaskContent::SettingsUpdate { .. }
| TaskContent::IndexDeletion
| TaskContent::IndexDeletion { .. }
| TaskContent::IndexCreation { .. }
| TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate,
_ => unreachable!("unhandled task type"),
@ -528,25 +541,25 @@ mod test {
use super::*;
fn gen_task(id: TaskId, index_uid: Option<&str>, content: TaskContent) -> Task {
fn gen_task(id: TaskId, content: TaskContent) -> Task {
Task {
id,
index_uid: index_uid.map(IndexUid::new_unchecked),
content,
events: vec![],
}
}
#[test]
#[rustfmt::skip]
fn register_updates_multiples_indexes() {
let mut queue = TaskQueue::default();
queue.insert(gen_task(0, Some("test1"), TaskContent::IndexDeletion));
queue.insert(gen_task(1, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(2, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(3, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(4, Some("test1"), TaskContent::IndexDeletion));
queue.insert(gen_task(5, Some("test1"), TaskContent::IndexDeletion));
queue.insert(gen_task(6, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(0, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1") }));
queue.insert(gen_task(1, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") }));
queue.insert(gen_task(2, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") }));
queue.insert(gen_task(3, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") }));
queue.insert(gen_task(4, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1") }));
queue.insert(gen_task(5, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1") }));
queue.insert(gen_task(6, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") }));
let test1_tasks = queue
.head_mut(|tasks| tasks.drain().map(|t| t.id).collect::<Vec<_>>())
@ -564,31 +577,30 @@ mod test {
assert!(queue.queue.is_empty());
}
#[test]
fn test_make_batch() {
let mut queue = TaskQueue::default();
let content = TaskContent::DocumentAddition {
fn gen_doc_addition_task_content(index_uid: &str) -> TaskContent {
TaskContent::DocumentAddition {
content_uuid: Uuid::new_v4(),
merge_strategy: IndexDocumentsMethod::ReplaceDocuments,
primary_key: Some("test".to_string()),
documents_count: 0,
allow_index_creation: true,
};
queue.insert(gen_task(0, Some("test1"), content.clone()));
queue.insert(gen_task(1, Some("test2"), content.clone()));
queue.insert(gen_task(2, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(3, Some("test2"), content.clone()));
queue.insert(gen_task(4, Some("test1"), content.clone()));
queue.insert(gen_task(5, Some("test1"), TaskContent::IndexDeletion));
queue.insert(gen_task(6, Some("test2"), content.clone()));
queue.insert(gen_task(7, Some("test1"), content));
queue.insert(gen_task(
8,
None,
TaskContent::Dump {
uid: "adump".to_owned(),
},
));
index_uid: IndexUid::new_unchecked(index_uid),
}
}
#[test]
#[rustfmt::skip]
fn test_make_batch() {
let mut queue = TaskQueue::default();
queue.insert(gen_task(0, gen_doc_addition_task_content("test1")));
queue.insert(gen_task(1, gen_doc_addition_task_content("test2")));
queue.insert(gen_task(2, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2")}));
queue.insert(gen_task(3, gen_doc_addition_task_content("test2")));
queue.insert(gen_task(4, gen_doc_addition_task_content("test1")));
queue.insert(gen_task(5, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1")}));
queue.insert(gen_task(6, gen_doc_addition_task_content("test2")));
queue.insert(gen_task(7, gen_doc_addition_task_content("test1")));
queue.insert(gen_task(8, TaskContent::Dump { uid: "adump".to_owned() }));
let config = SchedulerConfig::default();

View File

@ -5,10 +5,8 @@ use time::OffsetDateTime;
use uuid::Uuid;
use super::batch::BatchId;
use crate::{
index::{Settings, Unchecked},
index_resolver::IndexUid,
};
use crate::index::{Settings, Unchecked};
use crate::index_resolver::IndexUid;
pub type TaskId = u32;
@ -90,13 +88,6 @@ pub struct Task {
/// then this is None
// TODO: when next forward breaking dumps, it would be a good idea to move this field inside of
// the TaskContent.
#[cfg_attr(
test,
proptest(
strategy = "proptest::option::weighted(proptest::option::Probability::new(0.99), IndexUid::arbitrary())"
)
)]
pub index_uid: Option<IndexUid>,
pub content: TaskContent,
pub events: Vec<TaskEvent>,
}
@ -123,6 +114,18 @@ impl Task {
_ => None,
}
}
pub fn index_uid(&self) -> Option<&str> {
match &self.content {
TaskContent::DocumentAddition { index_uid, .. }
| TaskContent::DocumentDeletion { index_uid, .. }
| TaskContent::SettingsUpdate { index_uid, .. }
| TaskContent::IndexDeletion { index_uid }
| TaskContent::IndexCreation { index_uid, .. }
| TaskContent::IndexUpdate { index_uid, .. } => Some(index_uid.as_str()),
TaskContent::Dump { .. } => None,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
@ -137,6 +140,7 @@ pub enum DocumentDeletion {
#[allow(clippy::large_enum_variant)]
pub enum TaskContent {
DocumentAddition {
index_uid: IndexUid,
#[cfg_attr(test, proptest(value = "Uuid::new_v4()"))]
content_uuid: Uuid,
#[cfg_attr(test, proptest(strategy = "test::index_document_method_strategy()"))]
@ -145,18 +149,26 @@ pub enum TaskContent {
documents_count: usize,
allow_index_creation: bool,
},
DocumentDeletion(DocumentDeletion),
DocumentDeletion {
index_uid: IndexUid,
deletion: DocumentDeletion,
},
SettingsUpdate {
index_uid: IndexUid,
settings: Settings<Unchecked>,
/// Indicates whether the task was a deletion
is_deletion: bool,
allow_index_creation: bool,
},
IndexDeletion,
IndexDeletion {
index_uid: IndexUid,
},
IndexCreation {
index_uid: IndexUid,
primary_key: Option<String>,
},
IndexUpdate {
index_uid: IndexUid,
primary_key: Option<String>,
},
Dump {

View File

@ -14,7 +14,6 @@ use super::error::TaskError;
use super::scheduler::Processing;
use super::task::{Task, TaskContent, TaskId};
use super::Result;
use crate::index_resolver::IndexUid;
use crate::tasks::task::TaskEvent;
use crate::update_file_store::UpdateFileStore;
@ -32,11 +31,11 @@ pub struct TaskFilter {
impl TaskFilter {
fn pass(&self, task: &Task) -> bool {
match task.index_uid {
Some(ref index_uid) => self
match task.index_uid() {
Some(index_uid) => self
.indexes
.as_ref()
.map_or(true, |indexes| indexes.contains(index_uid.as_str())),
.map_or(true, |indexes| indexes.contains(index_uid)),
None => false,
}
}
@ -75,11 +74,7 @@ impl TaskStore {
Ok(Self { store })
}
pub async fn register(
&self,
index_uid: Option<IndexUid>,
content: TaskContent,
) -> Result<Task> {
pub async fn register(&self, content: TaskContent) -> Result<Task> {
debug!("registering update: {:?}", content);
let store = self.store.clone();
let task = tokio::task::spawn_blocking(move || -> Result<Task> {
@ -88,7 +83,6 @@ impl TaskStore {
let created_at = TaskEvent::Created(OffsetDateTime::now_utc());
let task = Task {
id: next_task_id,
index_uid,
content,
events: vec![created_at],
};
@ -273,7 +267,10 @@ impl TaskStore {
#[cfg(test)]
pub mod test {
use crate::tasks::{scheduler::Processing, task_store::store::test::tmp_env};
use crate::{
tasks::{scheduler::Processing, task_store::store::test::tmp_env},
IndexUid,
};
use super::*;
@ -359,13 +356,9 @@ pub mod test {
}
}
pub async fn register(
&self,
index_uid: Option<IndexUid>,
content: TaskContent,
) -> Result<Task> {
pub async fn register(&self, content: TaskContent) -> Result<Task> {
match self {
Self::Real(s) => s.register(index_uid, content).await,
Self::Real(s) => s.register(content).await,
Self::Mock(_m) => todo!(),
}
}
@ -393,8 +386,10 @@ pub mod test {
let gen_task = |id: TaskId| Task {
id,
index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexCreation { primary_key: None },
content: TaskContent::IndexCreation {
primary_key: None,
index_uid: IndexUid::new_unchecked("test"),
},
events: Vec::new(),
};

View File

@ -77,7 +77,7 @@ impl Store {
pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> {
self.tasks.put(txn, &BEU32::new(task.id), task)?;
// only add the task to the indexes index if it has an index_uid
if let Some(index_uid) = &task.index_uid {
if let Some(index_uid) = task.index_uid() {
let mut tasks_set = self
.index_uid_task_ids
.get(txn, index_uid)?
@ -287,8 +287,9 @@ pub mod test {
let tasks = (0..100)
.map(|_| Task {
id: rand::random(),
index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexDeletion,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test"),
},
events: vec![],
})
.collect::<Vec<_>>();
@ -318,15 +319,17 @@ pub mod test {
let task_1 = Task {
id: 1,
index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexDeletion,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test"),
},
events: vec![],
};
let task_2 = Task {
id: 0,
index_uid: Some(IndexUid::new_unchecked("test1")),
content: TaskContent::IndexDeletion,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test1"),
},
events: vec![],
};
@ -341,29 +344,21 @@ pub mod test {
txn.abort().unwrap();
assert_eq!(tasks.len(), 1);
assert_eq!(
tasks
.first()
.as_ref()
.unwrap()
.index_uid
.as_ref()
.unwrap()
.as_str(),
"test"
);
assert_eq!(tasks.first().as_ref().unwrap().index_uid().unwrap(), "test");
// same thing but invert the ids
let task_1 = Task {
id: 0,
index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexDeletion,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test"),
},
events: vec![],
};
let task_2 = Task {
id: 1,
index_uid: Some(IndexUid::new_unchecked("test1")),
content: TaskContent::IndexDeletion,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test1"),
},
events: vec![],
};
@ -378,14 +373,7 @@ pub mod test {
assert_eq!(tasks.len(), 1);
assert_eq!(
&*tasks
.first()
.as_ref()
.unwrap()
.index_uid
.as_ref()
.unwrap()
.as_str(),
&*tasks.first().as_ref().unwrap().index_uid().unwrap(),
"test"
);
}