MeiliSearch/meilisearch-http/src/index_controller/index_actor/handle_impl.rs

146 lines
4.7 KiB
Rust
Raw Normal View History

2021-03-24 11:29:11 +01:00
use std::path::{Path, PathBuf};
2021-03-23 11:00:50 +01:00
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
2021-04-01 16:44:42 +02:00
use crate::index::{Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::{updates::Processing, UpdateMeta};
use crate::index_controller::{IndexSettings, IndexStats};
2021-03-24 11:29:11 +01:00
use super::{
IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore, Result, UpdateResult,
};
2021-03-23 11:00:50 +01:00
#[derive(Clone)]
pub struct IndexActorHandleImpl {
2021-04-13 17:46:39 +02:00
sender: mpsc::Sender<IndexMsg>,
2021-03-23 11:00:50 +01:00
}
#[async_trait::async_trait]
impl IndexActorHandle for IndexActorHandleImpl {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::CreateIndex {
ret,
uuid,
primary_key,
};
2021-04-13 17:46:39 +02:00
let _ = self.sender.send(msg).await;
2021-03-23 11:00:50 +01:00
receiver.await.expect("IndexActor has been killed")
}
async fn update(
&self,
2021-04-13 17:14:02 +02:00
uuid: Uuid,
2021-03-23 11:00:50 +01:00
meta: Processing<UpdateMeta>,
data: std::fs::File,
) -> anyhow::Result<UpdateResult> {
let (ret, receiver) = oneshot::channel();
2021-04-13 17:14:02 +02:00
let msg = IndexMsg::Update { ret, meta, data, uuid };
2021-04-13 17:46:39 +02:00
let _ = self.sender.send(msg).await;
2021-03-23 11:00:50 +01:00
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Search { uuid, query, ret };
2021-04-13 17:46:39 +02:00
let _ = self.sender.send(msg).await;
2021-03-23 11:00:50 +01:00
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn settings(&self, uuid: Uuid) -> Result<Settings> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Settings { uuid, ret };
2021-04-13 17:46:39 +02:00
let _ = self.sender.send(msg).await;
2021-03-23 11:00:50 +01:00
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn documents(
&self,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Documents {
uuid,
ret,
offset,
attributes_to_retrieve,
limit,
};
2021-04-13 17:46:39 +02:00
let _ = self.sender.send(msg).await;
2021-03-23 11:00:50 +01:00
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Document {
uuid,
ret,
doc_id,
attributes_to_retrieve,
};
2021-04-13 17:46:39 +02:00
let _ = self.sender.send(msg).await;
2021-03-23 11:00:50 +01:00
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Delete { uuid, ret };
2021-04-13 17:46:39 +02:00
let _ = self.sender.send(msg).await;
2021-03-23 11:00:50 +01:00
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetMeta { uuid, ret };
2021-04-13 17:46:39 +02:00
let _ = self.sender.send(msg).await;
2021-03-23 11:00:50 +01:00
Ok(receiver.await.expect("IndexActor has been killed")?)
}
2021-03-24 11:29:11 +01:00
async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result<IndexMeta> {
2021-03-23 11:00:50 +01:00
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::UpdateIndex {
uuid,
index_settings,
ret,
};
2021-04-13 17:46:39 +02:00
let _ = self.sender.send(msg).await;
2021-03-23 11:00:50 +01:00
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Snapshot { uuid, path, ret };
2021-04-13 17:46:39 +02:00
let _ = self.sender.send(msg).await;
2021-03-23 11:00:50 +01:00
Ok(receiver.await.expect("IndexActor has been killed")?)
}
2021-04-01 16:44:42 +02:00
async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetStats { uuid, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
2021-03-23 11:00:50 +01:00
}
impl IndexActorHandleImpl {
pub fn new(path: impl AsRef<Path>, index_size: usize) -> anyhow::Result<Self> {
2021-04-13 17:46:39 +02:00
let (sender, receiver) = mpsc::channel(100);
2021-03-23 11:00:50 +01:00
let store = MapIndexStore::new(path, index_size);
2021-04-13 17:46:39 +02:00
let actor = IndexActor::new(receiver, store)?;
2021-03-23 11:00:50 +01:00
tokio::task::spawn(actor.run());
Ok(Self {
2021-04-13 17:46:39 +02:00
sender,
2021-03-23 11:00:50 +01:00
})
}
}