Compress the snapshot in a tarball

This commit is contained in:
Kerollmops 2022-10-25 15:51:15 +02:00 committed by Clément Renault
parent 4cafc63561
commit 942b7c338b
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
8 changed files with 65 additions and 7 deletions

2
Cargo.lock generated
View File

@ -2341,6 +2341,7 @@ dependencies = [
"csv", "csv",
"either", "either",
"enum-iterator", "enum-iterator",
"flate2",
"fst", "fst",
"insta", "insta",
"meili-snap", "meili-snap",
@ -2350,6 +2351,7 @@ dependencies = [
"roaring", "roaring",
"serde", "serde",
"serde_json", "serde_json",
"tar",
"thiserror", "thiserror",
"time", "time",
"tokio", "tokio",

View File

@ -18,6 +18,7 @@ one indexing operation.
*/ */
use std::collections::HashSet; use std::collections::HashSet;
use std::ffi::OsStr;
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::BufWriter; use std::io::BufWriter;
@ -33,7 +34,7 @@ use meilisearch_types::milli::update::{
use meilisearch_types::milli::{self, BEU32}; use meilisearch_types::milli::{self, BEU32};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task};
use meilisearch_types::{Index, VERSION_FILE_NAME}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
@ -621,7 +622,23 @@ impl IndexScheduler {
let auth = milli::heed::EnvOpenOptions::new().open(src)?; let auth = milli::heed::EnvOpenOptions::new().open(src)?;
auth.copy_to_path(dst, CompactionOption::Enabled)?; auth.copy_to_path(dst, CompactionOption::Enabled)?;
todo!("tar-gz and append .snapshot at the end of the file"); // 5. Copy and tarball the flat snapshot
// 5.1 Find the original name of the database
// TODO find a better way to get this path
let mut base_path = self.env.path().to_owned();
base_path.pop();
let db_name = base_path.file_name().and_then(OsStr::to_str).unwrap_or("data.ms");
// 5.2 Tarball the content of the snapshot in a tempfile with a .snapshot extension
let snapshot_path = self.snapshots_path.join(db_name).with_extension("snapshot");
let temp_snapshot_file = tempfile::NamedTempFile::new_in(&self.snapshots_path)?;
compression::to_tar_gz(temp_snapshot_dir.path(), temp_snapshot_file.path())?;
let file = temp_snapshot_file.persist(&snapshot_path)?;
// 5.3 Change the permission to make the snapshot readonly
let mut permissions = file.metadata()?.permissions();
permissions.set_readonly(true);
file.set_permissions(permissions)?;
for task in &mut tasks { for task in &mut tasks {
task.status = Status::Succeeded; task.status = Status::Succeeded;

View File

@ -32,6 +32,8 @@ pub enum Error {
FileStore(#[from] file_store::Error), FileStore(#[from] file_store::Error),
#[error(transparent)] #[error(transparent)]
IoError(#[from] std::io::Error), IoError(#[from] std::io::Error),
#[error(transparent)]
Persist(#[from] tempfile::PersistError),
#[error(transparent)] #[error(transparent)]
Anyhow(#[from] anyhow::Error), Anyhow(#[from] anyhow::Error),
@ -59,10 +61,11 @@ impl ErrorCode for Error {
Error::Dump(e) => e.error_code(), Error::Dump(e) => e.error_code(),
Error::Milli(e) => e.error_code(), Error::Milli(e) => e.error_code(),
Error::ProcessBatchPanicked => Code::Internal, Error::ProcessBatchPanicked => Code::Internal,
// TODO: TAMO: are all these errors really internal? // TODO: TAMO: are all these errors really internal?
Error::Heed(_) => Code::Internal, Error::Heed(_) => Code::Internal,
Error::FileStore(_) => Code::Internal, Error::FileStore(_) => Code::Internal,
Error::IoError(_) => Code::Internal, Error::IoError(_) => Code::Internal,
Error::Persist(_) => Code::Internal,
Error::Anyhow(_) => Code::Internal, Error::Anyhow(_) => Code::Internal,
Error::CorruptedTaskQueue => Code::Internal, Error::CorruptedTaskQueue => Code::Internal,
Error::CorruptedDump => Code::Internal, Error::CorruptedDump => Code::Internal,

View File

@ -955,6 +955,7 @@ mod tests {
use meilisearch_types::milli::update::IndexDocumentsMethod::{ use meilisearch_types::milli::update::IndexDocumentsMethod::{
ReplaceDocuments, UpdateDocuments, ReplaceDocuments, UpdateDocuments,
}; };
use meilisearch_types::VERSION_FILE_NAME;
use tempfile::TempDir; use tempfile::TempDir;
use time::Duration; use time::Duration;
use uuid::Uuid; use uuid::Uuid;

View File

@ -35,6 +35,7 @@ use meilisearch_auth::AuthController;
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod};
use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::versioning::{check_version_file, create_version_file};
use meilisearch_types::{milli, VERSION_FILE_NAME}; use meilisearch_types::{milli, VERSION_FILE_NAME};
pub use option::Opt; pub use option::Opt;
@ -128,23 +129,23 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(IndexScheduler, AuthContr
match ( match (
index_scheduler_builder().map_err(anyhow::Error::from), index_scheduler_builder().map_err(anyhow::Error::from),
auth_controller_builder().map_err(anyhow::Error::from), auth_controller_builder().map_err(anyhow::Error::from),
create_version_file(&opt.db_path).map_err(anyhow::Error::from),
) { ) {
(Ok(i), Ok(a)) => Ok((i, a)), (Ok(i), Ok(a), Ok(())) => Ok((i, a)),
(Err(e), _) | (_, Err(e)) => { (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
std::fs::remove_dir_all(&opt.db_path)?; std::fs::remove_dir_all(&opt.db_path)?;
Err(e) Err(e)
} }
} }
}; };
let empty_db = is_empty_db(&opt.db_path);
let (index_scheduler, auth_controller) = if let Some(ref _path) = opt.import_snapshot { let (index_scheduler, auth_controller) = if let Some(ref _path) = opt.import_snapshot {
// handle the snapshot with something akin to the dumps // handle the snapshot with something akin to the dumps
// + the snapshot interval / spawning a thread // + the snapshot interval / spawning a thread
todo!(); todo!();
} else if let Some(ref path) = opt.import_dump { } else if let Some(ref path) = opt.import_dump {
let empty_db = is_empty_db(&opt.db_path);
let src_path_exists = path.exists(); let src_path_exists = path.exists();
if empty_db && src_path_exists { if empty_db && src_path_exists {
let (mut index_scheduler, mut auth_controller) = meilisearch_builder()?; let (mut index_scheduler, mut auth_controller) = meilisearch_builder()?;
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) { match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
@ -172,6 +173,9 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(IndexScheduler, AuthContr
} }
} }
} else { } else {
if !empty_db {
check_version_file(&opt.db_path)?;
}
meilisearch_builder()? meilisearch_builder()?
}; };

View File

@ -10,6 +10,7 @@ anyhow = "1.0.65"
csv = "1.1.6" csv = "1.1.6"
either = { version = "1.6.1", features = ["serde"] } either = { version = "1.6.1", features = ["serde"] }
enum-iterator = "1.1.3" enum-iterator = "1.1.3"
flate2 = "1.0.24"
fst = "0.4.7" fst = "0.4.7"
milli = { git = "https://github.com/meilisearch/milli.git", branch = "indexation-abortion", default-features = false } milli = { git = "https://github.com/meilisearch/milli.git", branch = "indexation-abortion", default-features = false }
proptest = { version = "1.0.0", optional = true } proptest = { version = "1.0.0", optional = true }
@ -17,6 +18,7 @@ proptest-derive = { version = "0.3.0", optional = true }
roaring = { version = "0.10.0", features = ["serde"] } roaring = { version = "0.10.0", features = ["serde"] }
serde = { version = "1.0.145", features = ["derive"] } serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0.85" serde_json = "1.0.85"
tar = "0.4.38"
thiserror = "1.0.30" thiserror = "1.0.30"
time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] }
tokio = "1.0" tokio = "1.0"

View File

@ -0,0 +1,28 @@
use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::Path;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use tar::{Archive, Builder};
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let mut f = File::create(dest)?;
let gz_encoder = GzEncoder::new(&mut f, Compression::default());
let mut tar_encoder = Builder::new(gz_encoder);
tar_encoder.append_dir_all(".", src)?;
let gz_encoder = tar_encoder.into_inner()?;
gz_encoder.finish()?;
f.flush()?;
Ok(())
}
pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let f = File::open(&src)?;
let gz = GzDecoder::new(f);
let mut ar = Archive::new(gz);
create_dir_all(&dest)?;
ar.unpack(&dest)?;
Ok(())
}

View File

@ -1,3 +1,4 @@
pub mod compression;
pub mod document_formats; pub mod document_formats;
pub mod error; pub mod error;
pub mod index_uid; pub mod index_uid;