From 6d79107b14e451740005f08c510e2bd831a34b69 Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 15 Dec 2020 13:05:01 +0100 Subject: [PATCH] make dumps reentrant --- meilisearch-http/src/data.rs | 15 +++++++++- meilisearch-http/src/dump.rs | 44 +++++++++++------------------ meilisearch-http/src/routes/dump.rs | 2 +- 3 files changed, 31 insertions(+), 30 deletions(-) diff --git a/meilisearch-http/src/data.rs b/meilisearch-http/src/data.rs index 783c81fd8..2deeab693 100644 --- a/meilisearch-http/src/data.rs +++ b/meilisearch-http/src/data.rs @@ -1,7 +1,7 @@ use std::error::Error; use std::ops::Deref; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use meilisearch_core::{Database, DatabaseOptions, Index}; use sha2::Digest; @@ -9,6 +9,7 @@ use sha2::Digest; use crate::error::{Error as MSError, ResponseError}; use crate::index_update_callback; use crate::option::Opt; +use crate::dump::DumpInfo; #[derive(Clone)] pub struct Data { @@ -32,6 +33,7 @@ pub struct DataInner { pub api_keys: ApiKeys, pub server_pid: u32, pub http_payload_size_limit: usize, + pub current_dump: Arc>>, } #[derive(Clone)] @@ -82,6 +84,8 @@ impl Data { api_keys.generate_missing_api_keys(); + let current_dump = Arc::new(Mutex::new(None)); + let inner_data = DataInner { db: db.clone(), db_path, @@ -90,6 +94,7 @@ impl Data { api_keys, server_pid, http_payload_size_limit, + current_dump, }; let data = Data { @@ -135,6 +140,14 @@ impl Data { Ok(created_index) } + pub fn get_current_dump_info(&self) -> Option { + self.current_dump.lock().unwrap().clone() + } + + pub fn set_current_dump_info(&self, dump_info: DumpInfo) { + self.current_dump.lock().unwrap().replace(dump_info); + } + pub fn get_or_create_index(&self, uid: &str, f: F) -> Result where F: FnOnce(&Index) -> Result, diff --git a/meilisearch-http/src/dump.rs b/meilisearch-http/src/dump.rs index 468dbf640..c4513af6f 100644 --- a/meilisearch-http/src/dump.rs +++ b/meilisearch-http/src/dump.rs @@ -1,7 +1,6 @@ use std::fs::{create_dir_all, File}; use std::io::prelude::*; use std::path::{Path, PathBuf}; -use std::sync::Mutex; use std::thread; use actix_web::web; @@ -11,7 +10,6 @@ use log::{error, info}; use meilisearch_core::{MainWriter, MainReader, UpdateReader}; use meilisearch_core::settings::Settings; use meilisearch_core::update::{apply_settings_update, apply_documents_addition}; -use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_json::json; use tempfile::TempDir; @@ -22,9 +20,6 @@ use crate::helpers::compression; use crate::routes::index; use crate::routes::index::IndexResponse; -// Mutex to share dump progress. -static DUMP_INFO: Lazy>> = Lazy::new(Mutex::default); - #[derive(Debug, Serialize, Deserialize, Copy, Clone)] enum DumpVersion { V1, @@ -211,6 +206,7 @@ pub struct DumpInfo { pub status: DumpStatus, #[serde(skip_serializing_if = "Option::is_none", flatten)] pub error: Option, + } impl DumpInfo { @@ -228,14 +224,6 @@ impl DumpInfo { pub fn dump_already_in_progress(&self) -> bool { self.status == DumpStatus::InProgress } - - pub fn get_current() -> Option { - DUMP_INFO.lock().unwrap().clone() - } - - pub fn set_current(&self) { - *DUMP_INFO.lock().unwrap() = Some(self.clone()); - } } /// Generate uid from creation date @@ -299,11 +287,11 @@ fn dump_index_documents(data: &web::Data, reader: &MainReader, dir_path: & } /// Write error with a context. -fn fail_dump_process(dump_info: DumpInfo, context: &str, error: E) { +fn fail_dump_process(data: &web::Data, dump_info: DumpInfo, context: &str, error: E) { let error_message = format!("{}; {}", context, error); error!("Something went wrong during dump process: {}", &error_message); - dump_info.with_error(Error::dump_failed(error_message).into()).set_current(); + data.set_current_dump_info(dump_info.with_error(Error::dump_failed(error_message).into())) } /// Main function of dump. @@ -312,7 +300,7 @@ fn dump_process(data: web::Data, dumps_dir: PathBuf, dump_info: DumpInfo) let update_reader = match data.db.update_read_txn() { Ok(r) => r, Err(e) => { - fail_dump_process(dump_info, "creating RO transaction on updates", e); + fail_dump_process(&data, dump_info, "creating RO transaction on updates", e); return ; } }; @@ -321,7 +309,7 @@ fn dump_process(data: web::Data, dumps_dir: PathBuf, dump_info: DumpInfo) let main_reader = match data.db.main_read_txn() { Ok(r) => r, Err(e) => { - fail_dump_process(dump_info, "creating RO transaction on main", e); + fail_dump_process(&data, dump_info, "creating RO transaction on main", e); return ; } }; @@ -330,7 +318,7 @@ fn dump_process(data: web::Data, dumps_dir: PathBuf, dump_info: DumpInfo) let tmp_dir = match TempDir::new() { Ok(tmp_dir) => tmp_dir, Err(e) => { - fail_dump_process(dump_info, "creating temporary directory", e); + fail_dump_process(&data, dump_info, "creating temporary directory", e); return ; } }; @@ -340,14 +328,14 @@ fn dump_process(data: web::Data, dumps_dir: PathBuf, dump_info: DumpInfo) let indexes = match crate::routes::index::list_indexes_sync(&data, &main_reader) { Ok(indexes) => indexes, Err(e) => { - fail_dump_process(dump_info, "listing indexes", e); + fail_dump_process(&data, dump_info, "listing indexes", e); return ; } }; // create metadata if let Err(e) = dump_metadata(&data, &tmp_dir_path, indexes.clone()) { - fail_dump_process(dump_info, "generating metadata", e); + fail_dump_process(&data, dump_info, "generating metadata", e); return ; } @@ -357,32 +345,32 @@ fn dump_process(data: web::Data, dumps_dir: PathBuf, dump_info: DumpInfo) // create index sub-dircetory if let Err(e) = create_dir_all(&index_path) { - fail_dump_process(dump_info, &format!("creating directory for index {}", &index.uid), e); + fail_dump_process(&data, dump_info, &format!("creating directory for index {}", &index.uid), e); return ; } // export settings if let Err(e) = dump_index_settings(&data, &main_reader, &index_path, &index.uid) { - fail_dump_process(dump_info, &format!("generating settings for index {}", &index.uid), e); + fail_dump_process(&data, dump_info, &format!("generating settings for index {}", &index.uid), e); return ; } // export documents if let Err(e) = dump_index_documents(&data, &main_reader, &index_path, &index.uid) { - fail_dump_process(dump_info, &format!("generating documents for index {}", &index.uid), e); + fail_dump_process(&data, dump_info, &format!("generating documents for index {}", &index.uid), e); return ; } // export updates if let Err(e) = dump_index_updates(&data, &update_reader, &index_path, &index.uid) { - fail_dump_process(dump_info, &format!("generating updates for index {}", &index.uid), e); + fail_dump_process(&data, dump_info, &format!("generating updates for index {}", &index.uid), e); return ; } } // compress dump in a file named `{dump_uid}.dump` in `dumps_dir` if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_dumps_dir(&dumps_dir, &dump_info.uid)) { - fail_dump_process(dump_info, "compressing dump", e); + fail_dump_process(&data, dump_info, "compressing dump", e); return ; } @@ -392,14 +380,14 @@ fn dump_process(data: web::Data, dumps_dir: PathBuf, dump_info: DumpInfo) DumpStatus::Done ); - resume.set_current(); + data.set_current_dump_info(resume); } pub fn init_dump_process(data: &web::Data, dumps_dir: &Path) -> Result { create_dir_all(dumps_dir).map_err(|e| Error::dump_failed(format!("creating temporary directory {}", e)))?; // check if a dump is already in progress - if let Some(resume) = DumpInfo::get_current() { + if let Some(resume) = data.get_current_dump_info() { if resume.dump_already_in_progress() { return Err(Error::dump_conflict()) } @@ -411,7 +399,7 @@ pub fn init_dump_process(data: &web::Data, dumps_dir: &Path) -> Result