diff --git a/meilisearch-auth/src/store.rs b/meilisearch-auth/src/store.rs index 3b309e6b8..7672efbca 100644 --- a/meilisearch-auth/src/store.rs +++ b/meilisearch-auth/src/store.rs @@ -5,6 +5,7 @@ use std::convert::TryInto; use std::fs::create_dir_all; use std::path::Path; use std::str; +use std::sync::Arc; use chrono::{DateTime, Utc}; use heed::types::{ByteSlice, DecodeIgnore, SerdeJson}; @@ -23,11 +24,19 @@ pub type KeyId = [u8; KEY_ID_LENGTH]; #[derive(Clone)] pub struct HeedAuthStore { - env: Env, + env: Arc, keys: Database>, action_keyid_index_expiration: Database>>>, } +impl Drop for HeedAuthStore { + fn drop(&mut self) { + if Arc::strong_count(&self.env) == 1 { + self.env.as_ref().clone().prepare_for_closing(); + } + } +} + impl HeedAuthStore { pub fn new(path: impl AsRef) -> Result { let path = path.as_ref().join(AUTH_DB_PATH); @@ -35,7 +44,7 @@ impl HeedAuthStore { let mut options = EnvOpenOptions::new(); options.map_size(AUTH_STORE_SIZE); // 1GB options.max_dbs(2); - let env = options.open(path)?; + let env = Arc::new(options.open(path)?); let keys = env.create_database(Some(KEY_DB_NAME))?; let action_keyid_index_expiration = env.create_database(Some(KEY_ID_ACTION_INDEX_EXPIRATION_DB_NAME))?; diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v4.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v4.rs index 1878c3cc3..ea71298e3 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v4.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v4.rs @@ -1,4 +1,5 @@ use std::path::Path; +use std::sync::Arc; use heed::EnvOpenOptions; use log::info; @@ -26,7 +27,7 @@ pub fn load_dump( let mut options = EnvOpenOptions::new(); options.map_size(meta_env_size); options.max_dbs(100); - let env = options.open(&dst)?; + let env = Arc::new(options.open(&dst)?); IndexResolver::load_dump( src.as_ref(), diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index 001bbdbd7..4e2343850 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -192,7 +192,7 @@ impl IndexControllerBuilder { options.map_size(task_store_size); options.max_dbs(20); - let meta_env = options.open(&db_path)?; + let meta_env = Arc::new(options.open(&db_path)?); let update_file_store = UpdateFileStore::new(&db_path)?; // Create or overwrite the version file for this DB diff --git a/meilisearch-lib/src/index_resolver/meta_store.rs b/meilisearch-lib/src/index_resolver/meta_store.rs index 30df1d9eb..6ca615dbf 100644 --- a/meilisearch-lib/src/index_resolver/meta_store.rs +++ b/meilisearch-lib/src/index_resolver/meta_store.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use std::fs::{create_dir_all, File}; use std::io::{BufRead, BufReader, Write}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use heed::types::{SerdeBincode, Str}; use heed::{CompactionOption, Database, Env}; @@ -42,12 +43,20 @@ pub struct IndexMeta { #[derive(Clone)] pub struct HeedMetaStore { - env: Env, + env: Arc, db: Database>, } +impl Drop for HeedMetaStore { + fn drop(&mut self) { + if Arc::strong_count(&self.env) == 1 { + self.env.as_ref().clone().prepare_for_closing(); + } + } +} + impl HeedMetaStore { - pub fn new(env: heed::Env) -> Result { + pub fn new(env: Arc) -> Result { let db = env.create_database(Some("uuids"))?; Ok(Self { env, db }) } @@ -144,7 +153,7 @@ impl HeedMetaStore { Ok(()) } - pub fn load_dump(src: impl AsRef, env: heed::Env) -> Result<()> { + pub fn load_dump(src: impl AsRef, env: Arc) -> Result<()> { let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl"); let indexes = File::open(&src_indexes)?; let mut indexes = BufReader::new(indexes); diff --git a/meilisearch-lib/src/index_resolver/mod.rs b/meilisearch-lib/src/index_resolver/mod.rs index 79493407e..0b6beb2f3 100644 --- a/meilisearch-lib/src/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_resolver/mod.rs @@ -4,6 +4,7 @@ pub mod meta_store; use std::convert::TryInto; use std::path::Path; +use std::sync::Arc; use chrono::Utc; use error::{IndexResolverError, Result}; @@ -16,13 +17,11 @@ use serde::{Deserialize, Serialize}; use tokio::task::spawn_blocking; use uuid::Uuid; -use crate::index::update_handler::UpdateHandler; -use crate::index::{error::Result as IndexResult, Index}; +use crate::index::{error::Result as IndexResult, update_handler::UpdateHandler, Index}; use crate::options::IndexerOpts; use crate::tasks::batch::Batch; use crate::tasks::task::{DocumentDeletion, Job, Task, TaskContent, TaskEvent, TaskId, TaskResult}; -use crate::tasks::Pending; -use crate::tasks::TaskPerformer; +use crate::tasks::{Pending, TaskPerformer}; use crate::update_file_store::UpdateFileStore; use self::meta_store::IndexMeta; @@ -39,7 +38,7 @@ pub fn create_index_resolver( path: impl AsRef, index_size: usize, indexer_opts: &IndexerOpts, - meta_env: heed::Env, + meta_env: Arc, file_store: UpdateFileStore, ) -> anyhow::Result { let uuid_store = HeedMetaStore::new(meta_env)?; @@ -153,7 +152,7 @@ impl IndexResolver { src: impl AsRef, dst: impl AsRef, index_db_size: usize, - env: Env, + env: Arc, indexer_opts: &IndexerOpts, ) -> anyhow::Result<()> { HeedMetaStore::load_dump(&src, env)?; diff --git a/meilisearch-lib/src/tasks/mod.rs b/meilisearch-lib/src/tasks/mod.rs index fea5aa085..9d6de324a 100644 --- a/meilisearch-lib/src/tasks/mod.rs +++ b/meilisearch-lib/src/tasks/mod.rs @@ -32,7 +32,7 @@ pub trait TaskPerformer: Sync + Send + 'static { async fn finish(&self, batch: &Batch); } -pub fn create_task_store

