handle parallel requests for the dump actor

This commit is contained in:
tamo 2021-05-24 15:42:12 +02:00
parent 8a11c6c429
commit 529f7962f4
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69

View File

@ -1,7 +1,9 @@
use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus}; use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus};
use crate::helpers::compression; use crate::helpers::compression;
use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata}; use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata};
use async_stream::stream;
use chrono::Utc; use chrono::Utc;
use futures::stream::StreamExt;
use log::{error, info, warn}; use log::{error, info, warn};
use std::{ use std::{
collections::HashSet, collections::HashSet,
@ -11,8 +13,10 @@ use std::{
use tokio::sync::{mpsc, Mutex}; use tokio::sync::{mpsc, Mutex};
use uuid::Uuid; use uuid::Uuid;
pub const CONCURRENT_DUMP_MSG: usize = 10;
pub struct DumpActor<UuidResolver, Index, Update> { pub struct DumpActor<UuidResolver, Index, Update> {
inbox: mpsc::Receiver<DumpMsg>, inbox: Option<mpsc::Receiver<DumpMsg>>,
inner: InnerDump<UuidResolver, Index, Update>, inner: InnerDump<UuidResolver, Index, Update>,
} }
@ -44,7 +48,7 @@ where
dump_path: impl AsRef<Path>, dump_path: impl AsRef<Path>,
) -> Self { ) -> Self {
Self { Self {
inbox, inbox: Some(inbox),
inner: InnerDump { inner: InnerDump {
uuid_resolver, uuid_resolver,
index, index,
@ -56,24 +60,41 @@ where
} }
pub async fn run(mut self) { pub async fn run(mut self) {
use DumpMsg::*;
info!("Started dump actor."); info!("Started dump actor.");
let mut inbox = self
.inbox
.take()
.expect("Dump Actor must have a inbox at this point.");
let stream = stream! {
loop { loop {
match self.inbox.recv().await { match inbox.recv().await {
Some(CreateDump { ret }) => { Some(msg) => yield msg,
let _ = ret.send(self.inner.clone().handle_create_dump().await);
}
Some(DumpInfo { ret, uid }) => {
let _ = ret.send(self.inner.handle_dump_info(uid).await);
}
None => break, None => break,
} }
} }
};
stream
.for_each_concurrent(Some(CONCURRENT_DUMP_MSG), |msg| self.handle_message(msg))
.await;
error!("Dump actor stopped."); error!("Dump actor stopped.");
} }
async fn handle_message(&self, msg: DumpMsg) {
use DumpMsg::*;
match msg {
CreateDump { ret } => {
let _ = ret.send(self.inner.clone().handle_create_dump().await);
}
DumpInfo { ret, uid } => {
let _ = ret.send(self.inner.handle_dump_info(uid).await);
}
}
}
} }
impl<UuidResolver, Index, Update> InnerDump<UuidResolver, Index, Update> impl<UuidResolver, Index, Update> InnerDump<UuidResolver, Index, Update>