implement snapshot scheduler

This commit is contained in:
mpostma 2021-03-17 11:53:23 +01:00
parent 2f418ee767
commit ee838be41b
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
3 changed files with 67 additions and 9 deletions

View File

@ -4,6 +4,7 @@ mod update_handler;
mod update_store; mod update_store;
mod updates; mod updates;
mod uuid_resolver; mod uuid_resolver;
mod snapshot;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
@ -19,7 +20,9 @@ use tokio::time::sleep;
use crate::index::{Document, SearchQuery, SearchResult}; use crate::index::{Document, SearchQuery, SearchResult};
use crate::index::{Facets, Settings, UpdateResult}; use crate::index::{Facets, Settings, UpdateResult};
pub use updates::{Failed, Processed, Processing}; pub use updates::{Failed, Processed, Processing};
use snapshot::SnapshotService;
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>; pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
@ -65,12 +68,19 @@ impl IndexController {
update_store_size: usize, update_store_size: usize,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?; let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?;
let index_actor = index_actor::IndexActorHandle::new(&path, index_size)?; let index_handle = index_actor::IndexActorHandle::new(&path, index_size)?;
let update_handle = let update_handle =
update_actor::UpdateActorHandle::new(index_actor.clone(), &path, update_store_size)?; update_actor::UpdateActorHandle::new(index_handle.clone(), &path, update_store_size)?;
let snapshot_service = SnapshotService::new(
index_handle.clone(),
uuid_resolver.clone(),
update_handle.clone(),
Duration::from_millis(10000),
"/dev/toto".into());
tokio::task::spawn(snapshot_service.run());
Ok(Self { Ok(Self {
uuid_resolver, uuid_resolver,
index_handle: index_actor, index_handle,
update_handle, update_handle,
}) })
} }

View File

@ -0,0 +1,48 @@
use std::path::PathBuf;
use std::time::Duration;
use tokio::time::interval;
use super::index_actor::IndexActorHandle;
use super::update_actor::UpdateActorHandle;
use super::uuid_resolver::UuidResolverHandle;
#[allow(dead_code)]
pub struct SnapshotService<B> {
index_handle: IndexActorHandle,
uuid_resolver_handle: UuidResolverHandle,
update_handle: UpdateActorHandle<B>,
snapshot_period: Duration,
snapshot_path: PathBuf,
}
impl<B> SnapshotService<B> {
pub fn new(
index_handle: IndexActorHandle,
uuid_resolver_handle: UuidResolverHandle,
update_handle: UpdateActorHandle<B>,
snapshot_period: Duration,
snapshot_path: PathBuf,
) -> Self {
Self {
index_handle,
uuid_resolver_handle,
update_handle,
snapshot_period,
snapshot_path,
}
}
pub async fn run(self) {
let mut interval = interval(self.snapshot_period);
loop {
interval.tick().await;
self.perform_snapshot().await;
}
}
async fn perform_snapshot(&self) {
println!("performing snapshot in {:?}", self.snapshot_path);
}
}

View File

@ -37,14 +37,14 @@ pub fn create_snapshot(data: &Data, snapshot_path: &Path) -> Result<(), Error> {
} }
pub fn schedule_snapshot(data: Data, snapshot_dir: &Path, time_gap_s: u64) -> Result<(), Error> { pub fn schedule_snapshot(data: Data, snapshot_dir: &Path, time_gap_s: u64) -> Result<(), Error> {
if snapshot_dir.file_name().is_none() { if snapshot_dir.file_name().is_none() {
return Err(Error::Internal("invalid snapshot file path".to_string())); return Err(Error::Internal("invalid snapshot file path".to_string()));
} }
let db_name = Path::new(&data.db_path).file_name().ok_or_else(|| Error::Internal("invalid database name".to_string()))?; let db_name = Path::new(&data.db_path).file_name().ok_or_else(|| Error::Internal("invalid database name".to_string()))?;
create_dir_all(snapshot_dir)?; create_dir_all(snapshot_dir)?;
let snapshot_path = snapshot_dir.join(format!("{}.snapshot", db_name.to_str().unwrap_or("data.ms"))); let snapshot_path = snapshot_dir.join(format!("{}.snapshot", db_name.to_str().unwrap_or("data.ms")));
thread::spawn(move || loop { thread::spawn(move || loop {
if let Err(e) = create_snapshot(&data, &snapshot_path) { if let Err(e) = create_snapshot(&data, &snapshot_path) {
error!("Unsuccessful snapshot creation: {}", e); error!("Unsuccessful snapshot creation: {}", e);
} }
@ -72,12 +72,12 @@ mod tests {
let file_1_relative = Path::new("file1.txt"); let file_1_relative = Path::new("file1.txt");
let subdir_relative = Path::new("subdir/"); let subdir_relative = Path::new("subdir/");
let file_2_relative = Path::new("subdir/file2.txt"); let file_2_relative = Path::new("subdir/file2.txt");
create_dir_all(src_dir.join(subdir_relative)).unwrap(); create_dir_all(src_dir.join(subdir_relative)).unwrap();
fs::File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap(); fs::File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap();
fs::File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap(); fs::File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap();
assert!(compression::to_tar_gz(&src_dir, &archive_path).is_ok()); assert!(compression::to_tar_gz(&src_dir, &archive_path).is_ok());
assert!(archive_path.exists()); assert!(archive_path.exists());
assert!(load_snapshot(&dest_dir.to_str().unwrap(), &archive_path, false, false).is_ok()); assert!(load_snapshot(&dest_dir.to_str().unwrap(), &archive_path, false, false).is_ok());
@ -89,7 +89,7 @@ mod tests {
let contents = fs::read_to_string(dest_dir.join(file_1_relative)).unwrap(); let contents = fs::read_to_string(dest_dir.join(file_1_relative)).unwrap();
assert_eq!(contents, "Hello_file_1"); assert_eq!(contents, "Hello_file_1");
let contents = fs::read_to_string(dest_dir.join(file_2_relative)).unwrap(); let contents = fs::read_to_string(dest_dir.join(file_2_relative)).unwrap();
assert_eq!(contents, "Hello_file_2"); assert_eq!(contents, "Hello_file_2");
} }