udpate actor error improvements

This commit is contained in:
Marin Postma 2021-05-25 09:46:11 +02:00
parent 4acbe8e473
commit 464639aa0f
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30
3 changed files with 58 additions and 67 deletions

View File

@ -158,13 +158,7 @@ impl IndexController {
// prevent dead_locking between the update_handle::update that waits for the update to be // prevent dead_locking between the update_handle::update that waits for the update to be
// registered and the update_actor that waits for the the payload to be sent to it. // registered and the update_actor that waits for the the payload to be sent to it.
tokio::task::spawn_local(async move { tokio::task::spawn_local(async move {
payload payload.for_each(|r| async {
.map(|bytes| {
bytes.map_err(|e| {
Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>
})
})
.for_each(|r| async {
let _ = sender.send(r).await; let _ = sender.send(r).await;
}) })
.await .await

View File

@ -11,7 +11,7 @@ use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo}; use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo};
use crate::index_controller::index_actor::{IndexActorHandle}; use crate::index_controller::index_actor::IndexActorHandle;
use crate::index_controller::{UpdateMeta, UpdateStatus}; use crate::index_controller::{UpdateMeta, UpdateStatus};
pub struct UpdateActor<D, I> { pub struct UpdateActor<D, I> {
@ -42,7 +42,12 @@ where
let store = UpdateStore::open(options, &path, index_handle.clone())?; let store = UpdateStore::open(options, &path, index_handle.clone())?;
std::fs::create_dir_all(path.join("update_files"))?; std::fs::create_dir_all(path.join("update_files"))?;
assert!(path.exists()); assert!(path.exists());
Ok(Self { path, store, inbox, index_handle }) Ok(Self {
path,
store,
inbox,
index_handle,
})
} }
pub async fn run(mut self) { pub async fn run(mut self) {
@ -90,9 +95,7 @@ where
mut payload: mpsc::Receiver<PayloadData<D>>, mut payload: mpsc::Receiver<PayloadData<D>>,
) -> Result<UpdateStatus> { ) -> Result<UpdateStatus> {
let file_path = match meta { let file_path = match meta {
UpdateMeta::DocumentsAddition { .. } UpdateMeta::DocumentsAddition { .. } | UpdateMeta::DeleteDocuments => {
| UpdateMeta::DeleteDocuments => {
let update_file_id = uuid::Uuid::new_v4(); let update_file_id = uuid::Uuid::new_v4();
let path = self let path = self
.path .path
@ -102,39 +105,26 @@ where
.write(true) .write(true)
.create(true) .create(true)
.open(&path) .open(&path)
.await .await?;
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let mut file_len = 0; let mut file_len = 0;
while let Some(bytes) = payload.recv().await { while let Some(bytes) = payload.recv().await {
match bytes { let bytes = bytes?;
Ok(bytes) => {
file_len += bytes.as_ref().len(); file_len += bytes.as_ref().len();
file.write_all(bytes.as_ref()) file.write_all(bytes.as_ref()).await?;
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
}
Err(e) => {
return Err(UpdateError::Error(e));
}
}
} }
if file_len != 0 { if file_len != 0 {
file.flush() file.flush().await?;
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let file = file.into_std().await; let file = file.into_std().await;
Some((file, path)) Some((file, path))
} else { } else {
// empty update, delete the empty file. // empty update, delete the empty file.
fs::remove_file(&path) fs::remove_file(&path).await?;
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
None None
} }
} }
_ => None _ => None,
}; };
let update_store = self.store.clone(); let update_store = self.store.clone();
@ -145,17 +135,15 @@ where
// If the payload is empty, ignore the check. // If the payload is empty, ignore the check.
let path = if let Some((mut file, path)) = file_path { let path = if let Some((mut file, path)) = file_path {
// set the file back to the beginning // set the file back to the beginning
file.seek(SeekFrom::Start(0)).map_err(|e| UpdateError::Error(Box::new(e)))?; file.seek(SeekFrom::Start(0))?;
// Check that the json payload is valid: // Check that the json payload is valid:
let reader = BufReader::new(&mut file); let reader = BufReader::new(&mut file);
let mut checker = JsonChecker::new(reader); let mut checker = JsonChecker::new(reader);
if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() { if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() {
// The json file is invalid, we use Serde to get a nice error message: // The json file is invalid, we use Serde to get a nice error message:
file.seek(SeekFrom::Start(0)) file.seek(SeekFrom::Start(0))?;
.map_err(|e| UpdateError::Error(Box::new(e)))?; let _: serde_json::Value = serde_json::from_reader(file)?;
let _: serde_json::Value = serde_json::from_reader(file)
.map_err(|e| UpdateError::Error(Box::new(e)))?;
} }
Some(path) Some(path)
} else { } else {
@ -163,32 +151,27 @@ where
}; };
// The payload is valid, we can register it to the update store. // The payload is valid, we can register it to the update store.
update_store let status = update_store
.register_update(meta, path, uuid) .register_update(meta, path, uuid)
.map(UpdateStatus::Enqueued) .map(UpdateStatus::Enqueued)?;
.map_err(|e| UpdateError::Error(Box::new(e))) Ok(status)
}) })
.await .await?
.map_err(|e| UpdateError::Error(Box::new(e)))?
} }
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> { async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let update_store = self.store.clone(); let update_store = self.store.clone();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = update_store let result = update_store.list(uuid)?;
.list(uuid)
.map_err(|e| UpdateError::Error(e.into()))?;
Ok(result) Ok(result)
}) })
.await .await?
.map_err(|e| UpdateError::Error(Box::new(e)))?
} }
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> { async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let store = self.store.clone(); let store = self.store.clone();
let result = store let result = store
.meta(uuid, id) .meta(uuid, id)?
.map_err(|e| UpdateError::Error(Box::new(e)))?
.ok_or(UpdateError::UnexistingUpdate(id))?; .ok_or(UpdateError::UnexistingUpdate(id))?;
Ok(result) Ok(result)
} }
@ -196,10 +179,7 @@ where
async fn handle_delete(&self, uuid: Uuid) -> Result<()> { async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let store = self.store.clone(); let store = self.store.clone();
tokio::task::spawn_blocking(move || store.delete_all(uuid)) tokio::task::spawn_blocking(move || store.delete_all(uuid)).await??;
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
Ok(()) Ok(())
} }
@ -209,9 +189,7 @@ where
let update_store = self.store.clone(); let update_store = self.store.clone();
tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle)) tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle))
.await .await??;
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
Ok(()) Ok(())
} }
@ -223,9 +201,8 @@ where
update_store.dump(&uuids, path.to_path_buf(), index_handle)?; update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
Ok(()) Ok(())
}) })
.await .await??;
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
Ok(()) Ok(())
} }
@ -235,9 +212,7 @@ where
let info = update_store.get_info()?; let info = update_store.get_info()?;
Ok(info) Ok(info)
}) })
.await .await??;
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
Ok(info) Ok(info)
} }

