mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-23 05:14:27 +01:00
fix the error handling in case there is a panic while creating a dump
This commit is contained in:
parent
529f7962f4
commit
dcf29e1081
@ -10,7 +10,7 @@ use std::{
|
|||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
use tokio::sync::{mpsc, Mutex};
|
use tokio::sync::{mpsc, oneshot, Mutex};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
pub const CONCURRENT_DUMP_MSG: usize = 10;
|
pub const CONCURRENT_DUMP_MSG: usize = 10;
|
||||||
@ -88,7 +88,7 @@ where
|
|||||||
|
|
||||||
match msg {
|
match msg {
|
||||||
CreateDump { ret } => {
|
CreateDump { ret } => {
|
||||||
let _ = ret.send(self.inner.clone().handle_create_dump().await);
|
let _ = self.inner.clone().handle_create_dump(ret).await;
|
||||||
}
|
}
|
||||||
DumpInfo { ret, uid } => {
|
DumpInfo { ret, uid } => {
|
||||||
let _ = ret.send(self.inner.handle_dump_info(uid).await);
|
let _ = ret.send(self.inner.handle_dump_info(uid).await);
|
||||||
@ -103,38 +103,45 @@ where
|
|||||||
Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static,
|
Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static,
|
||||||
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static,
|
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static,
|
||||||
{
|
{
|
||||||
async fn handle_create_dump(self) -> DumpResult<DumpInfo> {
|
async fn handle_create_dump(self, ret: oneshot::Sender<DumpResult<DumpInfo>>) {
|
||||||
if self.is_running().await {
|
if self.is_running().await {
|
||||||
return Err(DumpError::DumpAlreadyRunning);
|
ret.send(Err(DumpError::DumpAlreadyRunning))
|
||||||
|
.expect("Dump actor is dead");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
let uid = generate_uid();
|
let uid = generate_uid();
|
||||||
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress);
|
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress);
|
||||||
*self.dump_info.lock().await = Some(info.clone());
|
*self.dump_info.lock().await = Some(info.clone());
|
||||||
|
|
||||||
let this = self.clone();
|
ret.send(Ok(info)).expect("Dump actor is dead");
|
||||||
|
|
||||||
tokio::task::spawn(async move {
|
let dump_info = self.dump_info.clone();
|
||||||
match this.perform_dump(uid).await {
|
let cloned_uid = uid.clone();
|
||||||
Ok(()) => {
|
|
||||||
if let Some(ref mut info) = *self.dump_info.lock().await {
|
let task_result = tokio::task::spawn(self.clone().perform_dump(cloned_uid)).await;
|
||||||
|
|
||||||
|
match task_result {
|
||||||
|
Ok(Ok(())) => {
|
||||||
|
if let Some(ref mut info) = *dump_info.lock().await {
|
||||||
info.done();
|
info.done();
|
||||||
} else {
|
} else {
|
||||||
warn!("dump actor was in an inconsistant state");
|
warn!("dump actor was in an inconsistant state");
|
||||||
}
|
}
|
||||||
info!("Dump succeed");
|
info!("Dump succeed");
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Ok(Err(e)) => {
|
||||||
if let Some(ref mut info) = *self.dump_info.lock().await {
|
if let Some(ref mut info) = *dump_info.lock().await {
|
||||||
info.with_error(e.to_string());
|
info.with_error(e.to_string());
|
||||||
} else {
|
} else {
|
||||||
warn!("dump actor was in an inconsistant state");
|
warn!("dump actor was in an inconsistant state");
|
||||||
}
|
}
|
||||||
error!("Dump failed: {}", e);
|
error!("Dump failed: {}", e);
|
||||||
}
|
}
|
||||||
|
Err(_) => {
|
||||||
|
error!("Dump panicked. Dump status set to failed");
|
||||||
|
*dump_info.lock().await = Some(DumpInfo::new(uid, DumpStatus::Failed));
|
||||||
|
}
|
||||||
};
|
};
|
||||||
});
|
|
||||||
|
|
||||||
Ok(info)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn perform_dump(self, uid: String) -> anyhow::Result<()> {
|
async fn perform_dump(self, uid: String) -> anyhow::Result<()> {
|
||||||
|
@ -13,7 +13,6 @@ use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
use mockall::automock;
|
use mockall::automock;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::json;
|
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@ -129,7 +128,7 @@ pub struct DumpInfo {
|
|||||||
pub uid: String,
|
pub uid: String,
|
||||||
pub status: DumpStatus,
|
pub status: DumpStatus,
|
||||||
#[serde(skip_serializing_if = "Option::is_none", flatten)]
|
#[serde(skip_serializing_if = "Option::is_none", flatten)]
|
||||||
pub error: Option<serde_json::Value>,
|
pub error: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DumpInfo {
|
impl DumpInfo {
|
||||||
@ -143,7 +142,7 @@ impl DumpInfo {
|
|||||||
|
|
||||||
pub fn with_error(&mut self, error: String) {
|
pub fn with_error(&mut self, error: String) {
|
||||||
self.status = DumpStatus::Failed;
|
self.status = DumpStatus::Failed;
|
||||||
self.error = Some(json!(error));
|
self.error = Some(error);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn done(&mut self) {
|
pub fn done(&mut self) {
|
||||||
|
Loading…
Reference in New Issue
Block a user