make dumps reentrant

This commit is contained in:
mpostma 2020-12-15 13:05:01 +01:00
parent 5fe0e06342
commit 6d79107b14
3 changed files with 31 additions and 30 deletions

View File

@ -1,7 +1,7 @@
use std::error::Error;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use meilisearch_core::{Database, DatabaseOptions, Index};
use sha2::Digest;
@ -9,6 +9,7 @@ use sha2::Digest;
use crate::error::{Error as MSError, ResponseError};
use crate::index_update_callback;
use crate::option::Opt;
use crate::dump::DumpInfo;
#[derive(Clone)]
pub struct Data {
@ -32,6 +33,7 @@ pub struct DataInner {
pub api_keys: ApiKeys,
pub server_pid: u32,
pub http_payload_size_limit: usize,
pub current_dump: Arc<Mutex<Option<DumpInfo>>>,
}
#[derive(Clone)]
@ -82,6 +84,8 @@ impl Data {
api_keys.generate_missing_api_keys();
let current_dump = Arc::new(Mutex::new(None));
let inner_data = DataInner {
db: db.clone(),
db_path,
@ -90,6 +94,7 @@ impl Data {
api_keys,
server_pid,
http_payload_size_limit,
current_dump,
};
let data = Data {
@ -135,6 +140,14 @@ impl Data {
Ok(created_index)
}
pub fn get_current_dump_info(&self) -> Option<DumpInfo> {
self.current_dump.lock().unwrap().clone()
}
pub fn set_current_dump_info(&self, dump_info: DumpInfo) {
self.current_dump.lock().unwrap().replace(dump_info);
}
pub fn get_or_create_index<F, R>(&self, uid: &str, f: F) -> Result<R, ResponseError>
where
F: FnOnce(&Index) -> Result<R, ResponseError>,

View File

@ -1,7 +1,6 @@
use std::fs::{create_dir_all, File};
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::thread;
use actix_web::web;
@ -11,7 +10,6 @@ use log::{error, info};
use meilisearch_core::{MainWriter, MainReader, UpdateReader};
use meilisearch_core::settings::Settings;
use meilisearch_core::update::{apply_settings_update, apply_documents_addition};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tempfile::TempDir;
@ -22,9 +20,6 @@ use crate::helpers::compression;
use crate::routes::index;
use crate::routes::index::IndexResponse;
// Mutex to share dump progress.
static DUMP_INFO: Lazy<Mutex<Option<DumpInfo>>> = Lazy::new(Mutex::default);
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
enum DumpVersion {
V1,
@ -211,6 +206,7 @@ pub struct DumpInfo {
pub status: DumpStatus,
#[serde(skip_serializing_if = "Option::is_none", flatten)]
pub error: Option<serde_json::Value>,
}
impl DumpInfo {
@ -228,14 +224,6 @@ impl DumpInfo {
pub fn dump_already_in_progress(&self) -> bool {
self.status == DumpStatus::InProgress
}
pub fn get_current() -> Option<Self> {
DUMP_INFO.lock().unwrap().clone()
}
pub fn set_current(&self) {
*DUMP_INFO.lock().unwrap() = Some(self.clone());
}
}
/// Generate uid from creation date
@ -299,11 +287,11 @@ fn dump_index_documents(data: &web::Data<Data>, reader: &MainReader, dir_path: &
}
/// Write error with a context.
fn fail_dump_process<E: std::error::Error>(dump_info: DumpInfo, context: &str, error: E) {
fn fail_dump_process<E: std::error::Error>(data: &web::Data<Data>, dump_info: DumpInfo, context: &str, error: E) {
let error_message = format!("{}; {}", context, error);
error!("Something went wrong during dump process: {}", &error_message);
dump_info.with_error(Error::dump_failed(error_message).into()).set_current();
data.set_current_dump_info(dump_info.with_error(Error::dump_failed(error_message).into()))
}
/// Main function of dump.
@ -312,7 +300,7 @@ fn dump_process(data: web::Data<Data>, dumps_dir: PathBuf, dump_info: DumpInfo)
let update_reader = match data.db.update_read_txn() {
Ok(r) => r,
Err(e) => {
fail_dump_process(dump_info, "creating RO transaction on updates", e);
fail_dump_process(&data, dump_info, "creating RO transaction on updates", e);
return ;
}
};
@ -321,7 +309,7 @@ fn dump_process(data: web::Data<Data>, dumps_dir: PathBuf, dump_info: DumpInfo)
let main_reader = match data.db.main_read_txn() {
Ok(r) => r,
Err(e) => {
fail_dump_process(dump_info, "creating RO transaction on main", e);
fail_dump_process(&data, dump_info, "creating RO transaction on main", e);
return ;
}
};
@ -330,7 +318,7 @@ fn dump_process(data: web::Data<Data>, dumps_dir: PathBuf, dump_info: DumpInfo)
let tmp_dir = match TempDir::new() {
Ok(tmp_dir) => tmp_dir,
Err(e) => {
fail_dump_process(dump_info, "creating temporary directory", e);
fail_dump_process(&data, dump_info, "creating temporary directory", e);
return ;
}
};
@ -340,14 +328,14 @@ fn dump_process(data: web::Data<Data>, dumps_dir: PathBuf, dump_info: DumpInfo)
let indexes = match crate::routes::index::list_indexes_sync(&data, &main_reader) {
Ok(indexes) => indexes,
Err(e) => {
fail_dump_process(dump_info, "listing indexes", e);
fail_dump_process(&data, dump_info, "listing indexes", e);
return ;
}
};
// create metadata
if let Err(e) = dump_metadata(&data, &tmp_dir_path, indexes.clone()) {
fail_dump_process(dump_info, "generating metadata", e);
fail_dump_process(&data, dump_info, "generating metadata", e);
return ;
}
@ -357,32 +345,32 @@ fn dump_process(data: web::Data<Data>, dumps_dir: PathBuf, dump_info: DumpInfo)
// create index sub-dircetory
if let Err(e) = create_dir_all(&index_path) {
fail_dump_process(dump_info, &format!("creating directory for index {}", &index.uid), e);
fail_dump_process(&data, dump_info, &format!("creating directory for index {}", &index.uid), e);
return ;
}
// export settings
if let Err(e) = dump_index_settings(&data, &main_reader, &index_path, &index.uid) {
fail_dump_process(dump_info, &format!("generating settings for index {}", &index.uid), e);
fail_dump_process(&data, dump_info, &format!("generating settings for index {}", &index.uid), e);
return ;
}
// export documents
if let Err(e) = dump_index_documents(&data, &main_reader, &index_path, &index.uid) {
fail_dump_process(dump_info, &format!("generating documents for index {}", &index.uid), e);
fail_dump_process(&data, dump_info, &format!("generating documents for index {}", &index.uid), e);
return ;
}
// export updates
if let Err(e) = dump_index_updates(&data, &update_reader, &index_path, &index.uid) {
fail_dump_process(dump_info, &format!("generating updates for index {}", &index.uid), e);
fail_dump_process(&data, dump_info, &format!("generating updates for index {}", &index.uid), e);
return ;
}
}
// compress dump in a file named `{dump_uid}.dump` in `dumps_dir`
if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_dumps_dir(&dumps_dir, &dump_info.uid)) {
fail_dump_process(dump_info, "compressing dump", e);
fail_dump_process(&data, dump_info, "compressing dump", e);
return ;
}
@ -392,14 +380,14 @@ fn dump_process(data: web::Data<Data>, dumps_dir: PathBuf, dump_info: DumpInfo)
DumpStatus::Done
);
resume.set_current();
data.set_current_dump_info(resume);
}
pub fn init_dump_process(data: &web::Data<Data>, dumps_dir: &Path) -> Result<DumpInfo, Error> {
create_dir_all(dumps_dir).map_err(|e| Error::dump_failed(format!("creating temporary directory {}", e)))?;
// check if a dump is already in progress
if let Some(resume) = DumpInfo::get_current() {
if let Some(resume) = data.get_current_dump_info() {
if resume.dump_already_in_progress() {
return Err(Error::dump_conflict())
}
@ -411,7 +399,7 @@ pub fn init_dump_process(data: &web::Data<Data>, dumps_dir: &Path) -> Result<Dum
DumpStatus::InProgress
);
info.set_current();
data.set_current_dump_info(info.clone());
let data = data.clone();
let dumps_dir = dumps_dir.to_path_buf();

View File

@ -45,7 +45,7 @@ async fn get_dump_status(
let dumps_dir = Path::new(&data.dumps_dir);
let dump_uid = &path.dump_uid;
if let Some(resume) = DumpInfo::get_current() {
if let Some(resume) = data.get_current_dump_info() {
if &resume.uid == dump_uid {
return Ok(HttpResponse::Ok().json(resume));
}