From ece4c739f4fa29bd644054ec4b8f6b898a84dc54 Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 5 Oct 2021 11:47:50 +0200 Subject: [PATCH] update store tests --- meilisearch-lib/src/index/mod.rs | 26 +- .../src/index_controller/updates/store/mod.rs | 316 +++++++++++------- 2 files changed, 211 insertions(+), 131 deletions(-) diff --git a/meilisearch-lib/src/index/mod.rs b/meilisearch-lib/src/index/mod.rs index e482cad8c..c4b1fd07d 100644 --- a/meilisearch-lib/src/index/mod.rs +++ b/meilisearch-lib/src/index/mod.rs @@ -103,20 +103,31 @@ pub mod test { } } - pub struct StubBuilder<'a> { + pub struct StubBuilder<'a, A, R> { name: String, store: &'a StubStore, times: Option, + _f: std::marker::PhantomData R> } - impl<'a> StubBuilder<'a> { + impl<'a, A: 'static, R: 'static> StubBuilder<'a, A, R> { + /// Asserts the stub has been called exactly `times` times. #[must_use] pub fn times(mut self, times: usize) -> Self { self.times = Some(times); self } - pub fn then(self, f: impl Fn(A) -> R + Sync + Send + 'static) { + /// Asserts the stub has been called exactly once. + #[must_use] + pub fn once(mut self) -> Self { + self.times = Some(1); + self + } + + /// The function that will be called when the stub is called. This needs to be called to + /// actually build the stub and register it to the stub store. + pub fn then(self, f: impl Fn(A) -> R + Sync + Send + 'static) { let stub = Stub { stub: Box::new(f), times: self.times, @@ -130,21 +141,18 @@ pub mod test { /// Mocker allows to stub metod call on any struct. you can register stubs by calling /// `Mocker::when` and retrieve it in the proxy implementation when with `Mocker::get`. - /// - /// Mocker uses unsafe code to erase function types, because `Any` is too restrictive with it's - /// requirement for all stub arguments to be static. Because of that panic inside a stub is UB, - /// and it has been observed to crash with an illegal hardware instruction. Use with caution. #[derive(Debug, Default)] pub struct Mocker { store: StubStore, } impl Mocker { - pub fn when(&self, name: &str) -> StubBuilder { + pub fn when(&self, name: &str) -> StubBuilder { StubBuilder { name: name.to_string(), store: &self.store, times: None, + _f: std::marker::PhantomData, } } @@ -191,7 +199,7 @@ pub mod test { pub fn handle_update(&self, update: Processing) -> std::result::Result { match self { MockIndex::Vrai(index) => index.handle_update(update), - MockIndex::Faux(_) => todo!(), + MockIndex::Faux(faux) => faux.get("handle_update").call(update), } } diff --git a/meilisearch-lib/src/index_controller/updates/store/mod.rs b/meilisearch-lib/src/index_controller/updates/store/mod.rs index 74f20517a..55d2a37db 100644 --- a/meilisearch-lib/src/index_controller/updates/store/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/store/mod.rs @@ -569,149 +569,221 @@ impl UpdateStore { } } -//#[cfg(test)] -//mod test { -//use super::*; -//use crate::index_controller::{ -//index_actor::{error::IndexActorError, MockIndexActorHandle}, -//UpdateResult, -//}; +#[cfg(test)] +mod test { + use futures::future::ok; + use mockall::predicate::eq; -//use futures::future::ok; + use crate::index::error::IndexError; + use crate::index::test::Mocker; + use crate::index_controller::index_resolver::uuid_store::MockUuidStore; + use crate::index_controller::index_resolver::index_store::MockIndexStore; + use crate::index_controller::updates::status::{Failed, Processed}; -//#[actix_rt::test] -//async fn test_next_id() { -//let dir = tempfile::tempdir_in(".").unwrap(); -//let mut options = EnvOpenOptions::new(); -//let handle = Arc::new(MockIndexActorHandle::new()); -//options.map_size(4096 * 100); -//let update_store = UpdateStore::open( -//options, -//dir.path(), -//handle, -//Arc::new(AtomicBool::new(false)), -//) -//.unwrap(); + use super::*; -//let index1_uuid = Uuid::new_v4(); -//let index2_uuid = Uuid::new_v4(); + #[actix_rt::test] + async fn test_next_id() { + let dir = tempfile::tempdir_in(".").unwrap(); + let mut options = EnvOpenOptions::new(); + let index_store = MockIndexStore::new(); + let uuid_store = MockUuidStore::new(); + let index_resolver = IndexResolver::new(uuid_store, index_store); + let update_file_store = UpdateFileStore::new(dir.path()).unwrap(); + options.map_size(4096 * 100); + let update_store = UpdateStore::open( + options, + dir.path(), + Arc::new(index_resolver), + Arc::new(AtomicBool::new(false)), + update_file_store, + ) + .unwrap(); -//let mut txn = update_store.env.write_txn().unwrap(); -//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); -//txn.commit().unwrap(); -//assert_eq!((0, 0), ids); + let index1_uuid = Uuid::new_v4(); + let index2_uuid = Uuid::new_v4(); -//let mut txn = update_store.env.write_txn().unwrap(); -//let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); -//txn.commit().unwrap(); -//assert_eq!((1, 0), ids); + let mut txn = update_store.env.write_txn().unwrap(); + let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); + txn.commit().unwrap(); + assert_eq!((0, 0), ids); -//let mut txn = update_store.env.write_txn().unwrap(); -//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); -//txn.commit().unwrap(); -//assert_eq!((2, 1), ids); -//} + let mut txn = update_store.env.write_txn().unwrap(); + let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); + txn.commit().unwrap(); + assert_eq!((1, 0), ids); -//#[actix_rt::test] -//async fn test_register_update() { -//let dir = tempfile::tempdir_in(".").unwrap(); -//let mut options = EnvOpenOptions::new(); -//let handle = Arc::new(MockIndexActorHandle::new()); -//options.map_size(4096 * 100); -//let update_store = UpdateStore::open( -//options, -//dir.path(), -//handle, -//Arc::new(AtomicBool::new(false)), -//) -//.unwrap(); -//let meta = UpdateMeta::ClearDocuments; -//let uuid = Uuid::new_v4(); -//let store_clone = update_store.clone(); -//tokio::task::spawn_blocking(move || { -//store_clone.register_update(meta, None, uuid).unwrap(); -//}) -//.await -//.unwrap(); + let mut txn = update_store.env.write_txn().unwrap(); + let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); + txn.commit().unwrap(); + assert_eq!((2, 1), ids); + } -//let txn = update_store.env.read_txn().unwrap(); -//assert!(update_store -//.pending_queue -//.get(&txn, &(0, uuid, 0)) -//.unwrap() -//.is_some()); -//} + #[actix_rt::test] + async fn test_register_update() { + let dir = tempfile::tempdir_in(".").unwrap(); + let index_store = MockIndexStore::new(); + let uuid_store = MockUuidStore::new(); + let index_resolver = IndexResolver::new(uuid_store, index_store); + let update_file_store = UpdateFileStore::new(dir.path()).unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(4096 * 100); + let update_store = UpdateStore::open( + options, + dir.path(), + Arc::new(index_resolver), + Arc::new(AtomicBool::new(false)), + update_file_store, + ) + .unwrap(); + let update = Update::ClearDocuments; + let uuid = Uuid::new_v4(); + let store_clone = update_store.clone(); + tokio::task::spawn_blocking(move || { + store_clone.register_update(uuid, update).unwrap(); + }) + .await + .unwrap(); -//#[actix_rt::test] -//async fn test_process_update() { -//let dir = tempfile::tempdir_in(".").unwrap(); -//let mut handle = MockIndexActorHandle::new(); + let txn = update_store.env.read_txn().unwrap(); + assert!(update_store + .pending_queue + .get(&txn, &(0, uuid, 0)) + .unwrap() + .is_some()); + } -//handle -//.expect_update() -//.times(2) -//.returning(|_index_uuid, processing, _file| { -//if processing.id() == 0 { -//Box::pin(ok(Ok(processing.process(UpdateResult::Other)))) -//} else { -//Box::pin(ok(Err( -//processing.fail(IndexActorError::ExistingPrimaryKey.into()) -//))) -//} -//}); + #[actix_rt::test] + async fn test_process_update_success() { + let dir = tempfile::tempdir_in(".").unwrap(); + let index_uuid = Uuid::new_v4(); -//let handle = Arc::new(handle); + let mut index_store = MockIndexStore::new(); + index_store + .expect_get() + .with(eq(index_uuid)) + .returning(|_uuid| { + let mocker = Mocker::default(); + mocker + .when::>("handle_update") + .once() + .then(|update| Ok(update.process(status::UpdateResult::Other))); -//let mut options = EnvOpenOptions::new(); -//options.map_size(4096 * 100); -//let store = UpdateStore::open( -//options, -//dir.path(), -//handle.clone(), -//Arc::new(AtomicBool::new(false)), -//) -//.unwrap(); + Box::pin(ok(Some(Index::faux(mocker)))) + }); -//// wait a bit for the event loop exit. -//tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let uuid_store = MockUuidStore::new(); + let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); -//let mut txn = store.env.write_txn().unwrap(); -//let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None); -//let uuid = Uuid::new_v4(); + let update_file_store = UpdateFileStore::new(dir.path()).unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(4096 * 100); + let store = UpdateStore::open( + options, + dir.path(), + index_resolver.clone(), + Arc::new(AtomicBool::new(false)), + update_file_store, + ) + .unwrap(); -//store -//.pending_queue -//.put(&mut txn, &(0, uuid, 0), &update) -//.unwrap(); + // wait a bit for the event loop exit. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; -//let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None); + let mut txn = store.env.write_txn().unwrap(); -//store -//.pending_queue -//.put(&mut txn, &(1, uuid, 1), &update) -//.unwrap(); + let update = Enqueued::new(Update::ClearDocuments, 0); -//txn.commit().unwrap(); + store + .pending_queue + .put(&mut txn, &(0, index_uuid, 0), &update) + .unwrap(); -//// Process the pending, and check that it has been moved to the update databases, and -//// removed from the pending database. -//let store_clone = store.clone(); -//tokio::task::spawn_blocking(move || { -//store_clone.process_pending_update(handle.clone()).unwrap(); -//store_clone.process_pending_update(handle).unwrap(); -//}) -//.await -//.unwrap(); -//let txn = store.env.read_txn().unwrap(); + txn.commit().unwrap(); -//assert!(store.pending_queue.first(&txn).unwrap().is_none()); -//let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap(); + // Process the pending, and check that it has been moved to the update databases, and + // removed from the pending database. + let store_clone = store.clone(); + tokio::task::spawn_blocking(move || { + store_clone.process_pending_update(index_resolver).unwrap(); + }) + .await + .unwrap(); -//assert!(matches!(update, UpdateStatus::Processed(_))); -//let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap(); + let txn = store.env.read_txn().unwrap(); -//assert!(matches!(update, UpdateStatus::Failed(_))); -//} -//} + assert!(store.pending_queue.first(&txn).unwrap().is_none()); + let update = store.updates.get(&txn, &(index_uuid, 0)).unwrap().unwrap(); + + assert!(matches!(update, UpdateStatus::Processed(_))); + } + + #[actix_rt::test] + async fn test_process_update_failure() { + let dir = tempfile::tempdir_in(".").unwrap(); + let index_uuid = Uuid::new_v4(); + + let mut index_store = MockIndexStore::new(); + index_store + .expect_get() + .with(eq(index_uuid)) + .returning(|_uuid| { + let mocker = Mocker::default(); + mocker + .when::>("handle_update") + .once() + .then(|update| Err(update.fail(IndexError::ExistingPrimaryKey))); + + Box::pin(ok(Some(Index::faux(mocker)))) + }); + + let uuid_store = MockUuidStore::new(); + let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); + + + let update_file_store = UpdateFileStore::new(dir.path()).unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(4096 * 100); + let store = UpdateStore::open( + options, + dir.path(), + index_resolver.clone(), + Arc::new(AtomicBool::new(false)), + update_file_store, + ) + .unwrap(); + + // wait a bit for the event loop exit. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let mut txn = store.env.write_txn().unwrap(); + + let update = Enqueued::new(Update::ClearDocuments, 0); + + store + .pending_queue + .put(&mut txn, &(0, index_uuid, 0), &update) + .unwrap(); + + + txn.commit().unwrap(); + + // Process the pending, and check that it has been moved to the update databases, and + // removed from the pending database. + let store_clone = store.clone(); + tokio::task::spawn_blocking(move || { + store_clone.process_pending_update(index_resolver).unwrap(); + }) + .await + .unwrap(); + + let txn = store.env.read_txn().unwrap(); + + assert!(store.pending_queue.first(&txn).unwrap().is_none()); + let update = store.updates.get(&txn, &(index_uuid, 0)).unwrap().unwrap(); + + assert!(matches!(update, UpdateStatus::Failed(_))); + } +}