integrate in dump actor

This commit is contained in:
Marin Postma 2021-05-25 18:14:11 +02:00
parent 3593ebb8aa
commit 9278a6fe59
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30

View File

@ -106,7 +106,6 @@ where
let task_result = tokio::task::spawn(perform_dump( let task_result = tokio::task::spawn(perform_dump(
self.dump_path.clone(), self.dump_path.clone(),
self.uuid_resolver.clone(), self.uuid_resolver.clone(),
self.index.clone(),
self.update.clone(), self.update.clone(),
uid.clone(), uid.clone(),
)) ))
@ -155,50 +154,28 @@ where
} }
} }
async fn perform_dump<UuidResolver, Index, Update>( async fn perform_dump<UuidResolver, Update>(
dump_path: PathBuf, dump_path: PathBuf,
uuid_resolver: UuidResolver, uuid_resolver: UuidResolver,
index: Index, update_handle: Update,
update: Update,
uid: String, uid: String,
) -> anyhow::Result<()> ) -> anyhow::Result<()>
where where
UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static,
Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static,
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static,
{ {
info!("Performing dump."); info!("Performing dump.");
let dump_dir = dump_path.clone(); let dump_path_clone = dump_path.clone();
tokio::fs::create_dir_all(&dump_dir).await?; let temp_dump_path = tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(dump_path_clone)).await??;
let temp_dump_dir =
tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let uuids = uuid_resolver.list().await?; let uuids = uuid_resolver.dump(temp_dump_path.path().to_owned()).await?;
// maybe we could just keep the vec as-is
let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect();
if uuids.is_empty() { update_handle.dump(uuids, temp_dump_path.path().to_owned()).await?;
return Ok(());
}
let indexes = list_indexes(&uuid_resolver, &index).await?;
// we create one directory by index
for meta in indexes.iter() {
tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?;
}
let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string());
metadata.to_path(&temp_dump_path).await?;
update.dump(uuids, temp_dump_path.clone()).await?;
let dump_dir = dump_path.clone();
let dump_path = dump_path.join(format!("{}.dump", uid)); let dump_path = dump_path.join(format!("{}.dump", uid));
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> { let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?; let temp_dump_file = tempfile::NamedTempFile::new_in(&dump_path)?;
let temp_dump_file_path = temp_dump_file.path().to_owned(); let temp_dump_file_path = temp_dump_file.path().to_owned();
compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?; compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?;
temp_dump_file.persist(&dump_path)?; temp_dump_file.persist(&dump_path)?;