mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-23 13:24:27 +01:00
Merge #982
982: fix backups r=MarinPostma a=LegendreM * pluralize variable `backup_folder` -> `backups_folder` * change env case `MEILI_backup_folder` -> `MEILI_BACKUPS_FOLDER` * add miliseconds to backup ID to reduce colisions Co-authored-by: many <maxime@meilisearch.com>
This commit is contained in:
commit
4398f2c023
4
.github/workflows/test.yml
vendored
4
.github/workflows/test.yml
vendored
@ -30,11 +30,11 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
command: test
|
command: test
|
||||||
args: --locked --release
|
args: --locked --release
|
||||||
- name: Run cargo test backup
|
- name: Run cargo test dump
|
||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
with:
|
with:
|
||||||
command: test
|
command: test
|
||||||
args: backup --locked --release -- --ignored --test-threads 1
|
args: dump --locked --release -- --ignored --test-threads 1
|
||||||
- name: Run cargo clippy
|
- name: Run cargo clippy
|
||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
with:
|
with:
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
## v0.15.0
|
## v0.15.0
|
||||||
|
|
||||||
- Backups (#887)
|
- Dumps (#887)
|
||||||
- Update actix-web dependency to 3.0.0 (#963)
|
- Update actix-web dependency to 3.0.0 (#963)
|
||||||
- Consider an empty query to be a placeholder search (#916)
|
- Consider an empty query to be a placeholder search (#916)
|
||||||
|
|
||||||
|
@ -77,8 +77,8 @@ pub enum Code {
|
|||||||
SearchDocuments,
|
SearchDocuments,
|
||||||
UnsupportedMediaType,
|
UnsupportedMediaType,
|
||||||
|
|
||||||
BackupAlreadyInProgress,
|
DumpAlreadyInProgress,
|
||||||
BackupProcessFailed,
|
DumpProcessFailed,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Code {
|
impl Code {
|
||||||
@ -126,9 +126,9 @@ impl Code {
|
|||||||
SearchDocuments => ErrCode::internal("search_error", StatusCode::BAD_REQUEST),
|
SearchDocuments => ErrCode::internal("search_error", StatusCode::BAD_REQUEST),
|
||||||
UnsupportedMediaType => ErrCode::invalid("unsupported_media_type", StatusCode::UNSUPPORTED_MEDIA_TYPE),
|
UnsupportedMediaType => ErrCode::invalid("unsupported_media_type", StatusCode::UNSUPPORTED_MEDIA_TYPE),
|
||||||
|
|
||||||
// error related to backup
|
// error related to dump
|
||||||
BackupAlreadyInProgress => ErrCode::invalid("backup_already_in_progress", StatusCode::CONFLICT),
|
DumpAlreadyInProgress => ErrCode::invalid("dump_already_in_progress", StatusCode::CONFLICT),
|
||||||
BackupProcessFailed => ErrCode::internal("backup_process_failed", StatusCode::INTERNAL_SERVER_ERROR),
|
DumpProcessFailed => ErrCode::internal("dump_process_failed", StatusCode::INTERNAL_SERVER_ERROR),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,8 +26,8 @@ impl Deref for Data {
|
|||||||
pub struct DataInner {
|
pub struct DataInner {
|
||||||
pub db: Arc<Database>,
|
pub db: Arc<Database>,
|
||||||
pub db_path: String,
|
pub db_path: String,
|
||||||
pub backup_folder: PathBuf,
|
pub dumps_folder: PathBuf,
|
||||||
pub backup_batch_size: usize,
|
pub dump_batch_size: usize,
|
||||||
pub api_keys: ApiKeys,
|
pub api_keys: ApiKeys,
|
||||||
pub server_pid: u32,
|
pub server_pid: u32,
|
||||||
pub http_payload_size_limit: usize,
|
pub http_payload_size_limit: usize,
|
||||||
@ -60,8 +60,8 @@ impl ApiKeys {
|
|||||||
impl Data {
|
impl Data {
|
||||||
pub fn new(opt: Opt) -> Result<Data, Box<dyn Error>> {
|
pub fn new(opt: Opt) -> Result<Data, Box<dyn Error>> {
|
||||||
let db_path = opt.db_path.clone();
|
let db_path = opt.db_path.clone();
|
||||||
let backup_folder = opt.backup_folder.clone();
|
let dumps_folder = opt.dumps_folder.clone();
|
||||||
let backup_batch_size = opt.backup_batch_size;
|
let dump_batch_size = opt.dump_batch_size;
|
||||||
let server_pid = std::process::id();
|
let server_pid = std::process::id();
|
||||||
|
|
||||||
let db_opt = DatabaseOptions {
|
let db_opt = DatabaseOptions {
|
||||||
@ -84,8 +84,8 @@ impl Data {
|
|||||||
let inner_data = DataInner {
|
let inner_data = DataInner {
|
||||||
db: db.clone(),
|
db: db.clone(),
|
||||||
db_path,
|
db_path,
|
||||||
backup_folder,
|
dumps_folder,
|
||||||
backup_batch_size,
|
dump_batch_size,
|
||||||
api_keys,
|
api_keys,
|
||||||
server_pid,
|
server_pid,
|
||||||
http_payload_size_limit,
|
http_payload_size_limit,
|
||||||
|
@ -7,7 +7,7 @@ use std::thread;
|
|||||||
use actix_web::web;
|
use actix_web::web;
|
||||||
use chrono::offset::Utc;
|
use chrono::offset::Utc;
|
||||||
use indexmap::IndexMap;
|
use indexmap::IndexMap;
|
||||||
use log::error;
|
use log::{error, info};
|
||||||
use meilisearch_core::{MainWriter, MainReader, UpdateReader};
|
use meilisearch_core::{MainWriter, MainReader, UpdateReader};
|
||||||
use meilisearch_core::settings::Settings;
|
use meilisearch_core::settings::Settings;
|
||||||
use meilisearch_core::update::{apply_settings_update, apply_documents_addition};
|
use meilisearch_core::update::{apply_settings_update, apply_documents_addition};
|
||||||
@ -21,37 +21,37 @@ use crate::helpers::compression;
|
|||||||
use crate::routes::index;
|
use crate::routes::index;
|
||||||
use crate::routes::index::IndexResponse;
|
use crate::routes::index::IndexResponse;
|
||||||
|
|
||||||
// Mutex to share backup progress.
|
// Mutex to share dump progress.
|
||||||
static BACKUP_INFO: Lazy<Mutex<Option<BackupInfo>>> = Lazy::new(Mutex::default);
|
static DUMP_INFO: Lazy<Mutex<Option<DumpInfo>>> = Lazy::new(Mutex::default);
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||||
enum BackupVersion {
|
enum DumpVersion {
|
||||||
V1,
|
V1,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BackupVersion {
|
impl DumpVersion {
|
||||||
const CURRENT: Self = Self::V1;
|
const CURRENT: Self = Self::V1;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct BackupMetadata {
|
pub struct DumpMetadata {
|
||||||
indexes: Vec<crate::routes::index::IndexResponse>,
|
indexes: Vec<crate::routes::index::IndexResponse>,
|
||||||
db_version: String,
|
db_version: String,
|
||||||
backup_version: BackupVersion,
|
dump_version: DumpVersion,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BackupMetadata {
|
impl DumpMetadata {
|
||||||
/// Create a BackupMetadata with the current backup version of meilisearch.
|
/// Create a DumpMetadata with the current dump version of meilisearch.
|
||||||
pub fn new(indexes: Vec<crate::routes::index::IndexResponse>, db_version: String) -> Self {
|
pub fn new(indexes: Vec<crate::routes::index::IndexResponse>, db_version: String) -> Self {
|
||||||
BackupMetadata {
|
DumpMetadata {
|
||||||
indexes,
|
indexes,
|
||||||
db_version,
|
db_version,
|
||||||
backup_version: BackupVersion::CURRENT,
|
dump_version: DumpVersion::CURRENT,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract BackupMetadata from `metadata.json` file present at provided `folder_path`
|
/// Extract DumpMetadata from `metadata.json` file present at provided `folder_path`
|
||||||
fn from_path(folder_path: &Path) -> Result<Self, Error> {
|
fn from_path(folder_path: &Path) -> Result<Self, Error> {
|
||||||
let path = folder_path.join("metadata.json");
|
let path = folder_path.join("metadata.json");
|
||||||
let file = File::open(path)?;
|
let file = File::open(path)?;
|
||||||
@ -61,7 +61,7 @@ impl BackupMetadata {
|
|||||||
Ok(metadata)
|
Ok(metadata)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write BackupMetadata in `metadata.json` file at provided `folder_path`
|
/// Write DumpMetadata in `metadata.json` file at provided `folder_path`
|
||||||
fn to_path(&self, folder_path: &Path) -> Result<(), Error> {
|
fn to_path(&self, folder_path: &Path) -> Result<(), Error> {
|
||||||
let path = folder_path.join("metadata.json");
|
let path = folder_path.join("metadata.json");
|
||||||
let file = File::create(path)?;
|
let file = File::create(path)?;
|
||||||
@ -92,10 +92,10 @@ fn settings_to_path(settings: &Settings, folder_path: &Path) -> Result<(), Error
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Import settings and documents of a backup with version `BackupVersion::V1` in specified index.
|
/// Import settings and documents of a dump with version `DumpVersion::V1` in specified index.
|
||||||
fn import_index_v1(
|
fn import_index_v1(
|
||||||
data: &Data,
|
data: &Data,
|
||||||
backup_folder: &Path,
|
dumps_folder: &Path,
|
||||||
index_uid: &str,
|
index_uid: &str,
|
||||||
document_batch_size: usize,
|
document_batch_size: usize,
|
||||||
write_txn: &mut MainWriter,
|
write_txn: &mut MainWriter,
|
||||||
@ -107,12 +107,12 @@ fn import_index_v1(
|
|||||||
.open_index(index_uid)
|
.open_index(index_uid)
|
||||||
.ok_or(Error::index_not_found(index_uid))?;
|
.ok_or(Error::index_not_found(index_uid))?;
|
||||||
|
|
||||||
// index folder path in backup folder
|
// index folder path in dump folder
|
||||||
let index_path = &backup_folder.join(index_uid);
|
let index_path = &dumps_folder.join(index_uid);
|
||||||
|
|
||||||
// extract `settings.json` file and import content
|
// extract `settings.json` file and import content
|
||||||
let settings = settings_from_path(&index_path)?;
|
let settings = settings_from_path(&index_path)?;
|
||||||
let settings = settings.to_update().or_else(|_e| Err(Error::backup_failed()))?;
|
let settings = settings.to_update().or_else(|_e| Err(Error::dump_failed()))?;
|
||||||
apply_settings_update(write_txn, &index, settings)?;
|
apply_settings_update(write_txn, &index, settings)?;
|
||||||
|
|
||||||
// create iterator over documents in `documents.jsonl` to make batch importation
|
// create iterator over documents in `documents.jsonl` to make batch importation
|
||||||
@ -143,28 +143,35 @@ fn import_index_v1(
|
|||||||
apply_documents_addition(write_txn, &index, values)?;
|
apply_documents_addition(write_txn, &index, values)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sync index information: stats, updated_at, last_update
|
||||||
|
if let Err(e) = crate::index_update_callback_txn(index, index_uid, data, write_txn) {
|
||||||
|
return Err(Error::Internal(e));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Import backup from `backup_folder` in database.
|
/// Import dump from `dump_path` in database.
|
||||||
pub fn import_backup(
|
pub fn import_dump(
|
||||||
data: &Data,
|
data: &Data,
|
||||||
backup_folder: &Path,
|
dump_path: &Path,
|
||||||
document_batch_size: usize,
|
document_batch_size: usize,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
info!("Importing dump from {:?}...", dump_path);
|
||||||
|
|
||||||
// create a temporary directory
|
// create a temporary directory
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let tmp_dir_path = tmp_dir.path();
|
let tmp_dir_path = tmp_dir.path();
|
||||||
|
|
||||||
// extract backup in temporary directory
|
// extract dump in temporary directory
|
||||||
compression::from_tar_gz(backup_folder, tmp_dir_path)?;
|
compression::from_tar_gz(dump_path, tmp_dir_path)?;
|
||||||
|
|
||||||
// read backup metadata
|
// read dump metadata
|
||||||
let metadata = BackupMetadata::from_path(&tmp_dir_path)?;
|
let metadata = DumpMetadata::from_path(&tmp_dir_path)?;
|
||||||
|
|
||||||
// choose importation function from BackupVersion of metadata
|
// choose importation function from DumpVersion of metadata
|
||||||
let import_index = match metadata.backup_version {
|
let import_index = match metadata.dump_version {
|
||||||
BackupVersion::V1 => import_index_v1,
|
DumpVersion::V1 => import_index_v1,
|
||||||
};
|
};
|
||||||
|
|
||||||
// remove indexes which have same `uid` than indexes to import and create empty indexes
|
// remove indexes which have same `uid` than indexes to import and create empty indexes
|
||||||
@ -184,78 +191,79 @@ pub fn import_backup(
|
|||||||
Ok(())
|
Ok(())
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
info!("Dump importation from {:?} succeed", dump_path);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
|
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "snake_case")]
|
||||||
pub enum BackupStatus {
|
pub enum DumpStatus {
|
||||||
Done,
|
Done,
|
||||||
Processing,
|
Processing,
|
||||||
BackupProcessFailed,
|
DumpProcessFailed,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct BackupInfo {
|
pub struct DumpInfo {
|
||||||
pub uid: String,
|
pub uid: String,
|
||||||
pub status: BackupStatus,
|
pub status: DumpStatus,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub error: Option<String>,
|
pub error: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BackupInfo {
|
impl DumpInfo {
|
||||||
pub fn new(uid: String, status: BackupStatus) -> Self {
|
pub fn new(uid: String, status: DumpStatus) -> Self {
|
||||||
Self { uid, status, error: None }
|
Self { uid, status, error: None }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_error(mut self, error: String) -> Self {
|
pub fn with_error(mut self, error: String) -> Self {
|
||||||
self.status = BackupStatus::BackupProcessFailed;
|
self.status = DumpStatus::DumpProcessFailed;
|
||||||
self.error = Some(error);
|
self.error = Some(error);
|
||||||
|
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn backup_already_in_progress(&self) -> bool {
|
pub fn dump_already_in_progress(&self) -> bool {
|
||||||
self.status == BackupStatus::Processing
|
self.status == DumpStatus::Processing
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_current() -> Option<Self> {
|
pub fn get_current() -> Option<Self> {
|
||||||
BACKUP_INFO.lock().unwrap().clone()
|
DUMP_INFO.lock().unwrap().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_current(&self) {
|
pub fn set_current(&self) {
|
||||||
*BACKUP_INFO.lock().unwrap() = Some(self.clone());
|
*DUMP_INFO.lock().unwrap() = Some(self.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate uid from creation date
|
/// Generate uid from creation date
|
||||||
fn generate_uid() -> String {
|
fn generate_uid() -> String {
|
||||||
Utc::now().format("%Y%m%d-%H%M%S").to_string()
|
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Infer backup_folder from backup_uid
|
/// Infer dumps_folder from dump_uid
|
||||||
pub fn compressed_backup_folder(backup_folder: &Path, backup_uid: &str) -> PathBuf {
|
pub fn compressed_dumps_folder(dumps_folder: &Path, dump_uid: &str) -> PathBuf {
|
||||||
backup_folder.join(format!("{}.tar.gz", backup_uid))
|
dumps_folder.join(format!("{}.tar.gz", dump_uid))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write metadata in backup
|
/// Write metadata in dump
|
||||||
fn backup_metadata(data: &web::Data<Data>, folder_path: &Path, indexes: Vec<IndexResponse>) -> Result<(), Error> {
|
fn dump_metadata(data: &web::Data<Data>, folder_path: &Path, indexes: Vec<IndexResponse>) -> Result<(), Error> {
|
||||||
let (db_major, db_minor, db_patch) = data.db.version();
|
let (db_major, db_minor, db_patch) = data.db.version();
|
||||||
let metadata = BackupMetadata::new(indexes, format!("{}.{}.{}", db_major, db_minor, db_patch));
|
let metadata = DumpMetadata::new(indexes, format!("{}.{}.{}", db_major, db_minor, db_patch));
|
||||||
|
|
||||||
metadata.to_path(folder_path)
|
metadata.to_path(folder_path)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Export settings of provided index in backup
|
/// Export settings of provided index in dump
|
||||||
fn backup_index_settings(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
|
fn dump_index_settings(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
|
||||||
let settings = crate::routes::setting::get_all_sync(data, reader, index_uid)?;
|
let settings = crate::routes::setting::get_all_sync(data, reader, index_uid)?;
|
||||||
|
|
||||||
settings_to_path(&settings, folder_path)
|
settings_to_path(&settings, folder_path)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Export updates of provided index in backup
|
/// Export updates of provided index in dump
|
||||||
fn backup_index_updates(data: &web::Data<Data>, reader: &UpdateReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
|
fn dump_index_updates(data: &web::Data<Data>, reader: &UpdateReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
|
||||||
let updates_path = folder_path.join("updates.jsonl");
|
let updates_path = folder_path.join("updates.jsonl");
|
||||||
let updates = crate::routes::index::get_all_updates_status_sync(data, reader, index_uid)?;
|
let updates = crate::routes::index::get_all_updates_status_sync(data, reader, index_uid)?;
|
||||||
|
|
||||||
@ -269,16 +277,16 @@ fn backup_index_updates(data: &web::Data<Data>, reader: &UpdateReader, folder_pa
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Export documents of provided index in backup
|
/// Export documents of provided index in dump
|
||||||
fn backup_index_documents(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
|
fn dump_index_documents(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
|
||||||
let documents_path = folder_path.join("documents.jsonl");
|
let documents_path = folder_path.join("documents.jsonl");
|
||||||
let file = File::create(documents_path)?;
|
let file = File::create(documents_path)?;
|
||||||
let backup_batch_size = data.backup_batch_size;
|
let dump_batch_size = data.dump_batch_size;
|
||||||
|
|
||||||
let mut offset = 0;
|
let mut offset = 0;
|
||||||
loop {
|
loop {
|
||||||
let documents = crate::routes::document::get_all_documents_sync(data, reader, index_uid, offset, backup_batch_size, None)?;
|
let documents = crate::routes::document::get_all_documents_sync(data, reader, index_uid, offset, dump_batch_size, None)?;
|
||||||
if documents.len() == 0 { break; } else { offset += backup_batch_size; }
|
if documents.len() == 0 { break; } else { offset += dump_batch_size; }
|
||||||
|
|
||||||
for document in documents {
|
for document in documents {
|
||||||
serde_json::to_writer(&file, &document)?;
|
serde_json::to_writer(&file, &document)?;
|
||||||
@ -290,20 +298,20 @@ fn backup_index_documents(data: &web::Data<Data>, reader: &MainReader, folder_pa
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Write error with a context.
|
/// Write error with a context.
|
||||||
fn fail_backup_process<E: std::error::Error>(backup_info: BackupInfo, context: &str, error: E) {
|
fn fail_dump_process<E: std::error::Error>(dump_info: DumpInfo, context: &str, error: E) {
|
||||||
let error = format!("Something went wrong during backup process: {}; {}", context, error);
|
let error = format!("Something went wrong during dump process: {}; {}", context, error);
|
||||||
|
|
||||||
error!("{}", &error);
|
error!("{}", &error);
|
||||||
backup_info.with_error(error).set_current();
|
dump_info.with_error(error).set_current();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Main function of backup.
|
/// Main function of dump.
|
||||||
fn backup_process(data: web::Data<Data>, backup_folder: PathBuf, backup_info: BackupInfo) {
|
fn dump_process(data: web::Data<Data>, dumps_folder: PathBuf, dump_info: DumpInfo) {
|
||||||
// open read transaction on Update
|
// open read transaction on Update
|
||||||
let update_reader = match data.db.update_read_txn() {
|
let update_reader = match data.db.update_read_txn() {
|
||||||
Ok(r) => r,
|
Ok(r) => r,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
fail_backup_process(backup_info, "creating RO transaction on updates", e);
|
fail_dump_process(dump_info, "creating RO transaction on updates", e);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -312,7 +320,7 @@ fn backup_process(data: web::Data<Data>, backup_folder: PathBuf, backup_info: Ba
|
|||||||
let main_reader = match data.db.main_read_txn() {
|
let main_reader = match data.db.main_read_txn() {
|
||||||
Ok(r) => r,
|
Ok(r) => r,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
fail_backup_process(backup_info, "creating RO transaction on main", e);
|
fail_dump_process(dump_info, "creating RO transaction on main", e);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -321,7 +329,7 @@ fn backup_process(data: web::Data<Data>, backup_folder: PathBuf, backup_info: Ba
|
|||||||
let tmp_dir = match TempDir::new() {
|
let tmp_dir = match TempDir::new() {
|
||||||
Ok(tmp_dir) => tmp_dir,
|
Ok(tmp_dir) => tmp_dir,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
fail_backup_process(backup_info, "creating temporary directory", e);
|
fail_dump_process(dump_info, "creating temporary directory", e);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -331,14 +339,14 @@ fn backup_process(data: web::Data<Data>, backup_folder: PathBuf, backup_info: Ba
|
|||||||
let indexes = match crate::routes::index::list_indexes_sync(&data, &main_reader) {
|
let indexes = match crate::routes::index::list_indexes_sync(&data, &main_reader) {
|
||||||
Ok(indexes) => indexes,
|
Ok(indexes) => indexes,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
fail_backup_process(backup_info, "listing indexes", e);
|
fail_dump_process(dump_info, "listing indexes", e);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// create metadata
|
// create metadata
|
||||||
if let Err(e) = backup_metadata(&data, &tmp_dir_path, indexes.clone()) {
|
if let Err(e) = dump_metadata(&data, &tmp_dir_path, indexes.clone()) {
|
||||||
fail_backup_process(backup_info, "generating metadata", e);
|
fail_dump_process(dump_info, "generating metadata", e);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -348,68 +356,68 @@ fn backup_process(data: web::Data<Data>, backup_folder: PathBuf, backup_info: Ba
|
|||||||
|
|
||||||
// create index sub-dircetory
|
// create index sub-dircetory
|
||||||
if let Err(e) = create_dir_all(&index_path) {
|
if let Err(e) = create_dir_all(&index_path) {
|
||||||
fail_backup_process(backup_info, &format!("creating directory for index {}", &index.uid), e);
|
fail_dump_process(dump_info, &format!("creating directory for index {}", &index.uid), e);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
|
|
||||||
// export settings
|
// export settings
|
||||||
if let Err(e) = backup_index_settings(&data, &main_reader, &index_path, &index.uid) {
|
if let Err(e) = dump_index_settings(&data, &main_reader, &index_path, &index.uid) {
|
||||||
fail_backup_process(backup_info, &format!("generating settings for index {}", &index.uid), e);
|
fail_dump_process(dump_info, &format!("generating settings for index {}", &index.uid), e);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
|
|
||||||
// export documents
|
// export documents
|
||||||
if let Err(e) = backup_index_documents(&data, &main_reader, &index_path, &index.uid) {
|
if let Err(e) = dump_index_documents(&data, &main_reader, &index_path, &index.uid) {
|
||||||
fail_backup_process(backup_info, &format!("generating documents for index {}", &index.uid), e);
|
fail_dump_process(dump_info, &format!("generating documents for index {}", &index.uid), e);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
|
|
||||||
// export updates
|
// export updates
|
||||||
if let Err(e) = backup_index_updates(&data, &update_reader, &index_path, &index.uid) {
|
if let Err(e) = dump_index_updates(&data, &update_reader, &index_path, &index.uid) {
|
||||||
fail_backup_process(backup_info, &format!("generating updates for index {}", &index.uid), e);
|
fail_dump_process(dump_info, &format!("generating updates for index {}", &index.uid), e);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// compress backup in a file named `{backup_uid}.tar.gz` in `backup_folder`
|
// compress dump in a file named `{dump_uid}.tar.gz` in `dumps_folder`
|
||||||
if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_backup_folder(&backup_folder, &backup_info.uid)) {
|
if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_dumps_folder(&dumps_folder, &dump_info.uid)) {
|
||||||
fail_backup_process(backup_info, "compressing backup", e);
|
fail_dump_process(dump_info, "compressing dump", e);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update backup info to `done`
|
// update dump info to `done`
|
||||||
let resume = BackupInfo::new(
|
let resume = DumpInfo::new(
|
||||||
backup_info.uid,
|
dump_info.uid,
|
||||||
BackupStatus::Done
|
DumpStatus::Done
|
||||||
);
|
);
|
||||||
|
|
||||||
resume.set_current();
|
resume.set_current();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn init_backup_process(data: &web::Data<Data>, backup_folder: &Path) -> Result<BackupInfo, Error> {
|
pub fn init_dump_process(data: &web::Data<Data>, dumps_folder: &Path) -> Result<DumpInfo, Error> {
|
||||||
create_dir_all(backup_folder).or(Err(Error::backup_failed()))?;
|
create_dir_all(dumps_folder).or(Err(Error::dump_failed()))?;
|
||||||
|
|
||||||
// check if a backup is already in progress
|
// check if a dump is already in progress
|
||||||
if let Some(resume) = BackupInfo::get_current() {
|
if let Some(resume) = DumpInfo::get_current() {
|
||||||
if resume.backup_already_in_progress() {
|
if resume.dump_already_in_progress() {
|
||||||
return Err(Error::backup_conflict())
|
return Err(Error::dump_conflict())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// generate a new backup info
|
// generate a new dump info
|
||||||
let info = BackupInfo::new(
|
let info = DumpInfo::new(
|
||||||
generate_uid(),
|
generate_uid(),
|
||||||
BackupStatus::Processing
|
DumpStatus::Processing
|
||||||
);
|
);
|
||||||
|
|
||||||
info.set_current();
|
info.set_current();
|
||||||
|
|
||||||
let data = data.clone();
|
let data = data.clone();
|
||||||
let backup_folder = backup_folder.to_path_buf();
|
let dumps_folder = dumps_folder.to_path_buf();
|
||||||
let info_cloned = info.clone();
|
let info_cloned = info.clone();
|
||||||
// run backup process in a new thread
|
// run dump process in a new thread
|
||||||
thread::spawn(move ||
|
thread::spawn(move ||
|
||||||
backup_process(data, backup_folder, info_cloned)
|
dump_process(data, dumps_folder, info_cloned)
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(info)
|
Ok(info)
|
@ -53,8 +53,8 @@ pub enum Error {
|
|||||||
SearchDocuments(String),
|
SearchDocuments(String),
|
||||||
PayloadTooLarge,
|
PayloadTooLarge,
|
||||||
UnsupportedMediaType,
|
UnsupportedMediaType,
|
||||||
BackupAlreadyInProgress,
|
DumpAlreadyInProgress,
|
||||||
BackupProcessFailed,
|
DumpProcessFailed,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl error::Error for Error {}
|
impl error::Error for Error {}
|
||||||
@ -80,8 +80,8 @@ impl ErrorCode for Error {
|
|||||||
SearchDocuments(_) => Code::SearchDocuments,
|
SearchDocuments(_) => Code::SearchDocuments,
|
||||||
PayloadTooLarge => Code::PayloadTooLarge,
|
PayloadTooLarge => Code::PayloadTooLarge,
|
||||||
UnsupportedMediaType => Code::UnsupportedMediaType,
|
UnsupportedMediaType => Code::UnsupportedMediaType,
|
||||||
BackupAlreadyInProgress => Code::BackupAlreadyInProgress,
|
DumpAlreadyInProgress => Code::DumpAlreadyInProgress,
|
||||||
BackupProcessFailed => Code::BackupProcessFailed,
|
DumpProcessFailed => Code::DumpProcessFailed,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -185,12 +185,12 @@ impl Error {
|
|||||||
Error::SearchDocuments(err.to_string())
|
Error::SearchDocuments(err.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn backup_conflict() -> Error {
|
pub fn dump_conflict() -> Error {
|
||||||
Error::BackupAlreadyInProgress
|
Error::DumpAlreadyInProgress
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn backup_failed() -> Error {
|
pub fn dump_failed() -> Error {
|
||||||
Error::BackupProcessFailed
|
Error::DumpProcessFailed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,8 +214,8 @@ impl fmt::Display for Error {
|
|||||||
Self::SearchDocuments(err) => write!(f, "Impossible to search documents; {}", err),
|
Self::SearchDocuments(err) => write!(f, "Impossible to search documents; {}", err),
|
||||||
Self::PayloadTooLarge => f.write_str("Payload too large"),
|
Self::PayloadTooLarge => f.write_str("Payload too large"),
|
||||||
Self::UnsupportedMediaType => f.write_str("Unsupported media type"),
|
Self::UnsupportedMediaType => f.write_str("Unsupported media type"),
|
||||||
Self::BackupAlreadyInProgress => f.write_str("Another backup is already in progress"),
|
Self::DumpAlreadyInProgress => f.write_str("Another dump is already in progress"),
|
||||||
Self::BackupProcessFailed => f.write_str("Backup process failed"),
|
Self::DumpProcessFailed => f.write_str("Dump process failed"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@ pub mod option;
|
|||||||
pub mod routes;
|
pub mod routes;
|
||||||
pub mod analytics;
|
pub mod analytics;
|
||||||
pub mod snapshot;
|
pub mod snapshot;
|
||||||
pub mod backup;
|
pub mod dump;
|
||||||
|
|
||||||
use actix_http::Error;
|
use actix_http::Error;
|
||||||
use actix_service::ServiceFactory;
|
use actix_service::ServiceFactory;
|
||||||
@ -16,7 +16,7 @@ use actix_web::{dev, web, App};
|
|||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use log::error;
|
use log::error;
|
||||||
|
|
||||||
use meilisearch_core::ProcessedUpdateResult;
|
use meilisearch_core::{Index, MainWriter, ProcessedUpdateResult};
|
||||||
|
|
||||||
pub use option::Opt;
|
pub use option::Opt;
|
||||||
pub use self::data::Data;
|
pub use self::data::Data;
|
||||||
@ -57,7 +57,23 @@ pub fn create_app(
|
|||||||
.configure(routes::health::services)
|
.configure(routes::health::services)
|
||||||
.configure(routes::stats::services)
|
.configure(routes::stats::services)
|
||||||
.configure(routes::key::services)
|
.configure(routes::key::services)
|
||||||
.configure(routes::backup::services)
|
.configure(routes::dump::services)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn index_update_callback_txn(index: Index, index_uid: &str, data: &Data, mut writer: &mut MainWriter) -> Result<(), String> {
|
||||||
|
if let Err(e) = data.db.compute_stats(&mut writer, index_uid) {
|
||||||
|
return Err(format!("Impossible to compute stats; {}", e));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = data.db.set_last_update(&mut writer, &Utc::now()) {
|
||||||
|
return Err(format!("Impossible to update last_update; {}", e));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = index.main.put_updated_at(&mut writer) {
|
||||||
|
return Err(format!("Impossible to update updated_at; {}", e));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn index_update_callback(index_uid: &str, data: &Data, status: ProcessedUpdateResult) {
|
pub fn index_update_callback(index_uid: &str, data: &Data, status: ProcessedUpdateResult) {
|
||||||
@ -65,20 +81,13 @@ pub fn index_update_callback(index_uid: &str, data: &Data, status: ProcessedUpda
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(index) = data.db.open_index(&index_uid) {
|
if let Some(index) = data.db.open_index(index_uid) {
|
||||||
let db = &data.db;
|
let db = &data.db;
|
||||||
let res = db.main_write::<_, _, ResponseError>(|mut writer| {
|
let res = db.main_write::<_, _, ResponseError>(|mut writer| {
|
||||||
if let Err(e) = data.db.compute_stats(&mut writer, &index_uid) {
|
if let Err(e) = index_update_callback_txn(index, index_uid, data, &mut writer) {
|
||||||
error!("Impossible to compute stats; {}", e)
|
error!("{}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = data.db.set_last_update(&mut writer, &Utc::now()) {
|
|
||||||
error!("Impossible to update last_update; {}", e)
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Err(e) = index.main.put_updated_at(&mut writer) {
|
|
||||||
error!("Impossible to update updated_at; {}", e)
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
match res {
|
match res {
|
||||||
|
@ -6,7 +6,7 @@ use main_error::MainError;
|
|||||||
use meilisearch_http::helpers::NormalizePath;
|
use meilisearch_http::helpers::NormalizePath;
|
||||||
use meilisearch_http::{create_app, index_update_callback, Data, Opt};
|
use meilisearch_http::{create_app, index_update_callback, Data, Opt};
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
use meilisearch_http::{snapshot, backup};
|
use meilisearch_http::{snapshot, dump};
|
||||||
|
|
||||||
mod analytics;
|
mod analytics;
|
||||||
|
|
||||||
@ -70,8 +70,8 @@ async fn main() -> Result<(), MainError> {
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
|
|
||||||
if let Some(path) = &opt.import_backup {
|
if let Some(path) = &opt.import_dump {
|
||||||
backup::import_backup(&data, path, opt.backup_batch_size)?;
|
dump::import_dump(&data, path, opt.dump_batch_size)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(path) = &opt.snapshot_path {
|
if let Some(path) = &opt.snapshot_path {
|
||||||
|
@ -116,17 +116,17 @@ pub struct Opt {
|
|||||||
#[structopt(long, requires = "snapshot-path", env = "MEILI_SNAPSHOT_INTERVAL_SEC")]
|
#[structopt(long, requires = "snapshot-path", env = "MEILI_SNAPSHOT_INTERVAL_SEC")]
|
||||||
pub snapshot_interval_sec: Option<u64>,
|
pub snapshot_interval_sec: Option<u64>,
|
||||||
|
|
||||||
/// Folder where backups are created when the backup route is called.
|
/// Folder where dumps are created when the dump route is called.
|
||||||
#[structopt(long, env = "MEILI_backup_folder", default_value = "backups/")]
|
#[structopt(long, env = "MEILI_DUMPS_FOLDER", default_value = "dumps/")]
|
||||||
pub backup_folder: PathBuf,
|
pub dumps_folder: PathBuf,
|
||||||
|
|
||||||
/// Import a backup from the specified path, must be a `.tar.gz` file.
|
/// Import a dump from the specified path, must be a `.tar.gz` file.
|
||||||
#[structopt(long, env = "MEILI_IMPORT_BACKUP", conflicts_with = "load-from-snapshot")]
|
#[structopt(long, env = "MEILI_IMPORT_DUMP", conflicts_with = "load-from-snapshot")]
|
||||||
pub import_backup: Option<PathBuf>,
|
pub import_dump: Option<PathBuf>,
|
||||||
|
|
||||||
/// The batch size used in the importation process, the bigger it is the faster the backup is created.
|
/// The batch size used in the importation process, the bigger it is the faster the dump is created.
|
||||||
#[structopt(long, env = "MEILI_BACKUP_BATCH_SIZE", default_value = "1024")]
|
#[structopt(long, env = "MEILI_DUMP_BATCH_SIZE", default_value = "1024")]
|
||||||
pub backup_batch_size: usize,
|
pub dump_batch_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Opt {
|
impl Opt {
|
||||||
|
@ -1,64 +0,0 @@
|
|||||||
use std::fs::File;
|
|
||||||
use std::path::Path;
|
|
||||||
|
|
||||||
use actix_web::{get, post};
|
|
||||||
use actix_web::{HttpResponse, web};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
use crate::backup::{BackupInfo, BackupStatus, compressed_backup_folder, init_backup_process};
|
|
||||||
use crate::Data;
|
|
||||||
use crate::error::{Error, ResponseError};
|
|
||||||
use crate::helpers::Authentication;
|
|
||||||
|
|
||||||
pub fn services(cfg: &mut web::ServiceConfig) {
|
|
||||||
cfg.service(trigger_backup)
|
|
||||||
.service(get_backup_status);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[post("/backups", wrap = "Authentication::Private")]
|
|
||||||
async fn trigger_backup(
|
|
||||||
data: web::Data<Data>,
|
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
|
||||||
let backup_folder = Path::new(&data.backup_folder);
|
|
||||||
match init_backup_process(&data, &backup_folder) {
|
|
||||||
Ok(resume) => Ok(HttpResponse::Accepted().json(resume)),
|
|
||||||
Err(e) => Err(e.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct BackupStatusResponse {
|
|
||||||
status: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct BackupParam {
|
|
||||||
backup_uid: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[get("/backups/{backup_uid}/status", wrap = "Authentication::Private")]
|
|
||||||
async fn get_backup_status(
|
|
||||||
data: web::Data<Data>,
|
|
||||||
path: web::Path<BackupParam>,
|
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
|
||||||
let backup_folder = Path::new(&data.backup_folder);
|
|
||||||
let backup_uid = &path.backup_uid;
|
|
||||||
|
|
||||||
if let Some(resume) = BackupInfo::get_current() {
|
|
||||||
if &resume.uid == backup_uid {
|
|
||||||
return Ok(HttpResponse::Ok().json(resume));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if File::open(compressed_backup_folder(Path::new(backup_folder), backup_uid)).is_ok() {
|
|
||||||
let resume = BackupInfo::new(
|
|
||||||
backup_uid.into(),
|
|
||||||
BackupStatus::Done
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(resume))
|
|
||||||
} else {
|
|
||||||
Err(Error::not_found("backup does not exist").into())
|
|
||||||
}
|
|
||||||
}
|
|
64
meilisearch-http/src/routes/dump.rs
Normal file
64
meilisearch-http/src/routes/dump.rs
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
use std::fs::File;
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
use actix_web::{get, post};
|
||||||
|
use actix_web::{HttpResponse, web};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use crate::dump::{DumpInfo, DumpStatus, compressed_dumps_folder, init_dump_process};
|
||||||
|
use crate::Data;
|
||||||
|
use crate::error::{Error, ResponseError};
|
||||||
|
use crate::helpers::Authentication;
|
||||||
|
|
||||||
|
pub fn services(cfg: &mut web::ServiceConfig) {
|
||||||
|
cfg.service(trigger_dump)
|
||||||
|
.service(get_dump_status);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[post("/dumps", wrap = "Authentication::Private")]
|
||||||
|
async fn trigger_dump(
|
||||||
|
data: web::Data<Data>,
|
||||||
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
|
let dumps_folder = Path::new(&data.dumps_folder);
|
||||||
|
match init_dump_process(&data, &dumps_folder) {
|
||||||
|
Ok(resume) => Ok(HttpResponse::Accepted().json(resume)),
|
||||||
|
Err(e) => Err(e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct DumpStatusResponse {
|
||||||
|
status: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct DumpParam {
|
||||||
|
dump_uid: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[get("/dumps/{dump_uid}/status", wrap = "Authentication::Private")]
|
||||||
|
async fn get_dump_status(
|
||||||
|
data: web::Data<Data>,
|
||||||
|
path: web::Path<DumpParam>,
|
||||||
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
|
let dumps_folder = Path::new(&data.dumps_folder);
|
||||||
|
let dump_uid = &path.dump_uid;
|
||||||
|
|
||||||
|
if let Some(resume) = DumpInfo::get_current() {
|
||||||
|
if &resume.uid == dump_uid {
|
||||||
|
return Ok(HttpResponse::Ok().json(resume));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if File::open(compressed_dumps_folder(Path::new(dumps_folder), dump_uid)).is_ok() {
|
||||||
|
let resume = DumpInfo::new(
|
||||||
|
dump_uid.into(),
|
||||||
|
DumpStatus::Done
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(HttpResponse::Ok().json(resume))
|
||||||
|
} else {
|
||||||
|
Err(Error::not_found("dump does not exist").into())
|
||||||
|
}
|
||||||
|
}
|
@ -10,7 +10,7 @@ pub mod setting;
|
|||||||
pub mod stats;
|
pub mod stats;
|
||||||
pub mod stop_words;
|
pub mod stop_words;
|
||||||
pub mod synonym;
|
pub mod synonym;
|
||||||
pub mod backup;
|
pub mod dump;
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct IndexParam {
|
pub struct IndexParam {
|
||||||
|
@ -8,5 +8,5 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"dbVersion": "0.13.0",
|
"dbVersion": "0.13.0",
|
||||||
"backupVersion": "1"
|
"dumpVersion": "1"
|
||||||
}
|
}
|
@ -40,8 +40,8 @@ impl Server {
|
|||||||
|
|
||||||
let opt = Opt {
|
let opt = Opt {
|
||||||
db_path: tmp_dir.path().join("db").to_str().unwrap().to_string(),
|
db_path: tmp_dir.path().join("db").to_str().unwrap().to_string(),
|
||||||
backup_folder: tmp_dir.path().join("backup"),
|
dumps_folder: tmp_dir.path().join("dump"),
|
||||||
backup_batch_size: 16,
|
dump_batch_size: 16,
|
||||||
http_addr: "127.0.0.1:7700".to_owned(),
|
http_addr: "127.0.0.1:7700".to_owned(),
|
||||||
master_key: None,
|
master_key: None,
|
||||||
env: "development".to_owned(),
|
env: "development".to_owned(),
|
||||||
@ -489,17 +489,17 @@ impl Server {
|
|||||||
self.get_request("/sys-info/pretty").await
|
self.get_request("/sys-info/pretty").await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn trigger_backup(&self) -> (Value, StatusCode) {
|
pub async fn trigger_dump(&self) -> (Value, StatusCode) {
|
||||||
self.post_request("/backups", Value::Null).await
|
self.post_request("/dumps", Value::Null).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_backup_status(&mut self, backup_uid: &str) -> (Value, StatusCode) {
|
pub async fn get_dump_status(&mut self, dump_uid: &str) -> (Value, StatusCode) {
|
||||||
let url = format!("/backups/{}/status", backup_uid);
|
let url = format!("/dumps/{}/status", dump_uid);
|
||||||
self.get_request(&url).await
|
self.get_request(&url).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn trigger_backup_importation(&mut self, backup_uid: &str) -> (Value, StatusCode) {
|
pub async fn trigger_dump_importation(&mut self, dump_uid: &str) -> (Value, StatusCode) {
|
||||||
let url = format!("/backups/{}/import", backup_uid);
|
let url = format!("/dumps/{}/import", dump_uid);
|
||||||
self.get_request(&url).await
|
self.get_request(&url).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,24 +9,24 @@ use tempfile::TempDir;
|
|||||||
|
|
||||||
#[macro_use] mod common;
|
#[macro_use] mod common;
|
||||||
|
|
||||||
async fn trigger_and_wait_backup(server: &mut common::Server) -> String {
|
async fn trigger_and_wait_dump(server: &mut common::Server) -> String {
|
||||||
let (value, status_code) = server.trigger_backup().await;
|
let (value, status_code) = server.trigger_dump().await;
|
||||||
|
|
||||||
assert_eq!(status_code, 202);
|
assert_eq!(status_code, 202);
|
||||||
|
|
||||||
let backup_uid = value["uid"].as_str().unwrap().to_string();
|
let dump_uid = value["uid"].as_str().unwrap().to_string();
|
||||||
|
|
||||||
for _ in 0..20 as u8 {
|
for _ in 0..20 as u8 {
|
||||||
let (value, status_code) = server.get_backup_status(&backup_uid).await;
|
let (value, status_code) = server.get_dump_status(&dump_uid).await;
|
||||||
|
|
||||||
assert_eq!(status_code, 200);
|
assert_eq!(status_code, 200);
|
||||||
assert_ne!(value["status"].as_str(), Some("backup_process_failed"));
|
assert_ne!(value["status"].as_str(), Some("dump_process_failed"));
|
||||||
|
|
||||||
if value["status"].as_str() == Some("done") { return backup_uid }
|
if value["status"].as_str() == Some("done") { return dump_uid }
|
||||||
thread::sleep(Duration::from_millis(100));
|
thread::sleep(Duration::from_millis(100));
|
||||||
}
|
}
|
||||||
|
|
||||||
unreachable!("backup creation runned out of time")
|
unreachable!("dump creation runned out of time")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn current_db_version() -> (String, String, String) {
|
fn current_db_version() -> (String, String, String) {
|
||||||
@ -37,7 +37,7 @@ fn current_db_version() -> (String, String, String) {
|
|||||||
(current_version_major, current_version_minor, current_version_patch)
|
(current_version_major, current_version_minor, current_version_patch)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn current_backup_version() -> String {
|
fn current_dump_version() -> String {
|
||||||
"V1".into()
|
"V1".into()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -50,31 +50,31 @@ fn read_all_jsonline<R: std::io::Read>(r: R) -> Value {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
async fn trigger_backup_should_return_ok() {
|
async fn trigger_dump_should_return_ok() {
|
||||||
let server = common::Server::test_server().await;
|
let server = common::Server::test_server().await;
|
||||||
|
|
||||||
let (_, status_code) = server.trigger_backup().await;
|
let (_, status_code) = server.trigger_dump().await;
|
||||||
|
|
||||||
assert_eq!(status_code, 202);
|
assert_eq!(status_code, 202);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
async fn trigger_backup_twice_should_return_conflict() {
|
async fn trigger_dump_twice_should_return_conflict() {
|
||||||
let server = common::Server::test_server().await;
|
let server = common::Server::test_server().await;
|
||||||
|
|
||||||
let expected = json!({
|
let expected = json!({
|
||||||
"message": "Another backup is already in progress",
|
"message": "Another dump is already in progress",
|
||||||
"errorCode": "backup_already_in_progress",
|
"errorCode": "dump_already_in_progress",
|
||||||
"errorType": "invalid_request_error",
|
"errorType": "invalid_request_error",
|
||||||
"errorLink": "https://docs.meilisearch.com/errors#backup_already_in_progress"
|
"errorLink": "https://docs.meilisearch.com/errors#dump_already_in_progress"
|
||||||
});
|
});
|
||||||
|
|
||||||
let (_, status_code) = server.trigger_backup().await;
|
let (_, status_code) = server.trigger_dump().await;
|
||||||
|
|
||||||
assert_eq!(status_code, 202);
|
assert_eq!(status_code, 202);
|
||||||
|
|
||||||
let (value, status_code) = server.trigger_backup().await;
|
let (value, status_code) = server.trigger_dump().await;
|
||||||
|
|
||||||
|
|
||||||
assert_json_eq!(expected.clone(), value.clone(), ordered: false);
|
assert_json_eq!(expected.clone(), value.clone(), ordered: false);
|
||||||
@ -83,17 +83,17 @@ async fn trigger_backup_twice_should_return_conflict() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
async fn trigger_backup_concurently_should_return_conflict() {
|
async fn trigger_dump_concurently_should_return_conflict() {
|
||||||
let server = common::Server::test_server().await;
|
let server = common::Server::test_server().await;
|
||||||
|
|
||||||
let expected = json!({
|
let expected = json!({
|
||||||
"message": "Another backup is already in progress",
|
"message": "Another dump is already in progress",
|
||||||
"errorCode": "backup_already_in_progress",
|
"errorCode": "dump_already_in_progress",
|
||||||
"errorType": "invalid_request_error",
|
"errorType": "invalid_request_error",
|
||||||
"errorLink": "https://docs.meilisearch.com/errors#backup_already_in_progress"
|
"errorLink": "https://docs.meilisearch.com/errors#dump_already_in_progress"
|
||||||
});
|
});
|
||||||
|
|
||||||
let ((_value_1, _status_code_1), (value_2, status_code_2)) = futures::join!(server.trigger_backup(), server.trigger_backup());
|
let ((_value_1, _status_code_1), (value_2, status_code_2)) = futures::join!(server.trigger_dump(), server.trigger_dump());
|
||||||
|
|
||||||
assert_json_eq!(expected.clone(), value_2.clone(), ordered: false);
|
assert_json_eq!(expected.clone(), value_2.clone(), ordered: false);
|
||||||
assert_eq!(status_code_2, 409);
|
assert_eq!(status_code_2, 409);
|
||||||
@ -101,21 +101,21 @@ async fn trigger_backup_concurently_should_return_conflict() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
async fn get_backup_status_early_should_return_processing() {
|
async fn get_dump_status_early_should_return_processing() {
|
||||||
let mut server = common::Server::test_server().await;
|
let mut server = common::Server::test_server().await;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
let (value, status_code) = server.trigger_backup().await;
|
let (value, status_code) = server.trigger_dump().await;
|
||||||
|
|
||||||
assert_eq!(status_code, 202);
|
assert_eq!(status_code, 202);
|
||||||
|
|
||||||
let backup_uid = value["uid"].as_str().unwrap().to_string();
|
let dump_uid = value["uid"].as_str().unwrap().to_string();
|
||||||
|
|
||||||
let (value, status_code) = server.get_backup_status(&backup_uid).await;
|
let (value, status_code) = server.get_dump_status(&dump_uid).await;
|
||||||
|
|
||||||
let expected = json!({
|
let expected = json!({
|
||||||
"uid": backup_uid,
|
"uid": dump_uid,
|
||||||
"status": "processing"
|
"status": "processing"
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -126,25 +126,24 @@ async fn get_backup_status_early_should_return_processing() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
async fn get_backup_status_should_return_done() {
|
async fn get_dump_status_should_return_done() {
|
||||||
let mut server = common::Server::test_server().await;
|
let mut server = common::Server::test_server().await;
|
||||||
|
|
||||||
|
|
||||||
let (value, status_code) = server.trigger_backup().await;
|
let (value, status_code) = server.trigger_dump().await;
|
||||||
|
|
||||||
assert_eq!(status_code, 202);
|
assert_eq!(status_code, 202);
|
||||||
|
|
||||||
println!("{:?}", value);
|
let dump_uid = value["uid"].as_str().unwrap().to_string();
|
||||||
let backup_uid = value["uid"].as_str().unwrap().to_string();
|
|
||||||
|
|
||||||
let expected = json!({
|
let expected = json!({
|
||||||
"uid": backup_uid.clone(),
|
"uid": dump_uid.clone(),
|
||||||
"status": "done"
|
"status": "done"
|
||||||
});
|
});
|
||||||
|
|
||||||
thread::sleep(Duration::from_secs(1)); // wait backup until process end
|
thread::sleep(Duration::from_secs(1)); // wait dump until process end
|
||||||
|
|
||||||
let (value, status_code) = server.get_backup_status(&backup_uid).await;
|
let (value, status_code) = server.get_dump_status(&dump_uid).await;
|
||||||
|
|
||||||
assert_eq!(status_code, 200);
|
assert_eq!(status_code, 200);
|
||||||
|
|
||||||
@ -153,7 +152,7 @@ async fn get_backup_status_should_return_done() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
async fn backup_metadata_should_be_valid() {
|
async fn dump_metadata_should_be_valid() {
|
||||||
let mut server = common::Server::test_server().await;
|
let mut server = common::Server::test_server().await;
|
||||||
|
|
||||||
let body = json!({
|
let body = json!({
|
||||||
@ -163,13 +162,13 @@ async fn backup_metadata_should_be_valid() {
|
|||||||
|
|
||||||
server.create_index(body).await;
|
server.create_index(body).await;
|
||||||
|
|
||||||
let uid = trigger_and_wait_backup(&mut server).await;
|
let uid = trigger_and_wait_dump(&mut server).await;
|
||||||
|
|
||||||
let backup_folder = Path::new(&server.data().backup_folder);
|
let dumps_folder = Path::new(&server.data().dumps_folder);
|
||||||
let tmp_dir = TempDir::new().unwrap();
|
let tmp_dir = TempDir::new().unwrap();
|
||||||
let tmp_dir_path = tmp_dir.path();
|
let tmp_dir_path = tmp_dir.path();
|
||||||
|
|
||||||
compression::from_tar_gz(&backup_folder.join(&format!("{}.tar.gz", uid)), tmp_dir_path).unwrap();
|
compression::from_tar_gz(&dumps_folder.join(&format!("{}.tar.gz", uid)), tmp_dir_path).unwrap();
|
||||||
|
|
||||||
let file = File::open(tmp_dir_path.join("metadata.json")).unwrap();
|
let file = File::open(tmp_dir_path.join("metadata.json")).unwrap();
|
||||||
let mut metadata: serde_json::Value = serde_json::from_reader(file).unwrap();
|
let mut metadata: serde_json::Value = serde_json::from_reader(file).unwrap();
|
||||||
@ -193,7 +192,7 @@ async fn backup_metadata_should_be_valid() {
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"dbVersion": format!("{}.{}.{}", major, minor, patch),
|
"dbVersion": format!("{}.{}.{}", major, minor, patch),
|
||||||
"backupVersion": current_backup_version()
|
"dumpVersion": current_dump_version()
|
||||||
});
|
});
|
||||||
|
|
||||||
assert_json_include!(expected: expected.clone(), actual: metadata.clone());
|
assert_json_include!(expected: expected.clone(), actual: metadata.clone());
|
||||||
@ -201,20 +200,20 @@ async fn backup_metadata_should_be_valid() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
async fn backup_gzip_should_have_been_created() {
|
async fn dump_gzip_should_have_been_created() {
|
||||||
let mut server = common::Server::test_server().await;
|
let mut server = common::Server::test_server().await;
|
||||||
|
|
||||||
|
|
||||||
let backup_uid = trigger_and_wait_backup(&mut server).await;
|
let dump_uid = trigger_and_wait_dump(&mut server).await;
|
||||||
let backup_folder = Path::new(&server.data().backup_folder);
|
let dumps_folder = Path::new(&server.data().dumps_folder);
|
||||||
|
|
||||||
let compressed_path = backup_folder.join(format!("{}.tar.gz", backup_uid));
|
let compressed_path = dumps_folder.join(format!("{}.tar.gz", dump_uid));
|
||||||
assert!(File::open(compressed_path).is_ok());
|
assert!(File::open(compressed_path).is_ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
async fn backup_index_settings_should_be_valid() {
|
async fn dump_index_settings_should_be_valid() {
|
||||||
let mut server = common::Server::test_server().await;
|
let mut server = common::Server::test_server().await;
|
||||||
|
|
||||||
let expected = json!({
|
let expected = json!({
|
||||||
@ -278,13 +277,13 @@ async fn backup_index_settings_should_be_valid() {
|
|||||||
|
|
||||||
server.update_all_settings(expected.clone()).await;
|
server.update_all_settings(expected.clone()).await;
|
||||||
|
|
||||||
let uid = trigger_and_wait_backup(&mut server).await;
|
let uid = trigger_and_wait_dump(&mut server).await;
|
||||||
|
|
||||||
let backup_folder = Path::new(&server.data().backup_folder);
|
let dumps_folder = Path::new(&server.data().dumps_folder);
|
||||||
let tmp_dir = TempDir::new().unwrap();
|
let tmp_dir = TempDir::new().unwrap();
|
||||||
let tmp_dir_path = tmp_dir.path();
|
let tmp_dir_path = tmp_dir.path();
|
||||||
|
|
||||||
compression::from_tar_gz(&backup_folder.join(&format!("{}.tar.gz", uid)), tmp_dir_path).unwrap();
|
compression::from_tar_gz(&dumps_folder.join(&format!("{}.tar.gz", uid)), tmp_dir_path).unwrap();
|
||||||
|
|
||||||
let file = File::open(tmp_dir_path.join("test").join("settings.json")).unwrap();
|
let file = File::open(tmp_dir_path.join("test").join("settings.json")).unwrap();
|
||||||
let settings: serde_json::Value = serde_json::from_reader(file).unwrap();
|
let settings: serde_json::Value = serde_json::from_reader(file).unwrap();
|
||||||
@ -294,21 +293,21 @@ async fn backup_index_settings_should_be_valid() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
async fn backup_index_documents_should_be_valid() {
|
async fn dump_index_documents_should_be_valid() {
|
||||||
let mut server = common::Server::test_server().await;
|
let mut server = common::Server::test_server().await;
|
||||||
|
|
||||||
let dataset = include_bytes!("assets/backups/v1/test/documents.jsonl");
|
let dataset = include_bytes!("assets/dumps/v1/test/documents.jsonl");
|
||||||
let mut slice: &[u8] = dataset;
|
let mut slice: &[u8] = dataset;
|
||||||
|
|
||||||
let expected: Value = read_all_jsonline(&mut slice);
|
let expected: Value = read_all_jsonline(&mut slice);
|
||||||
|
|
||||||
let uid = trigger_and_wait_backup(&mut server).await;
|
let uid = trigger_and_wait_dump(&mut server).await;
|
||||||
|
|
||||||
let backup_folder = Path::new(&server.data().backup_folder);
|
let dumps_folder = Path::new(&server.data().dumps_folder);
|
||||||
let tmp_dir = TempDir::new().unwrap();
|
let tmp_dir = TempDir::new().unwrap();
|
||||||
let tmp_dir_path = tmp_dir.path();
|
let tmp_dir_path = tmp_dir.path();
|
||||||
|
|
||||||
compression::from_tar_gz(&backup_folder.join(&format!("{}.tar.gz", uid)), tmp_dir_path).unwrap();
|
compression::from_tar_gz(&dumps_folder.join(&format!("{}.tar.gz", uid)), tmp_dir_path).unwrap();
|
||||||
|
|
||||||
let file = File::open(tmp_dir_path.join("test").join("documents.jsonl")).unwrap();
|
let file = File::open(tmp_dir_path.join("test").join("documents.jsonl")).unwrap();
|
||||||
let documents = read_all_jsonline(file);
|
let documents = read_all_jsonline(file);
|
||||||
@ -318,21 +317,21 @@ async fn backup_index_documents_should_be_valid() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
async fn backup_index_updates_should_be_valid() {
|
async fn dump_index_updates_should_be_valid() {
|
||||||
let mut server = common::Server::test_server().await;
|
let mut server = common::Server::test_server().await;
|
||||||
|
|
||||||
let dataset = include_bytes!("assets/backups/v1/test/updates.jsonl");
|
let dataset = include_bytes!("assets/dumps/v1/test/updates.jsonl");
|
||||||
let mut slice: &[u8] = dataset;
|
let mut slice: &[u8] = dataset;
|
||||||
|
|
||||||
let expected: Value = read_all_jsonline(&mut slice);
|
let expected: Value = read_all_jsonline(&mut slice);
|
||||||
|
|
||||||
let uid = trigger_and_wait_backup(&mut server).await;
|
let uid = trigger_and_wait_dump(&mut server).await;
|
||||||
|
|
||||||
let backup_folder = Path::new(&server.data().backup_folder);
|
let dumps_folder = Path::new(&server.data().dumps_folder);
|
||||||
let tmp_dir = TempDir::new().unwrap();
|
let tmp_dir = TempDir::new().unwrap();
|
||||||
let tmp_dir_path = tmp_dir.path();
|
let tmp_dir_path = tmp_dir.path();
|
||||||
|
|
||||||
compression::from_tar_gz(&backup_folder.join(&format!("{}.tar.gz", uid)), tmp_dir_path).unwrap();
|
compression::from_tar_gz(&dumps_folder.join(&format!("{}.tar.gz", uid)), tmp_dir_path).unwrap();
|
||||||
|
|
||||||
let file = File::open(tmp_dir_path.join("test").join("updates.jsonl")).unwrap();
|
let file = File::open(tmp_dir_path.join("test").join("updates.jsonl")).unwrap();
|
||||||
let mut updates = read_all_jsonline(file);
|
let mut updates = read_all_jsonline(file);
|
||||||
@ -354,10 +353,10 @@ async fn backup_index_updates_should_be_valid() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
async fn get_unexisting_backup_status_should_return_not_found() {
|
async fn get_unexisting_dump_status_should_return_not_found() {
|
||||||
let mut server = common::Server::test_server().await;
|
let mut server = common::Server::test_server().await;
|
||||||
|
|
||||||
let (_, status_code) = server.get_backup_status("4242").await;
|
let (_, status_code) = server.get_dump_status("4242").await;
|
||||||
|
|
||||||
assert_eq!(status_code, 404);
|
assert_eq!(status_code, 404);
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user