mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-23 21:20:24 +01:00
integration test snapshot
This commit is contained in:
parent
06f9dae0f3
commit
1f16c8d224
@ -39,6 +39,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(self) {
|
pub async fn run(self) {
|
||||||
|
info!("Snashot scheduled every {}s.", self.snapshot_period.as_secs());
|
||||||
loop {
|
loop {
|
||||||
sleep(self.snapshot_period).await;
|
sleep(self.snapshot_period).await;
|
||||||
if let Err(e) = self.perform_snapshot().await {
|
if let Err(e) = self.perform_snapshot().await {
|
||||||
@ -52,6 +53,8 @@ where
|
|||||||
bail!("Invalid snapshot file path.");
|
bail!("Invalid snapshot file path.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!("Performing snapshot.");
|
||||||
|
|
||||||
let temp_snapshot_dir = spawn_blocking(move || tempfile::tempdir_in(".")).await??;
|
let temp_snapshot_dir = spawn_blocking(move || tempfile::tempdir_in(".")).await??;
|
||||||
let temp_snapshot_path = temp_snapshot_dir.path().to_owned();
|
let temp_snapshot_path = temp_snapshot_dir.path().to_owned();
|
||||||
|
|
||||||
|
@ -1,96 +0,0 @@
|
|||||||
use crate::Data;
|
|
||||||
use crate::error::Error;
|
|
||||||
use crate::helpers::compression;
|
|
||||||
|
|
||||||
use log::error;
|
|
||||||
use std::fs::create_dir_all;
|
|
||||||
use std::path::Path;
|
|
||||||
use std::thread;
|
|
||||||
use std::time::{Duration};
|
|
||||||
use tempfile::TempDir;
|
|
||||||
|
|
||||||
pub fn load_snapshot(
|
|
||||||
db_path: &str,
|
|
||||||
snapshot_path: &Path,
|
|
||||||
ignore_snapshot_if_db_exists: bool,
|
|
||||||
ignore_missing_snapshot: bool
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let db_path = Path::new(db_path);
|
|
||||||
|
|
||||||
if !db_path.exists() && snapshot_path.exists() {
|
|
||||||
compression::from_tar_gz(snapshot_path, db_path)
|
|
||||||
} else if db_path.exists() && !ignore_snapshot_if_db_exists {
|
|
||||||
Err(Error::Internal(format!("database already exists at {:?}, try to delete it or rename it", db_path.canonicalize().unwrap_or(db_path.into()))))
|
|
||||||
} else if !snapshot_path.exists() && !ignore_missing_snapshot {
|
|
||||||
Err(Error::Internal(format!("snapshot doesn't exist at {:?}", snapshot_path.canonicalize().unwrap_or(snapshot_path.into()))))
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn create_snapshot(data: &Data, snapshot_path: &Path) -> Result<(), Error> {
|
|
||||||
let tmp_dir = TempDir::new()?;
|
|
||||||
|
|
||||||
data.db.copy_and_compact_to_path(tmp_dir.path())?;
|
|
||||||
|
|
||||||
compression::to_tar_gz(tmp_dir.path(), snapshot_path).map_err(|e| Error::Internal(format!("something went wrong during snapshot compression: {}", e)))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn schedule_snapshot(data: Data, snapshot_dir: &Path, time_gap_s: u64) -> Result<(), Error> {
|
|
||||||
if snapshot_dir.file_name().is_none() {
|
|
||||||
return Err(Error::Internal("invalid snapshot file path".to_string()));
|
|
||||||
}
|
|
||||||
let db_name = Path::new(&data.db_path).file_name().ok_or_else(|| Error::Internal("invalid database name".to_string()))?;
|
|
||||||
create_dir_all(snapshot_dir)?;
|
|
||||||
let snapshot_path = snapshot_dir.join(format!("{}.snapshot", db_name.to_str().unwrap_or("data.ms")));
|
|
||||||
|
|
||||||
thread::spawn(move || loop {
|
|
||||||
if let Err(e) = create_snapshot(&data, &snapshot_path) {
|
|
||||||
error!("Unsuccessful snapshot creation: {}", e);
|
|
||||||
}
|
|
||||||
thread::sleep(Duration::from_secs(time_gap_s));
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use std::io::prelude::*;
|
|
||||||
use std::fs;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_pack_unpack() {
|
|
||||||
let tempdir = TempDir::new().unwrap();
|
|
||||||
|
|
||||||
let test_dir = tempdir.path();
|
|
||||||
let src_dir = test_dir.join("src");
|
|
||||||
let dest_dir = test_dir.join("complex/destination/path/");
|
|
||||||
let archive_path = test_dir.join("archive.snapshot");
|
|
||||||
|
|
||||||
let file_1_relative = Path::new("file1.txt");
|
|
||||||
let subdir_relative = Path::new("subdir/");
|
|
||||||
let file_2_relative = Path::new("subdir/file2.txt");
|
|
||||||
|
|
||||||
create_dir_all(src_dir.join(subdir_relative)).unwrap();
|
|
||||||
fs::File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap();
|
|
||||||
fs::File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap();
|
|
||||||
|
|
||||||
|
|
||||||
assert!(compression::to_tar_gz(&src_dir, &archive_path).is_ok());
|
|
||||||
assert!(archive_path.exists());
|
|
||||||
assert!(load_snapshot(&dest_dir.to_str().unwrap(), &archive_path, false, false).is_ok());
|
|
||||||
|
|
||||||
assert!(dest_dir.exists());
|
|
||||||
assert!(dest_dir.join(file_1_relative).exists());
|
|
||||||
assert!(dest_dir.join(subdir_relative).exists());
|
|
||||||
assert!(dest_dir.join(file_2_relative).exists());
|
|
||||||
|
|
||||||
let contents = fs::read_to_string(dest_dir.join(file_1_relative)).unwrap();
|
|
||||||
assert_eq!(contents, "Hello_file_1");
|
|
||||||
|
|
||||||
let contents = fs::read_to_string(dest_dir.join(file_2_relative)).unwrap();
|
|
||||||
assert_eq!(contents, "Hello_file_2");
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,6 +1,6 @@
|
|||||||
mod index;
|
pub mod index;
|
||||||
mod server;
|
pub mod server;
|
||||||
mod service;
|
pub mod service;
|
||||||
|
|
||||||
pub use index::{GetAllDocumentsOptions, GetDocumentOptions};
|
pub use index::{GetAllDocumentsOptions, GetDocumentOptions};
|
||||||
pub use server::Server;
|
pub use server::Server;
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
use std::path::Path;
|
||||||
|
|
||||||
use actix_web::http::StatusCode;
|
use actix_web::http::StatusCode;
|
||||||
use byte_unit::{Byte, ByteUnit};
|
use byte_unit::{Byte, ByteUnit};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
@ -12,17 +14,50 @@ use super::service::Service;
|
|||||||
|
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
pub service: Service,
|
pub service: Service,
|
||||||
// hod ownership to the tempdir while we use the server instance.
|
// hold ownership to the tempdir while we use the server instance.
|
||||||
_dir: tempdir::TempDir,
|
_dir: Option<tempdir::TempDir>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
pub async fn new() -> Self {
|
pub async fn new() -> Self {
|
||||||
let dir = TempDir::new("meilisearch").unwrap();
|
let dir = TempDir::new("meilisearch").unwrap();
|
||||||
|
|
||||||
let opt = Opt {
|
let opt = default_settings(dir.path());
|
||||||
db_path: dir.path().join("db"),
|
|
||||||
dumps_dir: dir.path().join("dump"),
|
let data = Data::new(opt).unwrap();
|
||||||
|
let service = Service(data);
|
||||||
|
|
||||||
|
Server { service, _dir: Some(dir) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn new_with_options(opt: Opt) -> Self {
|
||||||
|
let data = Data::new(opt).unwrap();
|
||||||
|
let service = Service(data);
|
||||||
|
|
||||||
|
Server { service, _dir: None }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a view to an index. There is no guarantee that the index exists.
|
||||||
|
pub fn index<'a>(&'a self, uid: impl AsRef<str>) -> Index<'a> {
|
||||||
|
Index {
|
||||||
|
uid: encode(uid.as_ref()),
|
||||||
|
service: &self.service,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_indexes(&self) -> (Value, StatusCode) {
|
||||||
|
self.service.get("/indexes").await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn version(&self) -> (Value, StatusCode) {
|
||||||
|
self.service.get("/version").await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn default_settings(dir: impl AsRef<Path>) -> Opt {
|
||||||
|
Opt {
|
||||||
|
db_path: dir.as_ref().join("db"),
|
||||||
|
dumps_dir: dir.as_ref().join("dump"),
|
||||||
dump_batch_size: 16,
|
dump_batch_size: 16,
|
||||||
http_addr: "127.0.0.1:7700".to_owned(),
|
http_addr: "127.0.0.1:7700".to_owned(),
|
||||||
master_key: None,
|
master_key: None,
|
||||||
@ -50,27 +85,5 @@ impl Server {
|
|||||||
sentry_dsn: String::from(""),
|
sentry_dsn: String::from(""),
|
||||||
#[cfg(all(not(debug_assertions), feature = "sentry"))]
|
#[cfg(all(not(debug_assertions), feature = "sentry"))]
|
||||||
no_sentry: true,
|
no_sentry: true,
|
||||||
};
|
|
||||||
|
|
||||||
let data = Data::new(opt).unwrap();
|
|
||||||
let service = Service(data);
|
|
||||||
|
|
||||||
Server { service, _dir: dir }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a view to an index. There is no guarantee that the index exists.
|
|
||||||
pub fn index<'a>(&'a self, uid: impl AsRef<str>) -> Index<'a> {
|
|
||||||
Index {
|
|
||||||
uid: encode(uid.as_ref()),
|
|
||||||
service: &self.service,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn list_indexes(&self) -> (Value, StatusCode) {
|
|
||||||
self.service.get("/indexes").await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn version(&self) -> (Value, StatusCode) {
|
|
||||||
self.service.get("/version").await
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,8 +3,9 @@ mod documents;
|
|||||||
mod index;
|
mod index;
|
||||||
mod search;
|
mod search;
|
||||||
mod settings;
|
mod settings;
|
||||||
mod updates;
|
mod snapshot;
|
||||||
mod stats;
|
mod stats;
|
||||||
|
mod updates;
|
||||||
|
|
||||||
// Tests are isolated by features in different modules to allow better readability, test
|
// Tests are isolated by features in different modules to allow better readability, test
|
||||||
// targetability, and improved incremental compilation times.
|
// targetability, and improved incremental compilation times.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user