From 0f94ef8abc5fa3e994afb19bccab8f5f7f571757 Mon Sep 17 00:00:00 2001 From: tamo Date: Thu, 29 Apr 2021 14:45:08 +0200 Subject: [PATCH] WIP: dump --- meilisearch-http/src/data/mod.rs | 4 +- .../src/index_controller/dump/mod.rs | 34 ++++++++++------- .../src/index_controller/dump/v1.rs | 13 +++---- .../src/index_controller/index_actor/mod.rs | 4 ++ meilisearch-http/src/index_controller/mod.rs | 2 +- .../index_controller/update_actor/actor.rs | 1 - .../src/index_controller/update_actor/mod.rs | 3 +- .../update_actor/update_store.rs | 37 ++++++++++++++----- meilisearch-http/src/main.rs | 2 +- .../tests/settings/get_settings.rs | 4 +- 10 files changed, 64 insertions(+), 40 deletions(-) diff --git a/meilisearch-http/src/data/mod.rs b/meilisearch-http/src/data/mod.rs index e2bb7fbfb..c7979210e 100644 --- a/meilisearch-http/src/data/mod.rs +++ b/meilisearch-http/src/data/mod.rs @@ -55,10 +55,10 @@ impl ApiKeys { } impl Data { - pub async fn new(options: Opt) -> anyhow::Result { + pub fn new(options: Opt) -> anyhow::Result { let path = options.db_path.clone(); - let index_controller = IndexController::new(&path, &options).await?; + let index_controller = IndexController::new(&path, &options)?; let mut api_keys = ApiKeys { master: options.clone().master_key, diff --git a/meilisearch-http/src/index_controller/dump/mod.rs b/meilisearch-http/src/index_controller/dump/mod.rs index c12041e9d..2bd5f167e 100644 --- a/meilisearch-http/src/index_controller/dump/mod.rs +++ b/meilisearch-http/src/index_controller/dump/mod.rs @@ -1,7 +1,7 @@ mod v1; mod v2; -use std::{fs::File, path::{Path, PathBuf}, sync::Arc}; +use std::{fs::File, path::{Path}, sync::Arc}; use anyhow::bail; use heed::EnvOpenOptions; @@ -10,12 +10,10 @@ use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use serde::{Deserialize, Serialize}; use tempfile::TempDir; -use super::update_actor::UpdateActorHandle; -use super::uuid_resolver::UuidResolverHandle; use super::IndexMetadata; use crate::index::Index; use crate::index_controller::uuid_resolver; -use crate::{helpers::compression, index::Settings}; +use crate::helpers::compression; #[derive(Debug, Serialize, Deserialize, Copy, Clone)] enum DumpVersion { @@ -24,7 +22,7 @@ enum DumpVersion { } impl DumpVersion { - const CURRENT: Self = Self::V2; + // const CURRENT: Self = Self::V2; /// Select the good importation function from the `DumpVersion` of metadata pub fn import_index(self, size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> { @@ -37,23 +35,25 @@ impl DumpVersion { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct DumpMetadata { +pub struct Metadata { indexes: Vec, db_version: String, dump_version: DumpVersion, } -impl DumpMetadata { - /// Create a DumpMetadata with the current dump version of meilisearch. +impl Metadata { + /* + /// Create a Metadata with the current dump version of meilisearch. pub fn new(indexes: Vec, db_version: String) -> Self { - DumpMetadata { + Metadata { indexes, db_version, dump_version: DumpVersion::CURRENT, } } + */ - /// Extract DumpMetadata from `metadata.json` file present at provided `dir_path` + /// Extract Metadata from `metadata.json` file present at provided `dir_path` fn from_path(dir_path: &Path) -> anyhow::Result { let path = dir_path.join("metadata.json"); let file = File::open(path)?; @@ -63,7 +63,8 @@ impl DumpMetadata { Ok(metadata) } - /// Write DumpMetadata in `metadata.json` file at provided `dir_path` + /* + /// Write Metadata in `metadata.json` file at provided `dir_path` fn to_path(&self, dir_path: &Path) -> anyhow::Result<()> { let path = dir_path.join("metadata.json"); let file = File::create(path)?; @@ -72,8 +73,10 @@ impl DumpMetadata { Ok(()) } + */ } +/* pub struct DumpService { uuid_resolver_handle: R, update_handle: U, @@ -148,7 +151,9 @@ where Ok(()) } } +*/ +/* /// Write Settings in `settings.json` file at provided `dir_path` fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()> { let path = dir_path.join("settings.json"); @@ -158,6 +163,7 @@ fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()> Ok(()) } +*/ pub fn load_dump( db_path: impl AsRef, @@ -170,12 +176,12 @@ pub fn load_dump( let uuid_resolver = uuid_resolver::HeedUuidStore::new(&db_path)?; // extract the dump in a temporary directory - let tmp_dir = TempDir::new()?; + let tmp_dir = TempDir::new_in(db_path)?; let tmp_dir_path = tmp_dir.path(); compression::from_tar_gz(dump_path, tmp_dir_path)?; // read dump metadata - let metadata = DumpMetadata::from_path(&tmp_dir_path)?; + let metadata = Metadata::from_path(&tmp_dir_path)?; // remove indexes which have same `uuid` than indexes to import and create empty indexes let existing_index_uids = uuid_resolver.list()?; @@ -207,7 +213,7 @@ pub fn load_dump( // this cannot fail since we created all the missing uuid in the previous loop let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap(); let index_path = db_path.join(&format!("indexes/index-{}", uuid)); - let update_path = db_path.join(&format!("updates/updates-{}", uuid)); // TODO: add the update db + // let update_path = db_path.join(&format!("updates/updates-{}", uuid)); // TODO: add the update db info!("Importing dump from {} into {}...", dump_path.display(), index_path.display()); metadata.dump_version.import_index(size, &dump_path, &index_path).unwrap(); diff --git a/meilisearch-http/src/index_controller/dump/v1.rs b/meilisearch-http/src/index_controller/dump/v1.rs index 89b55c51e..3a20299f3 100644 --- a/meilisearch-http/src/index_controller/dump/v1.rs +++ b/meilisearch-http/src/index_controller/dump/v1.rs @@ -84,19 +84,13 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow: std::fs::create_dir_all(&index_path)?; let mut options = EnvOpenOptions::new(); options.map_size(size); - let index = milli::Index::new(options, index_path)?; + let index = milli::Index::new(options.clone(), index_path)?; let index = Index(Arc::new(index)); // extract `settings.json` file and import content let settings = import_settings(&dump_path)?; dbg!(&settings); - let mut settings: index_controller::Settings = settings.into(); - if settings.displayed_attributes.as_ref().map_or(false, |o| o.as_ref().map_or(false, |v| v.contains(&String::from("*")))) { - settings.displayed_attributes = None; - } - if settings.searchable_attributes.as_ref().map_or(false, |o| o.as_ref().map_or(false, |v| v.contains(&String::from("*")))) { - settings.searchable_attributes = None; - } + let settings: index_controller::Settings = settings.into(); let update_builder = UpdateBuilder::new(0); index.update_settings(&settings, update_builder)?; @@ -112,6 +106,9 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow: None, )?; + // at this point we should handle the updates, but since the update logic is not handled in + // meilisearch we are just going to ignore this part + // the last step: we extract the original milli::Index and close it Arc::try_unwrap(index.0) .map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index") diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index 7522acc67..e06658ff8 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -178,6 +178,10 @@ mod test { self.as_ref().snapshot(uuid, path).await } + async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + self.as_ref().dump(uuid, path).await + } + async fn get_index_stats(&self, uuid: Uuid) -> IndexResult { self.as_ref().get_index_stats(uuid).await } diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 9ed6f10e4..ebcc9ed76 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -78,7 +78,7 @@ pub struct Stats { } impl IndexController { - pub async fn new(path: impl AsRef, options: &Opt) -> anyhow::Result { + pub fn new(path: impl AsRef, options: &Opt) -> anyhow::Result { let index_size = options.max_mdb_size.get_bytes() as usize; let update_store_size = options.max_udb_size.get_bytes() as usize; diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index 6789bc6ce..fee907001 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -91,7 +91,6 @@ where meta: UpdateMeta, mut payload: mpsc::Receiver>, ) -> Result { - let file_path = match meta { UpdateMeta::DocumentsAddition { .. } | UpdateMeta::DeleteDocuments => { diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index 36390c290..eeca6629f 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -13,9 +13,8 @@ use crate::index_controller::{UpdateMeta, UpdateStatus}; use actor::UpdateActor; use message::UpdateMsg; -use update_store::UpdateStore; -pub use update_store::UpdateStoreInfo; +pub use update_store::{UpdateStore, UpdateStoreInfo}; pub use handle_impl::UpdateActorHandleImpl; pub type Result = std::result::Result; diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index 6f698e693..27018764f 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -177,11 +177,7 @@ pub struct UpdateStore { } impl UpdateStore { - pub fn open( - mut options: EnvOpenOptions, - path: impl AsRef, - index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, - ) -> anyhow::Result> { + pub fn create(mut options: EnvOpenOptions, path: impl AsRef) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { options.max_dbs(5); let env = options.open(path)?; @@ -189,21 +185,30 @@ impl UpdateStore { let next_update_id = env.create_database(Some("next-update-id"))?; let updates = env.create_database(Some("updates"))?; - let (notification_sender, mut notification_receiver) = mpsc::channel(10); + let state = Arc::new(StateLock::from_state(State::Idle)); + + let (notification_sender, notification_receiver) = mpsc::channel(10); // Send a first notification to trigger the process. let _ = notification_sender.send(()); - let state = Arc::new(StateLock::from_state(State::Idle)); + Ok((Self { env, pending_queue, next_update_id, updates, state, notification_sender }, notification_receiver)) + } + + pub fn open( + options: EnvOpenOptions, + path: impl AsRef, + index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, + ) -> anyhow::Result> { + let (update_store, mut notification_receiver) = Self::create(options, path)?; + let update_store = Arc::new(update_store); // Init update loop to perform any pending updates at launch. // Since we just launched the update store, and we still own the receiving end of the // channel, this call is guaranteed to succeed. - notification_sender + update_store.notification_sender .try_send(()) .expect("Failed to init update store"); - let update_store = Arc::new(UpdateStore { env, pending_queue, next_update_id, updates, state, notification_sender }); - // We need a weak reference so we can take ownership on the arc later when we // want to close the index. let update_store_weak = Arc::downgrade(&update_store); @@ -283,6 +288,18 @@ impl UpdateStore { Ok(meta) } + /// Push already processed updates in the UpdateStore. This is useful for the dumps + pub fn register_already_processed_update ( + &self, + result: UpdateStatus, + index_uuid: Uuid, + ) -> heed::Result<()> { + let mut wtxn = self.env.write_txn()?; + let (_global_id, update_id) = self.next_update_id(&mut wtxn, index_uuid)?; + self.updates.remap_key_type::().put(&mut wtxn, &(index_uuid, update_id), &result)?; + wtxn.commit() + } + /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. diff --git a/meilisearch-http/src/main.rs b/meilisearch-http/src/main.rs index 592b70d30..b16f3c0e1 100644 --- a/meilisearch-http/src/main.rs +++ b/meilisearch-http/src/main.rs @@ -54,7 +54,7 @@ async fn main() -> Result<(), MainError> { //snapshot::load_snapshot(&opt.db_path, path, opt.ignore_snapshot_if_db_exists, opt.ignore_missing_snapshot)?; //} - let data = Data::new(opt.clone()).await?; + let data = Data::new(opt.clone())?; //if !opt.no_analytics { //let analytics_data = data.clone(); diff --git a/meilisearch-http/tests/settings/get_settings.rs b/meilisearch-http/tests/settings/get_settings.rs index e5f51d7f0..a39dd54e9 100644 --- a/meilisearch-http/tests/settings/get_settings.rs +++ b/meilisearch-http/tests/settings/get_settings.rs @@ -19,7 +19,7 @@ async fn get_settings() { assert_eq!(settings.keys().len(), 6); assert_eq!(settings["displayedAttributes"], json!(["*"])); assert_eq!(settings["searchableAttributes"], json!(["*"])); - assert_eq!(settings["attributesForFaceting"], json!(null)); + assert_eq!(settings["attributesForFaceting"], json!({})); assert_eq!(settings["distinctAttribute"], json!(null)); assert_eq!( settings["rankingRules"], @@ -82,7 +82,9 @@ async fn reset_all_settings() { assert_eq!(response["searchableAttributes"], json!(["bar"])); assert_eq!(response["stopWords"], json!(["the"])); + eprintln!("BEFORE"); index.delete_settings().await; + eprintln!("AFTER"); index.wait_update_id(1).await; let (response, code) = index.settings().await;