diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index 82a38cf96..fac67cbc0 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -1,7 +1,9 @@ use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus}; use crate::helpers::compression; use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata}; +use async_stream::stream; use chrono::Utc; +use futures::stream::StreamExt; use log::{error, info, warn}; use std::{ collections::HashSet, @@ -11,8 +13,10 @@ use std::{ use tokio::sync::{mpsc, Mutex}; use uuid::Uuid; +pub const CONCURRENT_DUMP_MSG: usize = 10; + pub struct DumpActor { - inbox: mpsc::Receiver, + inbox: Option>, inner: InnerDump, } @@ -44,7 +48,7 @@ where dump_path: impl AsRef, ) -> Self { Self { - inbox, + inbox: Some(inbox), inner: InnerDump { uuid_resolver, index, @@ -56,24 +60,41 @@ where } pub async fn run(mut self) { - use DumpMsg::*; - info!("Started dump actor."); - loop { - match self.inbox.recv().await { - Some(CreateDump { ret }) => { - let _ = ret.send(self.inner.clone().handle_create_dump().await); + let mut inbox = self + .inbox + .take() + .expect("Dump Actor must have a inbox at this point."); + + let stream = stream! { + loop { + match inbox.recv().await { + Some(msg) => yield msg, + None => break, } - Some(DumpInfo { ret, uid }) => { - let _ = ret.send(self.inner.handle_dump_info(uid).await); - } - None => break, } - } + }; + + stream + .for_each_concurrent(Some(CONCURRENT_DUMP_MSG), |msg| self.handle_message(msg)) + .await; error!("Dump actor stopped."); } + + async fn handle_message(&self, msg: DumpMsg) { + use DumpMsg::*; + + match msg { + CreateDump { ret } => { + let _ = ret.send(self.inner.clone().handle_create_dump().await); + } + DumpInfo { ret, uid } => { + let _ = ret.send(self.inner.handle_dump_info(uid).await); + } + } + } } impl InnerDump