2021-05-27 14:30:20 +02:00
use std ::fs ::File ;
use std ::path ::{ Path , PathBuf } ;
2022-05-19 14:59:59 +02:00
use std ::sync ::Arc ;
2021-05-10 20:25:09 +02:00
2022-01-20 16:00:14 +01:00
use anyhow ::bail ;
2022-05-19 14:44:24 +02:00
use log ::{ info , trace } ;
use meilisearch_auth ::AuthController ;
2021-05-10 20:23:12 +02:00
use serde ::{ Deserialize , Serialize } ;
2022-02-14 15:32:41 +01:00
use time ::OffsetDateTime ;
2021-04-28 16:43:49 +02:00
2022-01-20 16:00:14 +01:00
use tempfile ::TempDir ;
2022-05-19 14:44:24 +02:00
use tokio ::fs ::create_dir_all ;
2021-05-10 20:25:09 +02:00
2022-05-19 14:44:24 +02:00
use crate ::analytics ;
use crate ::compression ::{ from_tar_gz , to_tar_gz } ;
use crate ::dump ::error ::DumpError ;
2022-05-19 14:59:59 +02:00
use crate ::index_resolver ::index_store ::IndexStore ;
use crate ::index_resolver ::meta_store ::IndexMetaStore ;
use crate ::index_resolver ::IndexResolver ;
2021-09-21 13:23:22 +02:00
use crate ::options ::IndexerOpts ;
2021-12-02 16:03:26 +01:00
use crate ::update_file_store ::UpdateFileStore ;
2021-06-14 21:26:35 +02:00
use error ::Result ;
2021-05-26 22:52:06 +02:00
2022-05-19 12:43:46 +02:00
use self ::loaders ::{ v2 , v3 , v4 } ;
2022-05-19 14:44:24 +02:00
// mod actor;
2021-12-02 16:03:26 +01:00
mod compat ;
2021-06-15 17:39:07 +02:00
pub mod error ;
2022-05-19 14:44:24 +02:00
// mod handle_impl;
2021-05-26 22:52:06 +02:00
mod loaders ;
2022-05-19 14:44:24 +02:00
// mod message;
2021-05-26 22:52:06 +02:00
2021-05-31 16:03:39 +02:00
const META_FILE_NAME : & str = " metadata.json " ;
2021-05-27 14:30:20 +02:00
2021-09-29 15:24:59 +02:00
#[ derive(Serialize, Deserialize, Debug) ]
#[ serde(rename_all = " camelCase " ) ]
pub struct Metadata {
db_version : String ,
index_db_size : usize ,
update_db_size : usize ,
2022-03-01 19:02:32 +01:00
#[ serde(with = " time::serde::rfc3339 " ) ]
2022-02-14 15:32:41 +01:00
dump_date : OffsetDateTime ,
2021-09-29 15:24:59 +02:00
}
impl Metadata {
pub fn new ( index_db_size : usize , update_db_size : usize ) -> Self {
Self {
db_version : env ! ( " CARGO_PKG_VERSION " ) . to_string ( ) ,
index_db_size ,
update_db_size ,
2022-02-14 15:32:41 +01:00
dump_date : OffsetDateTime ::now_utc ( ) ,
2021-09-29 15:24:59 +02:00
}
}
}
2021-12-07 10:36:27 +01:00
#[ derive(Serialize, Deserialize, Debug) ]
#[ serde(rename_all = " camelCase " ) ]
pub struct MetadataV1 {
pub db_version : String ,
}
2021-04-28 16:43:49 +02:00
#[ derive(Debug, Serialize, Deserialize) ]
2021-05-31 10:42:31 +02:00
#[ serde(tag = " dumpVersion " ) ]
2021-09-29 15:24:59 +02:00
pub enum MetadataVersion {
2021-05-31 10:42:31 +02:00
V1 ( MetadataV1 ) ,
2021-09-29 15:24:59 +02:00
V2 ( Metadata ) ,
V3 ( Metadata ) ,
2021-12-02 16:03:26 +01:00
V4 ( Metadata ) ,
2021-04-28 16:43:49 +02:00
}
2021-09-29 15:24:59 +02:00
impl MetadataVersion {
2022-01-20 16:00:14 +01:00
pub fn load_dump (
self ,
src : impl AsRef < Path > ,
dst : impl AsRef < Path > ,
index_db_size : usize ,
meta_env_size : usize ,
indexing_options : & IndexerOpts ,
) -> anyhow ::Result < ( ) > {
match self {
MetadataVersion ::V1 ( _meta ) = > {
anyhow ::bail! ( " The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards. " )
}
MetadataVersion ::V2 ( meta ) = > v2 ::load_dump (
meta ,
src ,
dst ,
index_db_size ,
meta_env_size ,
indexing_options ,
) ? ,
MetadataVersion ::V3 ( meta ) = > v3 ::load_dump (
meta ,
src ,
dst ,
index_db_size ,
meta_env_size ,
indexing_options ,
) ? ,
MetadataVersion ::V4 ( meta ) = > v4 ::load_dump (
meta ,
src ,
dst ,
index_db_size ,
meta_env_size ,
indexing_options ,
) ? ,
}
Ok ( ( ) )
}
2021-12-02 16:03:26 +01:00
pub fn new_v4 ( index_db_size : usize , update_db_size : usize ) -> Self {
2021-09-29 15:24:59 +02:00
let meta = Metadata ::new ( index_db_size , update_db_size ) ;
2021-12-02 16:03:26 +01:00
Self ::V4 ( meta )
2021-05-27 10:51:19 +02:00
}
2021-09-29 15:41:25 +02:00
pub fn db_version ( & self ) -> & str {
match self {
Self ::V1 ( meta ) = > & meta . db_version ,
2021-12-02 16:03:26 +01:00
Self ::V2 ( meta ) | Self ::V3 ( meta ) | Self ::V4 ( meta ) = > & meta . db_version ,
2021-09-29 15:41:25 +02:00
}
}
pub fn version ( & self ) -> & str {
match self {
MetadataVersion ::V1 ( _ ) = > " V1 " ,
MetadataVersion ::V2 ( _ ) = > " V2 " ,
MetadataVersion ::V3 ( _ ) = > " V3 " ,
2021-12-02 16:03:26 +01:00
MetadataVersion ::V4 ( _ ) = > " V4 " ,
2021-09-29 15:41:25 +02:00
}
}
2022-02-14 15:32:41 +01:00
pub fn dump_date ( & self ) -> Option < & OffsetDateTime > {
2021-09-29 15:41:25 +02:00
match self {
MetadataVersion ::V1 ( _ ) = > None ,
2021-12-02 16:03:26 +01:00
MetadataVersion ::V2 ( meta ) | MetadataVersion ::V3 ( meta ) | MetadataVersion ::V4 ( meta ) = > {
Some ( & meta . dump_date )
}
2021-09-29 15:41:25 +02:00
}
}
2021-04-28 16:43:49 +02:00
}
2021-05-10 20:25:09 +02:00
#[ derive(Debug, Serialize, Deserialize, PartialEq, Clone) ]
#[ serde(rename_all = " snake_case " ) ]
pub enum DumpStatus {
Done ,
InProgress ,
Failed ,
2021-04-28 16:43:49 +02:00
}
2021-05-27 14:30:20 +02:00
pub fn load_dump (
2021-05-26 20:42:09 +02:00
dst_path : impl AsRef < Path > ,
src_path : impl AsRef < Path > ,
2022-01-20 16:00:14 +01:00
ignore_dump_if_db_exists : bool ,
ignore_missing_dump : bool ,
2021-05-31 16:40:59 +02:00
index_db_size : usize ,
update_db_size : usize ,
2021-05-26 22:52:06 +02:00
indexer_opts : & IndexerOpts ,
2021-06-15 17:39:07 +02:00
) -> anyhow ::Result < ( ) > {
2022-01-20 16:00:14 +01:00
let empty_db = crate ::is_empty_db ( & dst_path ) ;
let src_path_exists = src_path . as_ref ( ) . exists ( ) ;
if empty_db & & src_path_exists {
let ( tmp_src , tmp_dst , meta ) = extract_dump ( & dst_path , & src_path ) ? ;
meta . load_dump (
tmp_src . path ( ) ,
tmp_dst . path ( ) ,
index_db_size ,
update_db_size ,
indexer_opts ,
) ? ;
persist_dump ( & dst_path , tmp_dst ) ? ;
Ok ( ( ) )
} else if ! empty_db & & ! ignore_dump_if_db_exists {
bail! (
" database already exists at {:?}, try to delete it or rename it " ,
dst_path
. as_ref ( )
. canonicalize ( )
. unwrap_or_else ( | _ | dst_path . as_ref ( ) . to_owned ( ) )
)
} else if ! src_path_exists & & ! ignore_missing_dump {
bail! ( " dump doesn't exist at {:?} " , src_path . as_ref ( ) )
} else {
// there is nothing to do
Ok ( ( ) )
}
}
fn extract_dump (
dst_path : impl AsRef < Path > ,
src_path : impl AsRef < Path > ,
) -> anyhow ::Result < ( TempDir , TempDir , MetadataVersion ) > {
2021-09-29 12:34:39 +02:00
// Setup a temp directory path in the same path as the database, to prevent cross devices
// references.
2021-09-29 15:41:25 +02:00
let temp_path = dst_path
. as_ref ( )
. parent ( )
. map ( ToOwned ::to_owned )
. unwrap_or_else ( | | " . " . into ( ) ) ;
2021-09-29 12:34:39 +02:00
2022-04-28 10:48:57 +02:00
let tmp_src = tempfile ::tempdir_in ( temp_path ) ? ;
2021-05-27 14:30:20 +02:00
let tmp_src_path = tmp_src . path ( ) ;
2021-09-29 12:02:27 +02:00
from_tar_gz ( & src_path , tmp_src_path ) ? ;
2021-05-27 14:30:20 +02:00
let meta_path = tmp_src_path . join ( META_FILE_NAME ) ;
2021-05-26 20:42:09 +02:00
let mut meta_file = File ::open ( & meta_path ) ? ;
2021-09-29 15:24:59 +02:00
let meta : MetadataVersion = serde_json ::from_reader ( & mut meta_file ) ? ;
2021-04-28 16:43:49 +02:00
2022-01-13 12:30:58 +01:00
if ! dst_path . as_ref ( ) . exists ( ) {
std ::fs ::create_dir_all ( dst_path . as_ref ( ) ) ? ;
}
2022-01-05 18:59:58 +01:00
let tmp_dst = tempfile ::tempdir_in ( dst_path . as_ref ( ) ) ? ;
2021-05-31 10:42:31 +02:00
2021-09-29 15:41:25 +02:00
info! (
" Loading dump {}, dump database version: {}, dump version: {} " ,
meta . dump_date ( )
. map ( | t | format! ( " from {} " , t ) )
. unwrap_or_else ( String ::new ) ,
meta . db_version ( ) ,
meta . version ( )
) ;
2021-09-29 12:34:39 +02:00
2022-01-20 16:00:14 +01:00
Ok ( ( tmp_src , tmp_dst , meta ) )
}
fn persist_dump ( dst_path : impl AsRef < Path > , tmp_dst : TempDir ) -> anyhow ::Result < ( ) > {
2021-05-31 10:42:31 +02:00
let persisted_dump = tmp_dst . into_path ( ) ;
2022-01-05 18:59:58 +01:00
// Delete everything in the `data.ms` except the tempdir.
2021-05-31 10:42:31 +02:00
if dst_path . as_ref ( ) . exists ( ) {
2022-01-05 18:59:58 +01:00
for file in dst_path . as_ref ( ) . read_dir ( ) . unwrap ( ) {
let file = file . unwrap ( ) . path ( ) ;
if file . file_name ( ) = = persisted_dump . file_name ( ) {
continue ;
}
if file . is_file ( ) {
std ::fs ::remove_file ( & file ) ? ;
} else {
std ::fs ::remove_dir_all ( & file ) ? ;
}
}
}
// Move the whole content of the tempdir into the `data.ms`.
for file in persisted_dump . read_dir ( ) . unwrap ( ) {
let file = file . unwrap ( ) . path ( ) ;
std ::fs ::rename ( & file , & dst_path . as_ref ( ) . join ( file . file_name ( ) . unwrap ( ) ) ) ? ;
2021-05-31 10:42:31 +02:00
}
2022-01-05 18:59:58 +01:00
// Delete the empty tempdir.
std ::fs ::remove_dir_all ( & persisted_dump ) ? ;
2021-05-06 18:44:16 +02:00
2021-04-28 16:43:49 +02:00
Ok ( ( ) )
}
2021-05-27 14:30:20 +02:00
2022-05-19 14:59:59 +02:00
pub struct DumpJob < U , I > {
2022-05-19 14:44:24 +02:00
pub dump_path : PathBuf ,
pub db_path : PathBuf ,
pub update_file_store : UpdateFileStore ,
pub uid : String ,
pub update_db_size : usize ,
pub index_db_size : usize ,
2022-05-19 14:59:59 +02:00
pub index_resolver : Arc < IndexResolver < U , I > > ,
2021-05-27 14:30:20 +02:00
}
2022-05-19 14:59:59 +02:00
impl < U , I > DumpJob < U , I >
where
U : IndexMetaStore ,
I : IndexStore ,
{
2022-05-19 14:44:24 +02:00
pub async fn run ( self ) -> Result < ( ) > {
trace! ( " Performing dump. " ) ;
create_dir_all ( & self . dump_path ) . await ? ;
let temp_dump_dir = tokio ::task ::spawn_blocking ( tempfile ::TempDir ::new ) . await ? ? ;
let temp_dump_path = temp_dump_dir . path ( ) . to_owned ( ) ;
let meta = MetadataVersion ::new_v4 ( self . index_db_size , self . update_db_size ) ;
let meta_path = temp_dump_path . join ( META_FILE_NAME ) ;
let mut meta_file = File ::create ( & meta_path ) ? ;
serde_json ::to_writer ( & mut meta_file , & meta ) ? ;
analytics ::copy_user_id ( & self . db_path , & temp_dump_path ) ;
create_dir_all ( & temp_dump_path . join ( " indexes " ) ) . await ? ;
2022-05-19 14:59:59 +02:00
// TODO: this is blocking!!
2022-05-19 14:44:24 +02:00
AuthController ::dump ( & self . db_path , & temp_dump_path ) ? ;
2022-05-19 14:59:59 +02:00
self . index_resolver . dump ( & self . dump_path ) . await ? ;
2022-05-19 14:44:24 +02:00
//TODO(marin): this is not right, the scheduler should dump itself, not do it here...
2022-05-19 12:43:46 +02:00
// self.scheduler
// .read()
// .await
// .dump(&temp_dump_path, self.update_file_store.clone())
// .await?;
2022-05-19 14:44:24 +02:00
let dump_path = tokio ::task ::spawn_blocking ( move | | -> Result < PathBuf > {
// for now we simply copy the updates/updates_files
// FIXME: We may copy more files than necessary, if new files are added while we are
// performing the dump. We need a way to filter them out.
let temp_dump_file = tempfile ::NamedTempFile ::new_in ( & self . dump_path ) ? ;
to_tar_gz ( temp_dump_path , temp_dump_file . path ( ) )
. map_err ( | e | DumpError ::Internal ( e . into ( ) ) ) ? ;
let dump_path = self . dump_path . join ( self . uid ) . with_extension ( " dump " ) ;
temp_dump_file . persist ( & dump_path ) ? ;
Ok ( dump_path )
} )
. await ? ? ;
info! ( " Created dump in {:?}. " , dump_path ) ;
2021-05-27 14:30:20 +02:00
Ok ( ( ) )
}
}
2021-10-05 13:53:22 +02:00
#[ cfg(test) ]
mod test {
2021-12-02 16:03:26 +01:00
use nelson ::Mocker ;
2021-10-05 13:53:22 +02:00
use once_cell ::sync ::Lazy ;
use super ::* ;
2021-12-02 16:03:26 +01:00
use crate ::index_resolver ::error ::IndexResolverError ;
2022-01-19 11:21:19 +01:00
use crate ::options ::SchedulerConfig ;
use crate ::tasks ::error ::Result as TaskResult ;
use crate ::tasks ::task ::{ Task , TaskId } ;
2022-05-19 12:43:46 +02:00
use crate ::tasks ::{ BatchHandler , TaskFilter , TaskStore } ;
2021-12-02 16:03:26 +01:00
use crate ::update_file_store ::UpdateFileStore ;
2021-10-05 13:53:22 +02:00
fn setup ( ) {
static SETUP : Lazy < ( ) > = Lazy ::new ( | | {
if cfg! ( windows ) {
std ::env ::set_var ( " TMP " , " . " ) ;
} else {
std ::env ::set_var ( " TMPDIR " , " . " ) ;
}
} ) ;
// just deref to make sure the env is setup
* SETUP
}
#[ actix_rt::test ]
async fn test_dump_normal ( ) {
setup ( ) ;
let tmp = tempfile ::tempdir ( ) . unwrap ( ) ;
2021-12-02 16:03:26 +01:00
let mocker = Mocker ::default ( ) ;
let update_file_store = UpdateFileStore ::mock ( mocker ) ;
2022-05-19 12:43:46 +02:00
let mut performer = BatchHandler ::new ( ) ;
2022-01-19 11:21:19 +01:00
performer
. expect_process_job ( )
. once ( )
. returning ( | j | match j {
Job ::Dump { ret , .. } = > {
let ( sender , _receiver ) = oneshot ::channel ( ) ;
ret . send ( Ok ( sender ) ) . unwrap ( ) ;
}
_ = > unreachable! ( ) ,
} ) ;
let performer = Arc ::new ( performer ) ;
2021-12-02 16:03:26 +01:00
let mocker = Mocker ::default ( ) ;
2022-01-19 11:21:19 +01:00
mocker
. when ::< ( & Path , UpdateFileStore ) , TaskResult < ( ) > > ( " dump " )
. then ( | _ | Ok ( ( ) ) ) ;
mocker
. when ::< ( Option < TaskId > , Option < TaskFilter > , Option < usize > ) , TaskResult < Vec < Task > > > (
" list_tasks " ,
)
. then ( | _ | Ok ( Vec ::new ( ) ) ) ;
let store = TaskStore ::mock ( mocker ) ;
let config = SchedulerConfig ::default ( ) ;
let scheduler = Scheduler ::new ( store , performer , config ) . unwrap ( ) ;
2021-10-05 13:53:22 +02:00
2021-12-02 16:03:26 +01:00
let task = DumpJob {
2021-10-26 13:02:40 +02:00
dump_path : tmp . path ( ) . into ( ) ,
2021-10-26 12:34:00 +02:00
// this should do nothing
2021-12-02 16:03:26 +01:00
update_file_store ,
2021-10-26 13:02:40 +02:00
db_path : tmp . path ( ) . into ( ) ,
2021-10-05 13:53:22 +02:00
uid : String ::from ( " test " ) ,
update_db_size : 4096 * 10 ,
index_db_size : 4096 * 10 ,
2022-01-19 11:21:19 +01:00
scheduler ,
2021-10-05 13:53:22 +02:00
} ;
task . run ( ) . await . unwrap ( ) ;
}
#[ actix_rt::test ]
async fn error_performing_dump ( ) {
let tmp = tempfile ::tempdir ( ) . unwrap ( ) ;
2021-12-02 16:03:26 +01:00
let mocker = Mocker ::default ( ) ;
let file_store = UpdateFileStore ::mock ( mocker ) ;
2021-10-05 13:53:22 +02:00
2021-12-02 16:03:26 +01:00
let mocker = Mocker ::default ( ) ;
2022-01-19 11:21:19 +01:00
mocker
. when ::< ( Option < TaskId > , Option < TaskFilter > , Option < usize > ) , TaskResult < Vec < Task > > > (
" list_tasks " ,
)
. then ( | _ | Ok ( Vec ::new ( ) ) ) ;
2021-12-02 16:03:26 +01:00
let task_store = TaskStore ::mock ( mocker ) ;
2022-05-19 12:43:46 +02:00
let mut performer = BatchHandler ::new ( ) ;
2022-01-19 11:21:19 +01:00
performer
. expect_process_job ( )
. once ( )
. returning ( | job | match job {
Job ::Dump { ret , .. } = > drop ( ret . send ( Err ( IndexResolverError ::BadlyFormatted (
" blabla " . to_string ( ) ,
) ) ) ) ,
_ = > unreachable! ( ) ,
} ) ;
let performer = Arc ::new ( performer ) ;
let scheduler = Scheduler ::new ( task_store , performer , SchedulerConfig ::default ( ) ) . unwrap ( ) ;
2021-10-05 13:53:22 +02:00
2021-12-02 16:03:26 +01:00
let task = DumpJob {
2021-10-26 13:02:40 +02:00
dump_path : tmp . path ( ) . into ( ) ,
2021-10-26 12:34:00 +02:00
// this should do nothing
2021-10-26 13:02:40 +02:00
db_path : tmp . path ( ) . into ( ) ,
2021-12-02 16:03:26 +01:00
update_file_store : file_store ,
2021-10-05 13:53:22 +02:00
uid : String ::from ( " test " ) ,
update_db_size : 4096 * 10 ,
index_db_size : 4096 * 10 ,
2022-01-19 11:21:19 +01:00
scheduler ,
2021-10-05 13:53:22 +02:00
} ;
assert! ( task . run ( ) . await . is_err ( ) ) ;
}
}