diff --git a/src/index_controller/actor_index_controller/mod.rs b/src/index_controller/actor_index_controller/mod.rs index 34fcf55f0..be63b42d4 100644 --- a/src/index_controller/actor_index_controller/mod.rs +++ b/src/index_controller/actor_index_controller/mod.rs @@ -55,9 +55,16 @@ impl IndexController { // registered and the update_actor that waits for the the payload to be sent to it. tokio::task::spawn_local(async move { while let Some(bytes) = payload.next().await { - sender.send(bytes.unwrap()).await; + match bytes { + Ok(bytes) => { sender.send(Ok(bytes)).await; }, + Err(e) => { + let error: Box = Box::new(e); + sender.send(Err(error)).await; }, + } } }); + + // This must be done *AFTER* spawning the task. let status = self.update_handle.update(meta, receiver, uuid).await?; Ok(status) } diff --git a/src/index_controller/actor_index_controller/update_actor.rs b/src/index_controller/actor_index_controller/update_actor.rs index 6fc873715..1541cd4f7 100644 --- a/src/index_controller/actor_index_controller/update_actor.rs +++ b/src/index_controller/actor_index_controller/update_actor.rs @@ -10,13 +10,17 @@ use uuid::Uuid; use tokio::fs::File; use tokio::io::AsyncWriteExt; -use crate::index_controller::{UpdateMeta, UpdateStatus, UpdateResult, updates::Pending}; +use crate::index_controller::{UpdateMeta, UpdateStatus, UpdateResult}; pub type Result = std::result::Result; type UpdateStore = super::update_store::UpdateStore; +type PayloadData = std::result::Result>; #[derive(Debug, Error)] -pub enum UpdateError {} +pub enum UpdateError { + #[error("error with update: {0}")] + Error(Box), +} enum UpdateMsg { CreateIndex{ @@ -26,7 +30,7 @@ enum UpdateMsg { Update { uuid: Uuid, meta: UpdateMeta, - data: mpsc::Receiver, + data: mpsc::Receiver>, ret: oneshot::Sender> } } @@ -64,24 +68,35 @@ where D: AsRef<[u8]> + Sized + 'static, } } - async fn handle_update(&self, uuid: Uuid, meta: UpdateMeta, mut payload: mpsc::Receiver, ret: oneshot::Sender>) { + async fn handle_update(&self, uuid: Uuid, meta: UpdateMeta, mut payload: mpsc::Receiver>, ret: oneshot::Sender>) { let store = self.store.clone(); let update_file_id = uuid::Uuid::new_v4(); let path = self.path.join(format!("update_{}", update_file_id)); let mut file = File::create(&path).await.unwrap(); while let Some(bytes) = payload.recv().await { - file.write_all(bytes.as_ref()).await; + match bytes { + Ok(bytes) => { + file.write_all(bytes.as_ref()).await; + } + Err(e) => { + ret.send(Err(UpdateError::Error(e))); + return + } + } } file.flush().await; let file = file.into_std().await; - let result = tokio::task::spawn_blocking(move || -> anyhow::Result> { - Ok(store.register_update(meta, path, uuid)?) - }).await.unwrap().unwrap(); - let _ = ret.send(Ok(UpdateStatus::Pending(result))); + let result = tokio::task::spawn_blocking(move || { + let result = store + .register_update(meta, path, uuid) + .map(|pending| UpdateStatus::Pending(pending)) + .map_err(|e| UpdateError::Error(Box::new(e))); + let _ = ret.send(result); + }).await; } } @@ -110,7 +125,7 @@ where D: AsRef<[u8]> + Sized + 'static, Self { sender } } - pub async fn update(&self, meta: UpdateMeta, data: mpsc::Receiver, uuid: Uuid) -> Result { + pub async fn update(&self, meta: UpdateMeta, data: mpsc::Receiver>, uuid: Uuid) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Update { uuid,