mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-11 05:54:30 +01:00
test dumps
This commit is contained in:
parent
ece4c739f4
commit
4b4ebad9a9
@ -292,7 +292,7 @@ pub mod test {
|
||||
pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.dump(path),
|
||||
MockIndex::Faux(_) => todo!(),
|
||||
MockIndex::Faux(faux) => faux.get("dump").call(path.as_ref()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -10,14 +10,16 @@ use tokio::sync::{mpsc, oneshot, RwLock};
|
||||
|
||||
use super::error::{DumpActorError, Result};
|
||||
use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask};
|
||||
use crate::index_controller::index_resolver::HardStateIndexResolver;
|
||||
use crate::index_controller::index_resolver::IndexResolver;
|
||||
use crate::index_controller::index_resolver::index_store::IndexStore;
|
||||
use crate::index_controller::index_resolver::uuid_store::UuidStore;
|
||||
use crate::index_controller::updates::UpdateSender;
|
||||
|
||||
pub const CONCURRENT_DUMP_MSG: usize = 10;
|
||||
|
||||
pub struct DumpActor {
|
||||
pub struct DumpActor<U, I> {
|
||||
inbox: Option<mpsc::Receiver<DumpMsg>>,
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
update: UpdateSender,
|
||||
dump_path: PathBuf,
|
||||
lock: Arc<Mutex<()>>,
|
||||
@ -31,10 +33,14 @@ fn generate_uid() -> String {
|
||||
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
|
||||
}
|
||||
|
||||
impl DumpActor {
|
||||
impl<U, I> DumpActor<U, I>
|
||||
where
|
||||
U: UuidStore + Sync + Send + 'static,
|
||||
I: IndexStore + Sync + Send + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
inbox: mpsc::Receiver<DumpMsg>,
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
update: UpdateSender,
|
||||
dump_path: impl AsRef<Path>,
|
||||
index_db_size: usize,
|
||||
@ -114,7 +120,7 @@ impl DumpActor {
|
||||
let task = DumpTask {
|
||||
path: self.dump_path.clone(),
|
||||
index_resolver: self.index_resolver.clone(),
|
||||
update_handle: self.update.clone(),
|
||||
update_sender: self.update.clone(),
|
||||
uid: uid.clone(),
|
||||
update_db_size: self.update_db_size,
|
||||
index_db_size: self.index_db_size,
|
||||
|
@ -13,7 +13,9 @@ pub use actor::DumpActor;
|
||||
pub use handle_impl::*;
|
||||
pub use message::DumpMsg;
|
||||
|
||||
use super::index_resolver::HardStateIndexResolver;
|
||||
use super::index_resolver::index_store::IndexStore;
|
||||
use super::index_resolver::uuid_store::UuidStore;
|
||||
use super::index_resolver::IndexResolver;
|
||||
use super::updates::UpdateSender;
|
||||
use crate::compression::{from_tar_gz, to_tar_gz};
|
||||
use crate::index_controller::dump_actor::error::DumpActorError;
|
||||
@ -218,16 +220,20 @@ pub fn load_dump(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct DumpTask {
|
||||
struct DumpTask<U, I> {
|
||||
path: PathBuf,
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
update_handle: UpdateSender,
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
update_sender: UpdateSender,
|
||||
uid: String,
|
||||
update_db_size: usize,
|
||||
index_db_size: usize,
|
||||
}
|
||||
|
||||
impl DumpTask {
|
||||
impl<U, I> DumpTask<U, I>
|
||||
where
|
||||
U: UuidStore + Sync + Send + 'static,
|
||||
I: IndexStore + Sync + Send + 'static,
|
||||
{
|
||||
async fn run(self) -> Result<()> {
|
||||
trace!("Performing dump.");
|
||||
|
||||
@ -243,7 +249,7 @@ impl DumpTask {
|
||||
|
||||
let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?;
|
||||
|
||||
UpdateMsg::dump(&self.update_handle, uuids, temp_dump_path.clone()).await?;
|
||||
UpdateMsg::dump(&self.update_sender, uuids, temp_dump_path.clone()).await?;
|
||||
|
||||
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
|
||||
let temp_dump_file = tempfile::NamedTempFile::new()?;
|
||||
@ -262,3 +268,110 @@ impl DumpTask {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use futures::future::{err, ok};
|
||||
use once_cell::sync::Lazy;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::*;
|
||||
use crate::index::test::Mocker;
|
||||
use crate::index::Index;
|
||||
use crate::index_controller::index_resolver::index_store::MockIndexStore;
|
||||
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
|
||||
use crate::index_controller::updates::create_update_handler;
|
||||
use crate::index::error::Result as IndexResult;
|
||||
use crate::index_controller::index_resolver::error::IndexResolverError;
|
||||
|
||||
fn setup() {
|
||||
static SETUP: Lazy<()> = Lazy::new(|| {
|
||||
if cfg!(windows) {
|
||||
std::env::set_var("TMP", ".");
|
||||
} else {
|
||||
std::env::set_var("TMPDIR", ".");
|
||||
}
|
||||
});
|
||||
|
||||
// just deref to make sure the env is setup
|
||||
*SETUP
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_dump_normal() {
|
||||
setup();
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
|
||||
let uuids = std::iter::repeat_with(Uuid::new_v4)
|
||||
.take(4)
|
||||
.collect::<HashSet<_>>();
|
||||
let mut uuid_store = MockUuidStore::new();
|
||||
let uuids_cloned = uuids.clone();
|
||||
uuid_store
|
||||
.expect_dump()
|
||||
.once()
|
||||
.returning(move |_| Box::pin(ok(uuids_cloned.clone())));
|
||||
|
||||
let mut index_store = MockIndexStore::new();
|
||||
index_store.expect_get().times(4).returning(move |uuid| {
|
||||
let mocker = Mocker::default();
|
||||
let uuids_clone = uuids.clone();
|
||||
mocker.when::<(), Uuid>("uuid").once().then(move |_| {
|
||||
assert!(uuids_clone.contains(&uuid));
|
||||
uuid
|
||||
});
|
||||
mocker.when::<&Path, IndexResult<()>>("dump").once().then(move |_| {
|
||||
Ok(())
|
||||
});
|
||||
Box::pin(ok(Some(Index::faux(mocker))))
|
||||
});
|
||||
|
||||
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
|
||||
|
||||
|
||||
let update_sender =
|
||||
create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
|
||||
|
||||
let task = DumpTask {
|
||||
path: tmp.path().to_owned(),
|
||||
index_resolver,
|
||||
update_sender,
|
||||
uid: String::from("test"),
|
||||
update_db_size: 4096 * 10,
|
||||
index_db_size: 4096 * 10,
|
||||
};
|
||||
|
||||
task.run().await.unwrap();
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn error_performing_dump() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
|
||||
let mut uuid_store = MockUuidStore::new();
|
||||
uuid_store
|
||||
.expect_dump()
|
||||
.once()
|
||||
.returning(move |_| Box::pin(err(IndexResolverError::ExistingPrimaryKey)));
|
||||
|
||||
let index_store = MockIndexStore::new();
|
||||
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
|
||||
|
||||
let update_sender =
|
||||
create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
|
||||
|
||||
let task = DumpTask {
|
||||
path: tmp.path().to_owned(),
|
||||
index_resolver,
|
||||
update_sender,
|
||||
uid: String::from("test"),
|
||||
update_db_size: 4096 * 10,
|
||||
index_db_size: 4096 * 10,
|
||||
};
|
||||
|
||||
assert!(task.run().await.is_err());
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user