From 64e3096790cfe9e5c06d00cc2add4aa07cc9e3ca Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 2 Jun 2022 15:04:33 +0200 Subject: [PATCH] process_task updates task events --- meilisearch-lib/src/index_resolver/mod.rs | 114 ++---------------- .../src/tasks/handlers/dump_handler.rs | 2 +- .../tasks/handlers/index_resolver_handler.rs | 11 +- meilisearch-lib/src/tasks/task.rs | 4 +- 4 files changed, 16 insertions(+), 115 deletions(-) diff --git a/meilisearch-lib/src/index_resolver/mod.rs b/meilisearch-lib/src/index_resolver/mod.rs index 19ba051fe..97265e509 100644 --- a/meilisearch-lib/src/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_resolver/mod.rs @@ -265,7 +265,7 @@ mod real { Ok(()) } - pub async fn process_task(&self, task: &Task) -> Result { + async fn process_task_inner(&self, task: &Task) -> Result { match &task.content { TaskContent::DocumentAddition { .. } => { panic!("updates should be handled by batch") @@ -354,6 +354,13 @@ mod real { } } + pub async fn process_task(&self, task: &mut Task) { + match self.process_task_inner(&task).await { + Ok(res) => task.events.push(TaskEvent::succeeded(res)), + Err(e) => task.events.push(TaskEvent::failed(e)), + } + } + pub async fn dump(&self, path: impl AsRef) -> Result<()> { for (_, index) in self.list().await? { index.dump(&path)?; @@ -511,110 +518,7 @@ mod test { } } - pub async fn process_task(&self, task: &Task) -> Result { - match self { - IndexResolver::Real(r) => r.process_task(task).await, - IndexResolver::Mock(m) => unsafe { m.get("process_task").call(task) }, - } - } - - pub async fn dump(&self, path: impl AsRef) -> Result<()> { - match self { - IndexResolver::Real(r) => r.dump(path).await, - IndexResolver::Mock(_) => todo!(), - } - } - - /// Get or create an index with name `uid`. - pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result { - match self { - IndexResolver::Real(r) => r.get_or_create_index(uid, task_id).await, - IndexResolver::Mock(_) => todo!(), - } - } - - pub async fn list(&self) -> Result> { - match self { - IndexResolver::Real(r) => r.list().await, - IndexResolver::Mock(_) => todo!(), - } - } - - pub async fn delete_index(&self, uid: String) -> Result { - match self { - IndexResolver::Real(r) => r.delete_index(uid).await, - IndexResolver::Mock(_) => todo!(), - } - } - - pub async fn get_index(&self, uid: String) -> Result { - match self { - IndexResolver::Real(r) => r.get_index(uid).await, - IndexResolver::Mock(_) => todo!(), - } - } - - pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result { - match self { - IndexResolver::Real(r) => r.get_index_creation_task_id(index_uid).await, - IndexResolver::Mock(_) => todo!(), - } - } - - pub async fn delete_content_file(&self, content_uuid: Uuid) -> Result<()> { - match self { - IndexResolver::Real(r) => r.delete_content_file(content_uuid).await, - IndexResolver::Mock(m) => unsafe { - m.get("delete_content_file").call(content_uuid) - }, - } - } - } - - pub enum MockIndexResolver { - Real(super::real::IndexResolver), - Mock(Mocker), - } - - impl MockIndexResolver { - pub fn load_dump( - src: impl AsRef, - dst: impl AsRef, - index_db_size: usize, - env: Arc, - indexer_opts: &IndexerOpts, - ) -> anyhow::Result<()> { - super::real::IndexResolver::load_dump(src, dst, index_db_size, env, indexer_opts) - } - } - - impl MockIndexResolver - where - U: IndexMetaStore, - I: IndexStore, - { - pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self { - Self::Real(super::real::IndexResolver { - index_uuid_store, - index_store, - file_store, - }) - } - - pub fn mock(mocker: Mocker) -> Self { - Self::Mock(mocker) - } - - pub async fn process_document_addition_batch(&self, tasks: Vec) -> Vec { - match self { - IndexResolver::Real(r) => r.process_document_addition_batch(tasks).await, - IndexResolver::Mock(m) => unsafe { - m.get("process_document_addition_batch").call(tasks) - }, - } - } - - pub async fn process_task(&self, task: &Task) -> Result { + pub async fn process_task(&self, task: &mut Task) { match self { IndexResolver::Real(r) => r.process_task(task).await, IndexResolver::Mock(m) => unsafe { m.get("process_task").call(task) }, diff --git a/meilisearch-lib/src/tasks/handlers/dump_handler.rs b/meilisearch-lib/src/tasks/handlers/dump_handler.rs index c708dadcc..c0833e4c7 100644 --- a/meilisearch-lib/src/tasks/handlers/dump_handler.rs +++ b/meilisearch-lib/src/tasks/handlers/dump_handler.rs @@ -27,7 +27,7 @@ where .content .push_event(TaskEvent::succeeded(TaskResult::Other)); } - Err(e) => batch.content.push_event(TaskEvent::failed(e.into())), + Err(e) => batch.content.push_event(TaskEvent::failed(e)), } batch } diff --git a/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs b/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs index de624106c..21751cd1c 100644 --- a/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs +++ b/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs @@ -1,7 +1,6 @@ use crate::index_resolver::IndexResolver; use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore}; use crate::tasks::batch::{Batch, BatchContent}; -use crate::tasks::task::TaskEvent; use crate::tasks::BatchHandler; #[async_trait::async_trait] @@ -24,10 +23,9 @@ where .process_document_addition_batch(std::mem::take(tasks)) .await; } - BatchContent::IndexUpdate(ref mut task) => match self.process_task(task).await { - Ok(success) => task.events.push(TaskEvent::succeeded(success)), - Err(err) => task.events.push(TaskEvent::failed(err.into())), - }, + BatchContent::IndexUpdate(ref mut task) => { + self.process_task(task).await; + } _ => unreachable!(), } @@ -54,7 +52,6 @@ mod test { use crate::index_resolver::{ error::Result as IndexResult, index_store::MockIndexStore, meta_store::MockIndexMetaStore, }; - use crate::tasks::task::TaskResult; use crate::tasks::{ handlers::test::task_to_batch, task::{Task, TaskContent}, @@ -181,7 +178,7 @@ mod test { } TaskContent::Dump { .. } => (), _ => { - mocker.when::<&Task, IndexResult>("process_task").then(|_| Ok(TaskResult::Other)); + mocker.when::<&mut Task, ()>("process_task").then(|_| ()); } } let index_resolver: IndexResolver = IndexResolver::mock(mocker); diff --git a/meilisearch-lib/src/tasks/task.rs b/meilisearch-lib/src/tasks/task.rs index f19f6cbfe..cf9ab0520 100644 --- a/meilisearch-lib/src/tasks/task.rs +++ b/meilisearch-lib/src/tasks/task.rs @@ -68,9 +68,9 @@ impl TaskEvent { } } - pub fn failed(error: ResponseError) -> Self { + pub fn failed(error: impl Into) -> Self { Self::Failed { - error, + error: error.into(), timestamp: OffsetDateTime::now_utc(), } }