From 1fad72e019ef09279571335edd677cb8453f3e28 Mon Sep 17 00:00:00 2001 From: mpostma Date: Thu, 11 Mar 2021 17:59:47 +0100 Subject: [PATCH] fix test bug with tempdir --- .../src/index_controller/index_actor.rs | 316 ++++++++++-------- .../local_index_controller/index_store.rs | 3 +- meilisearch-http/src/index_controller/mod.rs | 2 +- .../src/index_controller/update_actor.rs | 13 +- .../src/index_controller/uuid_resolver.rs | 3 +- meilisearch-http/tests/common/server.rs | 9 +- .../tests/settings/get_settings.rs | 3 +- 7 files changed, 188 insertions(+), 161 deletions(-) diff --git a/meilisearch-http/src/index_controller/index_actor.rs b/meilisearch-http/src/index_controller/index_actor.rs index 1ddc041a1..d8dc5f014 100644 --- a/meilisearch-http/src/index_controller/index_actor.rs +++ b/meilisearch-http/src/index_controller/index_actor.rs @@ -1,4 +1,4 @@ -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::HashMap; use std::fs::{create_dir_all, remove_dir_all, File}; use std::future::Future; use std::path::{Path, PathBuf}; @@ -8,11 +8,15 @@ use async_stream::stream; use chrono::{DateTime, Utc}; use futures::pin_mut; use futures::stream::StreamExt; -use heed::EnvOpenOptions; -use log::info; +use heed::{ + types::{ByteSlice, SerdeBincode}, + Database, Env, EnvOpenOptions, +}; +use log::debug; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::sync::{mpsc, oneshot, RwLock}; +use tokio::{sync::{mpsc, oneshot, RwLock}}; +use tokio::task::spawn_blocking; use uuid::Uuid; use super::get_arc_ownership_blocking; @@ -96,6 +100,8 @@ pub enum IndexError { IndexAlreadyExists, #[error("Index doesn't exists")] UnexistingIndex, + #[error("Heed error: {0}")] + HeedError(#[from] heed::Error), } #[async_trait::async_trait] @@ -105,10 +111,9 @@ trait IndexStore { where F: FnOnce(Index) -> Result + Send + Sync + 'static, R: Sync + Send + 'static; - async fn get_or_create(&self, uuid: Uuid, primary_key: Option) -> Result; async fn get(&self, uuid: Uuid) -> Result>; - async fn delete(&self, uuid: &Uuid) -> Result>; - async fn get_meta(&self, uuid: &Uuid) -> Result>; + async fn delete(&self, uuid: Uuid) -> Result>; + async fn get_meta(&self, uuid: Uuid) -> Result>; } impl IndexActor { @@ -118,8 +123,8 @@ impl IndexActor { store: S, ) -> Result { let options = IndexerOpts::default(); - let update_handler = UpdateHandler::new(&options) - .map_err(|e| IndexError::Error(e.into()))?; + let update_handler = + UpdateHandler::new(&options).map_err(|e| IndexError::Error(e.into()))?; let update_handler = Arc::new(update_handler); let read_receiver = Some(read_receiver); let write_receiver = Some(write_receiver); @@ -227,11 +232,12 @@ impl IndexActor { } async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result { - let index = self.store + let index = self + .store .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - tokio::task::spawn_blocking(move || index.perform_search(query)).await? + spawn_blocking(move || index.perform_search(query)).await? } async fn handle_create_index( @@ -247,15 +253,14 @@ impl IndexActor { meta: Processing, data: File, ) -> Result { - info!("Processing update {}", meta.id()); + debug!("Processing update {}", meta.id()); let uuid = meta.index_uuid().clone(); let update_handler = self.update_handler.clone(); let handle = self .store .update_index(uuid, |index| { - let handle = tokio::task::spawn_blocking(move || { - update_handler.handle_update(meta, data, index) - }); + let handle = + spawn_blocking(move || update_handler.handle_update(meta, data, index)); Ok(handle) }) .await?; @@ -264,11 +269,12 @@ impl IndexActor { } async fn handle_settings(&self, uuid: Uuid) -> Result { - let index = self.store + let index = self + .store .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - tokio::task::spawn_blocking(move || index.settings().map_err(|e| IndexError::Error(e))) + spawn_blocking(move || index.settings().map_err(|e| IndexError::Error(e))) .await .map_err(|e| IndexError::Error(e.into()))? } @@ -280,10 +286,12 @@ impl IndexActor { limit: usize, attributes_to_retrieve: Option>, ) -> Result> { - let index = self.store.get(uuid) + let index = self + .store + .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { index .retrieve_documents(offset, limit, attributes_to_retrieve) .map_err(|e| IndexError::Error(e)) @@ -303,7 +311,7 @@ impl IndexActor { .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { index .retrieve_document(doc_id, attributes_to_retrieve) .map_err(|e| IndexError::Error(e)) @@ -313,15 +321,15 @@ impl IndexActor { } async fn handle_delete(&self, uuid: Uuid) -> Result<()> { - let index = self.store.delete(&uuid).await?; + let index = self.store.delete(uuid).await?; if let Some(index) = index { tokio::task::spawn(async move { let index = index.0; let store = get_arc_ownership_blocking(index).await; - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { store.prepare_for_closing().wait(); - info!("Index {} was closed.", uuid); + debug!("Index closed"); }); }); } @@ -330,7 +338,7 @@ impl IndexActor { } async fn handle_get_meta(&self, uuid: Uuid) -> Result> { - let result = self.store.get_meta(&uuid).await?; + let result = self.store.get_meta(uuid).await?; Ok(result) } } @@ -346,7 +354,7 @@ impl IndexActorHandle { let (read_sender, read_receiver) = mpsc::channel(100); let (write_sender, write_receiver) = mpsc::channel(100); - let store = MapIndexStore::new(path); + let store = HeedIndexStore::new(path)?; let actor = IndexActor::new(read_receiver, write_receiver, store)?; tokio::task::spawn(actor.run()); Ok(Self { @@ -442,149 +450,165 @@ impl IndexActorHandle { } } -struct MapIndexStore { - root: PathBuf, - meta_store: AsyncMap, +struct HeedIndexStore { + env: Env, + db: Database>, index_store: AsyncMap, + path: PathBuf, +} + +impl HeedIndexStore { + fn new(path: impl AsRef) -> anyhow::Result { + let mut options = EnvOpenOptions::new(); + options.map_size(1_073_741_824); //1GB + let path = path.as_ref().join("indexes/"); + create_dir_all(&path)?; + let env = options.open(&path)?; + let db = env.create_database(None)?; + let index_store = Arc::new(RwLock::new(HashMap::new())); + Ok(Self { + env, + db, + index_store, + path, + }) + } } #[async_trait::async_trait] -impl IndexStore for MapIndexStore { +impl IndexStore for HeedIndexStore { async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { - let meta = match self.meta_store.write().await.entry(uuid.clone()) { - Entry::Vacant(entry) => { - let now = Utc::now(); - let meta = IndexMeta { - uuid, - created_at: now.clone(), - updated_at: now, - primary_key, - }; - entry.insert(meta).clone() - } - Entry::Occupied(_) => return Err(IndexError::IndexAlreadyExists), - }; + let path = self.path.join(format!("index-{}", uuid)); - let db_path = self.root.join(format!("index-{}", meta.uuid)); + if path.exists() { + return Err(IndexError::IndexAlreadyExists); + } - let index: Result = tokio::task::spawn_blocking(move || { - create_dir_all(&db_path).expect("can't create db"); - let mut options = EnvOpenOptions::new(); - options.map_size(4096 * 100_000); - let index = milli::Index::new(options, &db_path).map_err(|e| IndexError::Error(e))?; - let index = Index(Arc::new(index)); - Ok(index) + let env = self.env.clone(); + let db = self.db.clone(); + let (index, meta) = spawn_blocking(move || -> Result<(Index, IndexMeta)> { + let now = Utc::now(); + let meta = IndexMeta { + uuid: uuid.clone(), + created_at: now.clone(), + updated_at: now, + primary_key, + }; + let mut txn = env.write_txn()?; + db.put(&mut txn, uuid.as_bytes(), &meta)?; + txn.commit()?; + + let index = open_index(&path, 4096 * 100_000)?; + + Ok((index, meta)) }) .await - .expect("thread died"); + .expect("thread died")?; - self.index_store - .write() - .await - .insert(meta.uuid.clone(), index?); + self.index_store.write().await.insert(uuid.clone(), index); Ok(meta) } - async fn get_or_create(&self, uuid: Uuid, primary_key: Option) -> Result { - match self.index_store.write().await.entry(uuid.clone()) { - Entry::Vacant(index_entry) => match self.meta_store.write().await.entry(uuid.clone()) { - Entry::Vacant(meta_entry) => { - let now = Utc::now(); - let meta = IndexMeta { - uuid, - created_at: now.clone(), - updated_at: now, - primary_key, - }; - let meta = meta_entry.insert(meta); - let db_path = self.root.join(format!("index-{}", meta.uuid)); - - let index: Result = tokio::task::spawn_blocking(move || { - create_dir_all(&db_path).expect("can't create db"); - let mut options = EnvOpenOptions::new(); - options.map_size(4096 * 100_000); - let index = milli::Index::new(options, &db_path) - .map_err(|e| IndexError::Error(e))?; - let index = Index(Arc::new(index)); - Ok(index) - }) - .await - .expect("thread died"); - - Ok(index_entry.insert(index?).clone()) - } - Entry::Occupied(entry) => { - let meta = entry.get(); - let db_path = self.root.join(format!("index-{}", meta.uuid)); - - let index: Result = tokio::task::spawn_blocking(move || { - create_dir_all(&db_path).expect("can't create db"); - let mut options = EnvOpenOptions::new(); - options.map_size(4096 * 100_000); - let index = milli::Index::new(options, &db_path) - .map_err(|e| IndexError::Error(e))?; - let index = Index(Arc::new(index)); - Ok(index) - }) - .await - .expect("thread died"); - - Ok(index_entry.insert(index?).clone()) - } - }, - Entry::Occupied(entry) => Ok(entry.get().clone()), - } - } - - async fn get(&self, uuid: Uuid) -> Result> { - Ok(self.index_store.read().await.get(&uuid).cloned()) - } - - async fn delete(&self, uuid: &Uuid) -> Result> { - let index = self.index_store.write().await.remove(&uuid); - if index.is_some() { - let db_path = self.root.join(format!("index-{}", uuid)); - remove_dir_all(db_path).unwrap(); - } - Ok(index) - } - - async fn get_meta(&self, uuid: &Uuid) -> Result> { - Ok(self.meta_store.read().await.get(uuid).cloned()) - } - async fn update_index(&self, uuid: Uuid, f: F) -> Result where F: FnOnce(Index) -> Result + Send + Sync + 'static, R: Sync + Send + 'static, { - let index = self.get_or_create(uuid.clone(), None).await?; - let mut meta = self - .get_meta(&uuid) - .await? - .ok_or(IndexError::UnexistingIndex)?; - match f(index) { - Ok(r) => { - meta.updated_at = Utc::now(); - self.meta_store.write().await.insert(uuid, meta); - Ok(r) + let guard = self.index_store.read().await; + let index = match guard.get(&uuid) { + Some(index) => index.clone(), + None => { + drop(guard); + self.create_index(uuid.clone(), None).await?; + self.index_store + .read() + .await + .get(&uuid) + .expect("Index should exist") + .clone() + } + }; + + let env = self.env.clone(); + let db = self.db.clone(); + spawn_blocking(move || { + let mut txn = env.write_txn()?; + let mut meta = db.get(&txn, uuid.as_bytes())?.expect("unexisting index"); + match f(index) { + Ok(r) => { + meta.updated_at = Utc::now(); + db.put(&mut txn, uuid.as_bytes(), &meta)?; + txn.commit()?; + Ok(r) + } + Err(e) => Err(e), + } + }) + .await + .expect("thread died") + } + + async fn get(&self, uuid: Uuid) -> Result> { + let guard = self.index_store.read().await; + match guard.get(&uuid) { + Some(index) => Ok(Some(index.clone())), + None => { + // drop the guard here so we can perform the write after without deadlocking; + drop(guard); + let path = self.path.join(format!("index-{}", uuid)); + if !path.exists() { + return Ok(None); + } + + // TODO: set this info from the database + let index = spawn_blocking(|| open_index(path, 4096 * 100_000)) + .await + .expect("thread died")?; + self.index_store + .write() + .await + .insert(uuid.clone(), index.clone()); + println!("here"); + Ok(Some(index)) } - Err(e) => Err(e), } } + + async fn delete(&self, uuid: Uuid) -> Result> { + let env = self.env.clone(); + let db = self.db.clone(); + let db_path = self.path.join(format!("index-{}", uuid)); + spawn_blocking(move || -> Result<()> { + let mut txn = env.write_txn()?; + db.delete(&mut txn, uuid.as_bytes())?; + txn.commit()?; + remove_dir_all(db_path).unwrap(); + Ok(()) + }) + .await + .expect("thread died")?; + let index = self.index_store.write().await.remove(&uuid); + Ok(index) + } + + async fn get_meta(&self, uuid: Uuid) -> Result> { + let env = self.env.clone(); + let db = self.db.clone(); + spawn_blocking(move || { + let txn = env.read_txn()?; + let meta = db.get(&txn, uuid.as_bytes())?; + Ok(meta) + }) + .await + .expect("thread died") + } } -impl MapIndexStore { - fn new(root: impl AsRef) -> Self { - let mut root = root.as_ref().to_owned(); - root.push("indexes/"); - let meta_store = Arc::new(RwLock::new(HashMap::new())); - let index_store = Arc::new(RwLock::new(HashMap::new())); - Self { - meta_store, - index_store, - root, - } - } +fn open_index(path: impl AsRef, size: usize) -> Result { + create_dir_all(&path).expect("can't create db"); + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let index = milli::Index::new(options, &path).map_err(|e| IndexError::Error(e))?; + Ok(Index(Arc::new(index))) } diff --git a/meilisearch-http/src/index_controller/local_index_controller/index_store.rs b/meilisearch-http/src/index_controller/local_index_controller/index_store.rs index 3fe8a3f59..a690abaf4 100644 --- a/meilisearch-http/src/index_controller/local_index_controller/index_store.rs +++ b/meilisearch-http/src/index_controller/local_index_controller/index_store.rs @@ -12,8 +12,7 @@ use heed::{ }; use log::{error, info}; use milli::Index; -use rayon::ThreadPool; -use serde::{Deserialize, Serialize}; +use rayon::ThreadPool; use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::update_handler::UpdateHandler; diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 85469728b..2c181817b 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -71,7 +71,7 @@ impl IndexController { pub fn new(path: impl AsRef) -> anyhow::Result { let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?; let index_actor = index_actor::IndexActorHandle::new(&path)?; - let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path); + let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path)?; Ok(Self { uuid_resolver, index_handle: index_actor, update_handle }) } diff --git a/meilisearch-http/src/index_controller/update_actor.rs b/meilisearch-http/src/index_controller/update_actor.rs index 32eda7bcb..708e8fd19 100644 --- a/meilisearch-http/src/index_controller/update_actor.rs +++ b/meilisearch-http/src/index_controller/update_actor.rs @@ -68,10 +68,11 @@ where D: AsRef<[u8]> + Sized + 'static, S: UpdateStoreStore, { - fn new(store: S, inbox: mpsc::Receiver>, path: impl AsRef) -> Self { + fn new(store: S, inbox: mpsc::Receiver>, path: impl AsRef) -> anyhow::Result { let path = path.as_ref().to_owned().join("update_files"); - create_dir_all(&path).unwrap(); - Self { store, inbox, path } + create_dir_all(&path)?; + assert!(path.exists()); + Ok(Self { store, inbox, path }) } async fn run(mut self) { @@ -211,15 +212,15 @@ impl UpdateActorHandle where D: AsRef<[u8]> + Sized + 'static + Sync + Send, { - pub fn new(index_handle: IndexActorHandle, path: impl AsRef) -> Self { + pub fn new(index_handle: IndexActorHandle, path: impl AsRef) -> anyhow::Result { let path = path.as_ref().to_owned().join("updates"); let (sender, receiver) = mpsc::channel(100); let store = MapUpdateStoreStore::new(index_handle, &path); - let actor = UpdateActor::new(store, receiver, path); + let actor = UpdateActor::new(store, receiver, path)?; tokio::task::spawn(actor.run()); - Self { sender } + Ok(Self { sender }) } pub async fn update( diff --git a/meilisearch-http/src/index_controller/uuid_resolver.rs b/meilisearch-http/src/index_controller/uuid_resolver.rs index a369790f3..d8d39d922 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver.rs @@ -277,8 +277,7 @@ impl UuidStore for HeedUuidStore { entries.push((name.to_owned(), uuid)) } Ok(entries) - }).await? - } + }).await? } } #[cfg(test)] diff --git a/meilisearch-http/tests/common/server.rs b/meilisearch-http/tests/common/server.rs index 943284736..090524601 100644 --- a/meilisearch-http/tests/common/server.rs +++ b/meilisearch-http/tests/common/server.rs @@ -12,15 +12,17 @@ use super::service::Service; pub struct Server { pub service: Service, + // hod ownership to the tempdir while we use the server instance. + _dir: tempdir::TempDir, } impl Server { pub async fn new() -> Self { - let tmp_dir = TempDir::new("meilisearch").unwrap(); + let dir = TempDir::new("meilisearch").unwrap(); let opt = Opt { - db_path: tmp_dir.path().join("db"), - dumps_dir: tmp_dir.path().join("dump"), + db_path: dir.path().join("db"), + dumps_dir: dir.path().join("dump"), dump_batch_size: 16, http_addr: "127.0.0.1:7700".to_owned(), master_key: None, @@ -55,6 +57,7 @@ impl Server { Server { service, + _dir: dir, } } diff --git a/meilisearch-http/tests/settings/get_settings.rs b/meilisearch-http/tests/settings/get_settings.rs index e5fbbde35..ba5d9651c 100644 --- a/meilisearch-http/tests/settings/get_settings.rs +++ b/meilisearch-http/tests/settings/get_settings.rs @@ -35,7 +35,8 @@ async fn update_settings_unknown_field() { async fn test_partial_update() { let server = Server::new().await; let index = server.index("test"); - index.update_settings(json!({"displayedAttributes": ["foo"]})).await; + let (response, _code) = index.update_settings(json!({"displayedAttributes": ["foo"]})).await; + println!("response: {}", response); index.wait_update_id(0).await; let (response, code) = index.settings().await; assert_eq!(code, 200);