diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index 7278918aa..af755f65f 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -1,10 +1,8 @@ use std::fs::File; -use std::future::Future; use std::path::PathBuf; use std::sync::Arc; use async_stream::stream; -use futures::pin_mut; use futures::stream::StreamExt; use heed::CompactionOption; use log::debug; @@ -22,8 +20,7 @@ use crate::option::IndexerOpts; use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult}; pub struct IndexActor { - read_receiver: Option>, - write_receiver: Option>, + receiver: Option>, update_handler: Arc, processing: RwLock>, store: S, @@ -31,18 +28,16 @@ pub struct IndexActor { impl IndexActor { pub fn new( - read_receiver: mpsc::Receiver, - write_receiver: mpsc::Receiver, + receiver: mpsc::Receiver, store: S, ) -> Result { let options = IndexerOpts::default(); let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; let update_handler = Arc::new(update_handler); - let read_receiver = Some(read_receiver); - let write_receiver = Some(write_receiver); + let receiver = Some(receiver); Ok(Self { - read_receiver, - write_receiver, + receiver, + store, update_handler, processing: RwLock::new(None), store, @@ -53,44 +48,21 @@ impl IndexActor { /// through the read channel are processed concurrently, the messages sent through the write /// channel are processed one at a time. pub async fn run(mut self) { - let mut read_receiver = self - .read_receiver + let mut receiver = self + .receiver .take() .expect("Index Actor must have a inbox at this point."); - let read_stream = stream! { + let stream = stream! { loop { - match read_receiver.recv().await { + match receiver.recv().await { Some(msg) => yield msg, None => break, } } }; - let mut write_receiver = self - .write_receiver - .take() - .expect("Index Actor must have a inbox at this point."); - - let write_stream = stream! { - loop { - match write_receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - - pin_mut!(write_stream); - pin_mut!(read_stream); - - let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg)); - let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg)); - - let fut1: Box + Unpin + Send> = Box::new(fut1); - let fut2: Box + Unpin + Send> = Box::new(fut2); - - tokio::join!(fut1, fut2); + stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg)).await; } async fn handle_message(&self, msg: IndexMsg) { diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs index 5a734cdbf..629fd0db7 100644 --- a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -13,8 +13,7 @@ use super::{ #[derive(Clone)] pub struct IndexActorHandleImpl { - read_sender: mpsc::Sender, - write_sender: mpsc::Sender, + sender: mpsc::Sender, } #[async_trait::async_trait] @@ -26,7 +25,7 @@ impl IndexActorHandle for IndexActorHandleImpl { uuid, primary_key, }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; receiver.await.expect("IndexActor has been killed") } @@ -38,21 +37,21 @@ impl IndexActorHandle for IndexActorHandleImpl { ) -> anyhow::Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Update { ret, meta, data, uuid }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Search { uuid, query, ret }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } async fn settings(&self, uuid: Uuid) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Settings { uuid, ret }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } @@ -71,7 +70,7 @@ impl IndexActorHandle for IndexActorHandleImpl { attributes_to_retrieve, limit, }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } @@ -88,21 +87,21 @@ impl IndexActorHandle for IndexActorHandleImpl { doc_id, attributes_to_retrieve, }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } async fn delete(&self, uuid: Uuid) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Delete { uuid, ret }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } async fn get_index_meta(&self, uuid: Uuid) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::GetMeta { uuid, ret }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } @@ -113,14 +112,14 @@ impl IndexActorHandle for IndexActorHandleImpl { index_settings, ret, }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Snapshot { uuid, path, ret }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } @@ -134,15 +133,13 @@ impl IndexActorHandle for IndexActorHandleImpl { impl IndexActorHandleImpl { pub fn new(path: impl AsRef, index_size: usize) -> anyhow::Result { - let (read_sender, read_receiver) = mpsc::channel(100); - let (write_sender, write_receiver) = mpsc::channel(100); + let (sender, receiver) = mpsc::channel(100); let store = MapIndexStore::new(path, index_size); - let actor = IndexActor::new(read_receiver, write_receiver, store)?; + let actor = IndexActor::new(receiver, store)?; tokio::task::spawn(actor.run()); Ok(Self { - read_sender, - write_sender, + sender, }) } }