fix test bug with tempdir

This commit is contained in:
mpostma 2021-03-11 17:59:47 +01:00
parent 2ae90f9c5d
commit 1fad72e019
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
7 changed files with 188 additions and 161 deletions

View File

@ -1,4 +1,4 @@
use std::collections::{hash_map::Entry, HashMap};
use std::collections::HashMap;
use std::fs::{create_dir_all, remove_dir_all, File};
use std::future::Future;
use std::path::{Path, PathBuf};
@ -8,11 +8,15 @@ use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::pin_mut;
use futures::stream::StreamExt;
use heed::EnvOpenOptions;
use log::info;
use heed::{
types::{ByteSlice, SerdeBincode},
Database, Env, EnvOpenOptions,
};
use log::debug;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::{sync::{mpsc, oneshot, RwLock}};
use tokio::task::spawn_blocking;
use uuid::Uuid;
use super::get_arc_ownership_blocking;
@ -96,6 +100,8 @@ pub enum IndexError {
IndexAlreadyExists,
#[error("Index doesn't exists")]
UnexistingIndex,
#[error("Heed error: {0}")]
HeedError(#[from] heed::Error),
}
#[async_trait::async_trait]
@ -105,10 +111,9 @@ trait IndexStore {
where
F: FnOnce(Index) -> Result<R> + Send + Sync + 'static,
R: Sync + Send + 'static;
async fn get_or_create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn delete(&self, uuid: &Uuid) -> Result<Option<Index>>;
async fn get_meta(&self, uuid: &Uuid) -> Result<Option<IndexMeta>>;
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn get_meta(&self, uuid: Uuid) -> Result<Option<IndexMeta>>;
}
impl<S: IndexStore + Sync + Send> IndexActor<S> {
@ -118,8 +123,8 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
store: S,
) -> Result<Self> {
let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options)
.map_err(|e| IndexError::Error(e.into()))?;
let update_handler =
UpdateHandler::new(&options).map_err(|e| IndexError::Error(e.into()))?;
let update_handler = Arc::new(update_handler);
let read_receiver = Some(read_receiver);
let write_receiver = Some(write_receiver);
@ -227,11 +232,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
}
async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result<SearchResult> {
let index = self.store
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
tokio::task::spawn_blocking(move || index.perform_search(query)).await?
spawn_blocking(move || index.perform_search(query)).await?
}
async fn handle_create_index(
@ -247,15 +253,14 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
meta: Processing<UpdateMeta>,
data: File,
) -> Result<UpdateResult> {
info!("Processing update {}", meta.id());
debug!("Processing update {}", meta.id());
let uuid = meta.index_uuid().clone();
let update_handler = self.update_handler.clone();
let handle = self
.store
.update_index(uuid, |index| {
let handle = tokio::task::spawn_blocking(move || {
update_handler.handle_update(meta, data, index)
});
let handle =
spawn_blocking(move || update_handler.handle_update(meta, data, index));
Ok(handle)
})
.await?;
@ -264,11 +269,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
}
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings> {
let index = self.store
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
tokio::task::spawn_blocking(move || index.settings().map_err(|e| IndexError::Error(e)))
spawn_blocking(move || index.settings().map_err(|e| IndexError::Error(e)))
.await
.map_err(|e| IndexError::Error(e.into()))?
}
@ -280,10 +286,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
let index = self.store.get(uuid)
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
tokio::task::spawn_blocking(move || {
spawn_blocking(move || {
index
.retrieve_documents(offset, limit, attributes_to_retrieve)
.map_err(|e| IndexError::Error(e))
@ -303,7 +311,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
tokio::task::spawn_blocking(move || {
spawn_blocking(move || {
index
.retrieve_document(doc_id, attributes_to_retrieve)
.map_err(|e| IndexError::Error(e))
@ -313,15 +321,15 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
}
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let index = self.store.delete(&uuid).await?;
let index = self.store.delete(uuid).await?;
if let Some(index) = index {
tokio::task::spawn(async move {
let index = index.0;
let store = get_arc_ownership_blocking(index).await;
tokio::task::spawn_blocking(move || {
spawn_blocking(move || {
store.prepare_for_closing().wait();
info!("Index {} was closed.", uuid);
debug!("Index closed");
});
});
}
@ -330,7 +338,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
}
async fn handle_get_meta(&self, uuid: Uuid) -> Result<Option<IndexMeta>> {
let result = self.store.get_meta(&uuid).await?;
let result = self.store.get_meta(uuid).await?;
Ok(result)
}
}
@ -346,7 +354,7 @@ impl IndexActorHandle {
let (read_sender, read_receiver) = mpsc::channel(100);
let (write_sender, write_receiver) = mpsc::channel(100);
let store = MapIndexStore::new(path);
let store = HeedIndexStore::new(path)?;
let actor = IndexActor::new(read_receiver, write_receiver, store)?;
tokio::task::spawn(actor.run());
Ok(Self {
@ -442,149 +450,165 @@ impl IndexActorHandle {
}
}
struct MapIndexStore {
root: PathBuf,
meta_store: AsyncMap<Uuid, IndexMeta>,
struct HeedIndexStore {
env: Env,
db: Database<ByteSlice, SerdeBincode<IndexMeta>>,
index_store: AsyncMap<Uuid, Index>,
path: PathBuf,
}
impl HeedIndexStore {
fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let mut options = EnvOpenOptions::new();
options.map_size(1_073_741_824); //1GB
let path = path.as_ref().join("indexes/");
create_dir_all(&path)?;
let env = options.open(&path)?;
let db = env.create_database(None)?;
let index_store = Arc::new(RwLock::new(HashMap::new()));
Ok(Self {
env,
db,
index_store,
path,
})
}
}
#[async_trait::async_trait]
impl IndexStore for MapIndexStore {
impl IndexStore for HeedIndexStore {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
let meta = match self.meta_store.write().await.entry(uuid.clone()) {
Entry::Vacant(entry) => {
let now = Utc::now();
let meta = IndexMeta {
uuid,
created_at: now.clone(),
updated_at: now,
primary_key,
};
entry.insert(meta).clone()
}
Entry::Occupied(_) => return Err(IndexError::IndexAlreadyExists),
};
let path = self.path.join(format!("index-{}", uuid));
let db_path = self.root.join(format!("index-{}", meta.uuid));
if path.exists() {
return Err(IndexError::IndexAlreadyExists);
}
let index: Result<Index> = tokio::task::spawn_blocking(move || {
create_dir_all(&db_path).expect("can't create db");
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100_000);
let index = milli::Index::new(options, &db_path).map_err(|e| IndexError::Error(e))?;
let index = Index(Arc::new(index));
Ok(index)
let env = self.env.clone();
let db = self.db.clone();
let (index, meta) = spawn_blocking(move || -> Result<(Index, IndexMeta)> {
let now = Utc::now();
let meta = IndexMeta {
uuid: uuid.clone(),
created_at: now.clone(),
updated_at: now,
primary_key,
};
let mut txn = env.write_txn()?;
db.put(&mut txn, uuid.as_bytes(), &meta)?;
txn.commit()?;
let index = open_index(&path, 4096 * 100_000)?;
Ok((index, meta))
})
.await
.expect("thread died");
.expect("thread died")?;
self.index_store
.write()
.await
.insert(meta.uuid.clone(), index?);
self.index_store.write().await.insert(uuid.clone(), index);
Ok(meta)
}
async fn get_or_create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index> {
match self.index_store.write().await.entry(uuid.clone()) {
Entry::Vacant(index_entry) => match self.meta_store.write().await.entry(uuid.clone()) {
Entry::Vacant(meta_entry) => {
let now = Utc::now();
let meta = IndexMeta {
uuid,
created_at: now.clone(),
updated_at: now,
primary_key,
};
let meta = meta_entry.insert(meta);
let db_path = self.root.join(format!("index-{}", meta.uuid));
let index: Result<Index> = tokio::task::spawn_blocking(move || {
create_dir_all(&db_path).expect("can't create db");
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100_000);
let index = milli::Index::new(options, &db_path)
.map_err(|e| IndexError::Error(e))?;
let index = Index(Arc::new(index));
Ok(index)
})
.await
.expect("thread died");
Ok(index_entry.insert(index?).clone())
}
Entry::Occupied(entry) => {
let meta = entry.get();
let db_path = self.root.join(format!("index-{}", meta.uuid));
let index: Result<Index> = tokio::task::spawn_blocking(move || {
create_dir_all(&db_path).expect("can't create db");
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100_000);
let index = milli::Index::new(options, &db_path)
.map_err(|e| IndexError::Error(e))?;
let index = Index(Arc::new(index));
Ok(index)
})
.await
.expect("thread died");
Ok(index_entry.insert(index?).clone())
}
},
Entry::Occupied(entry) => Ok(entry.get().clone()),
}
}
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
Ok(self.index_store.read().await.get(&uuid).cloned())
}
async fn delete(&self, uuid: &Uuid) -> Result<Option<Index>> {
let index = self.index_store.write().await.remove(&uuid);
if index.is_some() {
let db_path = self.root.join(format!("index-{}", uuid));
remove_dir_all(db_path).unwrap();
}
Ok(index)
}
async fn get_meta(&self, uuid: &Uuid) -> Result<Option<IndexMeta>> {
Ok(self.meta_store.read().await.get(uuid).cloned())
}
async fn update_index<R, F>(&self, uuid: Uuid, f: F) -> Result<R>
where
F: FnOnce(Index) -> Result<R> + Send + Sync + 'static,
R: Sync + Send + 'static,
{
let index = self.get_or_create(uuid.clone(), None).await?;
let mut meta = self
.get_meta(&uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
match f(index) {
Ok(r) => {
meta.updated_at = Utc::now();
self.meta_store.write().await.insert(uuid, meta);
Ok(r)
let guard = self.index_store.read().await;
let index = match guard.get(&uuid) {
Some(index) => index.clone(),
None => {
drop(guard);
self.create_index(uuid.clone(), None).await?;
self.index_store
.read()
.await
.get(&uuid)
.expect("Index should exist")
.clone()
}
};
let env = self.env.clone();
let db = self.db.clone();
spawn_blocking(move || {
let mut txn = env.write_txn()?;
let mut meta = db.get(&txn, uuid.as_bytes())?.expect("unexisting index");
match f(index) {
Ok(r) => {
meta.updated_at = Utc::now();
db.put(&mut txn, uuid.as_bytes(), &meta)?;
txn.commit()?;
Ok(r)
}
Err(e) => Err(e),
}
})
.await
.expect("thread died")
}
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
let guard = self.index_store.read().await;
match guard.get(&uuid) {
Some(index) => Ok(Some(index.clone())),
None => {
// drop the guard here so we can perform the write after without deadlocking;
drop(guard);
let path = self.path.join(format!("index-{}", uuid));
if !path.exists() {
return Ok(None);
}
// TODO: set this info from the database
let index = spawn_blocking(|| open_index(path, 4096 * 100_000))
.await
.expect("thread died")?;
self.index_store
.write()
.await
.insert(uuid.clone(), index.clone());
println!("here");
Ok(Some(index))
}
Err(e) => Err(e),
}
}
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>> {
let env = self.env.clone();
let db = self.db.clone();
let db_path = self.path.join(format!("index-{}", uuid));
spawn_blocking(move || -> Result<()> {
let mut txn = env.write_txn()?;
db.delete(&mut txn, uuid.as_bytes())?;
txn.commit()?;
remove_dir_all(db_path).unwrap();
Ok(())
})
.await
.expect("thread died")?;
let index = self.index_store.write().await.remove(&uuid);
Ok(index)
}
async fn get_meta(&self, uuid: Uuid) -> Result<Option<IndexMeta>> {
let env = self.env.clone();
let db = self.db.clone();
spawn_blocking(move || {
let txn = env.read_txn()?;
let meta = db.get(&txn, uuid.as_bytes())?;
Ok(meta)
})
.await
.expect("thread died")
}
}
impl MapIndexStore {
fn new(root: impl AsRef<Path>) -> Self {
let mut root = root.as_ref().to_owned();
root.push("indexes/");
let meta_store = Arc::new(RwLock::new(HashMap::new()));
let index_store = Arc::new(RwLock::new(HashMap::new()));
Self {
meta_store,
index_store,
root,
}
}
fn open_index(path: impl AsRef<Path>, size: usize) -> Result<Index> {
create_dir_all(&path).expect("can't create db");
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, &path).map_err(|e| IndexError::Error(e))?;
Ok(Index(Arc::new(index)))
}

View File

@ -12,8 +12,7 @@ use heed::{
};
use log::{error, info};
use milli::Index;
use rayon::ThreadPool;
use serde::{Deserialize, Serialize};
use rayon::ThreadPool; use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::update_handler::UpdateHandler;

View File

@ -71,7 +71,7 @@ impl IndexController {
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?;
let index_actor = index_actor::IndexActorHandle::new(&path)?;
let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path);
let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path)?;
Ok(Self { uuid_resolver, index_handle: index_actor, update_handle })
}

View File

@ -68,10 +68,11 @@ where
D: AsRef<[u8]> + Sized + 'static,
S: UpdateStoreStore,
{
fn new(store: S, inbox: mpsc::Receiver<UpdateMsg<D>>, path: impl AsRef<Path>) -> Self {
fn new(store: S, inbox: mpsc::Receiver<UpdateMsg<D>>, path: impl AsRef<Path>) -> anyhow::Result<Self> {
let path = path.as_ref().to_owned().join("update_files");
create_dir_all(&path).unwrap();
Self { store, inbox, path }
create_dir_all(&path)?;
assert!(path.exists());
Ok(Self { store, inbox, path })
}
async fn run(mut self) {
@ -211,15 +212,15 @@ impl<D> UpdateActorHandle<D>
where
D: AsRef<[u8]> + Sized + 'static + Sync + Send,
{
pub fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>) -> Self {
pub fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>) -> anyhow::Result<Self> {
let path = path.as_ref().to_owned().join("updates");
let (sender, receiver) = mpsc::channel(100);
let store = MapUpdateStoreStore::new(index_handle, &path);
let actor = UpdateActor::new(store, receiver, path);
let actor = UpdateActor::new(store, receiver, path)?;
tokio::task::spawn(actor.run());
Self { sender }
Ok(Self { sender })
}
pub async fn update(

View File

@ -277,8 +277,7 @@ impl UuidStore for HeedUuidStore {
entries.push((name.to_owned(), uuid))
}
Ok(entries)
}).await?
}
}).await? }
}
#[cfg(test)]

View File

@ -12,15 +12,17 @@ use super::service::Service;
pub struct Server {
pub service: Service,
// hod ownership to the tempdir while we use the server instance.
_dir: tempdir::TempDir,
}
impl Server {
pub async fn new() -> Self {
let tmp_dir = TempDir::new("meilisearch").unwrap();
let dir = TempDir::new("meilisearch").unwrap();
let opt = Opt {
db_path: tmp_dir.path().join("db"),
dumps_dir: tmp_dir.path().join("dump"),
db_path: dir.path().join("db"),
dumps_dir: dir.path().join("dump"),
dump_batch_size: 16,
http_addr: "127.0.0.1:7700".to_owned(),
master_key: None,
@ -55,6 +57,7 @@ impl Server {
Server {
service,
_dir: dir,
}
}

View File

@ -35,7 +35,8 @@ async fn update_settings_unknown_field() {
async fn test_partial_update() {
let server = Server::new().await;
let index = server.index("test");
index.update_settings(json!({"displayedAttributes": ["foo"]})).await;
let (response, _code) = index.update_settings(json!({"displayedAttributes": ["foo"]})).await;
println!("response: {}", response);
index.wait_update_id(0).await;
let (response, code) = index.settings().await;
assert_eq!(code, 200);