From 0c5352fc222b303645938dfd8bb49db394add12e Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 31 May 2022 17:18:40 +0200 Subject: [PATCH] move index_uid from task to task_content --- meilisearch-http/src/routes/tasks.rs | 4 +- meilisearch-http/src/task.rs | 26 +- meilisearch-lib/src/dump/compat/v3.rs | 16 +- meilisearch-lib/src/dump/compat/v4.rs | 84 +++- meilisearch-lib/src/index_controller/mod.rs | 42 +- meilisearch-lib/src/index_resolver/mod.rs | 414 +++++++++--------- .../tasks/handlers/index_resolver_handler.rs | 4 +- meilisearch-lib/src/tasks/handlers/mod.rs | 4 +- meilisearch-lib/src/tasks/scheduler.rs | 84 ++-- meilisearch-lib/src/tasks/task.rs | 38 +- meilisearch-lib/src/tasks/task_store/mod.rs | 33 +- meilisearch-lib/src/tasks/task_store/store.rs | 48 +- 12 files changed, 452 insertions(+), 345 deletions(-) diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index 2f62615fd..0ab4678b7 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -38,9 +38,9 @@ fn task_type_matches_content(type_: &TaskType, content: &TaskContent) -> bool { matches!((type_, content), (TaskType::IndexCreation, TaskContent::IndexCreation { .. }) | (TaskType::IndexUpdate, TaskContent::IndexUpdate { .. }) - | (TaskType::IndexDeletion, TaskContent::IndexDeletion) + | (TaskType::IndexDeletion, TaskContent::IndexDeletion { .. }) | (TaskType::DocumentAdditionOrUpdate, TaskContent::DocumentAddition { .. }) - | (TaskType::DocumentDeletion, TaskContent::DocumentDeletion(_)) + | (TaskType::DocumentDeletion, TaskContent::DocumentDeletion{ .. }) | (TaskType::SettingsUpdate, TaskContent::SettingsUpdate { .. }) ) } diff --git a/meilisearch-http/src/task.rs b/meilisearch-http/src/task.rs index f8ba941d8..d9360039d 100644 --- a/meilisearch-http/src/task.rs +++ b/meilisearch-http/src/task.rs @@ -30,9 +30,9 @@ impl From for TaskType { match other { TaskContent::IndexCreation { .. } => TaskType::IndexCreation, TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate, - TaskContent::IndexDeletion => TaskType::IndexDeletion, + TaskContent::IndexDeletion { .. } => TaskType::IndexDeletion, TaskContent::DocumentAddition { .. } => TaskType::DocumentAdditionOrUpdate, - TaskContent::DocumentDeletion(_) => TaskType::DocumentDeletion, + TaskContent::DocumentDeletion { .. } => TaskType::DocumentDeletion, TaskContent::SettingsUpdate { .. } => TaskType::SettingsUpdate, TaskContent::Dump { .. } => TaskType::DumpCreation, } @@ -203,9 +203,9 @@ pub struct TaskView { impl From for TaskView { fn from(task: Task) -> Self { + let index_uid = task.index_uid().map(String::from); let Task { id, - index_uid, content, events, } = task; @@ -221,20 +221,26 @@ impl From for TaskView { (TaskType::DocumentAdditionOrUpdate, Some(details)) } - TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) => ( + TaskContent::DocumentDeletion { + deletion: DocumentDeletion::Ids(ids), + .. + } => ( TaskType::DocumentDeletion, Some(TaskDetails::DocumentDeletion { received_document_ids: ids.len(), deleted_documents: None, }), ), - TaskContent::DocumentDeletion(DocumentDeletion::Clear) => ( + TaskContent::DocumentDeletion { + deletion: DocumentDeletion::Clear, + .. + } => ( TaskType::DocumentDeletion, Some(TaskDetails::ClearAll { deleted_documents: None, }), ), - TaskContent::IndexDeletion => ( + TaskContent::IndexDeletion { .. } => ( TaskType::IndexDeletion, Some(TaskDetails::ClearAll { deleted_documents: None, @@ -244,11 +250,11 @@ impl From for TaskView { TaskType::SettingsUpdate, Some(TaskDetails::Settings { settings }), ), - TaskContent::IndexCreation { primary_key } => ( + TaskContent::IndexCreation { primary_key, .. } => ( TaskType::IndexCreation, Some(TaskDetails::IndexInfo { primary_key }), ), - TaskContent::IndexUpdate { primary_key } => ( + TaskContent::IndexUpdate { primary_key, .. } => ( TaskType::IndexUpdate, Some(TaskDetails::IndexInfo { primary_key }), ), @@ -353,7 +359,7 @@ impl From for TaskView { Self { uid: id, - index_uid: index_uid.map(|u| u.into_inner()), + index_uid, status, task_type, details, @@ -402,7 +408,7 @@ impl From for SummarizedTaskView { Self { task_uid: other.id, - index_uid: other.index_uid.map(|u| u.into_inner()), + index_uid: other.index_uid().map(String::from), status: TaskStatus::Enqueued, task_type: other.content.into(), enqueued_at, diff --git a/meilisearch-lib/src/dump/compat/v3.rs b/meilisearch-lib/src/dump/compat/v3.rs index 164b7153d..2044e3b60 100644 --- a/meilisearch-lib/src/dump/compat/v3.rs +++ b/meilisearch-lib/src/dump/compat/v3.rs @@ -4,10 +4,10 @@ use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use uuid::Uuid; -use super::v4::{Task, TaskEvent}; +use super::v4::{Task, TaskContent, TaskEvent}; use crate::index::{Settings, Unchecked}; use crate::index_resolver::IndexUid; -use crate::tasks::task::{DocumentDeletion, TaskContent, TaskId, TaskResult}; +use crate::tasks::task::{DocumentDeletion, TaskId, TaskResult}; use super::v2; @@ -59,9 +59,9 @@ pub enum Update { ClearDocuments, } -impl From for TaskContent { - fn from(other: Update) -> Self { - match other { +impl From for super::v4::TaskContent { + fn from(update: Update) -> Self { + match update { Update::DeleteDocuments(ids) => { TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) } @@ -186,10 +186,10 @@ impl Failed { impl From<(UpdateStatus, String, TaskId)> for Task { fn from((update, uid, task_id): (UpdateStatus, String, TaskId)) -> Self { // Dummy task - let mut task = Task { + let mut task = super::v4::Task { id: task_id, - index_uid: IndexUid::new(uid).unwrap(), - content: TaskContent::IndexDeletion, + index_uid: IndexUid::new_unchecked(uid), + content: super::v4::TaskContent::IndexDeletion, events: Vec::new(), }; diff --git a/meilisearch-lib/src/dump/compat/v4.rs b/meilisearch-lib/src/dump/compat/v4.rs index 6fa0e582a..867bc7b63 100644 --- a/meilisearch-lib/src/dump/compat/v4.rs +++ b/meilisearch-lib/src/dump/compat/v4.rs @@ -1,9 +1,14 @@ use meilisearch_error::ResponseError; +use milli::update::IndexDocumentsMethod; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; +use uuid::Uuid; +use crate::index::{Settings, Unchecked}; use crate::tasks::batch::BatchId; -use crate::tasks::task::{TaskContent, TaskEvent as NewTaskEvent, TaskId, TaskResult}; +use crate::tasks::task::{ + DocumentDeletion, TaskContent as NewTaskContent, TaskEvent as NewTaskEvent, TaskId, TaskResult, +}; use crate::IndexUid; #[derive(Debug, Serialize, Deserialize)] @@ -18,8 +23,7 @@ impl From for crate::tasks::task::Task { fn from(other: Task) -> Self { Self { id: other.id, - index_uid: Some(other.index_uid), - content: other.content, + content: NewTaskContent::from((other.index_uid, other.content)), events: other.events.into_iter().map(Into::into).collect(), } } @@ -65,3 +69,77 @@ impl From for NewTaskEvent { } } } + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[allow(clippy::large_enum_variant)] +pub enum TaskContent { + DocumentAddition { + content_uuid: Uuid, + merge_strategy: IndexDocumentsMethod, + primary_key: Option, + documents_count: usize, + allow_index_creation: bool, + }, + DocumentDeletion(DocumentDeletion), + SettingsUpdate { + settings: Settings, + /// Indicates whether the task was a deletion + is_deletion: bool, + allow_index_creation: bool, + }, + IndexDeletion, + IndexCreation { + primary_key: Option, + }, + IndexUpdate { + primary_key: Option, + }, + Dump { + uid: String, + }, +} + +impl From<(IndexUid, TaskContent)> for NewTaskContent { + fn from((index_uid, content): (IndexUid, TaskContent)) -> Self { + match content { + TaskContent::DocumentAddition { + content_uuid, + merge_strategy, + primary_key, + documents_count, + allow_index_creation, + } => NewTaskContent::DocumentAddition { + index_uid, + content_uuid, + merge_strategy, + primary_key, + documents_count, + allow_index_creation, + }, + TaskContent::DocumentDeletion(deletion) => NewTaskContent::DocumentDeletion { + index_uid, + deletion, + }, + TaskContent::SettingsUpdate { + settings, + is_deletion, + allow_index_creation, + } => NewTaskContent::SettingsUpdate { + index_uid, + settings, + is_deletion, + allow_index_creation, + }, + TaskContent::IndexDeletion => NewTaskContent::IndexDeletion { index_uid }, + TaskContent::IndexCreation { primary_key } => NewTaskContent::IndexCreation { + index_uid, + primary_key, + }, + TaskContent::IndexUpdate { primary_key } => NewTaskContent::IndexUpdate { + index_uid, + primary_key, + }, + TaskContent::Dump { uid } => NewTaskContent::Dump { uid }, + } + } +} diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index c872b60c5..ecca9ac63 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -356,12 +356,16 @@ where } pub async fn register_update(&self, uid: String, update: Update) -> Result { - let uid = IndexUid::new(uid)?; + let index_uid = IndexUid::new(uid)?; let content = match update { - Update::DeleteDocuments(ids) => { - TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) - } - Update::ClearDocuments => TaskContent::DocumentDeletion(DocumentDeletion::Clear), + Update::DeleteDocuments(ids) => TaskContent::DocumentDeletion { + index_uid, + deletion: DocumentDeletion::Ids(ids), + }, + Update::ClearDocuments => TaskContent::DocumentDeletion { + index_uid, + deletion: DocumentDeletion::Clear, + }, Update::Settings { settings, is_deletion, @@ -370,6 +374,7 @@ where settings, is_deletion, allow_index_creation, + index_uid, }, Update::DocumentAddition { mut payload, @@ -409,14 +414,21 @@ where primary_key, documents_count, allow_index_creation, + index_uid, } } - Update::DeleteIndex => TaskContent::IndexDeletion, - Update::CreateIndex { primary_key } => TaskContent::IndexCreation { primary_key }, - Update::UpdateIndex { primary_key } => TaskContent::IndexUpdate { primary_key }, + Update::DeleteIndex => TaskContent::IndexDeletion { index_uid }, + Update::CreateIndex { primary_key } => TaskContent::IndexCreation { + primary_key, + index_uid, + }, + Update::UpdateIndex { primary_key } => TaskContent::IndexUpdate { + primary_key, + index_uid, + }, }; - let task = self.task_store.register(Some(uid), content).await?; + let task = self.task_store.register(content).await?; self.scheduler.read().await.notify(); Ok(task) @@ -425,7 +437,7 @@ where pub async fn register_dump_task(&self) -> Result { let uid = dump::generate_uid(); let content = TaskContent::Dump { uid }; - let task = self.task_store.register(None, content).await?; + let task = self.task_store.register(content).await?; self.scheduler.read().await.notify(); Ok(task) } @@ -569,13 +581,7 @@ where // Check if the currently indexing update is from our index. let is_indexing = processing_tasks .first() - .map(|task| { - task.index_uid - .as_ref() - .map(|u| u.as_str() == uid) - .unwrap_or(false) - }) - .unwrap_or_default(); + .map_or(false, |task| task.index_uid().map_or(false, |u| u == uid)); let index = self.index_resolver.get_index(uid).await?; let mut stats = spawn_blocking(move || index.stats()).await??; @@ -610,7 +616,7 @@ where // Check if the currently indexing update is from our index. stats.is_indexing = processing_tasks .first() - .and_then(|p| p.index_uid.as_ref().map(|u| u.as_str() == index_uid)) + .and_then(|p| p.index_uid().map(|u| u == index_uid)) .or(Some(false)); indexes.insert(index_uid, stats); diff --git a/meilisearch-lib/src/index_resolver/mod.rs b/meilisearch-lib/src/index_resolver/mod.rs index 33b480f61..ac82f7a3d 100644 --- a/meilisearch-lib/src/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_resolver/mod.rs @@ -58,7 +58,6 @@ impl IndexUid { } } - #[cfg(test)] pub fn new_unchecked(s: impl AsRef) -> Self { Self(s.as_ref().to_string()) } @@ -151,13 +150,13 @@ where match tasks.first() { Some(Task { - index_uid: Some(ref index_uid), id, content: TaskContent::DocumentAddition { merge_strategy, primary_key, allow_index_creation, + index_uid, .. }, .. @@ -227,12 +226,14 @@ where } pub async fn process_task(&self, task: &Task) -> Result { - let index_uid = task.index_uid.clone(); match &task.content { TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"), - TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) => { + TaskContent::DocumentDeletion { + deletion: DocumentDeletion::Ids(ids), + index_uid, + } => { let ids = ids.clone(); - let index = self.get_index(index_uid.unwrap().into_inner()).await?; + let index = self.get_index(index_uid.clone().into_inner()).await?; let DocumentDeletionResult { deleted_documents, .. @@ -240,8 +241,11 @@ where Ok(TaskResult::DocumentDeletion { deleted_documents }) } - TaskContent::DocumentDeletion(DocumentDeletion::Clear) => { - let index = self.get_index(index_uid.unwrap().into_inner()).await?; + TaskContent::DocumentDeletion { + deletion: DocumentDeletion::Clear, + index_uid, + } => { + let index = self.get_index(index_uid.clone().into_inner()).await?; let deleted_documents = spawn_blocking(move || -> IndexResult { let number_documents = index.stats()?.number_of_documents; index.clear_documents()?; @@ -255,12 +259,12 @@ where settings, is_deletion, allow_index_creation, + index_uid, } => { let index = if *is_deletion || !*allow_index_creation { - self.get_index(index_uid.unwrap().into_inner()).await? + self.get_index(index_uid.clone().into_inner()).await? } else { - self.get_or_create_index(index_uid.unwrap(), task.id) - .await? + self.get_or_create_index(index_uid.clone(), task.id).await? }; let settings = settings.clone(); @@ -268,8 +272,8 @@ where Ok(TaskResult::Other) } - TaskContent::IndexDeletion => { - let index = self.delete_index(index_uid.unwrap().into_inner()).await?; + TaskContent::IndexDeletion { index_uid } => { + let index = self.delete_index(index_uid.clone().into_inner()).await?; let deleted_documents = spawn_blocking(move || -> IndexResult { Ok(index.stats()?.number_of_documents) @@ -278,8 +282,11 @@ where Ok(TaskResult::ClearAll { deleted_documents }) } - TaskContent::IndexCreation { primary_key } => { - let index = self.create_index(index_uid.unwrap(), task.id).await?; + TaskContent::IndexCreation { + primary_key, + index_uid, + } => { + let index = self.create_index(index_uid.clone(), task.id).await?; if let Some(primary_key) = primary_key { let primary_key = primary_key.clone(); @@ -288,8 +295,11 @@ where Ok(TaskResult::Other) } - TaskContent::IndexUpdate { primary_key } => { - let index = self.get_index(index_uid.unwrap().into_inner()).await?; + TaskContent::IndexUpdate { + primary_key, + index_uid, + } => { + let index = self.get_index(index_uid.clone().into_inner()).await?; if let Some(primary_key) = primary_key { let primary_key = primary_key.clone(); @@ -411,193 +421,193 @@ where #[cfg(test)] mod test { - use std::{collections::BTreeMap, vec::IntoIter}; - - use super::*; - - use futures::future::ok; - use milli::update::{DocumentAdditionResult, IndexDocumentsMethod}; - use nelson::Mocker; - use proptest::prelude::*; - - use crate::{ - index::{ - error::{IndexError, Result as IndexResult}, - Checked, IndexMeta, IndexStats, Settings, - }, - tasks::{batch::Batch, BatchHandler}, - }; - use index_store::MockIndexStore; - use meta_store::MockIndexMetaStore; + // use std::{collections::BTreeMap, vec::IntoIter}; + // + // use super::*; + // + // use futures::future::ok; + // use milli::update::{DocumentAdditionResult, IndexDocumentsMethod}; + // use nelson::Mocker; + // use proptest::prelude::*; + // + // use crate::{ + // index::{ + // error::{IndexError, Result as IndexResult}, + // Checked, IndexMeta, IndexStats, Settings, + // }, + // tasks::{batch::Batch, BatchHandler}, + // }; + // use index_store::MockIndexStore; + // use meta_store::MockIndexMetaStore; // TODO: ignoring this test, it has become too complex to maintain, and rather implement // handler logic test. - proptest! { - #[test] - #[ignore] - fn test_process_task( - task in any::().prop_filter("IndexUid should be Some", |s| s.index_uid.is_some()), - index_exists in any::(), - index_op_fails in any::(), - any_int in any::(), - ) { - actix_rt::System::new().block_on(async move { - let uuid = Uuid::new_v4(); - let mut index_store = MockIndexStore::new(); - - let mocker = Mocker::default(); - - // Return arbitrary data from index call. - match &task.content { - TaskContent::DocumentAddition{primary_key, ..} => { - let result = move || if !index_op_fails { - Ok(DocumentAdditionResult { indexed_documents: any_int, number_of_documents: any_int }) - } else { - // return this error because it's easy to generate... - Err(IndexError::DocumentNotFound("a doc".into())) - }; - if primary_key.is_some() { - mocker.when::>("update_primary_key") - .then(move |_| Ok(IndexMeta{ created_at: OffsetDateTime::now_utc(), updated_at: OffsetDateTime::now_utc(), primary_key: None })); - } - mocker.when::<(IndexDocumentsMethod, Option, UpdateFileStore, IntoIter), IndexResult>("update_documents") - .then(move |(_, _, _, _)| result()); - } - TaskContent::SettingsUpdate{..} => { - let result = move || if !index_op_fails { - Ok(()) - } else { - // return this error because it's easy to generate... - Err(IndexError::DocumentNotFound("a doc".into())) - }; - mocker.when::<&Settings, IndexResult<()>>("update_settings") - .then(move |_| result()); - } - TaskContent::DocumentDeletion(DocumentDeletion::Ids(_ids)) => { - let result = move || if !index_op_fails { - Ok(DocumentDeletionResult { deleted_documents: any_int as u64, remaining_documents: any_int as u64 }) - } else { - // return this error because it's easy to generate... - Err(IndexError::DocumentNotFound("a doc".into())) - }; - - mocker.when::<&[String], IndexResult>("delete_documents") - .then(move |_| result()); - }, - TaskContent::DocumentDeletion(DocumentDeletion::Clear) => { - let result = move || if !index_op_fails { - Ok(()) - } else { - // return this error because it's easy to generate... - Err(IndexError::DocumentNotFound("a doc".into())) - }; - mocker.when::<(), IndexResult<()>>("clear_documents") - .then(move |_| result()); - }, - TaskContent::IndexDeletion => { - mocker.when::<(), ()>("close") - .times(index_exists as usize) - .then(move |_| ()); - } - TaskContent::IndexUpdate { primary_key } - | TaskContent::IndexCreation { primary_key } => { - if primary_key.is_some() { - let result = move || if !index_op_fails { - Ok(IndexMeta{ created_at: OffsetDateTime::now_utc(), updated_at: OffsetDateTime::now_utc(), primary_key: None }) - } else { - // return this error because it's easy to generate... - Err(IndexError::DocumentNotFound("a doc".into())) - }; - mocker.when::>("update_primary_key") - .then(move |_| result()); - } - } - TaskContent::Dump { .. } => { } - } - - mocker.when::<(), IndexResult>("stats") - .then(|()| Ok(IndexStats { size: 0, number_of_documents: 0, is_indexing: Some(false), field_distribution: BTreeMap::new() })); - - let index = Index::mock(mocker); - - match &task.content { - // an unexisting index should trigger an index creation in the folllowing cases: - TaskContent::DocumentAddition { allow_index_creation: true, .. } - | TaskContent::SettingsUpdate { allow_index_creation: true, is_deletion: false, .. } - | TaskContent::IndexCreation { .. } if !index_exists => { - index_store - .expect_create() - .once() - .withf(move |&found| !index_exists || found == uuid) - .returning(move |_| Box::pin(ok(index.clone()))); - }, - TaskContent::IndexDeletion => { - index_store - .expect_delete() - // this is called only if the index.exists - .times(index_exists as usize) - .withf(move |&found| !index_exists || found == uuid) - .returning(move |_| Box::pin(ok(Some(index.clone())))); - } - // if index already exists, create index will return an error - TaskContent::IndexCreation { .. } if index_exists => (), - TaskContent::Dump { .. } => (), - // The index exists and get should be called - _ if index_exists => { - index_store - .expect_get() - .once() - .withf(move |&found| found == uuid) - .returning(move |_| Box::pin(ok(Some(index.clone())))); - }, - // the index doesn't exist and shouldn't be created, the uuidstore will return an error, and get_index will never be called. - _ => (), - } - - let mut uuid_store = MockIndexMetaStore::new(); - uuid_store - .expect_get() - .returning(move |uid| { - Box::pin(ok((uid, index_exists.then(|| crate::index_resolver::meta_store::IndexMeta {uuid, creation_task_id: 0 })))) - }); - - // we sould only be creating an index if the index doesn't alredy exist - uuid_store - .expect_insert() - .withf(move |_, _| !index_exists) - .returning(|_, _| Box::pin(ok(()))); - - uuid_store - .expect_delete() - .times(matches!(task.content, TaskContent::IndexDeletion) as usize) - .returning(move |_| Box::pin(ok(index_exists.then(|| crate::index_resolver::meta_store::IndexMeta { uuid, creation_task_id: 0})))); - - let mocker = Mocker::default(); - let update_file_store = UpdateFileStore::mock(mocker); - let index_resolver = IndexResolver::new(uuid_store, index_store, update_file_store); - - let batch = Batch { id: Some(1), created_at: OffsetDateTime::now_utc(), content: crate::tasks::batch::BatchContent::IndexUpdate(task.clone()) }; - if index_resolver.accept(&batch) { - let result = index_resolver.process_batch(batch).await; - - // Test for some expected output scenarios: - // Index creation and deletion cannot fail because of a failed index op, since they - // don't perform index ops. - if index_op_fails && !matches!(task.content, TaskContent::IndexDeletion | TaskContent::IndexCreation { primary_key: None } | TaskContent::IndexUpdate { primary_key: None } | TaskContent::Dump { .. }) - || (index_exists && matches!(task.content, TaskContent::IndexCreation { .. })) - || (!index_exists && matches!(task.content, TaskContent::IndexDeletion - | TaskContent::DocumentDeletion(_) - | TaskContent::SettingsUpdate { is_deletion: true, ..} - | TaskContent::SettingsUpdate { allow_index_creation: false, ..} - | TaskContent::DocumentAddition { allow_index_creation: false, ..} - | TaskContent::IndexUpdate { .. } )) - { - assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Failed { .. }), "{:?}", result); - } else { - assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Succeeded { .. }), "{:?}", result); - } - } - }); - } - } + // proptest! { + // #[test] + // #[ignore] + // fn test_process_task( + // task in any::().prop_filter("IndexUid should be Some", |s| s.index_uid.is_some()), + // index_exists in any::(), + // index_op_fails in any::(), + // any_int in any::(), + // ) { + // actix_rt::System::new().block_on(async move { + // let uuid = Uuid::new_v4(); + // let mut index_store = MockIndexStore::new(); + // + // let mocker = Mocker::default(); + // + // // Return arbitrary data from index call. + // match &task.content { + // TaskContent::DocumentAddition{primary_key, ..} => { + // let result = move || if !index_op_fails { + // Ok(DocumentAdditionResult { indexed_documents: any_int, number_of_documents: any_int }) + // } else { + // // return this error because it's easy to generate... + // Err(IndexError::DocumentNotFound("a doc".into())) + // }; + // if primary_key.is_some() { + // mocker.when::>("update_primary_key") + // .then(move |_| Ok(IndexMeta{ created_at: OffsetDateTime::now_utc(), updated_at: OffsetDateTime::now_utc(), primary_key: None })); + // } + // mocker.when::<(IndexDocumentsMethod, Option, UpdateFileStore, IntoIter), IndexResult>("update_documents") + // .then(move |(_, _, _, _)| result()); + // } + // TaskContent::SettingsUpdate{..} => { + // let result = move || if !index_op_fails { + // Ok(()) + // } else { + // // return this error because it's easy to generate... + // Err(IndexError::DocumentNotFound("a doc".into())) + // }; + // mocker.when::<&Settings, IndexResult<()>>("update_settings") + // .then(move |_| result()); + // } + // TaskContent::DocumentDeletion(DocumentDeletion::Ids(_ids)) => { + // let result = move || if !index_op_fails { + // Ok(DocumentDeletionResult { deleted_documents: any_int as u64, remaining_documents: any_int as u64 }) + // } else { + // // return this error because it's easy to generate... + // Err(IndexError::DocumentNotFound("a doc".into())) + // }; + // + // mocker.when::<&[String], IndexResult>("delete_documents") + // .then(move |_| result()); + // }, + // TaskContent::DocumentDeletion(DocumentDeletion::Clear) => { + // let result = move || if !index_op_fails { + // Ok(()) + // } else { + // // return this error because it's easy to generate... + // Err(IndexError::DocumentNotFound("a doc".into())) + // }; + // mocker.when::<(), IndexResult<()>>("clear_documents") + // .then(move |_| result()); + // }, + // TaskContent::IndexDeletion => { + // mocker.when::<(), ()>("close") + // .times(index_exists as usize) + // .then(move |_| ()); + // } + // TaskContent::IndexUpdate { primary_key } + // | TaskContent::IndexCreation { primary_key } => { + // if primary_key.is_some() { + // let result = move || if !index_op_fails { + // Ok(IndexMeta{ created_at: OffsetDateTime::now_utc(), updated_at: OffsetDateTime::now_utc(), primary_key: None }) + // } else { + // // return this error because it's easy to generate... + // Err(IndexError::DocumentNotFound("a doc".into())) + // }; + // mocker.when::>("update_primary_key") + // .then(move |_| result()); + // } + // } + // TaskContent::Dump { .. } => { } + // } + // + // mocker.when::<(), IndexResult>("stats") + // .then(|()| Ok(IndexStats { size: 0, number_of_documents: 0, is_indexing: Some(false), field_distribution: BTreeMap::new() })); + // + // let index = Index::mock(mocker); + // + // match &task.content { + // // an unexisting index should trigger an index creation in the folllowing cases: + // TaskContent::DocumentAddition { allow_index_creation: true, .. } + // | TaskContent::SettingsUpdate { allow_index_creation: true, is_deletion: false, .. } + // | TaskContent::IndexCreation { .. } if !index_exists => { + // index_store + // .expect_create() + // .once() + // .withf(move |&found| !index_exists || found == uuid) + // .returning(move |_| Box::pin(ok(index.clone()))); + // }, + // TaskContent::IndexDeletion => { + // index_store + // .expect_delete() + // // this is called only if the index.exists + // .times(index_exists as usize) + // .withf(move |&found| !index_exists || found == uuid) + // .returning(move |_| Box::pin(ok(Some(index.clone())))); + // } + // // if index already exists, create index will return an error + // TaskContent::IndexCreation { .. } if index_exists => (), + // TaskContent::Dump { .. } => (), + // // The index exists and get should be called + // _ if index_exists => { + // index_store + // .expect_get() + // .once() + // .withf(move |&found| found == uuid) + // .returning(move |_| Box::pin(ok(Some(index.clone())))); + // }, + // // the index doesn't exist and shouldn't be created, the uuidstore will return an error, and get_index will never be called. + // _ => (), + // } + // + // let mut uuid_store = MockIndexMetaStore::new(); + // uuid_store + // .expect_get() + // .returning(move |uid| { + // Box::pin(ok((uid, index_exists.then(|| crate::index_resolver::meta_store::IndexMeta {uuid, creation_task_id: 0 })))) + // }); + // + // // we sould only be creating an index if the index doesn't alredy exist + // uuid_store + // .expect_insert() + // .withf(move |_, _| !index_exists) + // .returning(|_, _| Box::pin(ok(()))); + // + // uuid_store + // .expect_delete() + // .times(matches!(task.content, TaskContent::IndexDeletion) as usize) + // .returning(move |_| Box::pin(ok(index_exists.then(|| crate::index_resolver::meta_store::IndexMeta { uuid, creation_task_id: 0})))); + // + // let mocker = Mocker::default(); + // let update_file_store = UpdateFileStore::mock(mocker); + // let index_resolver = IndexResolver::new(uuid_store, index_store, update_file_store); + // + // let batch = Batch { id: Some(1), created_at: OffsetDateTime::now_utc(), content: crate::tasks::batch::BatchContent::IndexUpdate(task.clone()) }; + // if index_resolver.accept(&batch) { + // let result = index_resolver.process_batch(batch).await; + // + // // Test for some expected output scenarios: + // // Index creation and deletion cannot fail because of a failed index op, since they + // // don't perform index ops. + // if index_op_fails && !matches!(task.content, TaskContent::IndexDeletion | TaskContent::IndexCreation { primary_key: None } | TaskContent::IndexUpdate { primary_key: None } | TaskContent::Dump { .. }) + // || (index_exists && matches!(task.content, TaskContent::IndexCreation { .. })) + // || (!index_exists && matches!(task.content, TaskContent::IndexDeletion + // | TaskContent::DocumentDeletion(_) + // | TaskContent::SettingsUpdate { is_deletion: true, ..} + // | TaskContent::SettingsUpdate { allow_index_creation: false, ..} + // | TaskContent::DocumentAddition { allow_index_creation: false, ..} + // | TaskContent::IndexUpdate { .. } )) + // { + // assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Failed { .. }), "{:?}", result); + // } else { + // assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Succeeded { .. }), "{:?}", result); + // } + // } + // }); + // } + // } } diff --git a/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs b/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs index e0471567b..75f0623b2 100644 --- a/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs +++ b/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs @@ -55,6 +55,7 @@ mod test { task::{Task, TaskContent}, }; use crate::update_file_store::{Result as FileStoreResult, UpdateFileStore}; + use crate::IndexUid; use super::*; use milli::update::IndexDocumentsMethod; @@ -103,13 +104,13 @@ mod test { let task = Task { id: 1, - index_uid: None, content: TaskContent::DocumentAddition { content_uuid, merge_strategy: IndexDocumentsMethod::ReplaceDocuments, primary_key: None, documents_count: 100, allow_index_creation: true, + index_uid: IndexUid::new_unchecked("test"), }, events: Vec::new(), }; @@ -130,7 +131,6 @@ mod test { let task = Task { id: 1, - index_uid: None, content: TaskContent::Dump { uid: String::from("hello"), }, diff --git a/meilisearch-lib/src/tasks/handlers/mod.rs b/meilisearch-lib/src/tasks/handlers/mod.rs index 6e28636ed..8f02de8b9 100644 --- a/meilisearch-lib/src/tasks/handlers/mod.rs +++ b/meilisearch-lib/src/tasks/handlers/mod.rs @@ -17,9 +17,9 @@ mod test { TaskContent::DocumentAddition { .. } => { BatchContent::DocumentsAdditionBatch(vec![task]) } - TaskContent::DocumentDeletion(_) + TaskContent::DocumentDeletion { .. } | TaskContent::SettingsUpdate { .. } - | TaskContent::IndexDeletion + | TaskContent::IndexDeletion { .. } | TaskContent::IndexCreation { .. } | TaskContent::IndexUpdate { .. } => BatchContent::IndexUpdate(task), TaskContent::Dump { .. } => BatchContent::Dump(task), diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index 36534f358..76294b6e7 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -131,6 +131,22 @@ enum TaskListIdentifier { Dump, } +impl From<&Task> for TaskListIdentifier { + fn from(task: &Task) -> Self { + match &task.content { + TaskContent::DocumentAddition { index_uid, .. } + | TaskContent::DocumentDeletion { index_uid, .. } + | TaskContent::SettingsUpdate { index_uid, .. } + | TaskContent::IndexDeletion { index_uid } + | TaskContent::IndexCreation { index_uid, .. } + | TaskContent::IndexUpdate { index_uid, .. } => { + TaskListIdentifier::Index(index_uid.as_str().to_string()) + } + TaskContent::Dump { .. } => TaskListIdentifier::Dump, + } + } +} + #[derive(Default)] struct TaskQueue { /// Maps index uids to their TaskList, for quick access @@ -142,11 +158,8 @@ struct TaskQueue { impl TaskQueue { fn insert(&mut self, task: Task) { let id = task.id; - let uid = match task.index_uid { - Some(uid) => TaskListIdentifier::Index(uid.into_inner()), - None if matches!(task.content, TaskContent::Dump { .. }) => TaskListIdentifier::Dump, - None => unreachable!("invalid task state"), - }; + let uid = TaskListIdentifier::from(&task); + let kind = match task.content { TaskContent::DocumentAddition { documents_count, @@ -163,9 +176,9 @@ impl TaskQueue { number: documents_count, }, TaskContent::Dump { .. } => TaskType::Dump, - TaskContent::DocumentDeletion(_) + TaskContent::DocumentDeletion { .. } | TaskContent::SettingsUpdate { .. } - | TaskContent::IndexDeletion + | TaskContent::IndexDeletion { .. } | TaskContent::IndexCreation { .. } | TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate, _ => unreachable!("unhandled task type"), @@ -528,25 +541,25 @@ mod test { use super::*; - fn gen_task(id: TaskId, index_uid: Option<&str>, content: TaskContent) -> Task { + fn gen_task(id: TaskId, content: TaskContent) -> Task { Task { id, - index_uid: index_uid.map(IndexUid::new_unchecked), content, events: vec![], } } #[test] + #[rustfmt::skip] fn register_updates_multiples_indexes() { let mut queue = TaskQueue::default(); - queue.insert(gen_task(0, Some("test1"), TaskContent::IndexDeletion)); - queue.insert(gen_task(1, Some("test2"), TaskContent::IndexDeletion)); - queue.insert(gen_task(2, Some("test2"), TaskContent::IndexDeletion)); - queue.insert(gen_task(3, Some("test2"), TaskContent::IndexDeletion)); - queue.insert(gen_task(4, Some("test1"), TaskContent::IndexDeletion)); - queue.insert(gen_task(5, Some("test1"), TaskContent::IndexDeletion)); - queue.insert(gen_task(6, Some("test2"), TaskContent::IndexDeletion)); + queue.insert(gen_task(0, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1") })); + queue.insert(gen_task(1, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") })); + queue.insert(gen_task(2, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") })); + queue.insert(gen_task(3, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") })); + queue.insert(gen_task(4, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1") })); + queue.insert(gen_task(5, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1") })); + queue.insert(gen_task(6, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") })); let test1_tasks = queue .head_mut(|tasks| tasks.drain().map(|t| t.id).collect::>()) @@ -564,31 +577,30 @@ mod test { assert!(queue.queue.is_empty()); } - #[test] - fn test_make_batch() { - let mut queue = TaskQueue::default(); - let content = TaskContent::DocumentAddition { + fn gen_doc_addition_task_content(index_uid: &str) -> TaskContent { + TaskContent::DocumentAddition { content_uuid: Uuid::new_v4(), merge_strategy: IndexDocumentsMethod::ReplaceDocuments, primary_key: Some("test".to_string()), documents_count: 0, allow_index_creation: true, - }; - queue.insert(gen_task(0, Some("test1"), content.clone())); - queue.insert(gen_task(1, Some("test2"), content.clone())); - queue.insert(gen_task(2, Some("test2"), TaskContent::IndexDeletion)); - queue.insert(gen_task(3, Some("test2"), content.clone())); - queue.insert(gen_task(4, Some("test1"), content.clone())); - queue.insert(gen_task(5, Some("test1"), TaskContent::IndexDeletion)); - queue.insert(gen_task(6, Some("test2"), content.clone())); - queue.insert(gen_task(7, Some("test1"), content)); - queue.insert(gen_task( - 8, - None, - TaskContent::Dump { - uid: "adump".to_owned(), - }, - )); + index_uid: IndexUid::new_unchecked(index_uid), + } + } + + #[test] + #[rustfmt::skip] + fn test_make_batch() { + let mut queue = TaskQueue::default(); + queue.insert(gen_task(0, gen_doc_addition_task_content("test1"))); + queue.insert(gen_task(1, gen_doc_addition_task_content("test2"))); + queue.insert(gen_task(2, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2")})); + queue.insert(gen_task(3, gen_doc_addition_task_content("test2"))); + queue.insert(gen_task(4, gen_doc_addition_task_content("test1"))); + queue.insert(gen_task(5, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1")})); + queue.insert(gen_task(6, gen_doc_addition_task_content("test2"))); + queue.insert(gen_task(7, gen_doc_addition_task_content("test1"))); + queue.insert(gen_task(8, TaskContent::Dump { uid: "adump".to_owned() })); let config = SchedulerConfig::default(); diff --git a/meilisearch-lib/src/tasks/task.rs b/meilisearch-lib/src/tasks/task.rs index 3b94cd991..f19f6cbfe 100644 --- a/meilisearch-lib/src/tasks/task.rs +++ b/meilisearch-lib/src/tasks/task.rs @@ -5,10 +5,8 @@ use time::OffsetDateTime; use uuid::Uuid; use super::batch::BatchId; -use crate::{ - index::{Settings, Unchecked}, - index_resolver::IndexUid, -}; +use crate::index::{Settings, Unchecked}; +use crate::index_resolver::IndexUid; pub type TaskId = u32; @@ -90,13 +88,6 @@ pub struct Task { /// then this is None // TODO: when next forward breaking dumps, it would be a good idea to move this field inside of // the TaskContent. - #[cfg_attr( - test, - proptest( - strategy = "proptest::option::weighted(proptest::option::Probability::new(0.99), IndexUid::arbitrary())" - ) - )] - pub index_uid: Option, pub content: TaskContent, pub events: Vec, } @@ -123,6 +114,18 @@ impl Task { _ => None, } } + + pub fn index_uid(&self) -> Option<&str> { + match &self.content { + TaskContent::DocumentAddition { index_uid, .. } + | TaskContent::DocumentDeletion { index_uid, .. } + | TaskContent::SettingsUpdate { index_uid, .. } + | TaskContent::IndexDeletion { index_uid } + | TaskContent::IndexCreation { index_uid, .. } + | TaskContent::IndexUpdate { index_uid, .. } => Some(index_uid.as_str()), + TaskContent::Dump { .. } => None, + } + } } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] @@ -137,6 +140,7 @@ pub enum DocumentDeletion { #[allow(clippy::large_enum_variant)] pub enum TaskContent { DocumentAddition { + index_uid: IndexUid, #[cfg_attr(test, proptest(value = "Uuid::new_v4()"))] content_uuid: Uuid, #[cfg_attr(test, proptest(strategy = "test::index_document_method_strategy()"))] @@ -145,18 +149,26 @@ pub enum TaskContent { documents_count: usize, allow_index_creation: bool, }, - DocumentDeletion(DocumentDeletion), + DocumentDeletion { + index_uid: IndexUid, + deletion: DocumentDeletion, + }, SettingsUpdate { + index_uid: IndexUid, settings: Settings, /// Indicates whether the task was a deletion is_deletion: bool, allow_index_creation: bool, }, - IndexDeletion, + IndexDeletion { + index_uid: IndexUid, + }, IndexCreation { + index_uid: IndexUid, primary_key: Option, }, IndexUpdate { + index_uid: IndexUid, primary_key: Option, }, Dump { diff --git a/meilisearch-lib/src/tasks/task_store/mod.rs b/meilisearch-lib/src/tasks/task_store/mod.rs index 6c7584683..33f4bfb50 100644 --- a/meilisearch-lib/src/tasks/task_store/mod.rs +++ b/meilisearch-lib/src/tasks/task_store/mod.rs @@ -14,7 +14,6 @@ use super::error::TaskError; use super::scheduler::Processing; use super::task::{Task, TaskContent, TaskId}; use super::Result; -use crate::index_resolver::IndexUid; use crate::tasks::task::TaskEvent; use crate::update_file_store::UpdateFileStore; @@ -32,11 +31,11 @@ pub struct TaskFilter { impl TaskFilter { fn pass(&self, task: &Task) -> bool { - match task.index_uid { - Some(ref index_uid) => self + match task.index_uid() { + Some(index_uid) => self .indexes .as_ref() - .map_or(true, |indexes| indexes.contains(index_uid.as_str())), + .map_or(true, |indexes| indexes.contains(index_uid)), None => false, } } @@ -75,11 +74,7 @@ impl TaskStore { Ok(Self { store }) } - pub async fn register( - &self, - index_uid: Option, - content: TaskContent, - ) -> Result { + pub async fn register(&self, content: TaskContent) -> Result { debug!("registering update: {:?}", content); let store = self.store.clone(); let task = tokio::task::spawn_blocking(move || -> Result { @@ -88,7 +83,6 @@ impl TaskStore { let created_at = TaskEvent::Created(OffsetDateTime::now_utc()); let task = Task { id: next_task_id, - index_uid, content, events: vec![created_at], }; @@ -273,7 +267,10 @@ impl TaskStore { #[cfg(test)] pub mod test { - use crate::tasks::{scheduler::Processing, task_store::store::test::tmp_env}; + use crate::{ + tasks::{scheduler::Processing, task_store::store::test::tmp_env}, + IndexUid, + }; use super::*; @@ -359,13 +356,9 @@ pub mod test { } } - pub async fn register( - &self, - index_uid: Option, - content: TaskContent, - ) -> Result { + pub async fn register(&self, content: TaskContent) -> Result { match self { - Self::Real(s) => s.register(index_uid, content).await, + Self::Real(s) => s.register(content).await, Self::Mock(_m) => todo!(), } } @@ -393,8 +386,10 @@ pub mod test { let gen_task = |id: TaskId| Task { id, - index_uid: Some(IndexUid::new_unchecked("test")), - content: TaskContent::IndexCreation { primary_key: None }, + content: TaskContent::IndexCreation { + primary_key: None, + index_uid: IndexUid::new_unchecked("test"), + }, events: Vec::new(), }; diff --git a/meilisearch-lib/src/tasks/task_store/store.rs b/meilisearch-lib/src/tasks/task_store/store.rs index 5b17da716..f044bd077 100644 --- a/meilisearch-lib/src/tasks/task_store/store.rs +++ b/meilisearch-lib/src/tasks/task_store/store.rs @@ -77,7 +77,7 @@ impl Store { pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> { self.tasks.put(txn, &BEU32::new(task.id), task)?; // only add the task to the indexes index if it has an index_uid - if let Some(index_uid) = &task.index_uid { + if let Some(index_uid) = task.index_uid() { let mut tasks_set = self .index_uid_task_ids .get(txn, index_uid)? @@ -287,8 +287,9 @@ pub mod test { let tasks = (0..100) .map(|_| Task { id: rand::random(), - index_uid: Some(IndexUid::new_unchecked("test")), - content: TaskContent::IndexDeletion, + content: TaskContent::IndexDeletion { + index_uid: IndexUid::new_unchecked("test"), + }, events: vec![], }) .collect::>(); @@ -318,15 +319,17 @@ pub mod test { let task_1 = Task { id: 1, - index_uid: Some(IndexUid::new_unchecked("test")), - content: TaskContent::IndexDeletion, + content: TaskContent::IndexDeletion { + index_uid: IndexUid::new_unchecked("test"), + }, events: vec![], }; let task_2 = Task { id: 0, - index_uid: Some(IndexUid::new_unchecked("test1")), - content: TaskContent::IndexDeletion, + content: TaskContent::IndexDeletion { + index_uid: IndexUid::new_unchecked("test1"), + }, events: vec![], }; @@ -341,29 +344,21 @@ pub mod test { txn.abort().unwrap(); assert_eq!(tasks.len(), 1); - assert_eq!( - tasks - .first() - .as_ref() - .unwrap() - .index_uid - .as_ref() - .unwrap() - .as_str(), - "test" - ); + assert_eq!(tasks.first().as_ref().unwrap().index_uid().unwrap(), "test"); // same thing but invert the ids let task_1 = Task { id: 0, - index_uid: Some(IndexUid::new_unchecked("test")), - content: TaskContent::IndexDeletion, + content: TaskContent::IndexDeletion { + index_uid: IndexUid::new_unchecked("test"), + }, events: vec![], }; let task_2 = Task { id: 1, - index_uid: Some(IndexUid::new_unchecked("test1")), - content: TaskContent::IndexDeletion, + content: TaskContent::IndexDeletion { + index_uid: IndexUid::new_unchecked("test1"), + }, events: vec![], }; @@ -378,14 +373,7 @@ pub mod test { assert_eq!(tasks.len(), 1); assert_eq!( - &*tasks - .first() - .as_ref() - .unwrap() - .index_uid - .as_ref() - .unwrap() - .as_str(), + &*tasks.first().as_ref().unwrap().index_uid().unwrap(), "test" ); }