mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
fix tests
This commit is contained in:
parent
ddfd7def35
commit
692c676625
24 changed files with 325 additions and 481 deletions
|
@ -2,7 +2,6 @@ use std::fs::File;
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use chrono::{DateTime, Utc};
|
||||
use log::{info, trace, warn};
|
||||
#[cfg(test)]
|
||||
|
@ -112,22 +111,16 @@ pub fn load_dump(
|
|||
update_db_size: usize,
|
||||
indexer_opts: &IndexerOpts,
|
||||
) -> anyhow::Result<()> {
|
||||
let tmp_src = tempfile::tempdir_in(".")?;
|
||||
let tmp_src = tempfile::tempdir()?;
|
||||
let tmp_src_path = tmp_src.path();
|
||||
|
||||
println!("importing to {}", dst_path.as_ref().display());
|
||||
crate::from_tar_gz(&src_path, tmp_src_path)?;
|
||||
|
||||
let meta_path = tmp_src_path.join(META_FILE_NAME);
|
||||
let mut meta_file = File::open(&meta_path)?;
|
||||
let meta: Metadata = serde_json::from_reader(&mut meta_file)?;
|
||||
|
||||
let dst_dir = dst_path
|
||||
.as_ref()
|
||||
.parent()
|
||||
.with_context(|| format!("Invalid db path: {}", dst_path.as_ref().display()))?;
|
||||
|
||||
let tmp_dst = tempfile::tempdir_in(dst_dir)?;
|
||||
let tmp_dst = tempfile::tempdir()?;
|
||||
|
||||
match meta {
|
||||
Metadata::V1(meta) => {
|
||||
|
@ -168,9 +161,8 @@ impl DumpTask {
|
|||
|
||||
create_dir_all(&self.path).await?;
|
||||
|
||||
let path_clone = self.path.clone();
|
||||
let temp_dump_dir =
|
||||
tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??;
|
||||
tokio::task::spawn_blocking(|| tempfile::TempDir::new()).await??;
|
||||
let temp_dump_path = temp_dump_dir.path().to_owned();
|
||||
|
||||
let meta = Metadata::new_v2(self.index_db_size, self.update_db_size);
|
||||
|
@ -183,7 +175,7 @@ impl DumpTask {
|
|||
UpdateMsg::dump(&self.update_handle, uuids, temp_dump_path.clone()).await?;
|
||||
|
||||
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
|
||||
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?;
|
||||
let temp_dump_file = tempfile::NamedTempFile::new()?;
|
||||
crate::to_tar_gz(temp_dump_path, temp_dump_file.path())
|
||||
.map_err(|e| DumpActorError::Internal(e.into()))?;
|
||||
|
||||
|
|
|
@ -84,11 +84,14 @@ where U: UuidStore,
|
|||
Ok(indexes)
|
||||
}
|
||||
|
||||
pub async fn create_index(&self, uid: String, primary_key: Option<String>) -> Result<(Uuid, Index)> {
|
||||
pub async fn create_index(&self, uid: String, primary_key: Option<String>) -> Result<Index> {
|
||||
if !is_index_uid_valid(&uid) {
|
||||
return Err(IndexResolverError::BadlyFormatted(uid));
|
||||
}
|
||||
let uuid = Uuid::new_v4();
|
||||
let index = self.index_store.create(uuid, primary_key).await?;
|
||||
self.index_uuid_store.insert(uid, uuid).await?;
|
||||
Ok((uuid, index))
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
pub async fn list(&self) -> Result<Vec<(String, Index)>> {
|
||||
|
@ -109,11 +112,11 @@ where U: UuidStore,
|
|||
Ok(indexes)
|
||||
}
|
||||
|
||||
pub async fn delete_index(&self, uid: String) -> Result<()> {
|
||||
pub async fn delete_index(&self, uid: String) -> Result<Uuid> {
|
||||
match self.index_uuid_store.delete(uid.clone()).await? {
|
||||
Some(uuid) => {
|
||||
let _ = self.index_store.delete(uuid).await;
|
||||
Ok(())
|
||||
Ok(uuid)
|
||||
}
|
||||
None => Err(IndexResolverError::UnexistingIndex(uid)),
|
||||
}
|
||||
|
@ -148,3 +151,8 @@ where U: UuidStore,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_index_uid_valid(uid: &str) -> bool {
|
||||
uid.chars()
|
||||
.all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_')
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ use crate::index_controller::index_resolver::create_index_resolver;
|
|||
use crate::index_controller::snapshot::SnapshotService;
|
||||
use crate::options::IndexerOpts;
|
||||
use error::Result;
|
||||
use crate::index::error::Result as IndexResult;
|
||||
use crate::index::error::{Result as IndexResult};
|
||||
|
||||
use self::dump_actor::load_dump;
|
||||
use self::index_resolver::HardStateIndexResolver;
|
||||
|
@ -33,29 +33,15 @@ use self::updates::UpdateMsg;
|
|||
|
||||
mod dump_actor;
|
||||
pub mod error;
|
||||
//pub mod indexes;
|
||||
mod snapshot;
|
||||
pub mod update_file_store;
|
||||
pub mod updates;
|
||||
//mod uuid_resolver;
|
||||
mod index_resolver;
|
||||
|
||||
pub type Payload = Box<
|
||||
dyn Stream<Item = std::result::Result<Bytes, PayloadError>> + Send + Sync + 'static + Unpin,
|
||||
>;
|
||||
|
||||
macro_rules! time {
|
||||
($e:expr) => {
|
||||
{
|
||||
let now = std::time::Instant::now();
|
||||
let result = $e;
|
||||
let elapsed = now.elapsed();
|
||||
println!("elapsed at line {}: {}ms ({}ns)", line!(), elapsed.as_millis(), elapsed.as_nanos());
|
||||
result
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct IndexMetadata {
|
||||
|
@ -260,148 +246,27 @@ impl IndexController {
|
|||
IndexControllerBuilder::default()
|
||||
}
|
||||
|
||||
pub async fn register_update(&self, uid: String, update: Update) -> Result<UpdateStatus> {
|
||||
pub async fn register_update(&self, uid: String, update: Update, create_index: bool) -> Result<UpdateStatus> {
|
||||
match self.index_resolver.get_uuid(uid).await {
|
||||
Ok(uuid) => {
|
||||
let update_result = UpdateMsg::update(&self.update_sender, uuid, update).await?;
|
||||
Ok(update_result)
|
||||
}
|
||||
Err(IndexResolverError::UnexistingIndex(name)) => {
|
||||
let (uuid, _) = self.index_resolver.create_index(name, None).await?;
|
||||
let update_result = UpdateMsg::update(&self.update_sender, uuid, update).await?;
|
||||
// ignore if index creation fails now, since it may already have been created
|
||||
if create_index {
|
||||
let index = self.index_resolver.create_index(name, None).await?;
|
||||
let update_result = UpdateMsg::update(&self.update_sender, index.uuid, update).await?;
|
||||
// ignore if index creation fails now, since it may already have been created
|
||||
|
||||
Ok(update_result)
|
||||
Ok(update_result)
|
||||
} else {
|
||||
Err(IndexResolverError::UnexistingIndex(name).into())
|
||||
}
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
//pub async fn add_documents(
|
||||
//&self,
|
||||
//uid: String,
|
||||
//method: milli::update::IndexDocumentsMethod,
|
||||
//payload: Payload,
|
||||
//primary_key: Option<String>,
|
||||
//) -> Result<UpdateStatus> {
|
||||
//let perform_update = |uuid| async move {
|
||||
//let meta = UpdateMeta::DocumentsAddition {
|
||||
//method,
|
||||
//primary_key,
|
||||
//};
|
||||
//let (sender, receiver) = mpsc::channel(10);
|
||||
|
||||
//// It is necessary to spawn a local task to send the payload to the update handle to
|
||||
//// prevent dead_locking between the update_handle::update that waits for the update to be
|
||||
//// registered and the update_actor that waits for the the payload to be sent to it.
|
||||
//tokio::task::spawn_local(async move {
|
||||
//payload
|
||||
//.for_each(|r| async {
|
||||
//let _ = sender.send(r).await;
|
||||
//})
|
||||
//.await
|
||||
//});
|
||||
|
||||
//// This must be done *AFTER* spawning the task.
|
||||
//self.update_handle.update(meta, receiver, uuid).await
|
||||
//};
|
||||
|
||||
//match self.uuid_resolver.get(uid).await {
|
||||
//Ok(uuid) => Ok(perform_update(uuid).await?),
|
||||
//Err(UuidResolverError::UnexistingIndex(name)) => {
|
||||
//let uuid = Uuid::new_v4();
|
||||
//let status = perform_update(uuid).await?;
|
||||
//// ignore if index creation fails now, since it may already have been created
|
||||
//let _ = self.index_handle.create_index(uuid, None).await;
|
||||
//self.uuid_resolver.insert(name, uuid).await?;
|
||||
//Ok(status)
|
||||
//}
|
||||
//Err(e) => Err(e.into()),
|
||||
//}
|
||||
//}
|
||||
|
||||
//pub async fn clear_documents(&self, uid: String) -> Result<UpdateStatus> {
|
||||
//let uuid = self.uuid_resolver.get(uid).await?;
|
||||
//let meta = UpdateMeta::ClearDocuments;
|
||||
//let (_, receiver) = mpsc::channel(1);
|
||||
//let status = self.update_handle.update(meta, receiver, uuid).await?;
|
||||
//Ok(status)
|
||||
//}
|
||||
|
||||
//pub async fn delete_documents(
|
||||
//&self,
|
||||
//uid: String,
|
||||
//documents: Vec<String>,
|
||||
//) -> Result<UpdateStatus> {
|
||||
//let uuid = self.uuid_resolver.get(uid).await?;
|
||||
//let meta = UpdateMeta::DeleteDocuments { ids: documents };
|
||||
//let (_, receiver) = mpsc::channel(1);
|
||||
//let status = self.update_handle.update(meta, receiver, uuid).await?;
|
||||
//Ok(status)
|
||||
//}
|
||||
|
||||
//pub async fn update_settings(
|
||||
//&self,
|
||||
//uid: String,
|
||||
//settings: Settings<Checked>,
|
||||
//create: bool,
|
||||
//) -> Result<UpdateStatus> {
|
||||
//let perform_udpate = |uuid| async move {
|
||||
//let meta = UpdateMeta::Settings(settings.into_unchecked());
|
||||
//// Nothing so send, drop the sender right away, as not to block the update actor.
|
||||
//let (_, receiver) = mpsc::channel(1);
|
||||
//self.update_handle.update(meta, receiver, uuid).await
|
||||
//};
|
||||
|
||||
//match self.uuid_resolver.get(uid).await {
|
||||
//Ok(uuid) => Ok(perform_udpate(uuid).await?),
|
||||
//Err(UuidResolverError::UnexistingIndex(name)) if create => {
|
||||
//let uuid = Uuid::new_v4();
|
||||
//let status = perform_udpate(uuid).await?;
|
||||
//// ignore if index creation fails now, since it may already have been created
|
||||
//let _ = self.index_handle.create_index(uuid, None).await;
|
||||
//self.uuid_resolver.insert(name, uuid).await?;
|
||||
//Ok(status)
|
||||
//}
|
||||
//Err(e) => Err(e.into()),
|
||||
//}
|
||||
//}
|
||||
|
||||
//pub async fn create_index(&self, index_settings: IndexSettings) -> Result<IndexMetadata> {
|
||||
//let IndexSettings { uid, primary_key } = index_settings;
|
||||
//let uid = uid.ok_or(IndexControllerError::MissingUid)?;
|
||||
//let uuid = Uuid::new_v4();
|
||||
//let meta = self.index_handle.create_index(uuid, primary_key).await?;
|
||||
//self.uuid_resolver.insert(uid.clone(), uuid).await?;
|
||||
//let meta = IndexMetadata {
|
||||
//uuid,
|
||||
//name: uid.clone(),
|
||||
//uid,
|
||||
//meta,
|
||||
//};
|
||||
|
||||
//Ok(meta)
|
||||
//}
|
||||
|
||||
//pub async fn delete_index(&self, uid: String) -> Result<()> {
|
||||
//let uuid = self.uuid_resolver.delete(uid).await?;
|
||||
|
||||
//// We remove the index from the resolver synchronously, and effectively perform the index
|
||||
//// deletion as a background task.
|
||||
//let update_handle = self.update_handle.clone();
|
||||
//let index_handle = self.index_handle.clone();
|
||||
//tokio::spawn(async move {
|
||||
//if let Err(e) = update_handle.delete(uuid).await {
|
||||
//error!("Error while deleting index: {}", e);
|
||||
//}
|
||||
//if let Err(e) = index_handle.delete(uuid).await {
|
||||
//error!("Error while deleting index: {}", e);
|
||||
//}
|
||||
//});
|
||||
|
||||
//Ok(())
|
||||
//}
|
||||
|
||||
pub async fn update_status(&self, uid: String, id: u64) -> Result<UpdateStatus> {
|
||||
let uuid = self.index_resolver.get_uuid(uid).await?;
|
||||
let result = UpdateMsg::get_update(&self.update_sender, uuid, id).await?;
|
||||
|
@ -481,8 +346,8 @@ impl IndexController {
|
|||
}
|
||||
|
||||
pub async fn search(&self, uid: String, query: SearchQuery) -> Result<SearchResult> {
|
||||
let index = time!(self.index_resolver.get_index(uid.clone()).await?);
|
||||
let result = time!(spawn_blocking(move || time!(index.perform_search(query))).await??);
|
||||
let index = self.index_resolver.get_index(uid.clone()).await?;
|
||||
let result = spawn_blocking(move || index.perform_search(query)).await??;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
|
@ -549,6 +414,33 @@ impl IndexController {
|
|||
pub async fn dump_info(&self, uid: String) -> Result<DumpInfo> {
|
||||
Ok(self.dump_handle.dump_info(uid).await?)
|
||||
}
|
||||
|
||||
pub async fn create_index(&self, uid: String, primary_key: Option<String>) -> Result<IndexMetadata> {
|
||||
let index = self.index_resolver.create_index(uid.clone(), primary_key).await?;
|
||||
let meta = spawn_blocking(move || -> IndexResult<_> {
|
||||
let meta = index.meta()?;
|
||||
let meta = IndexMetadata {
|
||||
uuid: index.uuid,
|
||||
uid: uid.clone(),
|
||||
name: uid,
|
||||
meta,
|
||||
};
|
||||
Ok(meta)
|
||||
}).await??;
|
||||
|
||||
Ok(meta)
|
||||
}
|
||||
|
||||
pub async fn delete_index(&self, uid: String) -> Result<()> {
|
||||
let uuid = self.index_resolver.delete_index(uid).await?;
|
||||
|
||||
let update_sender = self.update_sender.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = UpdateMsg::delete(&update_sender, uuid).await;
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
|
||||
|
|
|
@ -57,7 +57,7 @@ impl SnapshotService {
|
|||
let snapshot_dir = self.snapshot_path.clone();
|
||||
fs::create_dir_all(&snapshot_dir).await?;
|
||||
let temp_snapshot_dir =
|
||||
spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??;
|
||||
spawn_blocking(move || tempfile::tempdir()).await??;
|
||||
let temp_snapshot_path = temp_snapshot_dir.path().to_owned();
|
||||
|
||||
let indexes = self
|
||||
|
@ -71,12 +71,11 @@ impl SnapshotService {
|
|||
|
||||
UpdateMsg::snapshot(&self.update_sender, temp_snapshot_path.clone(), indexes).await?;
|
||||
|
||||
let snapshot_dir = self.snapshot_path.clone();
|
||||
let snapshot_path = self
|
||||
.snapshot_path
|
||||
.join(format!("{}.snapshot", self.db_name));
|
||||
let snapshot_path = spawn_blocking(move || -> anyhow::Result<PathBuf> {
|
||||
let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?;
|
||||
let temp_snapshot_file = tempfile::NamedTempFile::new()?;
|
||||
let temp_snapshot_file_path = temp_snapshot_file.path().to_owned();
|
||||
crate::compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?;
|
||||
temp_snapshot_file.persist(&snapshot_path)?;
|
||||
|
@ -137,13 +136,6 @@ pub fn load_snapshot(
|
|||
//use uuid::Uuid;
|
||||
|
||||
//use super::*;
|
||||
//use crate::index_controller::index_actor::MockIndexActorHandle;
|
||||
//use crate::index_controller::updates::{
|
||||
//error::UpdateActorError, MockUpdateActorHandle, UpdateActorHandleImpl,
|
||||
//};
|
||||
//use crate::index_controller::uuid_resolver::{
|
||||
//error::UuidResolverError, MockUuidResolverHandle,
|
||||
//};
|
||||
|
||||
//#[actix_rt::test]
|
||||
//async fn test_normal() {
|
||||
|
@ -191,7 +183,7 @@ pub fn load_snapshot(
|
|||
//uuid_resolver
|
||||
//.expect_snapshot()
|
||||
//.times(1)
|
||||
//abitrary error
|
||||
////abitrary error
|
||||
//.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
|
||||
|
||||
//let update_handle = MockUpdateActorHandle::new();
|
||||
|
@ -206,7 +198,7 @@ pub fn load_snapshot(
|
|||
//);
|
||||
|
||||
//assert!(snapshot_service.perform_snapshot().await.is_err());
|
||||
//Nothing was written to the file
|
||||
////Nothing was written to the file
|
||||
//assert!(!snapshot_path.path().join("data.ms.snapshot").exists());
|
||||
//}
|
||||
|
||||
|
@ -222,7 +214,7 @@ pub fn load_snapshot(
|
|||
//let mut update_handle = MockUpdateActorHandle::new();
|
||||
//update_handle
|
||||
//.expect_snapshot()
|
||||
//abitrary error
|
||||
////abitrary error
|
||||
//.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0))));
|
||||
|
||||
//let snapshot_path = tempfile::tempdir_in(".").unwrap();
|
||||
|
@ -235,7 +227,7 @@ pub fn load_snapshot(
|
|||
//);
|
||||
|
||||
//assert!(snapshot_service.perform_snapshot().await.is_err());
|
||||
//Nothing was written to the file
|
||||
////Nothing was written to the file
|
||||
//assert!(!snapshot_path.path().join("data.ms.snapshot").exists());
|
||||
//}
|
||||
|
||||
|
@ -244,9 +236,9 @@ pub fn load_snapshot(
|
|||
//let mut uuid_resolver = MockUuidResolverHandle::new();
|
||||
//uuid_resolver
|
||||
//.expect_snapshot()
|
||||
//we expect the funtion to be called between 2 and 3 time in the given interval.
|
||||
////we expect the funtion to be called between 2 and 3 time in the given interval.
|
||||
//.times(2..4)
|
||||
//abitrary error, to short-circuit the function
|
||||
////abitrary error, to short-circuit the function
|
||||
//.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
|
||||
|
||||
//let update_handle = MockUpdateActorHandle::new();
|
||||
|
|
|
@ -24,7 +24,7 @@ pub enum UpdateMsg {
|
|||
ret: oneshot::Sender<Result<UpdateStatus>>,
|
||||
id: u64,
|
||||
},
|
||||
Delete {
|
||||
DeleteIndex {
|
||||
uuid: Uuid,
|
||||
ret: oneshot::Sender<Result<()>>,
|
||||
},
|
||||
|
@ -99,4 +99,11 @@ impl UpdateMsg {
|
|||
sender.send(msg).await?;
|
||||
rcv.await?
|
||||
}
|
||||
|
||||
pub async fn delete(sender: &mpsc::Sender<Self>, uuid: Uuid) -> Result<()> {
|
||||
let (ret, rcv) = oneshot::channel();
|
||||
let msg = Self::DeleteIndex { ret, uuid };
|
||||
sender.send(msg).await?;
|
||||
rcv.await?
|
||||
}
|
||||
}
|
||||
|
|
|
@ -172,7 +172,7 @@ impl UpdateLoop {
|
|||
GetUpdate { uuid, ret, id } => {
|
||||
let _ = ret.send(self.handle_get_update(uuid, id).await);
|
||||
}
|
||||
Delete { uuid, ret } => {
|
||||
DeleteIndex { uuid, ret } => {
|
||||
let _ = ret.send(self.handle_delete(uuid).await);
|
||||
}
|
||||
Snapshot { indexes, path, ret } => {
|
||||
|
|
|
@ -552,149 +552,149 @@ impl UpdateStore {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::index_controller::{
|
||||
index_actor::{error::IndexActorError, MockIndexActorHandle},
|
||||
UpdateResult,
|
||||
};
|
||||
//#[cfg(test)]
|
||||
//mod test {
|
||||
//use super::*;
|
||||
//use crate::index_controller::{
|
||||
//index_actor::{error::IndexActorError, MockIndexActorHandle},
|
||||
//UpdateResult,
|
||||
//};
|
||||
|
||||
use futures::future::ok;
|
||||
//use futures::future::ok;
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_next_id() {
|
||||
let dir = tempfile::tempdir_in(".").unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let handle = Arc::new(MockIndexActorHandle::new());
|
||||
options.map_size(4096 * 100);
|
||||
let update_store = UpdateStore::open(
|
||||
options,
|
||||
dir.path(),
|
||||
handle,
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
)
|
||||
.unwrap();
|
||||
//#[actix_rt::test]
|
||||
//async fn test_next_id() {
|
||||
//let dir = tempfile::tempdir_in(".").unwrap();
|
||||
//let mut options = EnvOpenOptions::new();
|
||||
//let handle = Arc::new(MockIndexActorHandle::new());
|
||||
//options.map_size(4096 * 100);
|
||||
//let update_store = UpdateStore::open(
|
||||
//options,
|
||||
//dir.path(),
|
||||
//handle,
|
||||
//Arc::new(AtomicBool::new(false)),
|
||||
//)
|
||||
//.unwrap();
|
||||
|
||||
let index1_uuid = Uuid::new_v4();
|
||||
let index2_uuid = Uuid::new_v4();
|
||||
//let index1_uuid = Uuid::new_v4();
|
||||
//let index2_uuid = Uuid::new_v4();
|
||||
|
||||
let mut txn = update_store.env.write_txn().unwrap();
|
||||
let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
|
||||
txn.commit().unwrap();
|
||||
assert_eq!((0, 0), ids);
|
||||
//let mut txn = update_store.env.write_txn().unwrap();
|
||||
//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
|
||||
//txn.commit().unwrap();
|
||||
//assert_eq!((0, 0), ids);
|
||||
|
||||
let mut txn = update_store.env.write_txn().unwrap();
|
||||
let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap();
|
||||
txn.commit().unwrap();
|
||||
assert_eq!((1, 0), ids);
|
||||
//let mut txn = update_store.env.write_txn().unwrap();
|
||||
//let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap();
|
||||
//txn.commit().unwrap();
|
||||
//assert_eq!((1, 0), ids);
|
||||
|
||||
let mut txn = update_store.env.write_txn().unwrap();
|
||||
let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
|
||||
txn.commit().unwrap();
|
||||
assert_eq!((2, 1), ids);
|
||||
}
|
||||
//let mut txn = update_store.env.write_txn().unwrap();
|
||||
//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
|
||||
//txn.commit().unwrap();
|
||||
//assert_eq!((2, 1), ids);
|
||||
//}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_register_update() {
|
||||
let dir = tempfile::tempdir_in(".").unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let handle = Arc::new(MockIndexActorHandle::new());
|
||||
options.map_size(4096 * 100);
|
||||
let update_store = UpdateStore::open(
|
||||
options,
|
||||
dir.path(),
|
||||
handle,
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
)
|
||||
.unwrap();
|
||||
let meta = UpdateMeta::ClearDocuments;
|
||||
let uuid = Uuid::new_v4();
|
||||
let store_clone = update_store.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
store_clone.register_update(meta, None, uuid).unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
//#[actix_rt::test]
|
||||
//async fn test_register_update() {
|
||||
//let dir = tempfile::tempdir_in(".").unwrap();
|
||||
//let mut options = EnvOpenOptions::new();
|
||||
//let handle = Arc::new(MockIndexActorHandle::new());
|
||||
//options.map_size(4096 * 100);
|
||||
//let update_store = UpdateStore::open(
|
||||
//options,
|
||||
//dir.path(),
|
||||
//handle,
|
||||
//Arc::new(AtomicBool::new(false)),
|
||||
//)
|
||||
//.unwrap();
|
||||
//let meta = UpdateMeta::ClearDocuments;
|
||||
//let uuid = Uuid::new_v4();
|
||||
//let store_clone = update_store.clone();
|
||||
//tokio::task::spawn_blocking(move || {
|
||||
//store_clone.register_update(meta, None, uuid).unwrap();
|
||||
//})
|
||||
//.await
|
||||
//.unwrap();
|
||||
|
||||
let txn = update_store.env.read_txn().unwrap();
|
||||
assert!(update_store
|
||||
.pending_queue
|
||||
.get(&txn, &(0, uuid, 0))
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
//let txn = update_store.env.read_txn().unwrap();
|
||||
//assert!(update_store
|
||||
//.pending_queue
|
||||
//.get(&txn, &(0, uuid, 0))
|
||||
//.unwrap()
|
||||
//.is_some());
|
||||
//}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_process_update() {
|
||||
let dir = tempfile::tempdir_in(".").unwrap();
|
||||
let mut handle = MockIndexActorHandle::new();
|
||||
//#[actix_rt::test]
|
||||
//async fn test_process_update() {
|
||||
//let dir = tempfile::tempdir_in(".").unwrap();
|
||||
//let mut handle = MockIndexActorHandle::new();
|
||||
|
||||
handle
|
||||
.expect_update()
|
||||
.times(2)
|
||||
.returning(|_index_uuid, processing, _file| {
|
||||
if processing.id() == 0 {
|
||||
Box::pin(ok(Ok(processing.process(UpdateResult::Other))))
|
||||
} else {
|
||||
Box::pin(ok(Err(
|
||||
processing.fail(IndexActorError::ExistingPrimaryKey.into())
|
||||
)))
|
||||
}
|
||||
});
|
||||
//handle
|
||||
//.expect_update()
|
||||
//.times(2)
|
||||
//.returning(|_index_uuid, processing, _file| {
|
||||
//if processing.id() == 0 {
|
||||
//Box::pin(ok(Ok(processing.process(UpdateResult::Other))))
|
||||
//} else {
|
||||
//Box::pin(ok(Err(
|
||||
//processing.fail(IndexActorError::ExistingPrimaryKey.into())
|
||||
//)))
|
||||
//}
|
||||
//});
|
||||
|
||||
let handle = Arc::new(handle);
|
||||
//let handle = Arc::new(handle);
|
||||
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(4096 * 100);
|
||||
let store = UpdateStore::open(
|
||||
options,
|
||||
dir.path(),
|
||||
handle.clone(),
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
)
|
||||
.unwrap();
|
||||
//let mut options = EnvOpenOptions::new();
|
||||
//options.map_size(4096 * 100);
|
||||
//let store = UpdateStore::open(
|
||||
//options,
|
||||
//dir.path(),
|
||||
//handle.clone(),
|
||||
//Arc::new(AtomicBool::new(false)),
|
||||
//)
|
||||
//.unwrap();
|
||||
|
||||
// wait a bit for the event loop exit.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
//// wait a bit for the event loop exit.
|
||||
//tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
|
||||
let mut txn = store.env.write_txn().unwrap();
|
||||
//let mut txn = store.env.write_txn().unwrap();
|
||||
|
||||
let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None);
|
||||
let uuid = Uuid::new_v4();
|
||||
//let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None);
|
||||
//let uuid = Uuid::new_v4();
|
||||
|
||||
store
|
||||
.pending_queue
|
||||
.put(&mut txn, &(0, uuid, 0), &update)
|
||||
.unwrap();
|
||||
//store
|
||||
//.pending_queue
|
||||
//.put(&mut txn, &(0, uuid, 0), &update)
|
||||
//.unwrap();
|
||||
|
||||
let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None);
|
||||
//let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None);
|
||||
|
||||
store
|
||||
.pending_queue
|
||||
.put(&mut txn, &(1, uuid, 1), &update)
|
||||
.unwrap();
|
||||
//store
|
||||
//.pending_queue
|
||||
//.put(&mut txn, &(1, uuid, 1), &update)
|
||||
//.unwrap();
|
||||
|
||||
txn.commit().unwrap();
|
||||
//txn.commit().unwrap();
|
||||
|
||||
// Process the pending, and check that it has been moved to the update databases, and
|
||||
// removed from the pending database.
|
||||
let store_clone = store.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
store_clone.process_pending_update(handle.clone()).unwrap();
|
||||
store_clone.process_pending_update(handle).unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
//// Process the pending, and check that it has been moved to the update databases, and
|
||||
//// removed from the pending database.
|
||||
//let store_clone = store.clone();
|
||||
//tokio::task::spawn_blocking(move || {
|
||||
//store_clone.process_pending_update(handle.clone()).unwrap();
|
||||
//store_clone.process_pending_update(handle).unwrap();
|
||||
//})
|
||||
//.await
|
||||
//.unwrap();
|
||||
|
||||
let txn = store.env.read_txn().unwrap();
|
||||
//let txn = store.env.read_txn().unwrap();
|
||||
|
||||
assert!(store.pending_queue.first(&txn).unwrap().is_none());
|
||||
let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap();
|
||||
//assert!(store.pending_queue.first(&txn).unwrap().is_none());
|
||||
//let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap();
|
||||
|
||||
assert!(matches!(update, UpdateStatus::Processed(_)));
|
||||
let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap();
|
||||
//assert!(matches!(update, UpdateStatus::Processed(_)));
|
||||
//let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap();
|
||||
|
||||
assert!(matches!(update, UpdateStatus::Failed(_)));
|
||||
}
|
||||
}
|
||||
//assert!(matches!(update, UpdateStatus::Failed(_)));
|
||||
//}
|
||||
//}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue