From 464639aa0f331e1720711358b1e8e9e055822eaa Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Tue, 25 May 2021 09:46:11 +0200 Subject: [PATCH] udpate actor error improvements --- meilisearch-http/src/index_controller/mod.rs | 8 +- .../index_controller/update_actor/actor.rs | 87 +++++++------------ .../src/index_controller/update_actor/mod.rs | 30 ++++++- 3 files changed, 58 insertions(+), 67 deletions(-) diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 900482257..61bc71114 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -158,13 +158,7 @@ impl IndexController { // prevent dead_locking between the update_handle::update that waits for the update to be // registered and the update_actor that waits for the the payload to be sent to it. tokio::task::spawn_local(async move { - payload - .map(|bytes| { - bytes.map_err(|e| { - Box::new(e) as Box - }) - }) - .for_each(|r| async { + payload.for_each(|r| async { let _ = sender.send(r).await; }) .await diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index f576ce7a8..27906a1a8 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc; use uuid::Uuid; use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo}; -use crate::index_controller::index_actor::{IndexActorHandle}; +use crate::index_controller::index_actor::IndexActorHandle; use crate::index_controller::{UpdateMeta, UpdateStatus}; pub struct UpdateActor { @@ -42,7 +42,12 @@ where let store = UpdateStore::open(options, &path, index_handle.clone())?; std::fs::create_dir_all(path.join("update_files"))?; assert!(path.exists()); - Ok(Self { path, store, inbox, index_handle }) + Ok(Self { + path, + store, + inbox, + index_handle, + }) } pub async fn run(mut self) { @@ -90,9 +95,7 @@ where mut payload: mpsc::Receiver>, ) -> Result { let file_path = match meta { - UpdateMeta::DocumentsAddition { .. } - | UpdateMeta::DeleteDocuments => { - + UpdateMeta::DocumentsAddition { .. } | UpdateMeta::DeleteDocuments => { let update_file_id = uuid::Uuid::new_v4(); let path = self .path @@ -102,39 +105,26 @@ where .write(true) .create(true) .open(&path) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; + .await?; let mut file_len = 0; while let Some(bytes) = payload.recv().await { - match bytes { - Ok(bytes) => { - file_len += bytes.as_ref().len(); - file.write_all(bytes.as_ref()) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - } - Err(e) => { - return Err(UpdateError::Error(e)); - } - } + let bytes = bytes?; + file_len += bytes.as_ref().len(); + file.write_all(bytes.as_ref()).await?; } if file_len != 0 { - file.flush() - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; + file.flush().await?; let file = file.into_std().await; Some((file, path)) } else { // empty update, delete the empty file. - fs::remove_file(&path) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; + fs::remove_file(&path).await?; None } } - _ => None + _ => None, }; let update_store = self.store.clone(); @@ -145,17 +135,15 @@ where // If the payload is empty, ignore the check. let path = if let Some((mut file, path)) = file_path { // set the file back to the beginning - file.seek(SeekFrom::Start(0)).map_err(|e| UpdateError::Error(Box::new(e)))?; + file.seek(SeekFrom::Start(0))?; // Check that the json payload is valid: let reader = BufReader::new(&mut file); let mut checker = JsonChecker::new(reader); if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() { // The json file is invalid, we use Serde to get a nice error message: - file.seek(SeekFrom::Start(0)) - .map_err(|e| UpdateError::Error(Box::new(e)))?; - let _: serde_json::Value = serde_json::from_reader(file) - .map_err(|e| UpdateError::Error(Box::new(e)))?; + file.seek(SeekFrom::Start(0))?; + let _: serde_json::Value = serde_json::from_reader(file)?; } Some(path) } else { @@ -163,32 +151,27 @@ where }; // The payload is valid, we can register it to the update store. - update_store + let status = update_store .register_update(meta, path, uuid) - .map(UpdateStatus::Enqueued) - .map_err(|e| UpdateError::Error(Box::new(e))) + .map(UpdateStatus::Enqueued)?; + Ok(status) }) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))? + .await? } async fn handle_list_updates(&self, uuid: Uuid) -> Result> { let update_store = self.store.clone(); tokio::task::spawn_blocking(move || { - let result = update_store - .list(uuid) - .map_err(|e| UpdateError::Error(e.into()))?; + let result = update_store.list(uuid)?; Ok(result) }) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))? + .await? } async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result { let store = self.store.clone(); let result = store - .meta(uuid, id) - .map_err(|e| UpdateError::Error(Box::new(e)))? + .meta(uuid, id)? .ok_or(UpdateError::UnexistingUpdate(id))?; Ok(result) } @@ -196,10 +179,7 @@ where async fn handle_delete(&self, uuid: Uuid) -> Result<()> { let store = self.store.clone(); - tokio::task::spawn_blocking(move || store.delete_all(uuid)) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; + tokio::task::spawn_blocking(move || store.delete_all(uuid)).await??; Ok(()) } @@ -208,10 +188,8 @@ where let index_handle = self.index_handle.clone(); let update_store = self.store.clone(); - tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle)) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; + tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle)) + .await??; Ok(()) } @@ -223,9 +201,8 @@ where update_store.dump(&uuids, path.to_path_buf(), index_handle)?; Ok(()) }) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; + .await??; + Ok(()) } @@ -235,9 +212,7 @@ where let info = update_store.get_info()?; Ok(info) }) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; + .await??; Ok(info) } diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index 05b793e45..a0c498e92 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -5,6 +5,7 @@ mod update_store; use std::{collections::HashSet, path::PathBuf}; +use actix_http::error::PayloadError; use thiserror::Error; use tokio::sync::mpsc; use uuid::Uuid; @@ -14,23 +15,44 @@ use crate::index_controller::{UpdateMeta, UpdateStatus}; use actor::UpdateActor; use message::UpdateMsg; -pub use update_store::{UpdateStore, UpdateStoreInfo}; pub use handle_impl::UpdateActorHandleImpl; +pub use update_store::{UpdateStore, UpdateStoreInfo}; pub type Result = std::result::Result; -type PayloadData = std::result::Result>; +type PayloadData = std::result::Result; #[cfg(test)] use mockall::automock; #[derive(Debug, Error)] pub enum UpdateError { - #[error("error with update: {0}")] - Error(Box), #[error("Update {0} doesn't exist.")] UnexistingUpdate(u64), + #[error("Internal error processing update: {0}")] + Internal(String), } +macro_rules! internal_error { + ($($other:path), *) => { + $( + impl From<$other> for UpdateError { + fn from(other: $other) -> Self { + Self::Internal(other.to_string()) + } + } + )* + } +} + +internal_error!( + heed::Error, + std::io::Error, + serde_json::Error, + PayloadError, + tokio::task::JoinError, + anyhow::Error +); + #[async_trait::async_trait] #[cfg_attr(test, automock(type Data=Vec;))] pub trait UpdateActorHandle {