(env: heed::Env, performer: Arc

) -> Result +pub fn create_task_store

(env: Arc, performer: Arc

) -> Result where P: TaskPerformer, { diff --git a/meilisearch-lib/src/tasks/task_store/mod.rs b/meilisearch-lib/src/tasks/task_store/mod.rs index 46b1c9cd1..d8e286ff3 100644 --- a/meilisearch-lib/src/tasks/task_store/mod.rs +++ b/meilisearch-lib/src/tasks/task_store/mod.rs @@ -114,7 +114,7 @@ impl Clone for TaskStore { } impl TaskStore { - pub fn new(env: heed::Env) -> Result { + pub fn new(env: Arc) -> Result { let mut store = Store::new(env)?; let unfinished_tasks = store.reset_and_return_unfinished_tasks()?; let store = Arc::new(store); @@ -293,7 +293,7 @@ impl TaskStore { Ok(()) } - pub fn load_dump(src: impl AsRef, env: Env) -> anyhow::Result<()> { + pub fn load_dump(src: impl AsRef, env: Arc) -> anyhow::Result<()> { // create a dummy update field store, since it is not needed right now. let store = Self::new(env.clone())?; @@ -340,7 +340,7 @@ pub mod test { } impl MockTaskStore { - pub fn new(env: heed::Env) -> Result { + pub fn new(env: Arc) -> Result { Ok(Self::Real(TaskStore::new(env)?)) } @@ -432,7 +432,7 @@ pub mod test { } } - pub fn load_dump(path: impl AsRef, env: Env) -> anyhow::Result<()> { + pub fn load_dump(path: impl AsRef, env: Arc) -> anyhow::Result<()> { TaskStore::load_dump(path, env) } } diff --git a/meilisearch-lib/src/tasks/task_store/store.rs b/meilisearch-lib/src/tasks/task_store/store.rs index 936e366c0..49413f167 100644 --- a/meilisearch-lib/src/tasks/task_store/store.rs +++ b/meilisearch-lib/src/tasks/task_store/store.rs @@ -10,6 +10,7 @@ use std::convert::TryInto; use std::mem::size_of; use std::ops::Range; use std::result::Result as StdResult; +use std::sync::Arc; use heed::types::{ByteSlice, OwnedType, SerdeJson, Unit}; use heed::{BytesDecode, BytesEncode, Database, Env, RoTxn, RwTxn}; @@ -53,18 +54,26 @@ impl<'a> BytesDecode<'a> for IndexUidTaskIdCodec { } pub struct Store { - env: Env, + env: Arc, uids_task_ids: Database, tasks: Database, SerdeJson>, } +impl Drop for Store { + fn drop(&mut self) { + if Arc::strong_count(&self.env) == 1 { + self.env.as_ref().clone().prepare_for_closing(); + } + } +} + impl Store { /// Create a new store from the specified `Path`. /// Be really cautious when calling this function, the returned `Store` may /// be in an invalid state, with dangling processing tasks. /// You want to patch all un-finished tasks and put them in your pending /// queue with the `reset_and_return_unfinished_update` method. - pub fn new(env: heed::Env) -> Result { + pub fn new(env: Arc) -> Result { let uids_task_ids = env.create_database(Some(UID_TASK_IDS))?; let tasks = env.create_database(Some(TASKS))?; @@ -257,10 +266,10 @@ pub mod test { Fake(Mocker), } - pub struct TmpEnv(TempDir, heed::Env); + pub struct TmpEnv(TempDir, Arc); impl TmpEnv { - pub fn env(&self) -> heed::Env { + pub fn env(&self) -> Arc { self.1.clone() } } @@ -271,13 +280,13 @@ pub mod test { let mut options = EnvOpenOptions::new(); options.map_size(4096 * 100000); options.max_dbs(1000); - let env = options.open(tmp.path()).unwrap(); + let env = Arc::new(options.open(tmp.path()).unwrap()); TmpEnv(tmp, env) } impl MockStore { - pub fn new(env: heed::Env) -> Result { + pub fn new(env: Arc) -> Result { Ok(Self::Real(Store::new(env)?)) }