From e818c33fec0cb8e8af281335dd40150bca1ad1ce Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Wed, 26 May 2021 20:42:09 +0200 Subject: [PATCH] implement load uuid_resolver --- .../src/index_controller/dump_actor/actor.rs | 68 +++---- .../dump_actor/handle_impl.rs | 6 +- .../dump_actor/loaders/mod.rs | 2 + .../index_controller/dump_actor/loaders/v1.rs | 137 ++++++++++++++ .../index_controller/dump_actor/loaders/v2.rs | 179 ++++++++++++++++++ .../src/index_controller/dump_actor/mod.rs | 148 +++------------ .../src/index_controller/dump_actor/v1.rs | 122 ------------ .../src/index_controller/dump_actor/v2.rs | 89 --------- meilisearch-http/src/index_controller/mod.rs | 30 +-- .../src/index_controller/update_actor/mod.rs | 2 +- .../update_actor/store/dump.rs | 2 +- .../update_actor/store/mod.rs | 6 +- .../src/index_controller/uuid_resolver/mod.rs | 2 +- .../index_controller/uuid_resolver/store.rs | 56 +++++- 14 files changed, 438 insertions(+), 411 deletions(-) create mode 100644 meilisearch-http/src/index_controller/dump_actor/loaders/mod.rs create mode 100644 meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs create mode 100644 meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs delete mode 100644 meilisearch-http/src/index_controller/dump_actor/v1.rs delete mode 100644 meilisearch-http/src/index_controller/dump_actor/v2.rs diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index 2d931dcbd..31378f89c 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -1,27 +1,26 @@ use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus}; use crate::helpers::compression; -use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata}; +use crate::index_controller::{update_actor, uuid_resolver}; use async_stream::stream; use chrono::Utc; use futures::stream::StreamExt; use log::{error, info}; use std::{ - collections::HashSet, path::{Path, PathBuf}, sync::Arc, }; -use tokio::sync::{mpsc, oneshot, RwLock}; -use uuid::Uuid; +use tokio::{fs::create_dir_all, sync::{mpsc, oneshot, RwLock}}; pub const CONCURRENT_DUMP_MSG: usize = 10; -pub struct DumpActor { +pub struct DumpActor { inbox: Option>, uuid_resolver: UuidResolver, - index: Index, update: Update, dump_path: PathBuf, dump_info: Arc>>, + _update_db_size: u64, + _index_db_size: u64, } /// Generate uid from creation date @@ -29,26 +28,27 @@ fn generate_uid() -> String { Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() } -impl DumpActor +impl DumpActor where UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, - Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static, Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, { pub fn new( inbox: mpsc::Receiver, uuid_resolver: UuidResolver, - index: Index, update: Update, dump_path: impl AsRef, + _index_db_size: u64, + _update_db_size: u64, ) -> Self { Self { inbox: Some(inbox), uuid_resolver, - index, update, dump_path: dump_path.as_ref().into(), dump_info: Arc::new(RwLock::new(None)), + _index_db_size, + _update_db_size, } } @@ -155,7 +155,7 @@ where } async fn perform_dump( - dump_path: PathBuf, + path: PathBuf, uuid_resolver: UuidResolver, update_handle: Update, uid: String, @@ -166,19 +166,23 @@ where { info!("Performing dump."); - let dump_path_clone = dump_path.clone(); - let temp_dump_path = tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(dump_path_clone)).await??; + create_dir_all(&path).await?; - let uuids = uuid_resolver.dump(temp_dump_path.path().to_owned()).await?; + let path_clone = path.clone(); + let temp_dump_dir = tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??; + let temp_dump_path = temp_dump_dir.path().to_owned(); - update_handle.dump(uuids, temp_dump_path.path().to_owned()).await?; + let uuids = uuid_resolver.dump(temp_dump_path.clone()).await?; + + update_handle.dump(uuids, temp_dump_path.clone()).await?; - let dump_path = dump_path.join(format!("{}.dump", uid)); let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { - let temp_dump_file = tempfile::NamedTempFile::new_in(&dump_path)?; - let temp_dump_file_path = temp_dump_file.path().to_owned(); - compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?; + let temp_dump_file = tempfile::NamedTempFile::new_in(&path)?; + compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?; + + let dump_path = path.join(format!("{}.dump", uid)); temp_dump_file.persist(&dump_path)?; + Ok(dump_path) }) .await??; @@ -187,29 +191,3 @@ where Ok(()) } - -async fn list_indexes( - uuid_resolver: &UuidResolver, - index: &Index, -) -> anyhow::Result> -where - UuidResolver: uuid_resolver::UuidResolverHandle, - Index: index_actor::IndexActorHandle, -{ - let uuids = uuid_resolver.list().await?; - - let mut ret = Vec::new(); - - for (uid, uuid) in uuids { - let meta = index.get_index_meta(uuid).await?; - let meta = IndexMetadata { - uuid, - name: uid.clone(), - uid, - meta, - }; - ret.push(meta); - } - - Ok(ret) -} diff --git a/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs index 575119410..ff663798f 100644 --- a/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs @@ -29,13 +29,15 @@ impl DumpActorHandleImpl { pub fn new( path: impl AsRef, uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl, - index: crate::index_controller::index_actor::IndexActorHandleImpl, update: crate::index_controller::update_actor::UpdateActorHandleImpl, + index_db_size: u64, + update_db_size: u64, ) -> anyhow::Result { let (sender, receiver) = mpsc::channel(10); - let actor = DumpActor::new(receiver, uuid_resolver, index, update, path); + let actor = DumpActor::new(receiver, uuid_resolver, update, path, index_db_size, update_db_size); tokio::task::spawn(actor.run()); + Ok(Self { sender }) } } diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/mod.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/mod.rs new file mode 100644 index 000000000..ae6adc7cf --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/mod.rs @@ -0,0 +1,2 @@ +pub mod v1; +pub mod v2; diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs new file mode 100644 index 000000000..76207ff7b --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs @@ -0,0 +1,137 @@ +use std::path::Path; + +use serde::{Deserialize, Serialize}; + +use crate::index_controller::IndexMetadata; + +#[derive(Serialize, Deserialize, Debug)] +pub struct MetadataV1 { + db_version: String, + indexes: Vec, +} + +impl MetadataV1 { + pub fn load_dump(self, _src: impl AsRef, _dst: impl AsRef) -> anyhow::Result<()> { + todo!("implement load v1") + } +} + +// This is the settings used in the last version of meilisearch exporting dump in V1 +//#[derive(Default, Clone, Serialize, Deserialize, Debug)] +//#[serde(rename_all = "camelCase", deny_unknown_fields)] +//struct Settings { + //#[serde(default, deserialize_with = "deserialize_some")] + //pub ranking_rules: Option>>, + //#[serde(default, deserialize_with = "deserialize_some")] + //pub distinct_attribute: Option>, + //#[serde(default, deserialize_with = "deserialize_some")] + //pub searchable_attributes: Option>>, + //#[serde(default, deserialize_with = "deserialize_some")] + //pub displayed_attributes: Option>>, + //#[serde(default, deserialize_with = "deserialize_some")] + //pub stop_words: Option>>, + //#[serde(default, deserialize_with = "deserialize_some")] + //pub synonyms: Option>>>, + //#[serde(default, deserialize_with = "deserialize_some")] + //pub attributes_for_faceting: Option>>, +//} + +///// we need to **always** be able to convert the old settings to the settings currently being used +//impl From for index_controller::Settings { + //fn from(settings: Settings) -> Self { + //if settings.synonyms.flatten().is_some() { + //error!("`synonyms` are not yet implemented and thus will be ignored"); + //} + //Self { + //distinct_attribute: settings.distinct_attribute, + //// we need to convert the old `Vec` into a `BTreeSet` + //displayed_attributes: settings.displayed_attributes.map(|o| o.map(|vec| vec.into_iter().collect())), + //searchable_attributes: settings.searchable_attributes, + //// we previously had a `Vec` but now we have a `HashMap` + //// representing the name of the faceted field + the type of the field. Since the type + //// was not known in the V1 of the dump we are just going to assume everything is a + //// String + //attributes_for_faceting: settings.attributes_for_faceting.map(|o| o.map(|vec| vec.into_iter().map(|key| (key, String::from("string"))).collect())), + //// we need to convert the old `Vec` into a `BTreeSet` + //ranking_rules: settings.ranking_rules.map(|o| o.map(|vec| vec.into_iter().filter_map(|criterion| { + //match criterion.as_str() { + //"words" | "typo" | "proximity" | "attribute" => Some(criterion), + //s if s.starts_with("asc") || s.starts_with("desc") => Some(criterion), + //"wordsPosition" => { + //warn!("The criteria `words` and `wordsPosition` have been merged into a single criterion `words` so `wordsPositon` will be ignored"); + //Some(String::from("words")) + //} + //"exactness" => { + //error!("The criterion `{}` is not implemented currently and thus will be ignored", criterion); + //None + //} + //s => { + //error!("Unknown criterion found in the dump: `{}`, it will be ignored", s); + //None + //} + //} + //}).collect())), + //// we need to convert the old `Vec` into a `BTreeSet` + //stop_words: settings.stop_words.map(|o| o.map(|vec| vec.into_iter().collect())), + //_kind: PhantomData, + //} + //} +//} + +///// Extract Settings from `settings.json` file present at provided `dir_path` +//fn import_settings(dir_path: &Path) -> anyhow::Result { + //let path = dir_path.join("settings.json"); + //let file = File::open(path)?; + //let reader = std::io::BufReader::new(file); + //let metadata = serde_json::from_reader(reader)?; + + //Ok(metadata) +//} + +//pub fn import_dump( + //size: usize, + //uuid: Uuid, + //dump_path: &Path, + //db_path: &Path, + //primary_key: Option<&str>, +//) -> anyhow::Result<()> { + //let index_path = db_path.join(&format!("indexes/index-{}", uuid)); + //info!("Importing a dump from an old version of meilisearch with dump version 1"); + + //std::fs::create_dir_all(&index_path)?; + //let mut options = EnvOpenOptions::new(); + //options.map_size(size); + //let index = milli::Index::new(options, index_path)?; + //let index = Index(Arc::new(index)); + + //// extract `settings.json` file and import content + //let settings = import_settings(&dump_path)?; + //let settings: index_controller::Settings = settings.into(); + //let update_builder = UpdateBuilder::new(0); + //index.update_settings(&settings.check(), update_builder)?; + + //let update_builder = UpdateBuilder::new(1); + //let file = File::open(&dump_path.join("documents.jsonl"))?; + //let reader = std::io::BufReader::new(file); + + //// TODO: TAMO: waiting for milli. We should use the result + //let _ = index.update_documents( + //UpdateFormat::JsonStream, + //IndexDocumentsMethod::ReplaceDocuments, + //Some(reader), + //update_builder, + //primary_key, + //); + + //// the last step: we extract the original milli::Index and close it + //Arc::try_unwrap(index.0) + //.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index") + //.unwrap() + //.prepare_for_closing() + //.wait(); + + //// at this point we should handle the import of the updates, but since the update logic is not handled in + //// meilisearch we are just going to ignore this part + + //Ok(()) +//} diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs new file mode 100644 index 000000000..ee7044fd1 --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs @@ -0,0 +1,179 @@ +use std::{fs::File, io::BufReader, marker::PhantomData, path::Path}; + +use anyhow::Context; +use chrono::{DateTime, Utc}; +use log::info; +use serde::{Deserialize, Serialize}; + +use crate::index_controller::uuid_resolver::store::UuidStore; + +#[derive(Serialize, Deserialize, Debug)] +pub struct MetadataV2 { + db_version: String, + index_db_size: usize, + update_db_size: usize, + dump_date: DateTime, + _pth: PhantomData, +} + +impl MetadataV2 +where U: UuidStore, +{ + pub fn load_dump(self, src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { + info!( + "Loading dump from {}, dump database version: {}, dump version: V2", + self.dump_date, self.db_version + ); + // get dir in which to load the db: + let dst_dir = dst + .as_ref() + .parent() + .with_context(|| format!("Invalid db path: {}", dst.as_ref().display()))?; + + let tmp_dst = tempfile::tempdir_in(dst_dir)?; + + self.load_index_resolver(&src, tmp_dst.path())?; + load_updates(&src, tmp_dst.path())?; + load_indexes(&src, tmp_dst.path())?; + Ok(()) + } + + fn load_index_resolver( + &self, + src: impl AsRef, + dst: impl AsRef, + ) -> anyhow::Result<()> { + info!("Loading index database."); + let uuid_resolver_path = dst.as_ref().join("uuid_resolver/"); + std::fs::create_dir_all(&uuid_resolver_path)?; + + U::load_dump(src.as_ref(), dst.as_ref())?; + + Ok(()) + } +} + + +fn load_updates(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { + info!("Loading updates."); + todo!() +} + +fn load_indexes(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { + info!("Loading indexes"); + todo!() +} + +// Extract Settings from `settings.json` file present at provided `dir_path` +//fn import_settings(dir_path: &Path) -> anyhow::Result> { +//let path = dir_path.join("settings.json"); +//let file = File::open(path)?; +//let reader = BufReader::new(file); +//let metadata: Settings = serde_json::from_reader(reader)?; + +//Ok(metadata.check()) +//} + +//pub fn import_dump( +//_db_size: usize, +//update_db_size: usize, +//_uuid: Uuid, +//dump_path: impl AsRef, +//db_path: impl AsRef, +//_primary_key: Option<&str>, +//) -> anyhow::Result<()> { +//info!("Dump import started."); +//info!("Importing outstanding updates..."); + +//import_updates(&dump_path, &db_path, update_db_size)?; + +//info!("done importing updates"); + +//Ok(()) +////let index_path = db_path.join(&format!("indexes/index-{}", uuid)); +////std::fs::create_dir_all(&index_path)?; +////let mut options = EnvOpenOptions::new(); +////options.map_size(size); +////let index = milli::Index::new(options, index_path)?; +////let index = Index(Arc::new(index)); + +////let mut txn = index.write_txn()?; + +////info!("importing the settings..."); +////// extract `settings.json` file and import content +////let settings = import_settings(&dump_path)?; +////let update_builder = UpdateBuilder::new(0); +////index.update_settings_txn(&mut txn, &settings, update_builder)?; + +////// import the documents in the index +////let update_builder = UpdateBuilder::new(1); +////let file = File::open(&dump_path.join("documents.jsonl"))?; +////let reader = std::io::BufReader::new(file); + +////info!("importing the documents..."); +////// TODO: TAMO: currently we ignore any error caused by the importation of the documents because +////// if there is no documents nor primary key it'll throw an anyhow error, but we must remove +////// this before the merge on main +////index.update_documents_txn( +////&mut txn, +////UpdateFormat::JsonStream, +////IndexDocumentsMethod::ReplaceDocuments, +////Some(reader), +////update_builder, +////primary_key, +////)?; + +////txn.commit()?; + +////// the last step: we extract the original milli::Index and close it +////Arc::try_unwrap(index.0) +////.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index") +////.unwrap() +////.prepare_for_closing() +////.wait(); + +////info!("importing the updates..."); +////import_updates(dump_path, db_path) +//} + +//fn import_updates( +//src_path: impl AsRef, +//dst_path: impl AsRef, +//_update_db_size: usize +//) -> anyhow::Result<()> { +//let dst_update_path = dst_path.as_ref().join("updates"); +//std::fs::create_dir_all(&dst_update_path)?; + +//let dst_update_files_path = dst_update_path.join("update_files"); +//std::fs::create_dir_all(&dst_update_files_path)?; + +//let options = EnvOpenOptions::new(); +//let (update_store, _) = UpdateStore::create(options, &dst_update_path)?; + +//let src_update_path = src_path.as_ref().join("updates"); +//let src_update_files_path = src_update_path.join("update_files"); +//let update_data = File::open(&src_update_path.join("data.jsonl"))?; +//let mut update_data = BufReader::new(update_data); + +//let mut wtxn = update_store.env.write_txn()?; +//let mut line = String::new(); +//loop { +//match update_data.read_line(&mut line) { +//Ok(_) => { +//let UpdateEntry { uuid, mut update } = serde_json::from_str(&line)?; + +//if let Some(path) = update.content_path_mut() { +//let dst_file_path = dst_update_files_path.join(&path); +//let src_file_path = src_update_files_path.join(&path); +//*path = dst_update_files_path.join(&path); +//std::fs::copy(src_file_path, dst_file_path)?; +//} + +//update_store.register_raw_updates(&mut wtxn, update, uuid)?; +//} +//_ => break, +//} +//} +//wtxn.commit()?; +//Ok(()) +//} diff --git a/meilisearch-http/src/index_controller/dump_actor/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs index 1508f8eb7..f0eeb1be3 100644 --- a/meilisearch-http/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -1,26 +1,18 @@ mod actor; mod handle_impl; mod message; -mod v1; -mod v2; +mod loaders; -use std::{fs::File, path::Path, sync::Arc}; +use std::{fs::File, path::Path}; -use anyhow::bail; -use heed::EnvOpenOptions; -use log::{error, info}; -use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; +use log::error; #[cfg(test)] use mockall::automock; use serde::{Deserialize, Serialize}; -use tempfile::TempDir; use thiserror::Error; -use uuid::Uuid; -use super::IndexMetadata; -use crate::helpers::compression; -use crate::index::Index; -use crate::index_controller::uuid_resolver; +use loaders::v1::MetadataV1; +use loaders::v2::MetadataV2; pub use actor::DumpActor; pub use handle_impl::*; @@ -40,31 +32,6 @@ pub enum DumpError { DumpDoesNotExist(String), } -#[derive(Debug, Serialize, Deserialize, Copy, Clone)] -enum DumpVersion { - V1, - V2, -} - -impl DumpVersion { - const CURRENT: Self = Self::V2; - - /// Select the good importation function from the `DumpVersion` of metadata - pub fn import_index( - self, - size: usize, - uuid: Uuid, - dump_path: &Path, - db_path: &Path, - primary_key: Option<&str>, - ) -> anyhow::Result<()> { - match self { - Self::V1 => v1::import_index(size, uuid, dump_path, db_path, primary_key), - Self::V2 => v2::import_index(size, uuid, dump_path, db_path, primary_key), - } - } -} - #[async_trait::async_trait] #[cfg_attr(test, automock)] pub trait DumpActorHandle { @@ -78,23 +45,19 @@ pub trait DumpActorHandle { } #[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Metadata { - indexes: Vec, - db_version: String, - dump_version: DumpVersion, +#[serde(rename_all = "camelCase", tag = "dump_version")] +pub enum Metadata { + V1 { + #[serde(flatten)] + meta: MetadataV1, + }, + V2 { + #[serde(flatten)] + meta: MetadataV2, + }, } impl Metadata { - /// Create a Metadata with the current dump version of meilisearch. - pub fn new(indexes: Vec, db_version: String) -> Self { - Metadata { - indexes, - db_version, - dump_version: DumpVersion::CURRENT, - } - } - /// Extract Metadata from `metadata.json` file present at provided `dir_path` fn from_path(dir_path: &Path) -> anyhow::Result { let path = dir_path.join("metadata.json"); @@ -155,80 +118,19 @@ impl DumpInfo { } pub fn load_dump( - db_path: impl AsRef, - dump_path: impl AsRef, - size: usize, + dst_path: impl AsRef, + src_path: impl AsRef, + _index_db_size: u64, + _update_db_size: u64, ) -> anyhow::Result<()> { - info!("Importing dump from {}...", dump_path.as_ref().display()); - let db_path = db_path.as_ref(); - let dump_path = dump_path.as_ref(); - let uuid_resolver = uuid_resolver::HeedUuidStore::new(&db_path)?; + let meta_path = src_path.as_ref().join("metadat.json"); + let mut meta_file = File::open(&meta_path)?; + let meta: Metadata = serde_json::from_reader(&mut meta_file)?; - // extract the dump in a temporary directory - let tmp_dir = TempDir::new_in(db_path)?; - let tmp_dir_path = tmp_dir.path(); - compression::from_tar_gz(dump_path, tmp_dir_path)?; - - // read dump metadata - let metadata = Metadata::from_path(&tmp_dir_path)?; - - // remove indexes which have same `uuid` than indexes to import and create empty indexes - let existing_index_uids = uuid_resolver.list()?; - - info!("Deleting indexes already present in the db and provided in the dump..."); - for idx in &metadata.indexes { - if let Some((_, uuid)) = existing_index_uids.iter().find(|(s, _)| s == &idx.uid) { - // if we find the index in the `uuid_resolver` it's supposed to exist on the file system - // and we want to delete it - let path = db_path.join(&format!("indexes/index-{}", uuid)); - info!("Deleting {}", path.display()); - use std::io::ErrorKind::*; - match std::fs::remove_dir_all(path) { - Ok(()) => (), - // if an index was present in the metadata but missing of the fs we can ignore the - // problem because we are going to create it later - Err(e) if e.kind() == NotFound => (), - Err(e) => bail!(e), - } - } else { - // if the index does not exist in the `uuid_resolver` we create it - uuid_resolver.create_uuid(idx.uid.clone(), false)?; - } + match meta { + Metadata::V1 { meta } => meta.load_dump(src_path, dst_path)?, + Metadata::V2 { meta } => meta.load_dump(src_path, dst_path)?, } - // import each indexes content - for idx in metadata.indexes { - let dump_path = tmp_dir_path.join(&idx.uid); - // this cannot fail since we created all the missing uuid in the previous loop - let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap(); - - info!( - "Importing dump from {} into {}...", - dump_path.display(), - db_path.display() - ); - metadata.dump_version.import_index( - size, - uuid, - &dump_path, - &db_path, - idx.meta.primary_key.as_ref().map(|s| s.as_ref()), - )?; - info!("Dump importation from {} succeed", dump_path.display()); - } - - // finally we can move all the unprocessed update file into our new DB - // this directory may not exists - let update_path = tmp_dir_path.join("update_files"); - let db_update_path = db_path.join("updates/update_files"); - if update_path.exists() { - let _ = std::fs::remove_dir_all(db_update_path); - std::fs::rename( - tmp_dir_path.join("update_files"), - db_path.join("updates/update_files"), - )?; - } - - info!("Dump importation from {} succeed", dump_path.display()); Ok(()) } diff --git a/meilisearch-http/src/index_controller/dump_actor/v1.rs b/meilisearch-http/src/index_controller/dump_actor/v1.rs deleted file mode 100644 index 6f199193c..000000000 --- a/meilisearch-http/src/index_controller/dump_actor/v1.rs +++ /dev/null @@ -1,122 +0,0 @@ -use std::{collections::{BTreeMap, BTreeSet}, marker::PhantomData}; - -use log::warn; -use serde::{Deserialize, Serialize}; -use crate::{index::Unchecked, index_controller}; -use crate::index::deserialize_some; -use super::*; - -/// This is the settings used in the last version of meilisearch exporting dump in V1 -#[derive(Default, Clone, Serialize, Deserialize, Debug)] -#[serde(rename_all = "camelCase", deny_unknown_fields)] -struct Settings { - #[serde(default, deserialize_with = "deserialize_some")] - pub ranking_rules: Option>>, - #[serde(default, deserialize_with = "deserialize_some")] - pub distinct_attribute: Option>, - #[serde(default, deserialize_with = "deserialize_some")] - pub searchable_attributes: Option>>, - #[serde(default, deserialize_with = "deserialize_some")] - pub displayed_attributes: Option>>, - #[serde(default, deserialize_with = "deserialize_some")] - pub stop_words: Option>>, - #[serde(default, deserialize_with = "deserialize_some")] - pub synonyms: Option>>>, - #[serde(default, deserialize_with = "deserialize_some")] - pub attributes_for_faceting: Option>>, -} - -/// we need to **always** be able to convert the old settings to the settings currently being used -impl From for index_controller::Settings { - fn from(settings: Settings) -> Self { - if settings.synonyms.flatten().is_some() { - error!("`synonyms` are not yet implemented and thus will be ignored"); - } - Self { - distinct_attribute: settings.distinct_attribute, - // we need to convert the old `Vec` into a `BTreeSet` - displayed_attributes: settings.displayed_attributes.map(|o| o.map(|vec| vec.into_iter().collect())), - searchable_attributes: settings.searchable_attributes, - // we previously had a `Vec` but now we have a `HashMap` - // representing the name of the faceted field + the type of the field. Since the type - // was not known in the V1 of the dump we are just going to assume everything is a - // String - attributes_for_faceting: settings.attributes_for_faceting.map(|o| o.map(|vec| vec.into_iter().map(|key| (key, String::from("string"))).collect())), - // we need to convert the old `Vec` into a `BTreeSet` - ranking_rules: settings.ranking_rules.map(|o| o.map(|vec| vec.into_iter().filter_map(|criterion| { - match criterion.as_str() { - "words" | "typo" | "proximity" | "attribute" => Some(criterion), - s if s.starts_with("asc") || s.starts_with("desc") => Some(criterion), - "wordsPosition" => { - warn!("The criteria `words` and `wordsPosition` have been merged into a single criterion `words` so `wordsPositon` will be ignored"); - Some(String::from("words")) - } - "exactness" => { - error!("The criterion `{}` is not implemented currently and thus will be ignored", criterion); - None - } - s => { - error!("Unknown criterion found in the dump: `{}`, it will be ignored", s); - None - } - } - }).collect())), - // we need to convert the old `Vec` into a `BTreeSet` - stop_words: settings.stop_words.map(|o| o.map(|vec| vec.into_iter().collect())), - _kind: PhantomData, - } - } -} - -/// Extract Settings from `settings.json` file present at provided `dir_path` -fn import_settings(dir_path: &Path) -> anyhow::Result { - let path = dir_path.join("settings.json"); - let file = File::open(path)?; - let reader = std::io::BufReader::new(file); - let metadata = serde_json::from_reader(reader)?; - - Ok(metadata) -} - - -pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { - let index_path = db_path.join(&format!("indexes/index-{}", uuid)); - info!("Importing a dump from an old version of meilisearch with dump version 1"); - - std::fs::create_dir_all(&index_path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - let index = milli::Index::new(options, index_path)?; - let index = Index(Arc::new(index)); - - // extract `settings.json` file and import content - let settings = import_settings(&dump_path)?; - let settings: index_controller::Settings = settings.into(); - let update_builder = UpdateBuilder::new(0); - index.update_settings(&settings.check(), update_builder)?; - - let update_builder = UpdateBuilder::new(1); - let file = File::open(&dump_path.join("documents.jsonl"))?; - let reader = std::io::BufReader::new(file); - - // TODO: TAMO: waiting for milli. We should use the result - let _ = index.update_documents( - UpdateFormat::JsonStream, - IndexDocumentsMethod::ReplaceDocuments, - Some(reader), - update_builder, - primary_key, - ); - - // the last step: we extract the original milli::Index and close it - Arc::try_unwrap(index.0) - .map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index") - .unwrap() - .prepare_for_closing() - .wait(); - - // at this point we should handle the import of the updates, but since the update logic is not handled in - // meilisearch we are just going to ignore this part - - Ok(()) -} diff --git a/meilisearch-http/src/index_controller/dump_actor/v2.rs b/meilisearch-http/src/index_controller/dump_actor/v2.rs deleted file mode 100644 index eeda78e8a..000000000 --- a/meilisearch-http/src/index_controller/dump_actor/v2.rs +++ /dev/null @@ -1,89 +0,0 @@ -use heed::EnvOpenOptions; -use log::info; -use uuid::Uuid; -use crate::{index::Unchecked, index_controller::{UpdateStatus, update_actor::UpdateStore}}; -use std::io::BufRead; -use milli::{update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}}; -use crate::index::{Checked, Index}; -use crate::index_controller::Settings; -use std::{fs::File, path::Path, sync::Arc}; - -/// Extract Settings from `settings.json` file present at provided `dir_path` -fn import_settings(dir_path: &Path) -> anyhow::Result> { - let path = dir_path.join("settings.json"); - let file = File::open(path)?; - let reader = std::io::BufReader::new(file); - let metadata: Settings = serde_json::from_reader(reader)?; - - println!("Meta: {:?}", metadata); - - Ok(metadata.check()) -} - -pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { - let index_path = db_path.join(&format!("indexes/index-{}", uuid)); - std::fs::create_dir_all(&index_path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - let index = milli::Index::new(options, index_path)?; - let index = Index(Arc::new(index)); - - let mut txn = index.write_txn()?; - - info!("importing the settings..."); - // extract `settings.json` file and import content - let settings = import_settings(&dump_path)?; - let update_builder = UpdateBuilder::new(0); - index.update_settings_txn(&mut txn, &settings, update_builder)?; - - // import the documents in the index - let update_builder = UpdateBuilder::new(1); - let file = File::open(&dump_path.join("documents.jsonl"))?; - let reader = std::io::BufReader::new(file); - - info!("importing the documents..."); - // TODO: TAMO: currently we ignore any error caused by the importation of the documents because - // if there is no documents nor primary key it'll throw an anyhow error, but we must remove - // this before the merge on main - index.update_documents_txn( - &mut txn, - UpdateFormat::JsonStream, - IndexDocumentsMethod::ReplaceDocuments, - Some(reader), - update_builder, - primary_key, - )?; - - txn.commit()?; - - // the last step: we extract the original milli::Index and close it - Arc::try_unwrap(index.0) - .map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index") - .unwrap() - .prepare_for_closing() - .wait(); - - info!("importing the updates..."); - import_updates(uuid, dump_path, db_path) -} - -fn import_updates(uuid: Uuid, dump_path: &Path, db_path: &Path) -> anyhow::Result<()> { - let update_path = db_path.join("updates"); - let options = EnvOpenOptions::new(); - // create an UpdateStore to import the updates - std::fs::create_dir_all(&update_path)?; - let (update_store, _) = UpdateStore::create(options, &update_path)?; - let file = File::open(&dump_path.join("updates.jsonl"))?; - let reader = std::io::BufReader::new(file); - - let mut wtxn = update_store.env.write_txn()?; - for update in reader.lines() { - let mut update: UpdateStatus = serde_json::from_str(&update?)?; - if let Some(path) = update.content_path_mut() { - *path = update_path.join("update_files").join(&path); - } - update_store.register_raw_updates(&mut wtxn, update, uuid)?; - } - wtxn.commit()?; - Ok(()) -} diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 61bc71114..4e40a9873 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -14,22 +14,20 @@ use tokio::sync::mpsc; use tokio::time::sleep; use uuid::Uuid; -pub use updates::*; -pub use dump_actor::{DumpInfo, DumpStatus}; use dump_actor::DumpActorHandle; +pub use dump_actor::{DumpInfo, DumpStatus}; use index_actor::IndexActorHandle; -use snapshot::{SnapshotService, load_snapshot}; +use snapshot::{load_snapshot, SnapshotService}; use update_actor::UpdateActorHandle; +pub use updates::*; use uuid_resolver::{UuidResolverError, UuidResolverHandle}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::option::Opt; -use dump_actor::load_dump; - +mod dump_actor; mod index_actor; mod snapshot; -mod dump_actor; mod update_actor; mod update_handler; mod updates; @@ -94,13 +92,8 @@ impl IndexController { options.ignore_snapshot_if_db_exists, options.ignore_missing_snapshot, )?; - } else if let Some(ref path) = options.import_dump { - load_dump( - &options.db_path, - path, - index_size, - )?; - + } else if let Some(ref _path) = options.import_dump { + todo!("implement load dump") } std::fs::create_dir_all(&path)?; @@ -112,7 +105,13 @@ impl IndexController { &path, update_store_size, )?; - let dump_handle = dump_actor::DumpActorHandleImpl::new(&options.dumps_dir, uuid_resolver.clone(), index_handle.clone(), update_handle.clone())?; + let dump_handle = dump_actor::DumpActorHandleImpl::new( + &options.dumps_dir, + uuid_resolver.clone(), + update_handle.clone(), + options.max_mdb_size.get_bytes(), + options.max_udb_size.get_bytes(), + )?; if options.schedule_snapshot { let snapshot_service = SnapshotService::new( @@ -158,7 +157,8 @@ impl IndexController { // prevent dead_locking between the update_handle::update that waits for the update to be // registered and the update_actor that waits for the the payload to be sent to it. tokio::task::spawn_local(async move { - payload.for_each(|r| async { + payload + .for_each(|r| async { let _ = sender.send(r).await; }) .await diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index 8cd77e252..ba89eebe3 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -1,7 +1,7 @@ mod actor; mod handle_impl; mod message; -mod store; +pub mod store; use std::{collections::HashSet, path::PathBuf}; diff --git a/meilisearch-http/src/index_controller/update_actor/store/dump.rs b/meilisearch-http/src/index_controller/update_actor/store/dump.rs index 8b75f9e5d..82b8d0136 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/dump.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/dump.rs @@ -15,7 +15,7 @@ use super::UpdateStore; use crate::index_controller::{index_actor::IndexActorHandle, UpdateStatus}; #[derive(Serialize, Deserialize)] -struct UpdateEntry { +pub struct UpdateEntry { uuid: Uuid, update: UpdateStatus, } diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index 52bd8d62a..58ac24720 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -1,4 +1,4 @@ -mod dump; +pub mod dump; mod codec; use std::collections::{BTreeMap, HashSet}; @@ -115,7 +115,6 @@ impl UpdateStore { let (notification_sender, notification_receiver) = mpsc::channel(10); // Send a first notification to trigger the process. - let _ = notification_sender.send(()); Ok(( Self { @@ -138,6 +137,9 @@ impl UpdateStore { let (update_store, mut notification_receiver) = Self::create(options, path)?; let update_store = Arc::new(update_store); + // trigger the update loop + let _ = update_store.notification_sender.send(()); + // Init update loop to perform any pending updates at launch. // Since we just launched the update store, and we still own the receiving end of the // channel, this call is guaranteed to succeed. diff --git a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs index b84025094..5bddadf02 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs @@ -1,7 +1,7 @@ mod actor; mod handle_impl; mod message; -mod store; +pub mod store; use std::collections::HashSet; use std::path::PathBuf; diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index b497116cb..0c6b66ddf 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, io::Write}; +use std::{collections::HashSet, io::{BufReader, BufRead, Write}}; use std::fs::{create_dir_all, File}; use std::path::{Path, PathBuf}; @@ -7,12 +7,19 @@ use heed::{ CompactionOption, Database, Env, EnvOpenOptions, }; use uuid::Uuid; +use serde::{Serialize, Deserialize}; use super::{Result, UuidResolverError, UUID_STORE_SIZE}; use crate::helpers::EnvSizer; +#[derive(Serialize, Deserialize)] +struct DumpEntry { + uuid: Uuid, + uid: String, +} + #[async_trait::async_trait] -pub trait UuidStore { +pub trait UuidStore: Sized { // Create a new entry for `name`. Return an error if `err` and the entry already exists, return // the uuid otherwise. async fn create_uuid(&self, uid: String, err: bool) -> Result; @@ -23,6 +30,7 @@ pub trait UuidStore { async fn snapshot(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; async fn dump(&self, path: PathBuf) -> Result>; + fn load_dump(src: &Path, dst: &Path) -> Result<()>; } #[derive(Clone)] @@ -62,11 +70,7 @@ impl HeedUuidStore { Ok(uuid) } } - } - - pub fn get_uuid(&self, name: String) -> Result> { - let env = self.env.clone(); - let db = self.db; + } pub fn get_uuid(&self, name: String) -> Result> { let env = self.env.clone(); let db = self.db; let txn = env.read_txn()?; match db.get(&txn, &name)? { Some(uuid) => { @@ -149,11 +153,14 @@ impl HeedUuidStore { let txn = self.env.read_txn()?; for entry in self.db.iter(&txn)? { - let entry = entry?; + let (uid, uuid) = entry?; let uuid = Uuid::from_slice(entry.1)?; uuids.insert(uuid); - serde_json::to_writer(&mut dump_file, &serde_json::json!({ "uid": entry.0, "uuid": uuid - }))?; dump_file.write(b"\n").unwrap(); + let entry = DumpEntry { + uuid, uid + }; + serde_json::to_writer(&mut dump_file, &entry)?; + dump_file.write(b"\n").unwrap(); } Ok(uuids) @@ -200,4 +207,33 @@ impl UuidStore for HeedUuidStore { let this = self.clone(); tokio::task::spawn_blocking(move || this.dump(path)).await? } + + async fn load_dump(src: &Path, dst: &Path) -> Result<()> { + let uuid_resolver_path = dst.join("uuid_resolver/"); + std::fs::create_dir_all(&uuid_resolver_path)?; + + let src_indexes = src.join("index_uuids/data.jsonl"); + let indexes = File::Open(&src_indexes)?; + let mut indexes = BufReader::new(indexes); + let mut line = String::new(); + + let db = Self::new(dst)?; + let mut txn = db.env.write_txn()?; + + loop { + match indexes.read_line(&mut line) { + Ok(0) => break, + Ok(_) => { + let DumpEntry { uuid, uid } = serde_json::from_str(&line)?; + db.db.put(&mut txn, &uid, uuid.as_bytes())?; + } + Err(e) => Err(e)?, + } + + line.clear(); + } + txn.commit()?; + + Ok(()) + } }