View File

@ -5,6 +5,7 @@ mod update_store;
use std::{collections::HashSet, path::PathBuf}; use std::{collections::HashSet, path::PathBuf};
use actix_http::error::PayloadError;
use thiserror::Error; use thiserror::Error;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
@ -14,23 +15,44 @@ use crate::index_controller::{UpdateMeta, UpdateStatus};
use actor::UpdateActor; use actor::UpdateActor;
use message::UpdateMsg; use message::UpdateMsg;
pub use update_store::{UpdateStore, UpdateStoreInfo};
pub use handle_impl::UpdateActorHandleImpl; pub use handle_impl::UpdateActorHandleImpl;
pub use update_store::{UpdateStore, UpdateStoreInfo};
pub type Result<T> = std::result::Result<T, UpdateError>; pub type Result<T> = std::result::Result<T, UpdateError>;
type PayloadData<D> = std::result::Result<D, Box<dyn std::error::Error + Sync + Send + 'static>>; type PayloadData<D> = std::result::Result<D, PayloadError>;
#[cfg(test)] #[cfg(test)]
use mockall::automock; use mockall::automock;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum UpdateError { pub enum UpdateError {
#[error("error with update: {0}")]
Error(Box<dyn std::error::Error + Sync + Send + 'static>),
#[error("Update {0} doesn't exist.")] #[error("Update {0} doesn't exist.")]
UnexistingUpdate(u64), UnexistingUpdate(u64),
#[error("Internal error processing update: {0}")]
Internal(String),
} }
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for UpdateError {
fn from(other: $other) -> Self {
Self::Internal(other.to_string())
}
}
)*
}
}
internal_error!(
heed::Error,
std::io::Error,
serde_json::Error,
PayloadError,
tokio::task::JoinError,
anyhow::Error
);
#[async_trait::async_trait] #[async_trait::async_trait]
#[cfg_attr(test, automock(type Data=Vec<u8>;))] #[cfg_attr(test, automock(type Data=Vec<u8>;))]
pub trait UpdateActorHandle { pub trait UpdateActorHandle {