remove the dump_inner trickery

This commit is contained in:
tamo 2021-05-24 18:06:20 +02:00
parent dcf29e1081
commit 912f0286b3
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69

View File

@ -17,16 +17,11 @@ pub const CONCURRENT_DUMP_MSG: usize = 10;
pub struct DumpActor<UuidResolver, Index, Update> { pub struct DumpActor<UuidResolver, Index, Update> {
inbox: Option<mpsc::Receiver<DumpMsg>>, inbox: Option<mpsc::Receiver<DumpMsg>>,
inner: InnerDump<UuidResolver, Index, Update>, uuid_resolver: UuidResolver,
} index: Index,
update: Update,
#[derive(Clone)] dump_path: PathBuf,
struct InnerDump<UuidResolver, Index, Update> { dump_info: Arc<Mutex<Option<DumpInfo>>>,
pub uuid_resolver: UuidResolver,
pub index: Index,
pub update: Update,
pub dump_path: PathBuf,
pub dump_info: Arc<Mutex<Option<DumpInfo>>>,
} }
/// Generate uid from creation date /// Generate uid from creation date
@ -49,13 +44,11 @@ where
) -> Self { ) -> Self {
Self { Self {
inbox: Some(inbox), inbox: Some(inbox),
inner: InnerDump { uuid_resolver,
uuid_resolver, index,
index, update,
update, dump_path: dump_path.as_ref().into(),
dump_path: dump_path.as_ref().into(), dump_info: Arc::new(Mutex::new(None)),
dump_info: Arc::new(Mutex::new(None)),
},
} }
} }
@ -88,22 +81,15 @@ where
match msg { match msg {
CreateDump { ret } => { CreateDump { ret } => {
let _ = self.inner.clone().handle_create_dump(ret).await; let _ = self.handle_create_dump(ret).await;
} }
DumpInfo { ret, uid } => { DumpInfo { ret, uid } => {
let _ = ret.send(self.inner.handle_dump_info(uid).await); let _ = ret.send(self.handle_dump_info(uid).await);
} }
} }
} }
}
impl<UuidResolver, Index, Update> InnerDump<UuidResolver, Index, Update> async fn handle_create_dump(&self, ret: oneshot::Sender<DumpResult<DumpInfo>>) {
where
UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static,
Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static,
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static,
{
async fn handle_create_dump(self, ret: oneshot::Sender<DumpResult<DumpInfo>>) {
if self.is_running().await { if self.is_running().await {
ret.send(Err(DumpError::DumpAlreadyRunning)) ret.send(Err(DumpError::DumpAlreadyRunning))
.expect("Dump actor is dead"); .expect("Dump actor is dead");
@ -116,9 +102,15 @@ where
ret.send(Ok(info)).expect("Dump actor is dead"); ret.send(Ok(info)).expect("Dump actor is dead");
let dump_info = self.dump_info.clone(); let dump_info = self.dump_info.clone();
let cloned_uid = uid.clone();
let task_result = tokio::task::spawn(self.clone().perform_dump(cloned_uid)).await; let task_result = tokio::task::spawn(perform_dump(
self.dump_path.clone(),
self.uuid_resolver.clone(),
self.index.clone(),
self.update.clone(),
uid.clone(),
))
.await;
match task_result { match task_result {
Ok(Ok(())) => { Ok(Ok(())) => {
@ -144,70 +136,6 @@ where
}; };
} }
async fn perform_dump(self, uid: String) -> anyhow::Result<()> {
info!("Performing dump.");
let dump_dir = self.dump_path.clone();
tokio::fs::create_dir_all(&dump_dir).await?;
let temp_dump_dir =
tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let uuids = self.uuid_resolver.list().await?;
// maybe we could just keep the vec as-is
let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect();
if uuids.is_empty() {
return Ok(());
}
let indexes = self.list_indexes().await?;
// we create one directory by index
for meta in indexes.iter() {
tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?;
}
let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string());
metadata.to_path(&temp_dump_path).await?;
self.update.dump(uuids, temp_dump_path.clone()).await?;
let dump_dir = self.dump_path.clone();
let dump_path = self.dump_path.join(format!("{}.dump", uid));
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?;
let temp_dump_file_path = temp_dump_file.path().to_owned();
compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?;
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
async fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
let uuids = self.uuid_resolver.list().await?;
let mut ret = Vec::new();
for (uid, uuid) in uuids {
let meta = self.index.get_index_meta(uuid).await?;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
uid,
meta,
};
ret.push(meta);
}
Ok(ret)
}
async fn handle_dump_info(&self, uid: String) -> DumpResult<DumpInfo> { async fn handle_dump_info(&self, uid: String) -> DumpResult<DumpInfo> {
match &*self.dump_info.lock().await { match &*self.dump_info.lock().await {
None => self.dump_from_fs(uid).await, None => self.dump_from_fs(uid).await,
@ -234,3 +162,85 @@ where
) )
} }
} }
async fn perform_dump<UuidResolver, Index, Update>(
dump_path: PathBuf,
uuid_resolver: UuidResolver,
index: Index,
update: Update,
uid: String,
) -> anyhow::Result<()>
where
UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static,
Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static,
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static,
{
info!("Performing dump.");
let dump_dir = dump_path.clone();
tokio::fs::create_dir_all(&dump_dir).await?;
let temp_dump_dir =
tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let uuids = uuid_resolver.list().await?;
// maybe we could just keep the vec as-is
let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect();
if uuids.is_empty() {
return Ok(());
}
let indexes = list_indexes(&uuid_resolver, &index).await?;
// we create one directory by index
for meta in indexes.iter() {
tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?;
}
let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string());
metadata.to_path(&temp_dump_path).await?;
update.dump(uuids, temp_dump_path.clone()).await?;
let dump_dir = dump_path.clone();
let dump_path = dump_path.join(format!("{}.dump", uid));
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?;
let temp_dump_file_path = temp_dump_file.path().to_owned();
compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?;
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
async fn list_indexes<UuidResolver, Index>(
uuid_resolver: &UuidResolver,
index: &Index,
) -> anyhow::Result<Vec<IndexMetadata>>
where
UuidResolver: uuid_resolver::UuidResolverHandle,
Index: index_actor::IndexActorHandle,
{
let uuids = uuid_resolver.list().await?;
let mut ret = Vec::new();
for (uid, uuid) in uuids {
let meta = index.get_index_meta(uuid).await?;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
uid,
meta,
};
ret.push(meta);
}
Ok(ret)
}