diff --git a/meilisearch-http/src/helpers/mod.rs b/meilisearch-http/src/helpers/mod.rs index c664f15aa..0b72c3694 100644 --- a/meilisearch-http/src/helpers/mod.rs +++ b/meilisearch-http/src/helpers/mod.rs @@ -1,4 +1,4 @@ -pub mod compression; +//pub mod compression; mod env; pub use env::EnvSizer; diff --git a/meilisearch-http/src/main.rs b/meilisearch-http/src/main.rs index 77f439d05..dfa4bcc2d 100644 --- a/meilisearch-http/src/main.rs +++ b/meilisearch-http/src/main.rs @@ -1,4 +1,4 @@ -use std::env; +use std::{env, path::Path, time::Duration}; use actix_web::HttpServer; use meilisearch_http::{create_app, Opt}; @@ -12,6 +12,7 @@ use meilisearch_http::analytics; #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; +/// does all the setup before meilisearch is launched fn setup(opt: &Opt) -> anyhow::Result<()> { let mut log_builder = env_logger::Builder::new(); log_builder.parse_filters(&opt.log_level); @@ -22,12 +23,19 @@ fn setup(opt: &Opt) -> anyhow::Result<()> { log_builder.init(); + + Ok(()) +} + +/// Cleans and setup the temporary file folder in the database directory. This must be done after +/// the meilisearch instance has been created, to not interfere with the snapshot and dump loading. +fn setup_temp_dir(db_path: impl AsRef) -> anyhow::Result<()> { // Set the tempfile directory in the current db path, to avoid cross device references. Also // remove the previous outstanding files found there // // TODO: if two processes open the same db, one might delete the other tmpdir. Need to make // sure that no one is using it before deleting it. - let temp_path = opt.db_path.join("tmp"); + let temp_path = db_path.as_ref().join("tmp"); // Ignore error if tempdir doesn't exist let _ = std::fs::remove_dir_all(&temp_path); std::fs::create_dir_all(&temp_path)?; @@ -48,15 +56,21 @@ fn setup_meilisearch(opt: &Opt) -> anyhow::Result { .set_ignore_missing_snapshot(opt.ignore_missing_snapshot) .set_ignore_snapshot_if_db_exists(opt.ignore_snapshot_if_db_exists) .set_dump_dst(opt.dumps_dir.clone()) + .set_snapshot_interval(Duration::from_secs(opt.snapshot_interval_sec)) .set_snapshot_dir(opt.snapshot_dir.clone()); if let Some(ref path) = opt.import_snapshot { meilisearch.set_import_snapshot(path.clone()); } + if let Some(ref path) = opt.import_dump { meilisearch.set_dump_src(path.clone()); } + if opt.schedule_snapshot { + meilisearch.set_schedule_snapshot(); + } + meilisearch.build(opt.db_path.clone(), opt.indexer_options.clone()) } @@ -78,6 +92,8 @@ async fn main() -> anyhow::Result<()> { let meilisearch = setup_meilisearch(&opt)?; + setup_temp_dir(&opt.db_path)?; + #[cfg(all(not(debug_assertions), feature = "analytics"))] if !opt.no_analytics { let analytics_data = meilisearch.clone(); diff --git a/meilisearch-http/src/helpers/compression.rs b/meilisearch-lib/src/compression.rs similarity index 67% rename from meilisearch-http/src/helpers/compression.rs rename to meilisearch-lib/src/compression.rs index c4747cb21..f9620eb2a 100644 --- a/meilisearch-http/src/helpers/compression.rs +++ b/meilisearch-lib/src/compression.rs @@ -16,11 +16,11 @@ pub fn to_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Resul Ok(()) } -pub fn from_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { - let f = File::open(&src)?; - let gz = GzDecoder::new(f); - let mut ar = Archive::new(gz); - create_dir_all(&dest)?; - ar.unpack(&dest)?; - Ok(()) -} +//pub fn from_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { + //let f = File::open(&src)?; + //let gz = GzDecoder::new(f); + //let mut ar = Archive::new(gz); + //create_dir_all(&dest)?; + //ar.unpack(&dest)?; + //Ok(()) +//} diff --git a/meilisearch-lib/src/index/mod.rs b/meilisearch-lib/src/index/mod.rs index 911a22464..c4fa812b1 100644 --- a/meilisearch-lib/src/index/mod.rs +++ b/meilisearch-lib/src/index/mod.rs @@ -71,11 +71,15 @@ impl IndexMeta { } } -#[derive(Clone)] +#[derive(Clone, derivative::Derivative)] +#[derivative(Debug)] pub struct Index { pub uuid: Uuid, + #[derivative(Debug="ignore")] pub inner: Arc, + #[derivative(Debug="ignore")] update_file_store: Arc, + #[derivative(Debug="ignore")] update_handler: Arc, } @@ -258,4 +262,13 @@ impl Index { displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid)); Ok(displayed_fields_ids) } + + pub fn snapshot(&self, path: impl AsRef) -> Result<()> { + let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid)); + create_dir_all(&dst)?; + dst.push("data.mdb"); + let _txn = self.write_txn()?; + self.inner.env.copy_to_path(dst, heed::CompactionOption::Enabled)?; + Ok(()) + } } diff --git a/meilisearch-lib/src/index_controller/index_resolver/index_store.rs b/meilisearch-lib/src/index_controller/index_resolver/index_store.rs index c038ceb20..5969108de 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/index_store.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/index_store.rs @@ -57,7 +57,7 @@ impl IndexStore for MapIndexStore { if let Some(index) = lock.get(&uuid) { return Ok(index.clone()); } - let path = self.path.join(format!("index-{}", uuid)); + let path = self.path.join(format!("{}", uuid)); if path.exists() { return Err(IndexResolverError::IndexAlreadyExists); } @@ -92,7 +92,7 @@ impl IndexStore for MapIndexStore { None => { // drop the guard here so we can perform the write after without deadlocking; drop(guard); - let path = self.path.join(format!("index-{}", uuid)); + let path = self.path.join(format!("{}", uuid)); if !path.exists() { return Ok(None); } @@ -108,7 +108,7 @@ impl IndexStore for MapIndexStore { } async fn delete(&self, uuid: Uuid) -> Result> { - let db_path = self.path.join(format!("index-{}", uuid)); + let db_path = self.path.join(format!("{}", uuid)); fs::remove_dir_all(db_path).await?; let index = self.index_store.write().await.remove(&uuid); Ok(index) diff --git a/meilisearch-lib/src/index_controller/index_resolver/mod.rs b/meilisearch-lib/src/index_controller/index_resolver/mod.rs index eebb8ef95..5721fce8a 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/mod.rs @@ -45,10 +45,18 @@ where U: UuidStore, pub async fn get_size(&self) -> Result { todo!() + //Ok(self.index_store.get_size()? + self.index_uuid_store.get_size().await?) } - pub async fn perform_snapshot(&self, _path: impl AsRef) -> Result<()> { - todo!() + pub async fn snapshot(&self, path: impl AsRef) -> Result> { + let uuids = self.index_uuid_store.snapshot(path.as_ref().to_owned()).await?; + let mut indexes = Vec::new(); + + for uuid in uuids { + indexes.push(self.get_index_by_uuid(uuid).await?); + } + + Ok(indexes) } pub async fn create_index(&self, uid: String, primary_key: Option) -> Result<(Uuid, Index)> { diff --git a/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs b/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs index 7974bf4ae..a4bcd17d4 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs @@ -46,8 +46,9 @@ impl HeedUuidStore { create_dir_all(&path)?; let mut options = EnvOpenOptions::new(); options.map_size(UUID_STORE_SIZE); // 1GB + options.max_dbs(1); let env = options.open(path)?; - let db = env.create_database(None)?; + let db = env.create_database(Some("uuids"))?; Ok(Self { env, db }) } diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index 3c53ab9eb..29f5348b1 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -20,6 +20,7 @@ use snapshot::load_snapshot; use crate::index::{Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked}; use crate::index_controller::index_resolver::create_index_resolver; +use crate::index_controller::snapshot::SnapshotService; use crate::options::IndexerOpts; use error::Result; use crate::index::error::Result as IndexResult; @@ -75,7 +76,7 @@ pub struct IndexSettings { #[derive(Clone)] pub struct IndexController { index_resolver: Arc, - update_handle: updates::UpdateSender, + update_sender: updates::UpdateSender, dump_handle: dump_actor::DumpActorHandleImpl, } @@ -113,8 +114,10 @@ pub struct IndexControllerBuilder { max_update_store_size: Option, snapshot_dir: Option, import_snapshot: Option, + snapshot_interval: Option, ignore_snapshot_if_db_exists: bool, ignore_missing_snapshot: bool, + schedule_snapshot: bool, dump_src: Option, dump_dst: Option, } @@ -155,36 +158,36 @@ impl IndexControllerBuilder { let index_resolver = Arc::new(create_index_resolver(&db_path, index_size, &indexer_options)?); #[allow(unreachable_code)] - let update_handle = updates::create_update_handler(index_resolver.clone(), &db_path, update_store_size)?; + let update_sender = updates::create_update_handler(index_resolver.clone(), &db_path, update_store_size)?; let dump_path = self.dump_dst.ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?; let dump_handle = dump_actor::DumpActorHandleImpl::new( dump_path, index_resolver.clone(), - update_handle.clone(), + update_sender.clone(), index_size, update_store_size, )?; - //if options.schedule_snapshot { - //let snapshot_service = SnapshotService::new( - //uuid_resolver.clone(), - //update_handle.clone(), - //Duration::from_secs(options.snapshot_interval_sec), - //options.snapshot_dir.clone(), - //options - //.db_path - //.file_name() - //.map(|n| n.to_owned().into_string().expect("invalid path")) - //.unwrap_or_else(|| String::from("data.ms")), - //); + if self.schedule_snapshot { + let snapshot_service = SnapshotService::new( + index_resolver.clone(), + update_sender.clone(), + self.snapshot_interval.ok_or_else(|| anyhow::anyhow!("Snapshot interval not provided."))?, + self.snapshot_dir.ok_or_else(|| anyhow::anyhow!("Snapshot path not provided."))?, + db_path + .as_ref() + .file_name() + .map(|n| n.to_owned().into_string().expect("invalid path")) + .unwrap_or_else(|| String::from("data.ms")), + ); - //tokio::task::spawn(snapshot_service.run()); - //} + tokio::task::spawn(snapshot_service.run()); + } Ok(IndexController { index_resolver, - update_handle, + update_sender, dump_handle, }) } @@ -238,6 +241,18 @@ impl IndexControllerBuilder { self.import_snapshot.replace(import_snapshot); self } + + /// Set the index controller builder's snapshot interval sec. + pub fn set_snapshot_interval(&mut self, snapshot_interval: Duration) -> &mut Self { + self.snapshot_interval = Some(snapshot_interval); + self + } + + /// Set the index controller builder's schedule snapshot. + pub fn set_schedule_snapshot(&mut self) -> &mut Self { + self.schedule_snapshot = true; + self + } } impl IndexController { @@ -248,12 +263,12 @@ impl IndexController { pub async fn register_update(&self, uid: String, update: Update) -> Result { match self.index_resolver.get_uuid(uid).await { Ok(uuid) => { - let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?; + let update_result = UpdateMsg::update(&self.update_sender, uuid, update).await?; Ok(update_result) } Err(IndexResolverError::UnexistingIndex(name)) => { let (uuid, _) = self.index_resolver.create_index(name, None).await?; - let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?; + let update_result = UpdateMsg::update(&self.update_sender, uuid, update).await?; // ignore if index creation fails now, since it may already have been created Ok(update_result) @@ -389,13 +404,13 @@ impl IndexController { pub async fn update_status(&self, uid: String, id: u64) -> Result { let uuid = self.index_resolver.get_uuid(uid).await?; - let result = UpdateMsg::get_update(&self.update_handle, uuid, id).await?; + let result = UpdateMsg::get_update(&self.update_sender, uuid, id).await?; Ok(result) } pub async fn all_update_status(&self, uid: String) -> Result> { let uuid = self.index_resolver.get_uuid(uid).await?; - let result = UpdateMsg::list_updates(&self.update_handle, uuid).await?; + let result = UpdateMsg::list_updates(&self.update_sender, uuid).await?; Ok(result) } @@ -490,7 +505,7 @@ impl IndexController { } pub async fn get_index_stats(&self, uid: String) -> Result { - let update_infos = UpdateMsg::get_info(&self.update_handle).await?; + let update_infos = UpdateMsg::get_info(&self.update_sender).await?; let index = self.index_resolver.get_index(uid).await?; let uuid = index.uuid; let mut stats = spawn_blocking(move || index.stats()).await??; @@ -500,7 +515,7 @@ impl IndexController { } pub async fn get_all_stats(&self) -> Result { - let update_infos = UpdateMsg::get_info(&self.update_handle).await?; + let update_infos = UpdateMsg::get_info(&self.update_sender).await?; let mut database_size = self.get_uuids_size().await? + update_infos.size; let mut last_update: Option> = None; let mut indexes = BTreeMap::new(); diff --git a/meilisearch-lib/src/index_controller/snapshot.rs b/meilisearch-lib/src/index_controller/snapshot.rs index 7c999fd74..132745c96 100644 --- a/meilisearch-lib/src/index_controller/snapshot.rs +++ b/meilisearch-lib/src/index_controller/snapshot.rs @@ -1,88 +1,94 @@ -use std::path::Path; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Duration; use anyhow::bail; +use log::{error, info, trace}; +use tokio::task::spawn_blocking; +use tokio::time::sleep; +use tokio::fs; -//pub struct SnapshotService { - //uuid_resolver_handle: R, - //update_handle: U, - //snapshot_period: Duration, - //snapshot_path: PathBuf, - //db_name: String, -//} +use crate::index_controller::updates::UpdateMsg; -//impl SnapshotService -//where - //U: UpdateActorHandle, - //R: UuidResolverHandle, -//{ - //pub fn new( - //uuid_resolver_handle: R, - //update_handle: U, - //snapshot_period: Duration, - //snapshot_path: PathBuf, - //db_name: String, - //) -> Self { - //Self { - //uuid_resolver_handle, - //update_handle, - //snapshot_period, - //snapshot_path, - //db_name, - //} - //} +use super::updates::UpdateSender; +use super::index_resolver::HardStateIndexResolver; - //pub async fn run(self) { - //info!( - //"Snapshot scheduled every {}s.", - //self.snapshot_period.as_secs() - //); - //loop { - //if let Err(e) = self.perform_snapshot().await { - //error!("Error while performing snapshot: {}", e); - //} - //sleep(self.snapshot_period).await; - //} - //} +pub struct SnapshotService { + index_resolver: Arc, + update_sender: UpdateSender, + snapshot_period: Duration, + snapshot_path: PathBuf, + db_name: String, +} - //async fn perform_snapshot(&self) -> anyhow::Result<()> { - //trace!("Performing snapshot."); +impl SnapshotService { + pub fn new( + index_resolver: Arc, + update_sender: UpdateSender, + snapshot_period: Duration, + snapshot_path: PathBuf, + db_name: String, + ) -> Self { + Self { + index_resolver, + update_sender, + snapshot_period, + snapshot_path, + db_name, + } + } - //let snapshot_dir = self.snapshot_path.clone(); - //fs::create_dir_all(&snapshot_dir).await?; - //let temp_snapshot_dir = - //spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??; - //let temp_snapshot_path = temp_snapshot_dir.path().to_owned(); + pub async fn run(self) { + info!( + "Snapshot scheduled every {}s.", + self.snapshot_period.as_secs() + ); + loop { + if let Err(e) = self.perform_snapshot().await { + error!("Error while performing snapshot: {}", e); + } + sleep(self.snapshot_period).await; + } + } - //let uuids = self - //.uuid_resolver_handle - //.snapshot(temp_snapshot_path.clone()) - //.await?; + async fn perform_snapshot(&self) -> anyhow::Result<()> { + trace!("Performing snapshot."); - //if uuids.is_empty() { - //return Ok(()); - //} + let snapshot_dir = self.snapshot_path.clone(); + fs::create_dir_all(&snapshot_dir).await?; + let temp_snapshot_dir = + spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??; + let temp_snapshot_path = temp_snapshot_dir.path().to_owned(); - //self.update_handle - //.snapshot(uuids, temp_snapshot_path.clone()) - //.await?; - //let snapshot_dir = self.snapshot_path.clone(); - //let snapshot_path = self - //.snapshot_path - //.join(format!("{}.snapshot", self.db_name)); - //let snapshot_path = spawn_blocking(move || -> anyhow::Result { - //let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?; - //let temp_snapshot_file_path = temp_snapshot_file.path().to_owned(); - //compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?; - //temp_snapshot_file.persist(&snapshot_path)?; - //Ok(snapshot_path) - //}) - //.await??; + let indexes = self + .index_resolver + .snapshot(temp_snapshot_path.clone()) + .await?; - //trace!("Created snapshot in {:?}.", snapshot_path); + if indexes.is_empty() { + return Ok(()); + } - //Ok(()) - //} -//} + UpdateMsg::snapshot(&self.update_sender, temp_snapshot_path.clone(), indexes).await?; + + let snapshot_dir = self.snapshot_path.clone(); + let snapshot_path = self + .snapshot_path + .join(format!("{}.snapshot", self.db_name)); + let snapshot_path = spawn_blocking(move || -> anyhow::Result { + let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?; + let temp_snapshot_file_path = temp_snapshot_file.path().to_owned(); + crate::compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?; + temp_snapshot_file.persist(&snapshot_path)?; + Ok(snapshot_path) + }) + .await??; + + trace!("Created snapshot in {:?}.", snapshot_path); + + Ok(()) + } +} pub fn load_snapshot( db_path: impl AsRef, @@ -94,7 +100,7 @@ pub fn load_snapshot( match crate::from_tar_gz(snapshot_path, &db_path) { Ok(()) => Ok(()), Err(e) => { - // clean created db folder + //clean created db folder std::fs::remove_dir_all(&db_path)?; Err(e) } @@ -120,140 +126,140 @@ pub fn load_snapshot( } } -#[cfg(test)] -mod test { - use std::iter::FromIterator; - use std::{collections::HashSet, sync::Arc}; +//#[cfg(test)] +//mod test { + //use std::iter::FromIterator; + //use std::{collections::HashSet, sync::Arc}; - use futures::future::{err, ok}; - use rand::Rng; - use tokio::time::timeout; - use uuid::Uuid; + //use futures::future::{err, ok}; + //use rand::Rng; + //use tokio::time::timeout; + //use uuid::Uuid; - use super::*; - use crate::index_controller::index_actor::MockIndexActorHandle; - use crate::index_controller::updates::{ - error::UpdateActorError, MockUpdateActorHandle, UpdateActorHandleImpl, - }; - use crate::index_controller::uuid_resolver::{ - error::UuidResolverError, MockUuidResolverHandle, - }; + //use super::*; + //use crate::index_controller::index_actor::MockIndexActorHandle; + //use crate::index_controller::updates::{ + //error::UpdateActorError, MockUpdateActorHandle, UpdateActorHandleImpl, + //}; + //use crate::index_controller::uuid_resolver::{ + //error::UuidResolverError, MockUuidResolverHandle, + //}; - #[actix_rt::test] - async fn test_normal() { - let mut rng = rand::thread_rng(); - let uuids_num: usize = rng.gen_range(5..10); - let uuids = (0..uuids_num) - .map(|_| Uuid::new_v4()) - .collect::>(); + //#[actix_rt::test] + //async fn test_normal() { + //let mut rng = rand::thread_rng(); + //let uuids_num: usize = rng.gen_range(5..10); + //let uuids = (0..uuids_num) + //.map(|_| Uuid::new_v4()) + //.collect::>(); - let mut uuid_resolver = MockUuidResolverHandle::new(); - let uuids_clone = uuids.clone(); - uuid_resolver - .expect_snapshot() - .times(1) - .returning(move |_| Box::pin(ok(uuids_clone.clone()))); + //let mut uuid_resolver = MockUuidResolverHandle::new(); + //let uuids_clone = uuids.clone(); + //uuid_resolver + //.expect_snapshot() + //.times(1) + //.returning(move |_| Box::pin(ok(uuids_clone.clone()))); - let uuids_clone = uuids.clone(); - let mut index_handle = MockIndexActorHandle::new(); - index_handle - .expect_snapshot() - .withf(move |uuid, _path| uuids_clone.contains(uuid)) - .times(uuids_num) - .returning(move |_, _| Box::pin(ok(()))); + //let uuids_clone = uuids.clone(); + //let mut index_handle = MockIndexActorHandle::new(); + //index_handle + //.expect_snapshot() + //.withf(move |uuid, _path| uuids_clone.contains(uuid)) + //.times(uuids_num) + //.returning(move |_, _| Box::pin(ok(()))); - let dir = tempfile::tempdir_in(".").unwrap(); - let handle = Arc::new(index_handle); - let update_handle = - UpdateActorHandleImpl::>::new(handle.clone(), dir.path(), 4096 * 100).unwrap(); + //let dir = tempfile::tempdir_in(".").unwrap(); + //let handle = Arc::new(index_handle); + //let update_handle = + //UpdateActorHandleImpl::>::new(handle.clone(), dir.path(), 4096 * 100).unwrap(); - let snapshot_path = tempfile::tempdir_in(".").unwrap(); - let snapshot_service = SnapshotService::new( - uuid_resolver, - update_handle, - Duration::from_millis(100), - snapshot_path.path().to_owned(), - "data.ms".to_string(), - ); + //let snapshot_path = tempfile::tempdir_in(".").unwrap(); + //let snapshot_service = SnapshotService::new( + //uuid_resolver, + //update_handle, + //Duration::from_millis(100), + //snapshot_path.path().to_owned(), + //"data.ms".to_string(), + //); - snapshot_service.perform_snapshot().await.unwrap(); - } + //snapshot_service.perform_snapshot().await.unwrap(); + //} - #[actix_rt::test] - async fn error_performing_uuid_snapshot() { - let mut uuid_resolver = MockUuidResolverHandle::new(); - uuid_resolver - .expect_snapshot() - .times(1) - // abitrary error - .returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); + //#[actix_rt::test] + //async fn error_performing_uuid_snapshot() { + //let mut uuid_resolver = MockUuidResolverHandle::new(); + //uuid_resolver + //.expect_snapshot() + //.times(1) + //abitrary error + //.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); - let update_handle = MockUpdateActorHandle::new(); + //let update_handle = MockUpdateActorHandle::new(); - let snapshot_path = tempfile::tempdir_in(".").unwrap(); - let snapshot_service = SnapshotService::new( - uuid_resolver, - update_handle, - Duration::from_millis(100), - snapshot_path.path().to_owned(), - "data.ms".to_string(), - ); + //let snapshot_path = tempfile::tempdir_in(".").unwrap(); + //let snapshot_service = SnapshotService::new( + //uuid_resolver, + //update_handle, + //Duration::from_millis(100), + //snapshot_path.path().to_owned(), + //"data.ms".to_string(), + //); - assert!(snapshot_service.perform_snapshot().await.is_err()); - // Nothing was written to the file - assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); - } + //assert!(snapshot_service.perform_snapshot().await.is_err()); + //Nothing was written to the file + //assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); + //} - #[actix_rt::test] - async fn error_performing_index_snapshot() { - let uuid = Uuid::new_v4(); - let mut uuid_resolver = MockUuidResolverHandle::new(); - uuid_resolver - .expect_snapshot() - .times(1) - .returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid))))); + //#[actix_rt::test] + //async fn error_performing_index_snapshot() { + //let uuid = Uuid::new_v4(); + //let mut uuid_resolver = MockUuidResolverHandle::new(); + //uuid_resolver + //.expect_snapshot() + //.times(1) + //.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid))))); - let mut update_handle = MockUpdateActorHandle::new(); - update_handle - .expect_snapshot() - // abitrary error - .returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0)))); + //let mut update_handle = MockUpdateActorHandle::new(); + //update_handle + //.expect_snapshot() + //abitrary error + //.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0)))); - let snapshot_path = tempfile::tempdir_in(".").unwrap(); - let snapshot_service = SnapshotService::new( - uuid_resolver, - update_handle, - Duration::from_millis(100), - snapshot_path.path().to_owned(), - "data.ms".to_string(), - ); + //let snapshot_path = tempfile::tempdir_in(".").unwrap(); + //let snapshot_service = SnapshotService::new( + //uuid_resolver, + //update_handle, + //Duration::from_millis(100), + //snapshot_path.path().to_owned(), + //"data.ms".to_string(), + //); - assert!(snapshot_service.perform_snapshot().await.is_err()); - // Nothing was written to the file - assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); - } + //assert!(snapshot_service.perform_snapshot().await.is_err()); + //Nothing was written to the file + //assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); + //} - #[actix_rt::test] - async fn test_loop() { - let mut uuid_resolver = MockUuidResolverHandle::new(); - uuid_resolver - .expect_snapshot() - // we expect the funtion to be called between 2 and 3 time in the given interval. - .times(2..4) - // abitrary error, to short-circuit the function - .returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist))); + //#[actix_rt::test] + //async fn test_loop() { + //let mut uuid_resolver = MockUuidResolverHandle::new(); + //uuid_resolver + //.expect_snapshot() + //we expect the funtion to be called between 2 and 3 time in the given interval. + //.times(2..4) + //abitrary error, to short-circuit the function + //.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist))); - let update_handle = MockUpdateActorHandle::new(); + //let update_handle = MockUpdateActorHandle::new(); - let snapshot_path = tempfile::tempdir_in(".").unwrap(); - let snapshot_service = SnapshotService::new( - uuid_resolver, - update_handle, - Duration::from_millis(100), - snapshot_path.path().to_owned(), - "data.ms".to_string(), - ); + //let snapshot_path = tempfile::tempdir_in(".").unwrap(); + //let snapshot_service = SnapshotService::new( + //uuid_resolver, + //update_handle, + //Duration::from_millis(100), + //snapshot_path.path().to_owned(), + //"data.ms".to_string(), + //); - let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await; - } -} + //let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await; + //} +//} diff --git a/meilisearch-lib/src/index_controller/update_file_store.rs b/meilisearch-lib/src/index_controller/update_file_store.rs index 1c60bcec9..f21560f73 100644 --- a/meilisearch-lib/src/index_controller/update_file_store.rs +++ b/meilisearch-lib/src/index_controller/update_file_store.rs @@ -2,9 +2,13 @@ use std::fs::File; use std::path::{Path, PathBuf}; use std::ops::{Deref, DerefMut}; +//use milli::documents::DocumentBatchReader; +//use serde_json::Map; use tempfile::NamedTempFile; use uuid::Uuid; +const UPDATE_FILES_PATH: &str = "updates/updates_files"; + use super::error::Result; pub struct UpdateFile { @@ -14,7 +18,6 @@ pub struct UpdateFile { impl UpdateFile { pub fn persist(self) { - println!("persisting in {}", self.path.display()); self.file.persist(&self.path).unwrap(); } } @@ -40,11 +43,14 @@ pub struct UpdateFileStore { impl UpdateFileStore { pub fn new(path: impl AsRef) -> Result { - let path = path.as_ref().join("updates/updates_files"); + let path = path.as_ref().join(UPDATE_FILES_PATH); std::fs::create_dir_all(&path).unwrap(); Ok(Self { path }) } + /// Created a new temporary update file. + /// + /// A call to persist is needed to persist in the database. pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> { let file = NamedTempFile::new().unwrap(); let uuid = Uuid::new_v4(); @@ -54,10 +60,45 @@ impl UpdateFileStore { Ok((uuid, update_file)) } + /// Returns a the file corresponding to the requested uuid. pub fn get_update(&self, uuid: Uuid) -> Result { let path = self.path.join(uuid.to_string()); - println!("reading in {}", path.display()); let file = File::open(path).unwrap(); Ok(file) } + + /// Copies the content of the update file poited to by uuid to dst directory. + pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef) -> Result<()> { + let src = self.path.join(uuid.to_string()); + let mut dst = dst.as_ref().join(UPDATE_FILES_PATH); + std::fs::create_dir_all(&dst).unwrap(); + dst.push(uuid.to_string()); + std::fs::copy(src, dst).unwrap(); + Ok(()) + } + + /// Peform a dump of the given update file uuid into the provided snapshot path. + pub fn dump(&self, _uuid: Uuid, _snapshot_path: impl AsRef) -> Result<()> { + todo!() + //let update_file_path = self.path.join(uuid.to_string()); + //let snapshot_file_path: snapshot_path.as_ref().join(format!("update_files/uuid", uuid)); + + //let update_file = File::open(update_file_path).unwrap(); + + + //let mut document_reader = DocumentBatchReader::from_reader(update_file).unwrap(); + + //let mut document_buffer = Map::new(); + //// TODO: we need to find a way to do this more efficiently. (create a custom serializer to + //// jsonl for example...) + //while let Some((index, document)) = document_reader.next_document_with_index().unwrap() { + //for (field_id, content) in document.iter() { + //let field_name = index.get_by_left(&field_id).unwrap(); + //let content = serde_json::from_slice(content).unwrap(); + //document_buffer.insert(field_name.to_string(), content); + //} + + //} + //Ok(()) + } } diff --git a/meilisearch-lib/src/index_controller/updates/message.rs b/meilisearch-lib/src/index_controller/updates/message.rs index 09dc7443a..f96c707fd 100644 --- a/meilisearch-lib/src/index_controller/updates/message.rs +++ b/meilisearch-lib/src/index_controller/updates/message.rs @@ -4,6 +4,8 @@ use std::path::PathBuf; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; +use crate::index::Index; + use super::error::Result; use super::{Update, UpdateStatus, UpdateStoreInfo}; @@ -28,7 +30,7 @@ pub enum UpdateMsg { ret: oneshot::Sender>, }, Snapshot { - uuids: HashSet, + indexes: Vec, path: PathBuf, ret: oneshot::Sender>, }, @@ -43,17 +45,20 @@ pub enum UpdateMsg { } impl UpdateMsg { + pub async fn snapshot(sender: &mpsc::Sender, path: PathBuf, indexes: Vec) -> Result<()> { + let (ret, rcv) = oneshot::channel(); + let msg = Self::Snapshot { path, indexes, ret }; + sender.send(msg).await?; + rcv.await? + } + pub async fn dump( sender: &mpsc::Sender, uuids: HashSet, path: PathBuf, ) -> Result<()> { let (ret, rcv) = oneshot::channel(); - let msg = Self::Dump { - path, - uuids, - ret, - }; + let msg = Self::Dump { path, uuids, ret }; sender.send(msg).await?; rcv.await? } @@ -63,11 +68,7 @@ impl UpdateMsg { update: Update, ) -> Result { let (ret, rcv) = oneshot::channel(); - let msg = Self::Update { - uuid, - update, - ret, - }; + let msg = Self::Update { uuid, update, ret }; sender.send(msg).await?; rcv.await? } @@ -78,11 +79,7 @@ impl UpdateMsg { id: u64, ) -> Result { let (ret, rcv) = oneshot::channel(); - let msg = Self::GetUpdate { - uuid, - id, - ret, - }; + let msg = Self::GetUpdate { uuid, id, ret }; sender.send(msg).await?; rcv.await? } @@ -92,21 +89,14 @@ impl UpdateMsg { uuid: Uuid, ) -> Result> { let (ret, rcv) = oneshot::channel(); - let msg = Self::ListUpdates { - uuid, - ret, - }; + let msg = Self::ListUpdates { uuid, ret }; sender.send(msg).await?; rcv.await? } - pub async fn get_info( - sender: &mpsc::Sender, - ) -> Result { + pub async fn get_info(sender: &mpsc::Sender) -> Result { let (ret, rcv) = oneshot::channel(); - let msg = Self::GetInfo { - ret, - }; + let msg = Self::GetInfo { ret }; sender.send(msg).await?; rcv.await? } diff --git a/meilisearch-lib/src/index_controller/updates/mod.rs b/meilisearch-lib/src/index_controller/updates/mod.rs index 2027f5245..63716928f 100644 --- a/meilisearch-lib/src/index_controller/updates/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -24,7 +24,7 @@ use uuid::Uuid; use self::error::{Result, UpdateLoopError}; pub use self::message::UpdateMsg; use self::store::{UpdateStore, UpdateStoreInfo}; -use crate::index::{Settings, Unchecked}; +use crate::index::{Index, Settings, Unchecked}; use crate::index_controller::update_file_store::UpdateFileStore; use status::UpdateStatus; @@ -123,12 +123,11 @@ impl UpdateLoop { let must_exit = Arc::new(AtomicBool::new(false)); - let store = UpdateStore::open(options, &path, index_resolver.clone(), must_exit.clone())?; + let update_file_store = UpdateFileStore::new(&path).unwrap(); + let store = UpdateStore::open(options, &path, index_resolver.clone(), must_exit.clone(), update_file_store.clone())?; let inbox = Some(inbox); - let update_file_store = UpdateFileStore::new(&path).unwrap(); - Ok(Self { store, inbox, @@ -179,8 +178,8 @@ impl UpdateLoop { Delete { uuid, ret } => { let _ = ret.send(self.handle_delete(uuid).await); } - Snapshot { uuids, path, ret } => { - let _ = ret.send(self.handle_snapshot(uuids, path).await); + Snapshot { indexes, path, ret } => { + let _ = ret.send(self.handle_snapshot(indexes, path).await); } GetInfo { ret } => { let _ = ret.send(self.handle_get_info().await); @@ -270,15 +269,13 @@ impl UpdateLoop { Ok(()) } - async fn handle_snapshot(&self, _uuids: HashSet,_pathh: PathBuf) -> Result<()> { - todo!() - //let index_handle = self.index_resolver.clone(); - //let update_store = self.store.clone(); + async fn handle_snapshot(&self, indexes: Vec, path: PathBuf) -> Result<()> { + let update_store = self.store.clone(); - //tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle)) - //.await??; + tokio::task::spawn_blocking(move || update_store.snapshot(indexes, path)) + .await??; - //Ok(()) + Ok(()) } async fn handle_dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { diff --git a/meilisearch-lib/src/index_controller/updates/store/dump.rs b/meilisearch-lib/src/index_controller/updates/store/dump.rs index cf5d7e842..996bc3432 100644 --- a/meilisearch-lib/src/index_controller/updates/store/dump.rs +++ b/meilisearch-lib/src/index_controller/updates/store/dump.rs @@ -45,16 +45,17 @@ impl UpdateStore { uuids: &HashSet, path: impl AsRef, ) -> Result<()> { - let dump_data_path = path.as_ref().join("data.jsonl"); - let mut dump_data_file = File::create(dump_data_path)?; + //let dump_data_path = path.as_ref().join("data.jsonl"); + //let mut dump_data_file = File::create(dump_data_path)?; - let update_files_path = path.as_ref().join(super::UPDATE_DIR); - create_dir_all(&update_files_path)?; + //let update_files_path = path.as_ref().join(super::UPDATE_DIR); + //create_dir_all(&update_files_path)?; - self.dump_pending(txn, uuids, &mut dump_data_file, &path)?; - self.dump_completed(txn, uuids, &mut dump_data_file)?; + //self.dump_pending(txn, uuids, &mut dump_data_file, &path)?; + //self.dump_completed(txn, uuids, &mut dump_data_file)?; - Ok(()) + //Ok(()) + todo!() } fn dump_pending( diff --git a/meilisearch-lib/src/index_controller/updates/store/mod.rs b/meilisearch-lib/src/index_controller/updates/store/mod.rs index 8d40d8309..b7bf1b457 100644 --- a/meilisearch-lib/src/index_controller/updates/store/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/store/mod.rs @@ -22,6 +22,7 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tokio::time::timeout; use uuid::Uuid; +use rayon::prelude::*; use codec::*; @@ -31,12 +32,11 @@ use super::status::{Enqueued, Processing}; use crate::EnvSizer; use crate::index_controller::update_files_path; use crate::index_controller::updates::*; +use crate::index::Index; #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; -const UPDATE_DIR: &str = "update_files"; - #[derive(Debug)] pub struct UpdateStoreInfo { /// Size of the update store in bytes. @@ -108,6 +108,7 @@ pub struct UpdateStore { state: Arc, /// Wake up the loop when a new event occurs. notification_sender: mpsc::Sender<()>, + update_file_store: UpdateFileStore, path: PathBuf, } @@ -115,6 +116,7 @@ impl UpdateStore { fn new( mut options: EnvOpenOptions, path: impl AsRef, + update_file_store: UpdateFileStore, ) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { options.max_dbs(5); @@ -138,6 +140,7 @@ impl UpdateStore { state, notification_sender, path: path.as_ref().to_owned(), + update_file_store, }, notification_receiver, )) @@ -148,8 +151,9 @@ impl UpdateStore { path: impl AsRef, index_resolver: Arc, must_exit: Arc, + update_file_store: UpdateFileStore, ) -> anyhow::Result> { - let (update_store, mut notification_receiver) = Self::new(options, path)?; + let (update_store, mut notification_receiver) = Self::new(options, path, update_file_store)?; let update_store = Arc::new(update_store); // Send a first notification to trigger the process. @@ -482,13 +486,13 @@ impl UpdateStore { pub fn snapshot( &self, - _uuids: &HashSet, + indexes: Vec, path: impl AsRef, - handle: Arc, ) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Snapshoting); + let txn = self.env.write_txn()?; let update_path = path.as_ref().join("updates"); @@ -501,42 +505,28 @@ impl UpdateStore { // create db snapshot self.env.copy_to_path(&db_path, CompactionOption::Enabled)?; - let update_files_path = update_path.join(UPDATE_DIR); - create_dir_all(&update_files_path)?; - let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); + let uuids: HashSet<_> = indexes.iter().map(|i| i.uuid).collect(); for entry in pendings { - let ((_, _uuid, _), _pending) = entry?; - //if uuids.contains(&uuid) { - //if let Enqueued { - //content: Some(uuid), - //.. - //} = pending.decode()? - //{ - //let path = update_uuid_to_file_path(&self.path, uuid); - //copy(path, &update_files_path)?; - //} - //} + let ((_, uuid, _), pending) = entry?; + if uuids.contains(&uuid) { + if let Enqueued { + meta: RegisterUpdate::DocumentAddition { + content_uuid, .. + }, + .. + } = pending.decode()? + { + self.update_file_store.snapshot(content_uuid, &path).unwrap(); + } + } } - let _path = &path.as_ref().to_path_buf(); - let _handle = &handle; - // Perform the snapshot of each index concurently. Only a third of the capabilities of - // the index actor at a time not to put too much pressure on the index actor - todo!() - //let mut stream = futures::stream::iter(uuids.iter()) - //.map(move |uuid| IndexMsg::snapshot(handle,*uuid, path.clone())) - //.buffer_unordered(CONCURRENT_INDEX_MSG / 3); + let path = path.as_ref().to_owned(); + indexes.par_iter().try_for_each(|index| index.snapshot(&path)).unwrap(); - //Handle::current().block_on(async { - //while let Some(res) = stream.next().await { - //res?; - //} - //Ok(()) as Result<()> - //})?; - - //Ok(()) + Ok(()) } pub fn get_info(&self) -> Result { diff --git a/meilisearch-lib/src/lib.rs b/meilisearch-lib/src/lib.rs index 64f93695e..23538099c 100644 --- a/meilisearch-lib/src/lib.rs +++ b/meilisearch-lib/src/lib.rs @@ -7,6 +7,8 @@ pub mod index_controller; pub use index_controller::{IndexController as MeiliSearch, updates::RegisterUpdate}; +mod compression; + use walkdir::WalkDir; pub trait EnvSizer {