From b258f4f394270ba3bc998a7f13d42312cae4675b Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Thu, 27 May 2021 14:30:20 +0200 Subject: [PATCH] fix dump import --- meilisearch-http/src/index/dump.rs | 12 +- meilisearch-http/src/index/updates.rs | 22 ++++ .../src/index_controller/dump_actor/actor.rs | 63 ++--------- .../index_controller/dump_actor/loaders/v2.rs | 17 ++- .../src/index_controller/dump_actor/mod.rs | 105 +++++++++++++----- meilisearch-http/src/index_controller/mod.rs | 12 +- .../index_controller/uuid_resolver/store.rs | 1 + 7 files changed, 133 insertions(+), 99 deletions(-) diff --git a/meilisearch-http/src/index/dump.rs b/meilisearch-http/src/index/dump.rs index 35f5159e5..9dbb14fbd 100644 --- a/meilisearch-http/src/index/dump.rs +++ b/meilisearch-http/src/index/dump.rs @@ -9,12 +9,11 @@ use anyhow::bail; use crate::option::IndexerOpts; -use super::update_handler::UpdateHandler; -use super::{Checked, Index, Settings}; +use super::{Unchecked, Index, Settings, update_handler::UpdateHandler}; #[derive(Serialize, Deserialize)] struct DumpMeta { - settings: Settings, + settings: Settings, primary_key: Option, } @@ -33,7 +32,6 @@ impl Index { } fn dump_documents(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { - println!("dumping documents"); let document_file_path = path.as_ref().join(DATA_FILE_NAME); let mut document_file = File::create(&document_file_path)?; @@ -61,11 +59,10 @@ impl Index { } fn dump_meta(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { - println!("dumping settings"); let meta_file_path = path.as_ref().join(META_FILE_NAME); let mut meta_file = File::create(&meta_file_path)?; - let settings = self.settings_txn(txn)?; + let settings = self.settings_txn(txn)?.into_unchecked(); let primary_key = self.primary_key(txn)?.map(String::from); let meta = DumpMeta { settings, primary_key }; @@ -84,12 +81,13 @@ impl Index { .as_ref() .file_name() .with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; - let dst_dir_path = dst.as_ref().join(dir_name); + let dst_dir_path = dst.as_ref().join("indexes").join(dir_name); create_dir_all(&dst_dir_path)?; let meta_path = src.as_ref().join(META_FILE_NAME); let mut meta_file = File::open(meta_path)?; let DumpMeta { settings, primary_key } = serde_json::from_reader(&mut meta_file)?; + let settings = settings.check(); let index = Self::open(&dst_dir_path, size as usize)?; let mut txn = index.write_txn()?; diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index 2b489451b..053ca6a60 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -87,6 +87,28 @@ impl Settings { _kind: PhantomData, } } + + pub fn into_unchecked(self) -> Settings { + let Self { + displayed_attributes, + searchable_attributes, + attributes_for_faceting, + ranking_rules, + stop_words, + distinct_attribute, + .. + } = self; + + Settings { + displayed_attributes, + searchable_attributes, + attributes_for_faceting, + ranking_rules, + stop_words, + distinct_attribute, + _kind: PhantomData, + } + } } impl Settings { diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index 1abceef47..b93d6f42d 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -1,17 +1,18 @@ -use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus}; -use crate::{helpers::compression, index_controller::dump_actor::Metadata}; -use crate::index_controller::{update_actor, uuid_resolver}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + use async_stream::stream; use chrono::Utc; use futures::stream::StreamExt; use log::{error, info}; use update_actor::UpdateActorHandle; use uuid_resolver::UuidResolverHandle; -use std::{fs::File, path::{Path, PathBuf}, sync::Arc}; -use tokio::{fs::create_dir_all, sync::{mpsc, oneshot, RwLock}}; +use tokio::sync::{mpsc, oneshot, RwLock}; + +use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus, DumpTask}; +use crate::index_controller::{update_actor, uuid_resolver}; pub const CONCURRENT_DUMP_MSG: usize = 10; -const META_FILE_NAME: &'static str = "metadata.json"; pub struct DumpActor { inbox: Option>, @@ -155,54 +156,4 @@ where }) ) } - -} - -struct DumpTask { - path: PathBuf, - uuid_resolver: U, - update_handle: P, - uid: String, - update_db_size: u64, - index_db_size: u64, -} - -impl DumpTask -where - U: UuidResolverHandle + Send + Sync + Clone + 'static, - P: UpdateActorHandle + Send + Sync + Clone + 'static, -{ - async fn run(self) -> anyhow::Result<()> { - info!("Performing dump."); - - create_dir_all(&self.path).await?; - - let path_clone = self.path.clone(); - let temp_dump_dir = tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??; - let temp_dump_path = temp_dump_dir.path().to_owned(); - - let meta = Metadata::new_v2(self.index_db_size, self.update_db_size); - let meta_path = temp_dump_path.join(META_FILE_NAME); - let mut meta_file = File::create(&meta_path)?; - serde_json::to_writer(&mut meta_file, &meta)?; - - let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?; - - self.update_handle.dump(uuids, temp_dump_path.clone()).await?; - - let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { - let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?; - compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?; - - let dump_path = self.path.join(format!("{}.dump", self.uid)); - temp_dump_file.persist(&dump_path)?; - - Ok(dump_path) - }) - .await??; - - info!("Created dump in {:?}.", dump_path); - - Ok(()) - } } diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs index b9f89ebbf..def47fecb 100644 --- a/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs @@ -2,7 +2,7 @@ use std::path::Path; use anyhow::Context; use chrono::{DateTime, Utc}; -use log::info; +use log::{info, warn}; use serde::{Deserialize, Serialize}; use crate::{index::Index, index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore}, option::IndexerOpts}; @@ -29,6 +29,8 @@ impl MetadataV2 { self, src: impl AsRef, dst: impl AsRef, + _index_db_size: u64, + _update_db_size: u64, indexing_options: &IndexerOpts, ) -> anyhow::Result<()> { info!( @@ -44,23 +46,26 @@ impl MetadataV2 { let tmp_dst = tempfile::tempdir_in(dst_dir)?; info!("Loading index database."); - let uuid_resolver_path = dst.as_ref().join("uuid_resolver/"); - std::fs::create_dir_all(&uuid_resolver_path)?; - HeedUuidStore::load_dump(src.as_ref(), tmp_dst.as_ref())?; + HeedUuidStore::load_dump(src.as_ref(), &tmp_dst)?; info!("Loading updates."); - UpdateStore::load_dump(&src, &tmp_dst.as_ref(), self.update_db_size)?; + UpdateStore::load_dump(&src, &tmp_dst, self.update_db_size)?; info!("Loading indexes"); let indexes_path = src.as_ref().join("indexes"); let indexes = indexes_path.read_dir()?; for index in indexes { let index = index?; - Index::load_dump(&index.path(), &dst, self.index_db_size, indexing_options)?; + Index::load_dump(&index.path(), &tmp_dst, self.index_db_size, indexing_options)?; } // Persist and atomically rename the db let persisted_dump = tmp_dst.into_path(); + if dst.as_ref().exists() { + warn!("Overwriting database at {}", dst.as_ref().display()); + std::fs::remove_dir_all(&dst)?; + } + std::fs::rename(&persisted_dump, &dst)?; Ok(()) diff --git a/meilisearch-http/src/index_controller/dump_actor/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs index b54783f75..2b7d8a3e0 100644 --- a/meilisearch-http/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -1,6 +1,7 @@ -use std::{fs::File, path::Path}; +use std::fs::File; +use std::path::{Path, PathBuf}; -use log::error; +use log::{error, info}; #[cfg(test)] use mockall::automock; use serde::{Deserialize, Serialize}; @@ -12,16 +13,18 @@ use loaders::v2::MetadataV2; pub use actor::DumpActor; pub use handle_impl::*; pub use message::DumpMsg; +use tokio::fs::create_dir_all; -use crate::option::IndexerOpts; - -use super::uuid_resolver::store::UuidStore; +use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle}; +use crate::{helpers::compression, option::IndexerOpts}; mod actor; mod handle_impl; mod loaders; mod message; +const META_FILE_NAME: &'static str = "metadata.json"; + pub type DumpResult = std::result::Result; #[derive(Error, Debug)] @@ -66,23 +69,6 @@ impl Metadata { let meta = MetadataV2::new(index_db_size, update_db_size); Self::V2 { meta } } - /// Extract Metadata from `metadata.json` file present at provided `dir_path` - fn from_path(dir_path: &Path) -> anyhow::Result { - let path = dir_path.join("metadata.json"); - let file = File::open(path)?; - let reader = std::io::BufReader::new(file); - let metadata = serde_json::from_reader(reader)?; - - Ok(metadata) - } - - /// Write Metadata in `metadata.json` file at provided `dir_path` - pub async fn to_path(&self, dir_path: &Path) -> anyhow::Result<()> { - let path = dir_path.join("metadata.json"); - tokio::fs::write(path, serde_json::to_string(self)?).await?; - - Ok(()) - } } #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] @@ -125,21 +111,84 @@ impl DumpInfo { } } -pub fn load_dump( +pub fn load_dump( dst_path: impl AsRef, src_path: impl AsRef, - _index_db_size: u64, - _update_db_size: u64, + index_db_size: u64, + update_db_size: u64, indexer_opts: &IndexerOpts, ) -> anyhow::Result<()> { - let meta_path = src_path.as_ref().join("metadat.json"); + let tmp_src = tempfile::tempdir_in(".")?; + let tmp_src_path = tmp_src.path(); + + compression::from_tar_gz(&src_path, tmp_src_path)?; + + let meta_path = tmp_src_path.join(META_FILE_NAME); let mut meta_file = File::open(&meta_path)?; let meta: Metadata = serde_json::from_reader(&mut meta_file)?; match meta { - Metadata::V1 { meta } => meta.load_dump(src_path, dst_path)?, - Metadata::V2 { meta } => meta.load_dump(src_path.as_ref(), dst_path.as_ref(), indexer_opts)?, + Metadata::V1 { meta } => meta.load_dump(&tmp_src_path, dst_path)?, + Metadata::V2 { meta } => meta.load_dump( + &tmp_src_path, + dst_path.as_ref(), + index_db_size, + update_db_size, + indexer_opts, + )?, } Ok(()) } + +struct DumpTask { + path: PathBuf, + uuid_resolver: U, + update_handle: P, + uid: String, + update_db_size: u64, + index_db_size: u64, +} + +impl DumpTask +where + U: UuidResolverHandle + Send + Sync + Clone + 'static, + P: UpdateActorHandle + Send + Sync + Clone + 'static, +{ + async fn run(self) -> anyhow::Result<()> { + info!("Performing dump."); + + create_dir_all(&self.path).await?; + + let path_clone = self.path.clone(); + let temp_dump_dir = + tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??; + let temp_dump_path = temp_dump_dir.path().to_owned(); + + let meta = Metadata::new_v2(self.index_db_size, self.update_db_size); + let meta_path = temp_dump_path.join(META_FILE_NAME); + let mut meta_file = File::create(&meta_path)?; + serde_json::to_writer(&mut meta_file, &meta)?; + + let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?; + + self.update_handle + .dump(uuids, temp_dump_path.clone()) + .await?; + + let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { + let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?; + compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?; + + let dump_path = self.path.join(format!("{}.dump", self.uid)); + temp_dump_file.persist(&dump_path)?; + + Ok(dump_path) + }) + .await??; + + info!("Created dump in {:?}.", dump_path); + + Ok(()) + } +} diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 69415a1cd..18ba6dee3 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -25,6 +25,8 @@ use uuid_resolver::{UuidResolverError, UuidResolverHandle}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::option::Opt; +use self::dump_actor::load_dump; + mod dump_actor; mod index_actor; mod snapshot; @@ -91,8 +93,14 @@ impl IndexController { options.ignore_snapshot_if_db_exists, options.ignore_missing_snapshot, )?; - } else if let Some(ref _path) = options.import_dump { - todo!("implement load dump") + } else if let Some(ref src_path) = options.import_dump { + load_dump( + &options.db_path, + src_path, + options.max_mdb_size.get_bytes(), + options.max_udb_size.get_bytes(), + &options.indexer_options, + )?; } std::fs::create_dir_all(&path)?; diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index 876c2454c..2fd9ff301 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -178,6 +178,7 @@ impl HeedUuidStore { Ok(0) => break, Ok(_) => { let DumpEntry { uuid, uid } = serde_json::from_str(&line)?; + println!("importing {} {}", uid, uuid); db.db.put(&mut txn, &uid, uuid.as_bytes())?; } Err(e) => Err(e)?,