MeiliSearch/meilisearch-http/src/index_controller/snapshot.rs

250 lines
7.7 KiB
Rust
Raw Normal View History

2021-03-23 16:37:46 +01:00
use std::path::{Path, PathBuf};
2021-03-17 11:53:23 +01:00
use std::time::Duration;
2021-03-22 16:51:53 +01:00
use anyhow::bail;
2021-03-22 19:19:37 +01:00
use log::{error, info};
2021-03-22 16:51:53 +01:00
use tokio::task::spawn_blocking;
2021-03-22 19:19:37 +01:00
use tokio::time::sleep;
2021-03-25 10:23:31 +01:00
use tokio::fs;
2021-03-17 11:53:23 +01:00
use super::update_actor::UpdateActorHandle;
use super::uuid_resolver::UuidResolverHandle;
2021-03-23 16:19:01 +01:00
use crate::helpers::compression;
2021-03-17 11:53:23 +01:00
#[allow(dead_code)]
2021-03-23 11:00:50 +01:00
pub struct SnapshotService<U, R> {
uuid_resolver_handle: R,
update_handle: U,
2021-03-17 11:53:23 +01:00
snapshot_period: Duration,
snapshot_path: PathBuf,
2021-03-25 10:23:31 +01:00
db_name: String,
2021-03-17 11:53:23 +01:00
}
2021-03-23 11:00:50 +01:00
impl<U, R> SnapshotService<U, R>
where
U: UpdateActorHandle,
2021-03-23 16:19:01 +01:00
R: UuidResolverHandle,
2021-03-23 11:00:50 +01:00
{
2021-03-17 11:53:23 +01:00
pub fn new(
2021-03-23 11:00:50 +01:00
uuid_resolver_handle: R,
update_handle: U,
2021-03-17 11:53:23 +01:00
snapshot_period: Duration,
snapshot_path: PathBuf,
2021-03-25 10:23:31 +01:00
db_name: String,
2021-03-17 11:53:23 +01:00
) -> Self {
Self {
uuid_resolver_handle,
update_handle,
snapshot_period,
snapshot_path,
2021-03-25 10:23:31 +01:00
db_name,
2021-03-17 11:53:23 +01:00
}
}
pub async fn run(self) {
2021-03-24 11:29:11 +01:00
info!(
2021-03-25 09:34:29 +01:00
"Snapshot scheduled every {}s.",
2021-03-24 11:29:11 +01:00
self.snapshot_period.as_secs()
);
2021-03-17 11:53:23 +01:00
loop {
2021-03-22 19:19:37 +01:00
if let Err(e) = self.perform_snapshot().await {
error!("{}", e);
}
2021-03-25 09:34:29 +01:00
sleep(self.snapshot_period).await;
2021-03-17 11:53:23 +01:00
}
}
2021-03-19 20:08:00 +01:00
async fn perform_snapshot(&self) -> anyhow::Result<()> {
2021-03-24 11:03:01 +01:00
info!("Performing snapshot.");
2021-03-25 10:23:31 +01:00
fs::create_dir_all(&self.snapshot_path).await?;
2021-03-22 16:51:53 +01:00
let temp_snapshot_dir = spawn_blocking(move || tempfile::tempdir_in(".")).await??;
let temp_snapshot_path = temp_snapshot_dir.path().to_owned();
2021-03-23 16:19:01 +01:00
let uuids = self
.uuid_resolver_handle
.snapshot(temp_snapshot_path.clone())
.await?;
2021-03-22 19:19:37 +01:00
if uuids.is_empty() {
2021-03-23 16:19:01 +01:00
return Ok(());
2021-03-22 19:19:37 +01:00
}
2021-03-22 19:59:19 +01:00
let tasks = uuids
.iter()
2021-03-23 16:19:01 +01:00
.map(|&uuid| {
self.update_handle
.snapshot(uuid, temp_snapshot_path.clone())
})
2021-03-22 19:59:19 +01:00
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await?;
2021-03-22 16:51:53 +01:00
let temp_snapshot_path_clone = temp_snapshot_path.clone();
2021-03-25 10:23:31 +01:00
let snapshot_dir = self.snapshot_path.clone();
let snapshot_path = self.snapshot_path.join(format!("{}.snapshot", self.db_name));
let snapshot_path = spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?;
let temp_snapshot_file_path = temp_snapshot_file.path().to_owned();
compression::to_tar_gz(temp_snapshot_path_clone, temp_snapshot_file_path)?;
temp_snapshot_file.persist(&snapshot_path)?;
Ok(snapshot_path)
2021-03-23 16:19:01 +01:00
})
.await??;
2021-03-22 16:51:53 +01:00
2021-03-25 10:23:31 +01:00
info!("Created snapshot in {:?}.", snapshot_path);
2021-03-22 19:19:37 +01:00
2021-03-19 20:08:00 +01:00
Ok(())
2021-03-17 11:53:23 +01:00
}
}
2021-03-23 16:19:01 +01:00
2021-03-23 16:37:46 +01:00
pub fn load_snapshot(
db_path: impl AsRef<Path>,
snapshot_path: impl AsRef<Path>,
ignore_snapshot_if_db_exists: bool,
ignore_missing_snapshot: bool,
) -> anyhow::Result<()> {
if !db_path.as_ref().exists() && snapshot_path.as_ref().exists() {
compression::from_tar_gz(snapshot_path, db_path)
} else if db_path.as_ref().exists() && !ignore_snapshot_if_db_exists {
bail!(
"database already exists at {:?}, try to delete it or rename it",
db_path
.as_ref()
.canonicalize()
2021-03-24 11:50:52 +01:00
.unwrap_or_else(|_| db_path.as_ref().to_owned())
2021-03-23 16:37:46 +01:00
)
} else if !snapshot_path.as_ref().exists() && !ignore_missing_snapshot {
bail!(
"snapshot doesn't exist at {:?}",
snapshot_path
.as_ref()
.canonicalize()
2021-03-24 11:50:52 +01:00
.unwrap_or_else(|_| snapshot_path.as_ref().to_owned())
2021-03-23 16:37:46 +01:00
)
} else {
Ok(())
}
}
2021-03-23 16:19:01 +01:00
#[cfg(test)]
mod test {
2021-03-23 16:37:46 +01:00
use futures::future::{err, ok};
2021-03-23 16:19:01 +01:00
use rand::Rng;
use tokio::time::timeout;
use uuid::Uuid;
use super::*;
use crate::index_controller::update_actor::{MockUpdateActorHandle, UpdateError};
use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidError};
#[actix_rt::test]
async fn test_normal() {
let mut rng = rand::thread_rng();
let uuids_num = rng.gen_range(5, 10);
let uuids = (0..uuids_num).map(|_| Uuid::new_v4()).collect::<Vec<_>>();
let mut uuid_resolver = MockUuidResolverHandle::new();
let uuids_clone = uuids.clone();
uuid_resolver
.expect_snapshot()
.times(1)
.returning(move |_| Box::pin(ok(uuids_clone.clone())));
let mut update_handle = MockUpdateActorHandle::new();
let uuids_clone = uuids.clone();
update_handle
.expect_snapshot()
.withf(move |uuid, _path| uuids_clone.contains(uuid))
.times(uuids_num)
.returning(move |_, _| Box::pin(ok(())));
let snapshot_path = tempfile::NamedTempFile::new_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
update_handle,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
);
snapshot_service.perform_snapshot().await.unwrap();
}
#[actix_rt::test]
async fn error_performing_uuid_snapshot() {
let mut uuid_resolver = MockUuidResolverHandle::new();
uuid_resolver
.expect_snapshot()
.times(1)
// abitrary error
.returning(|_| Box::pin(err(UuidError::NameAlreadyExist)));
let update_handle = MockUpdateActorHandle::new();
let snapshot_path = tempfile::NamedTempFile::new_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
update_handle,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
);
assert!(snapshot_service.perform_snapshot().await.is_err());
// Nothing was written to the file
assert_eq!(snapshot_path.as_file().metadata().unwrap().len(), 0);
}
#[actix_rt::test]
async fn error_performing_index_snapshot() {
let uuid = Uuid::new_v4();
let mut uuid_resolver = MockUuidResolverHandle::new();
uuid_resolver
.expect_snapshot()
.times(1)
.returning(move |_| Box::pin(ok(vec![uuid])));
let mut update_handle = MockUpdateActorHandle::new();
update_handle
.expect_snapshot()
// abitrary error
.returning(|_, _| Box::pin(err(UpdateError::UnexistingUpdate(0))));
let snapshot_path = tempfile::NamedTempFile::new_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
update_handle,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
);
assert!(snapshot_service.perform_snapshot().await.is_err());
// Nothing was written to the file
assert_eq!(snapshot_path.as_file().metadata().unwrap().len(), 0);
}
#[actix_rt::test]
async fn test_loop() {
let mut uuid_resolver = MockUuidResolverHandle::new();
uuid_resolver
.expect_snapshot()
// we expect the funtion to be called between 2 and 3 time in the given interval.
.times(2..4)
// abitrary error, to short-circuit the function
.returning(move |_| Box::pin(err(UuidError::NameAlreadyExist)));
let update_handle = MockUpdateActorHandle::new();
let snapshot_path = tempfile::NamedTempFile::new_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
update_handle,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
);
let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await;
}
}