diff --git a/meilisearch-lib/src/dump/actor.rs b/meilisearch-lib/src/dump/actor.rs deleted file mode 100644 index b7f615e44..000000000 --- a/meilisearch-lib/src/dump/actor.rs +++ /dev/null @@ -1,190 +0,0 @@ -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use async_stream::stream; -use futures::{lock::Mutex, stream::StreamExt}; -use log::{error, trace}; -use time::macros::format_description; -use time::OffsetDateTime; -use tokio::sync::{mpsc, oneshot, RwLock}; - -use super::error::{DumpError, Result}; -use super::{DumpInfo, DumpJob, DumpMsg, DumpStatus}; -use crate::tasks::Scheduler; -use crate::update_file_store::UpdateFileStore; - -pub const CONCURRENT_DUMP_MSG: usize = 10; - -pub struct DumpActor { - inbox: Option>, - update_file_store: UpdateFileStore, - scheduler: Arc>, - dump_path: PathBuf, - analytics_path: PathBuf, - lock: Arc>, - dump_infos: Arc>>, - update_db_size: usize, - index_db_size: usize, -} - -/// Generate uid from creation date -fn generate_uid() -> String { - OffsetDateTime::now_utc() - .format(format_description!( - "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" - )) - .unwrap() -} - -impl DumpActor { - pub fn new( - inbox: mpsc::Receiver, - update_file_store: UpdateFileStore, - scheduler: Arc>, - dump_path: impl AsRef, - analytics_path: impl AsRef, - index_db_size: usize, - update_db_size: usize, - ) -> Self { - let dump_infos = Arc::new(RwLock::new(HashMap::new())); - let lock = Arc::new(Mutex::new(())); - Self { - inbox: Some(inbox), - scheduler, - update_file_store, - dump_path: dump_path.as_ref().into(), - analytics_path: analytics_path.as_ref().into(), - dump_infos, - lock, - index_db_size, - update_db_size, - } - } - - pub async fn run(mut self) { - trace!("Started dump actor."); - - let mut inbox = self - .inbox - .take() - .expect("Dump Actor must have a inbox at this point."); - - let stream = stream! { - loop { - match inbox.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - - stream - .for_each_concurrent(Some(CONCURRENT_DUMP_MSG), |msg| self.handle_message(msg)) - .await; - - error!("Dump actor stopped."); - } - - async fn handle_message(&self, msg: DumpMsg) { - use DumpMsg::*; - - match msg { - CreateDump { ret } => { - let _ = self.handle_create_dump(ret).await; - } - DumpInfo { ret, uid } => { - let _ = ret.send(self.handle_dump_info(uid).await); - } - } - } - - async fn handle_create_dump(&self, ret: oneshot::Sender>) { - let uid = generate_uid(); - let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress); - - let _lock = match self.lock.try_lock() { - Some(lock) => lock, - None => { - ret.send(Err(DumpError::DumpAlreadyRunning)) - .expect("Dump actor is dead"); - return; - } - }; - - self.dump_infos - .write() - .await - .insert(uid.clone(), info.clone()); - - ret.send(Ok(info)).expect("Dump actor is dead"); - - let task = DumpJob { - dump_path: self.dump_path.clone(), - db_path: self.analytics_path.clone(), - update_file_store: self.update_file_store.clone(), - uid: uid.clone(), - update_db_size: self.update_db_size, - index_db_size: self.index_db_size, - }; - - let task_result = tokio::task::spawn_local(task.run()).await; - - let mut dump_infos = self.dump_infos.write().await; - let dump_infos = dump_infos - .get_mut(&uid) - .expect("dump entry deleted while lock was acquired"); - - match task_result { - Ok(Ok(())) => { - dump_infos.done(); - trace!("Dump succeed"); - } - Ok(Err(e)) => { - dump_infos.with_error(e.to_string()); - error!("Dump failed: {}", e); - } - Err(_) => { - dump_infos.with_error("Unexpected error while performing dump.".to_string()); - error!("Dump panicked. Dump status set to failed"); - } - }; - } - - async fn handle_dump_info(&self, uid: String) -> Result { - match self.dump_infos.read().await.get(&uid) { - Some(info) => Ok(info.clone()), - _ => Err(DumpError::DumpDoesNotExist(uid)), - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_generate_uid() { - let current = OffsetDateTime::now_utc(); - - let uid = generate_uid(); - let (date, time) = uid.split_once('-').unwrap(); - - let date = time::Date::parse( - date, - &format_description!("[year repr:full][month repr:numerical][day padding:zero]"), - ) - .unwrap(); - let time = time::Time::parse( - time, - &format_description!( - "[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" - ), - ) - .unwrap(); - let datetime = time::PrimitiveDateTime::new(date, time); - let datetime = datetime.assume_utc(); - - assert!(current - datetime < time::Duration::SECOND); - } -} diff --git a/meilisearch-lib/src/dump/handle_impl.rs b/meilisearch-lib/src/dump/handle_impl.rs deleted file mode 100644 index 9577b3663..000000000 --- a/meilisearch-lib/src/dump/handle_impl.rs +++ /dev/null @@ -1,26 +0,0 @@ -use tokio::sync::{mpsc, oneshot}; - -use super::error::Result; -use super::{DumpActorHandle, DumpMsg}; - -#[derive(Clone)] -pub struct DumpActorHandleImpl { - pub sender: mpsc::Sender, -} - -#[async_trait::async_trait] -impl DumpActorHandle for DumpActorHandleImpl { - async fn create_dump(&self) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = DumpMsg::CreateDump { ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("IndexActor has been killed") - } - - async fn dump_info(&self, uid: String) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = DumpMsg::DumpInfo { ret, uid }; - let _ = self.sender.send(msg).await; - receiver.await.expect("IndexActor has been killed") - } -} diff --git a/meilisearch-lib/src/dump/message.rs b/meilisearch-lib/src/dump/message.rs deleted file mode 100644 index 8ebeb3b57..000000000 --- a/meilisearch-lib/src/dump/message.rs +++ /dev/null @@ -1,13 +0,0 @@ -use tokio::sync::oneshot; - -use super::error::Result; - -pub enum DumpMsg { - CreateDump { - ret: oneshot::Sender>, - }, - DumpInfo { - uid: String, - ret: oneshot::Sender>, - }, -}