From 10fc870684faa6e7a6014391b450898012f9854b Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Mon, 31 May 2021 15:34:03 +0200 Subject: [PATCH] improve dump info reports --- .../src/index_controller/dump_actor/actor.rs | 68 +++++++++---------- 1 file changed, 31 insertions(+), 37 deletions(-) diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index ff4c39f6d..5ac5ca9b9 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -1,9 +1,9 @@ -use std::path::{Path, PathBuf}; +use std::{collections::HashMap, path::{Path, PathBuf}}; use std::sync::Arc; use async_stream::stream; use chrono::Utc; -use futures::stream::StreamExt; +use futures::{lock::Mutex, stream::StreamExt}; use log::{error, info}; use update_actor::UpdateActorHandle; use uuid_resolver::UuidResolverHandle; @@ -19,7 +19,8 @@ pub struct DumpActor { uuid_resolver: UuidResolver, update: Update, dump_path: PathBuf, - dump_info: Arc>>, + lock: Arc>, + dump_infos: Arc>>, update_db_size: u64, index_db_size: u64, } @@ -42,12 +43,15 @@ where index_db_size: u64, update_db_size: u64, ) -> Self { + let dump_infos = Arc::new(RwLock::new(HashMap::new())); + let lock = Arc::new(Mutex::new(())); Self { inbox: Some(inbox), uuid_resolver, update, dump_path: dump_path.as_ref().into(), - dump_info: Arc::new(RwLock::new(None)), + dump_infos, + lock, index_db_size, update_db_size, } @@ -91,21 +95,22 @@ where } async fn handle_create_dump(&self, ret: oneshot::Sender>) { - if self.is_running().await { - ret.send(Err(DumpError::DumpAlreadyRunning)) - .expect("Dump actor is dead"); - return; - } let uid = generate_uid(); - let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress); - *self.dump_info.write().await = Some(info.clone()); + let _lock = match self.lock.try_lock() { + Some(lock) => lock, + None => { + ret.send(Err(DumpError::DumpAlreadyRunning)) + .expect("Dump actor is dead"); + return; + } + }; + + self.dump_infos.write().await.insert(uid.clone(), info.clone()); ret.send(Ok(info)).expect("Dump actor is dead"); - let dump_info = self.dump_info.clone(); - let task = DumpTask { path: self.dump_path.clone(), uuid_resolver: self.uuid_resolver.clone(), @@ -117,45 +122,34 @@ where let task_result = tokio::task::spawn(task.run()).await; + let mut dump_infos = self.dump_infos + .write() + .await; + let dump_infos = + dump_infos + .get_mut(&uid) + .expect("dump entry deleted while lock was acquired"); + match task_result { Ok(Ok(())) => { - (*dump_info.write().await).as_mut().expect("Inconsistent dump service state").done(); + dump_infos.done(); info!("Dump succeed"); } Ok(Err(e)) => { - (*dump_info.write().await).as_mut().expect("Inconsistent dump service state").with_error(e.to_string()); + dump_infos.with_error(e.to_string()); error!("Dump failed: {}", e); } Err(_) => { + dump_infos.with_error("Unexpected error while performing dump.".to_string()); error!("Dump panicked. Dump status set to failed"); - (*dump_info.write().await).as_mut().expect("Inconsistent dump service state").with_error("Unexpected error while performing dump.".to_string()); } }; } async fn handle_dump_info(&self, uid: String) -> DumpResult { - match &*self.dump_info.read().await { - None => self.dump_from_fs(uid).await, - Some(DumpInfo { uid: ref s, .. }) if &uid != s => self.dump_from_fs(uid).await, + match self.dump_infos.read().await.get(&uid) { Some(info) => Ok(info.clone()), + _ => Err(DumpError::DumpDoesNotExist(uid)), } } - - async fn dump_from_fs(&self, uid: String) -> DumpResult { - self.dump_path - .join(format!("{}.dump", &uid)) - .exists() - .then(|| DumpInfo::new(uid.clone(), DumpStatus::Done)) - .ok_or(DumpError::DumpDoesNotExist(uid)) - } - - async fn is_running(&self) -> bool { - matches!( - *self.dump_info.read().await, - Some(DumpInfo { - status: DumpStatus::InProgress, - .. - }) - ) - } }