2021-05-31 16:40:59 +02:00
|
|
|
use std::collections::HashMap;
|
|
|
|
use std::path::{Path, PathBuf};
|
2021-05-27 14:30:20 +02:00
|
|
|
use std::sync::Arc;
|
|
|
|
|
2021-05-24 15:42:12 +02:00
|
|
|
use async_stream::stream;
|
2021-05-10 20:25:09 +02:00
|
|
|
use chrono::Utc;
|
2021-05-31 15:34:03 +02:00
|
|
|
use futures::{lock::Mutex, stream::StreamExt};
|
2021-06-23 10:41:55 +02:00
|
|
|
use log::{error, trace};
|
2021-05-31 16:03:39 +02:00
|
|
|
use tokio::sync::{mpsc, oneshot, RwLock};
|
2021-05-27 10:51:19 +02:00
|
|
|
use update_actor::UpdateActorHandle;
|
|
|
|
use uuid_resolver::UuidResolverHandle;
|
2021-05-27 14:30:20 +02:00
|
|
|
|
2021-06-15 17:39:07 +02:00
|
|
|
use super::error::{DumpActorError, Result};
|
2021-06-14 21:26:35 +02:00
|
|
|
use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask};
|
2021-05-27 14:30:20 +02:00
|
|
|
use crate::index_controller::{update_actor, uuid_resolver};
|
2021-05-10 20:25:09 +02:00
|
|
|
|
2021-05-24 15:42:12 +02:00
|
|
|
pub const CONCURRENT_DUMP_MSG: usize = 10;
|
|
|
|
|
2021-05-26 20:42:09 +02:00
|
|
|
pub struct DumpActor<UuidResolver, Update> {
|
2021-05-24 15:42:12 +02:00
|
|
|
inbox: Option<mpsc::Receiver<DumpMsg>>,
|
2021-05-24 18:06:20 +02:00
|
|
|
uuid_resolver: UuidResolver,
|
|
|
|
update: Update,
|
|
|
|
dump_path: PathBuf,
|
2021-05-31 15:34:03 +02:00
|
|
|
lock: Arc<Mutex<()>>,
|
|
|
|
dump_infos: Arc<RwLock<HashMap<String, DumpInfo>>>,
|
2021-05-31 16:40:59 +02:00
|
|
|
update_db_size: usize,
|
|
|
|
index_db_size: usize,
|
2021-05-10 20:25:09 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Generate uid from creation date
|
|
|
|
fn generate_uid() -> String {
|
|
|
|
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
|
|
|
|
}
|
|
|
|
|
2021-05-26 20:42:09 +02:00
|
|
|
impl<UuidResolver, Update> DumpActor<UuidResolver, Update>
|
2021-05-10 20:25:09 +02:00
|
|
|
where
|
2021-05-27 10:51:19 +02:00
|
|
|
UuidResolver: UuidResolverHandle + Send + Sync + Clone + 'static,
|
|
|
|
Update: UpdateActorHandle + Send + Sync + Clone + 'static,
|
2021-05-10 20:25:09 +02:00
|
|
|
{
|
|
|
|
pub fn new(
|
|
|
|
inbox: mpsc::Receiver<DumpMsg>,
|
|
|
|
uuid_resolver: UuidResolver,
|
|
|
|
update: Update,
|
|
|
|
dump_path: impl AsRef<Path>,
|
2021-05-31 16:40:59 +02:00
|
|
|
index_db_size: usize,
|
|
|
|
update_db_size: usize,
|
2021-05-10 20:25:09 +02:00
|
|
|
) -> Self {
|
2021-05-31 15:34:03 +02:00
|
|
|
let dump_infos = Arc::new(RwLock::new(HashMap::new()));
|
|
|
|
let lock = Arc::new(Mutex::new(()));
|
2021-05-10 20:25:09 +02:00
|
|
|
Self {
|
2021-05-24 15:42:12 +02:00
|
|
|
inbox: Some(inbox),
|
2021-05-24 18:06:20 +02:00
|
|
|
uuid_resolver,
|
|
|
|
update,
|
|
|
|
dump_path: dump_path.as_ref().into(),
|
2021-05-31 15:34:03 +02:00
|
|
|
dump_infos,
|
|
|
|
lock,
|
2021-05-27 10:51:19 +02:00
|
|
|
index_db_size,
|
|
|
|
update_db_size,
|
2021-05-10 20:25:09 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn run(mut self) {
|
2021-06-23 10:41:55 +02:00
|
|
|
trace!("Started dump actor.");
|
2021-05-10 20:25:09 +02:00
|
|
|
|
2021-05-24 15:42:12 +02:00
|
|
|
let mut inbox = self
|
|
|
|
.inbox
|
|
|
|
.take()
|
|
|
|
.expect("Dump Actor must have a inbox at this point.");
|
|
|
|
|
|
|
|
let stream = stream! {
|
|
|
|
loop {
|
|
|
|
match inbox.recv().await {
|
|
|
|
Some(msg) => yield msg,
|
|
|
|
None => break,
|
2021-05-10 20:25:09 +02:00
|
|
|
}
|
|
|
|
}
|
2021-05-24 15:42:12 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
stream
|
|
|
|
.for_each_concurrent(Some(CONCURRENT_DUMP_MSG), |msg| self.handle_message(msg))
|
|
|
|
.await;
|
2021-05-10 20:25:09 +02:00
|
|
|
|
|
|
|
error!("Dump actor stopped.");
|
|
|
|
}
|
2021-05-24 15:42:12 +02:00
|
|
|
|
|
|
|
async fn handle_message(&self, msg: DumpMsg) {
|
|
|
|
use DumpMsg::*;
|
|
|
|
|
|
|
|
match msg {
|
|
|
|
CreateDump { ret } => {
|
2021-05-24 18:06:20 +02:00
|
|
|
let _ = self.handle_create_dump(ret).await;
|
2021-05-24 15:42:12 +02:00
|
|
|
}
|
|
|
|
DumpInfo { ret, uid } => {
|
2021-05-24 18:06:20 +02:00
|
|
|
let _ = ret.send(self.handle_dump_info(uid).await);
|
2021-05-24 15:42:12 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-05-10 20:25:09 +02:00
|
|
|
|
2021-06-14 21:26:35 +02:00
|
|
|
async fn handle_create_dump(&self, ret: oneshot::Sender<Result<DumpInfo>>) {
|
2021-05-10 20:25:09 +02:00
|
|
|
let uid = generate_uid();
|
|
|
|
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress);
|
2021-05-30 15:55:17 +02:00
|
|
|
|
2021-05-31 15:34:03 +02:00
|
|
|
let _lock = match self.lock.try_lock() {
|
|
|
|
Some(lock) => lock,
|
|
|
|
None => {
|
2021-06-14 21:26:35 +02:00
|
|
|
ret.send(Err(DumpActorError::DumpAlreadyRunning))
|
2021-05-31 15:34:03 +02:00
|
|
|
.expect("Dump actor is dead");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-05-31 16:03:39 +02:00
|
|
|
self.dump_infos
|
|
|
|
.write()
|
|
|
|
.await
|
|
|
|
.insert(uid.clone(), info.clone());
|
2021-05-10 20:25:09 +02:00
|
|
|
|
2021-05-24 17:33:42 +02:00
|
|
|
ret.send(Ok(info)).expect("Dump actor is dead");
|
|
|
|
|
2021-05-27 10:51:19 +02:00
|
|
|
let task = DumpTask {
|
|
|
|
path: self.dump_path.clone(),
|
|
|
|
uuid_resolver: self.uuid_resolver.clone(),
|
|
|
|
update_handle: self.update.clone(),
|
|
|
|
uid: uid.clone(),
|
|
|
|
update_db_size: self.update_db_size,
|
|
|
|
index_db_size: self.index_db_size,
|
|
|
|
};
|
|
|
|
|
|
|
|
let task_result = tokio::task::spawn(task.run()).await;
|
2021-05-24 17:33:42 +02:00
|
|
|
|
2021-05-31 16:03:39 +02:00
|
|
|
let mut dump_infos = self.dump_infos.write().await;
|
|
|
|
let dump_infos = dump_infos
|
2021-05-31 15:34:03 +02:00
|
|
|
.get_mut(&uid)
|
|
|
|
.expect("dump entry deleted while lock was acquired");
|
|
|
|
|
2021-05-24 17:33:42 +02:00
|
|
|
match task_result {
|
|
|
|
Ok(Ok(())) => {
|
2021-05-31 15:34:03 +02:00
|
|
|
dump_infos.done();
|
2021-06-23 10:41:55 +02:00
|
|
|
trace!("Dump succeed");
|
2021-05-24 17:33:42 +02:00
|
|
|
}
|
|
|
|
Ok(Err(e)) => {
|
2021-05-31 15:34:03 +02:00
|
|
|
dump_infos.with_error(e.to_string());
|
2021-05-24 17:33:42 +02:00
|
|
|
error!("Dump failed: {}", e);
|
|
|
|
}
|
|
|
|
Err(_) => {
|
2021-05-31 15:34:03 +02:00
|
|
|
dump_infos.with_error("Unexpected error while performing dump.".to_string());
|
2021-05-24 17:33:42 +02:00
|
|
|
error!("Dump panicked. Dump status set to failed");
|
|
|
|
}
|
|
|
|
};
|
2021-05-10 20:25:09 +02:00
|
|
|
}
|
|
|
|
|
2021-06-14 21:26:35 +02:00
|
|
|
async fn handle_dump_info(&self, uid: String) -> Result<DumpInfo> {
|
2021-05-31 15:34:03 +02:00
|
|
|
match self.dump_infos.read().await.get(&uid) {
|
2021-05-10 20:25:09 +02:00
|
|
|
Some(info) => Ok(info.clone()),
|
2021-06-14 21:26:35 +02:00
|
|
|
_ => Err(DumpActorError::DumpDoesNotExist(uid)),
|
2021-05-10 20:25:09 +02:00
|
|
|
}
|
|
|
|
}
|
2021-05-24 18:06:20 +02:00
|
|
|
}
|