diff --git a/src/index_controller/actor_index_controller/index_actor.rs b/src/index_controller/actor_index_controller/index_actor.rs new file mode 100644 index 000000000..6de0f2b77 --- /dev/null +++ b/src/index_controller/actor_index_controller/index_actor.rs @@ -0,0 +1,132 @@ +use uuid::Uuid; +use std::path::{PathBuf, Path}; +use chrono::Utc; +use tokio::sync::{mpsc, oneshot, RwLock}; +use thiserror::Error; +use std::collections::HashMap; +use std::sync::Arc; +use milli::Index; +use std::collections::hash_map::Entry; +use std::fs::create_dir_all; +use heed::EnvOpenOptions; +use crate::index_controller::IndexMetadata; + +pub type Result = std::result::Result; +type AsyncMap = Arc>>; + +enum IndexMsg { + CreateIndex { uuid: Uuid, primary_key: Option, ret: oneshot::Sender> }, +} + +struct IndexActor { + inbox: mpsc::Receiver, + store: S, +} + +#[derive(Error, Debug)] +pub enum IndexError { + #[error("error with index: {0}")] + Error(#[from] anyhow::Error), + #[error("index already exists")] + IndexAlreadyExists, +} + +#[async_trait::async_trait] +trait IndexStore { + async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; +} + +impl IndexActor { + fn new(inbox: mpsc::Receiver, store: S) -> Self { + Self { inbox, store } + } + + async fn run(mut self) { + loop { + match self.inbox.recv().await { + Some(IndexMsg::CreateIndex { uuid, primary_key, ret }) => self.handle_create_index(uuid, primary_key, ret).await, + None => break, + } + } + } + + async fn handle_create_index(&self, uuid: Uuid, primary_key: Option, ret: oneshot::Sender>) { + let result = self.store.create_index(uuid, primary_key).await; + let _ = ret.send(result); + } +} + +#[derive(Clone)] +pub struct IndexActorHandle { + sender: mpsc::Sender, +} + +impl IndexActorHandle { + pub fn new() -> Self { + let (sender, receiver) = mpsc::channel(100); + + let store = MapIndexStore::new("data.ms"); + let actor = IndexActor::new(receiver, store); + tokio::task::spawn(actor.run()); + Self { sender } + } + + pub async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::CreateIndex { ret, uuid, primary_key }; + let _ = self.sender.send(msg).await; + receiver.await.expect("IndexActor has been killed") + } +} + +struct MapIndexStore { + root: PathBuf, + meta_store: AsyncMap, + index_store: AsyncMap, +} + +#[async_trait::async_trait] +impl IndexStore for MapIndexStore { + async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { + let meta = match self.meta_store.write().await.entry(uuid.clone()) { + Entry::Vacant(entry) => { + let meta = IndexMetadata { + uuid, + created_at: Utc::now(), + updated_at: Utc::now(), + primary_key, + }; + entry.insert(meta).clone() + } + Entry::Occupied(_) => return Err(IndexError::IndexAlreadyExists), + }; + + let db_path = self.root.join(format!("index-{}", meta.uuid)); + + + println!("before blocking"); + let index: Result = tokio::task::spawn_blocking(move || { + create_dir_all(&db_path).expect("can't create db"); + let mut options = EnvOpenOptions::new(); + options.map_size(4096 * 100_000); + let index = Index::new(options, &db_path) + .map_err(|e| IndexError::Error(e))?; + Ok(index) + }).await.expect("thread died"); + println!("after blocking"); + + self.index_store.write().await.insert(meta.uuid.clone(), index?); + + Ok(meta) + } +} + +impl MapIndexStore { + fn new(root: impl AsRef) -> Self { + let mut root = root.as_ref().to_owned(); + root.push("indexes/"); + let meta_store = Arc::new(RwLock::new(HashMap::new())); + let index_store = Arc::new(RwLock::new(HashMap::new())); + Self { meta_store, index_store, root } + } +} diff --git a/src/index_controller/actor_index_controller/mod.rs b/src/index_controller/actor_index_controller/mod.rs new file mode 100644 index 000000000..bfcac7a3f --- /dev/null +++ b/src/index_controller/actor_index_controller/mod.rs @@ -0,0 +1,93 @@ +mod index_actor; +mod update_actor; +mod uuid_resolver; + +use tokio::fs::File; +use tokio::sync::oneshot; +use super::IndexController; +use uuid::Uuid; +use super::IndexMetadata; + + +pub struct ActorIndexController { + uuid_resolver: uuid_resolver::UuidResolverHandle, + index_actor: index_actor::IndexActorHandle, +} + +impl ActorIndexController { + pub fn new() -> Self { + let uuid_resolver = uuid_resolver::UuidResolverHandle::new(); + let index_actor = index_actor::IndexActorHandle::new(); + Self { uuid_resolver, index_actor } + } +} + +enum IndexControllerMsg { + CreateIndex { + uuid: Uuid, + primary_key: Option, + ret: oneshot::Sender>, + }, + Shutdown, +} + +#[async_trait::async_trait] +impl IndexController for ActorIndexController { + async fn add_documents( + &self, + index: String, + method: milli::update::IndexDocumentsMethod, + format: milli::update::UpdateFormat, + data: File, + primary_key: Option, + ) -> anyhow::Result { + todo!() + } + + fn clear_documents(&self, index: String) -> anyhow::Result { + todo!() + } + + fn delete_documents(&self, index: String, document_ids: Vec) -> anyhow::Result { + todo!() + } + + fn update_settings(&self, index_uid: String, settings: super::Settings) -> anyhow::Result { + todo!() + } + + async fn create_index(&self, index_settings: super::IndexSettings) -> anyhow::Result { + let super::IndexSettings { name, primary_key } = index_settings; + let uuid = self.uuid_resolver.create(name.unwrap()).await?; + let index_meta = self.index_actor.create_index(uuid, primary_key).await?; + Ok(index_meta) + } + + fn delete_index(&self, index_uid: String) -> anyhow::Result<()> { + todo!() + } + + fn swap_indices(&self, index1_uid: String, index2_uid: String) -> anyhow::Result<()> { + todo!() + } + + fn index(&self, name: String) -> anyhow::Result>> { + todo!() + } + + fn update_status(&self, index: String, id: u64) -> anyhow::Result> { + todo!() + } + + fn all_update_status(&self, index: String) -> anyhow::Result> { + todo!() + } + + fn list_indexes(&self) -> anyhow::Result> { + todo!() + } + + fn update_index(&self, name: String, index_settings: super::IndexSettings) -> anyhow::Result { + todo!() + } +} diff --git a/src/index_controller/actor_index_controller/update_actor.rs b/src/index_controller/actor_index_controller/update_actor.rs new file mode 100644 index 000000000..9fd3cc39f --- /dev/null +++ b/src/index_controller/actor_index_controller/update_actor.rs @@ -0,0 +1,16 @@ +use super::index_actor::IndexActorHandle; +use uuid::Uuid; +use tokio::sync::{mpsc, oneshot}; + +enum UpdateMsg { + CreateIndex{ + uuid: Uuid, + ret: oneshot::Sender>, + } +} + +struct UpdateActor { + update_store: S, + inbox: mpsc::Receiver, + index_actor: IndexActorHandle, +} diff --git a/src/index_controller/actor_index_controller/uuid_resolver.rs b/src/index_controller/actor_index_controller/uuid_resolver.rs new file mode 100644 index 000000000..d5756d05e --- /dev/null +++ b/src/index_controller/actor_index_controller/uuid_resolver.rs @@ -0,0 +1,116 @@ +use thiserror::Error; +use tokio::sync::{RwLock, mpsc, oneshot}; +use uuid::Uuid; +use std::collections::HashMap; +use std::sync::Arc; +use std::collections::hash_map::Entry; +use log::info; + +pub type Result = std::result::Result; + +#[derive(Debug)] +enum UuidResolveMsg { + Resolve { + name: String, + ret: oneshot::Sender>>, + }, + Create { + name: String, + ret: oneshot::Sender>, + }, + Shutdown, + +} + +struct UuidResolverActor { + inbox: mpsc::Receiver, + store: S, +} + +impl UuidResolverActor { + fn new(inbox: mpsc::Receiver, store: S) -> Self { + Self { inbox, store } + } + + async fn run(mut self) { + use UuidResolveMsg::*; + + info!("uuid resolver started"); + + // TODO: benchmark and use buffered streams to improve throughput. + loop { + match self.inbox.recv().await { + Some(Create { name, ret }) => self.handle_create(name, ret).await, + Some(_) => (), + // all senders have ned dropped, need to quit. + None => break, + } + } + } + + async fn handle_create(&self, name: String, ret: oneshot::Sender>) { + let result = self.store.create_uuid(name).await; + let _ = ret.send(result); + } +} + +#[derive(Clone)] +pub struct UuidResolverHandle { + sender: mpsc::Sender, +} + +impl UuidResolverHandle { + pub fn new() -> Self { + let (sender, reveiver) = mpsc::channel(100); + let store = MapUuidStore(Arc::new(RwLock::new(HashMap::new()))); + let actor = UuidResolverActor::new(reveiver, store); + tokio::spawn(actor.run()); + Self { sender } + } + + pub async fn resolve(&self, name: String) -> anyhow::Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::Resolve { name, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver.await.expect("Uuid resolver actor has been killed")?) + } + + pub async fn create(&self, name: String) -> anyhow::Result { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::Create { name, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver.await.expect("Uuid resolver actor has been killed")?) + } +} + +#[derive(Clone, Debug, Error)] +pub enum UuidError { + #[error("Name already exist.")] + NameAlreadyExist, +} + +#[async_trait::async_trait] +trait UuidStore { + async fn create_uuid(&self, name: String) -> Result; + async fn get_uuid(&self, name: String) -> Result>; +} + +struct MapUuidStore(Arc>>); + +#[async_trait::async_trait] +impl UuidStore for MapUuidStore { + async fn create_uuid(&self, name: String) -> Result { + match self.0.write().await.entry(name) { + Entry::Occupied(_) => Err(UuidError::NameAlreadyExist), + Entry::Vacant(entry) => { + let uuid = Uuid::new_v4(); + let uuid = entry.insert(uuid); + Ok(uuid.clone()) + } + } + } + + async fn get_uuid(&self, name: String) -> Result> { + Ok(self.0.read().await.get(&name).cloned()) + } +}