improve dump info reports

This commit is contained in:
Marin Postma 2021-05-31 15:34:03 +02:00
parent b3c8f0e1f6
commit 10fc870684
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30

View File

@ -1,9 +1,9 @@
use std::path::{Path, PathBuf}; use std::{collections::HashMap, path::{Path, PathBuf}};
use std::sync::Arc; use std::sync::Arc;
use async_stream::stream; use async_stream::stream;
use chrono::Utc; use chrono::Utc;
use futures::stream::StreamExt; use futures::{lock::Mutex, stream::StreamExt};
use log::{error, info}; use log::{error, info};
use update_actor::UpdateActorHandle; use update_actor::UpdateActorHandle;
use uuid_resolver::UuidResolverHandle; use uuid_resolver::UuidResolverHandle;
@ -19,7 +19,8 @@ pub struct DumpActor<UuidResolver, Update> {
uuid_resolver: UuidResolver, uuid_resolver: UuidResolver,
update: Update, update: Update,
dump_path: PathBuf, dump_path: PathBuf,
dump_info: Arc<RwLock<Option<DumpInfo>>>, lock: Arc<Mutex<()>>,
dump_infos: Arc<RwLock<HashMap<String, DumpInfo>>>,
update_db_size: u64, update_db_size: u64,
index_db_size: u64, index_db_size: u64,
} }
@ -42,12 +43,15 @@ where
index_db_size: u64, index_db_size: u64,
update_db_size: u64, update_db_size: u64,
) -> Self { ) -> Self {
let dump_infos = Arc::new(RwLock::new(HashMap::new()));
let lock = Arc::new(Mutex::new(()));
Self { Self {
inbox: Some(inbox), inbox: Some(inbox),
uuid_resolver, uuid_resolver,
update, update,
dump_path: dump_path.as_ref().into(), dump_path: dump_path.as_ref().into(),
dump_info: Arc::new(RwLock::new(None)), dump_infos,
lock,
index_db_size, index_db_size,
update_db_size, update_db_size,
} }
@ -91,21 +95,22 @@ where
} }
async fn handle_create_dump(&self, ret: oneshot::Sender<DumpResult<DumpInfo>>) { async fn handle_create_dump(&self, ret: oneshot::Sender<DumpResult<DumpInfo>>) {
if self.is_running().await { let uid = generate_uid();
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress);
let _lock = match self.lock.try_lock() {
Some(lock) => lock,
None => {
ret.send(Err(DumpError::DumpAlreadyRunning)) ret.send(Err(DumpError::DumpAlreadyRunning))
.expect("Dump actor is dead"); .expect("Dump actor is dead");
return; return;
} }
let uid = generate_uid(); };
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress); self.dump_infos.write().await.insert(uid.clone(), info.clone());
*self.dump_info.write().await = Some(info.clone());
ret.send(Ok(info)).expect("Dump actor is dead"); ret.send(Ok(info)).expect("Dump actor is dead");
let dump_info = self.dump_info.clone();
let task = DumpTask { let task = DumpTask {
path: self.dump_path.clone(), path: self.dump_path.clone(),
uuid_resolver: self.uuid_resolver.clone(), uuid_resolver: self.uuid_resolver.clone(),
@ -117,45 +122,34 @@ where
let task_result = tokio::task::spawn(task.run()).await; let task_result = tokio::task::spawn(task.run()).await;
let mut dump_infos = self.dump_infos
.write()
.await;
let dump_infos =
dump_infos
.get_mut(&uid)
.expect("dump entry deleted while lock was acquired");
match task_result { match task_result {
Ok(Ok(())) => { Ok(Ok(())) => {
(*dump_info.write().await).as_mut().expect("Inconsistent dump service state").done(); dump_infos.done();
info!("Dump succeed"); info!("Dump succeed");
} }
Ok(Err(e)) => { Ok(Err(e)) => {
(*dump_info.write().await).as_mut().expect("Inconsistent dump service state").with_error(e.to_string()); dump_infos.with_error(e.to_string());
error!("Dump failed: {}", e); error!("Dump failed: {}", e);
} }
Err(_) => { Err(_) => {
dump_infos.with_error("Unexpected error while performing dump.".to_string());
error!("Dump panicked. Dump status set to failed"); error!("Dump panicked. Dump status set to failed");
(*dump_info.write().await).as_mut().expect("Inconsistent dump service state").with_error("Unexpected error while performing dump.".to_string());
} }
}; };
} }
async fn handle_dump_info(&self, uid: String) -> DumpResult<DumpInfo> { async fn handle_dump_info(&self, uid: String) -> DumpResult<DumpInfo> {
match &*self.dump_info.read().await { match self.dump_infos.read().await.get(&uid) {
None => self.dump_from_fs(uid).await,
Some(DumpInfo { uid: ref s, .. }) if &uid != s => self.dump_from_fs(uid).await,
Some(info) => Ok(info.clone()), Some(info) => Ok(info.clone()),
_ => Err(DumpError::DumpDoesNotExist(uid)),
} }
} }
async fn dump_from_fs(&self, uid: String) -> DumpResult<DumpInfo> {
self.dump_path
.join(format!("{}.dump", &uid))
.exists()
.then(|| DumpInfo::new(uid.clone(), DumpStatus::Done))
.ok_or(DumpError::DumpDoesNotExist(uid))
}
async fn is_running(&self) -> bool {
matches!(
*self.dump_info.read().await,
Some(DumpInfo {
status: DumpStatus::InProgress,
..
})
)
}
} }