MeiliSearch/meilisearch-http/src/index_controller/dump_actor/actor.rs

162 lines
4.8 KiB
Rust
Raw Normal View History

2021-05-27 14:30:20 +02:00
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_stream::stream;
2021-05-10 20:25:09 +02:00
use chrono::Utc;
use futures::stream::StreamExt;
2021-05-25 15:23:13 +02:00
use log::{error, info};
2021-05-27 10:51:19 +02:00
use update_actor::UpdateActorHandle;
use uuid_resolver::UuidResolverHandle;
2021-05-27 14:30:20 +02:00
use tokio::sync::{mpsc, oneshot, RwLock};
use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus, DumpTask};
use crate::index_controller::{update_actor, uuid_resolver};
2021-05-10 20:25:09 +02:00
pub const CONCURRENT_DUMP_MSG: usize = 10;
2021-05-26 20:42:09 +02:00
pub struct DumpActor<UuidResolver, Update> {
inbox: Option<mpsc::Receiver<DumpMsg>>,
2021-05-24 18:06:20 +02:00
uuid_resolver: UuidResolver,
update: Update,
dump_path: PathBuf,
2021-05-24 18:19:34 +02:00
dump_info: Arc<RwLock<Option<DumpInfo>>>,
2021-05-27 10:51:19 +02:00
update_db_size: u64,
index_db_size: u64,
2021-05-10 20:25:09 +02:00
}
/// Generate uid from creation date
fn generate_uid() -> String {
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
}
2021-05-26 20:42:09 +02:00
impl<UuidResolver, Update> DumpActor<UuidResolver, Update>
2021-05-10 20:25:09 +02:00
where
2021-05-27 10:51:19 +02:00
UuidResolver: UuidResolverHandle + Send + Sync + Clone + 'static,
Update: UpdateActorHandle + Send + Sync + Clone + 'static,
2021-05-10 20:25:09 +02:00
{
pub fn new(
inbox: mpsc::Receiver<DumpMsg>,
uuid_resolver: UuidResolver,
update: Update,
dump_path: impl AsRef<Path>,
2021-05-27 10:51:19 +02:00
index_db_size: u64,
update_db_size: u64,
2021-05-10 20:25:09 +02:00
) -> Self {
Self {
inbox: Some(inbox),
2021-05-24 18:06:20 +02:00
uuid_resolver,
update,
dump_path: dump_path.as_ref().into(),
2021-05-24 18:19:34 +02:00
dump_info: Arc::new(RwLock::new(None)),
2021-05-27 10:51:19 +02:00
index_db_size,
update_db_size,
2021-05-10 20:25:09 +02:00
}
}
pub async fn run(mut self) {
info!("Started dump actor.");
let mut inbox = self
.inbox
.take()
.expect("Dump Actor must have a inbox at this point.");
let stream = stream! {
loop {
match inbox.recv().await {
Some(msg) => yield msg,
None => break,
2021-05-10 20:25:09 +02:00
}
}
};
stream
.for_each_concurrent(Some(CONCURRENT_DUMP_MSG), |msg| self.handle_message(msg))
.await;
2021-05-10 20:25:09 +02:00
error!("Dump actor stopped.");
}
async fn handle_message(&self, msg: DumpMsg) {
use DumpMsg::*;
match msg {
CreateDump { ret } => {
2021-05-24 18:06:20 +02:00
let _ = self.handle_create_dump(ret).await;
}
DumpInfo { ret, uid } => {
2021-05-24 18:06:20 +02:00
let _ = ret.send(self.handle_dump_info(uid).await);
}
}
}
2021-05-10 20:25:09 +02:00
2021-05-24 18:06:20 +02:00
async fn handle_create_dump(&self, ret: oneshot::Sender<DumpResult<DumpInfo>>) {
2021-05-10 20:25:09 +02:00
if self.is_running().await {
ret.send(Err(DumpError::DumpAlreadyRunning))
.expect("Dump actor is dead");
return;
2021-05-10 20:25:09 +02:00
}
let uid = generate_uid();
2021-05-30 15:55:17 +02:00
2021-05-10 20:25:09 +02:00
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress);
2021-05-30 15:55:17 +02:00
2021-05-24 18:19:34 +02:00
*self.dump_info.write().await = Some(info.clone());
2021-05-10 20:25:09 +02:00
ret.send(Ok(info)).expect("Dump actor is dead");
let dump_info = self.dump_info.clone();
2021-05-27 10:51:19 +02:00
let task = DumpTask {
path: self.dump_path.clone(),
uuid_resolver: self.uuid_resolver.clone(),
update_handle: self.update.clone(),
uid: uid.clone(),
update_db_size: self.update_db_size,
index_db_size: self.index_db_size,
};
let task_result = tokio::task::spawn(task.run()).await;
match task_result {
Ok(Ok(())) => {
2021-05-25 15:47:57 +02:00
(*dump_info.write().await).as_mut().expect("Inconsistent dump service state").done();
info!("Dump succeed");
}
Ok(Err(e)) => {
2021-05-25 15:47:57 +02:00
(*dump_info.write().await).as_mut().expect("Inconsistent dump service state").with_error(e.to_string());
error!("Dump failed: {}", e);
}
Err(_) => {
error!("Dump panicked. Dump status set to failed");
2021-05-30 15:55:17 +02:00
(*dump_info.write().await).as_mut().expect("Inconsistent dump service state").with_error("Unexpected error while performing dump.".to_string());
}
};
2021-05-10 20:25:09 +02:00
}
async fn handle_dump_info(&self, uid: String) -> DumpResult<DumpInfo> {
2021-05-24 18:19:34 +02:00
match &*self.dump_info.read().await {
None => self.dump_from_fs(uid).await,
Some(DumpInfo { uid: ref s, .. }) if &uid != s => self.dump_from_fs(uid).await,
2021-05-10 20:25:09 +02:00
Some(info) => Ok(info.clone()),
}
}
async fn dump_from_fs(&self, uid: String) -> DumpResult<DumpInfo> {
self.dump_path
.join(format!("{}.dump", &uid))
.exists()
.then(|| DumpInfo::new(uid.clone(), DumpStatus::Done))
.ok_or(DumpError::DumpDoesNotExist(uid))
}
2021-05-10 20:25:09 +02:00
async fn is_running(&self) -> bool {
matches!(
2021-05-24 18:19:34 +02:00
*self.dump_info.read().await,
2021-05-10 20:25:09 +02:00
Some(DumpInfo {
status: DumpStatus::InProgress,
..
})
)
}
2021-05-24 18:06:20 +02:00
}