mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-26 14:40:05 +01:00
refactor uuid resolver
This commit is contained in:
parent
60518449fc
commit
def737edee
@ -8,17 +8,17 @@ use futures::{lock::Mutex, stream::StreamExt};
|
||||
use log::{error, trace};
|
||||
use tokio::sync::{mpsc, oneshot, RwLock};
|
||||
use update_actor::UpdateActorHandle;
|
||||
use uuid_resolver::UuidResolverHandle;
|
||||
|
||||
use super::error::{DumpActorError, Result};
|
||||
use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask};
|
||||
use crate::index_controller::{update_actor, uuid_resolver};
|
||||
use crate::index_controller::uuid_resolver::UuidResolverSender;
|
||||
use crate::index_controller::update_actor;
|
||||
|
||||
pub const CONCURRENT_DUMP_MSG: usize = 10;
|
||||
|
||||
pub struct DumpActor<UuidResolver, Update> {
|
||||
pub struct DumpActor<Update> {
|
||||
inbox: Option<mpsc::Receiver<DumpMsg>>,
|
||||
uuid_resolver: UuidResolver,
|
||||
uuid_resolver: UuidResolverSender,
|
||||
update: Update,
|
||||
dump_path: PathBuf,
|
||||
lock: Arc<Mutex<()>>,
|
||||
@ -32,14 +32,13 @@ fn generate_uid() -> String {
|
||||
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
|
||||
}
|
||||
|
||||
impl<UuidResolver, Update> DumpActor<UuidResolver, Update>
|
||||
impl<Update> DumpActor<Update>
|
||||
where
|
||||
UuidResolver: UuidResolverHandle + Send + Sync + Clone + 'static,
|
||||
Update: UpdateActorHandle + Send + Sync + Clone + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
inbox: mpsc::Receiver<DumpMsg>,
|
||||
uuid_resolver: UuidResolver,
|
||||
uuid_resolver: UuidResolverSender,
|
||||
update: Update,
|
||||
dump_path: impl AsRef<Path>,
|
||||
index_db_size: usize,
|
||||
|
@ -2,6 +2,8 @@ use std::path::Path;
|
||||
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
use crate::index_controller::uuid_resolver::UuidResolverSender;
|
||||
|
||||
use super::error::Result;
|
||||
use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg};
|
||||
|
||||
@ -30,7 +32,7 @@ impl DumpActorHandle for DumpActorHandleImpl {
|
||||
impl DumpActorHandleImpl {
|
||||
pub fn new(
|
||||
path: impl AsRef<Path>,
|
||||
uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl,
|
||||
uuid_resolver: UuidResolverSender,
|
||||
update: crate::index_controller::update_actor::UpdateActorHandleImpl,
|
||||
index_db_size: usize,
|
||||
update_db_size: usize,
|
||||
|
@ -7,7 +7,8 @@ use milli::update::Setting;
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::index_controller::{self, uuid_resolver::HeedUuidStore, IndexMetadata};
|
||||
use crate::index_controller::uuid_resolver::store::HeedUuidStore;
|
||||
use crate::index_controller::{self, IndexMetadata};
|
||||
use crate::index_controller::{asc_ranking_rule, desc_ranking_rule};
|
||||
use crate::{
|
||||
index::Unchecked,
|
||||
|
@ -5,7 +5,7 @@ use log::info;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::index::Index;
|
||||
use crate::index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore};
|
||||
use crate::index_controller::{update_actor::UpdateStore, uuid_resolver::store::HeedUuidStore};
|
||||
use crate::options::IndexerOpts;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
@ -16,8 +16,10 @@ pub use actor::DumpActor;
|
||||
pub use handle_impl::*;
|
||||
pub use message::DumpMsg;
|
||||
|
||||
use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle};
|
||||
use super::update_actor::UpdateActorHandle;
|
||||
use super::uuid_resolver::UuidResolverSender;
|
||||
use crate::index_controller::dump_actor::error::DumpActorError;
|
||||
use crate::index_controller::uuid_resolver::UuidResolverMsg;
|
||||
use crate::options::IndexerOpts;
|
||||
use error::Result;
|
||||
|
||||
@ -149,18 +151,17 @@ pub fn load_dump(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct DumpTask<U, P> {
|
||||
struct DumpTask<P> {
|
||||
path: PathBuf,
|
||||
uuid_resolver: U,
|
||||
uuid_resolver: UuidResolverSender,
|
||||
update_handle: P,
|
||||
uid: String,
|
||||
update_db_size: usize,
|
||||
index_db_size: usize,
|
||||
}
|
||||
|
||||
impl<U, P> DumpTask<U, P>
|
||||
impl<P> DumpTask<P>
|
||||
where
|
||||
U: UuidResolverHandle + Send + Sync + Clone + 'static,
|
||||
P: UpdateActorHandle + Send + Sync + Clone + 'static,
|
||||
|
||||
{
|
||||
@ -179,7 +180,7 @@ where
|
||||
let mut meta_file = File::create(&meta_path)?;
|
||||
serde_json::to_writer(&mut meta_file, &meta)?;
|
||||
|
||||
let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?;
|
||||
let uuids = UuidResolverMsg::dump(&self.uuid_resolver, temp_dump_path.clone()).await?;
|
||||
|
||||
self.update_handle
|
||||
.dump(uuids, temp_dump_path.clone())
|
||||
|
@ -1,3 +1,5 @@
|
||||
use std::error::Error;
|
||||
|
||||
use meilisearch_error::Code;
|
||||
use meilisearch_error::ErrorCode;
|
||||
|
||||
@ -24,6 +26,8 @@ pub enum IndexControllerError {
|
||||
DumpActor(#[from] DumpActorError),
|
||||
#[error("{0}")]
|
||||
IndexError(#[from] IndexError),
|
||||
#[error("Internal error: {0}")]
|
||||
Internal(Box<dyn Error + Send + Sync + 'static>),
|
||||
}
|
||||
|
||||
impl ErrorCode for IndexControllerError {
|
||||
@ -35,6 +39,7 @@ impl ErrorCode for IndexControllerError {
|
||||
IndexControllerError::UpdateActor(e) => e.error_code(),
|
||||
IndexControllerError::DumpActor(e) => e.error_code(),
|
||||
IndexControllerError::IndexError(e) => e.error_code(),
|
||||
IndexControllerError::Internal(_) => Code::Internal,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,13 +20,14 @@ use index_actor::IndexActorHandle;
|
||||
use snapshot::load_snapshot;
|
||||
use update_actor::UpdateActorHandle;
|
||||
pub use updates::*;
|
||||
use uuid_resolver::{error::UuidResolverError, UuidResolverHandle};
|
||||
use uuid_resolver::error::UuidResolverError;
|
||||
|
||||
use crate::options::IndexerOpts;
|
||||
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
|
||||
use error::Result;
|
||||
|
||||
use self::dump_actor::load_dump;
|
||||
use self::uuid_resolver::UuidResolverMsg;
|
||||
|
||||
mod dump_actor;
|
||||
pub mod error;
|
||||
@ -71,7 +72,7 @@ pub struct IndexStats {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IndexController {
|
||||
uuid_resolver: uuid_resolver::UuidResolverHandleImpl,
|
||||
uuid_resolver: uuid_resolver::UuidResolverSender,
|
||||
index_handle: index_actor::IndexActorHandleImpl,
|
||||
update_handle: update_actor::UpdateActorHandleImpl,
|
||||
dump_handle: dump_actor::DumpActorHandleImpl,
|
||||
@ -136,7 +137,7 @@ impl IndexControllerBuilder {
|
||||
|
||||
std::fs::create_dir_all(db_path.as_ref())?;
|
||||
|
||||
let uuid_resolver = uuid_resolver::UuidResolverHandleImpl::new(&db_path)?;
|
||||
let uuid_resolver = uuid_resolver::create_uuid_resolver(&db_path)?;
|
||||
let index_handle =
|
||||
index_actor::IndexActorHandleImpl::new(&db_path, index_size, &indexer_options)?;
|
||||
let update_handle = update_actor::UpdateActorHandleImpl::new(
|
||||
@ -231,7 +232,8 @@ impl IndexController {
|
||||
}
|
||||
|
||||
pub async fn register_update(&self, uid: &str, update: Update) -> Result<UpdateStatus> {
|
||||
match self.uuid_resolver.get(uid.to_string()).await {
|
||||
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.to_string()).await;
|
||||
match uuid {
|
||||
Ok(uuid) => {
|
||||
let update_result = self.update_handle.update(uuid, update).await?;
|
||||
Ok(update_result)
|
||||
@ -241,7 +243,8 @@ impl IndexController {
|
||||
let update_result = self.update_handle.update(uuid, update).await?;
|
||||
// ignore if index creation fails now, since it may already have been created
|
||||
let _ = self.index_handle.create_index(uuid, None).await;
|
||||
self.uuid_resolver.insert(name, uuid).await?;
|
||||
UuidResolverMsg::insert(&self.uuid_resolver, uuid, name).await?;
|
||||
|
||||
Ok(update_result)
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
@ -374,22 +377,20 @@ impl IndexController {
|
||||
//}
|
||||
|
||||
pub async fn update_status(&self, uid: String, id: u64) -> Result<UpdateStatus> {
|
||||
let uuid = self.uuid_resolver.get(uid).await?;
|
||||
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
|
||||
let result = self.update_handle.update_status(uuid, id).await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn all_update_status(&self, uid: String) -> Result<Vec<UpdateStatus>> {
|
||||
let uuid = self.uuid_resolver.get(uid).await?;
|
||||
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
|
||||
let result = self.update_handle.get_all_updates_status(uuid).await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn list_indexes(&self) -> Result<Vec<IndexMetadata>> {
|
||||
let uuids = self.uuid_resolver.list().await?;
|
||||
|
||||
let uuids = UuidResolverMsg::list(&self.uuid_resolver).await?;
|
||||
let mut ret = Vec::new();
|
||||
|
||||
for (uid, uuid) in uuids {
|
||||
let meta = self.index_handle.get_index_meta(uuid).await?;
|
||||
let meta = IndexMetadata {
|
||||
@ -405,7 +406,7 @@ impl IndexController {
|
||||
}
|
||||
|
||||
pub async fn settings(&self, uid: String) -> Result<Settings<Checked>> {
|
||||
let uuid = self.uuid_resolver.get(uid.clone()).await?;
|
||||
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
|
||||
let settings = self.index_handle.settings(uuid).await?;
|
||||
Ok(settings)
|
||||
}
|
||||
@ -417,7 +418,7 @@ impl IndexController {
|
||||
limit: usize,
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
) -> Result<Vec<Document>> {
|
||||
let uuid = self.uuid_resolver.get(uid.clone()).await?;
|
||||
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
|
||||
let documents = self
|
||||
.index_handle
|
||||
.documents(uuid, offset, limit, attributes_to_retrieve)
|
||||
@ -431,7 +432,7 @@ impl IndexController {
|
||||
doc_id: String,
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
) -> Result<Document> {
|
||||
let uuid = self.uuid_resolver.get(uid.clone()).await?;
|
||||
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
|
||||
let document = self
|
||||
.index_handle
|
||||
.document(uuid, doc_id, attributes_to_retrieve)
|
||||
@ -448,7 +449,7 @@ impl IndexController {
|
||||
index_settings.uid.take();
|
||||
}
|
||||
|
||||
let uuid = self.uuid_resolver.get(uid.clone()).await?;
|
||||
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.clone()).await?;
|
||||
let meta = self.index_handle.update_index(uuid, index_settings).await?;
|
||||
let meta = IndexMetadata {
|
||||
uuid,
|
||||
@ -460,13 +461,13 @@ impl IndexController {
|
||||
}
|
||||
|
||||
pub async fn search(&self, uid: String, query: SearchQuery) -> Result<SearchResult> {
|
||||
let uuid = self.uuid_resolver.get(uid).await?;
|
||||
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
|
||||
let result = self.index_handle.search(uuid, query).await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn get_index(&self, uid: String) -> Result<IndexMetadata> {
|
||||
let uuid = self.uuid_resolver.get(uid.clone()).await?;
|
||||
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.clone()).await?;
|
||||
let meta = self.index_handle.get_index_meta(uuid).await?;
|
||||
let meta = IndexMetadata {
|
||||
uuid,
|
||||
@ -478,11 +479,12 @@ impl IndexController {
|
||||
}
|
||||
|
||||
pub async fn get_uuids_size(&self) -> Result<u64> {
|
||||
Ok(self.uuid_resolver.get_size().await?)
|
||||
let size = UuidResolverMsg::get_size(&self.uuid_resolver).await?;
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> {
|
||||
let uuid = self.uuid_resolver.get(uid).await?;
|
||||
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
|
||||
let update_infos = self.update_handle.get_info().await?;
|
||||
let mut stats = self.index_handle.get_index_stats(uuid).await?;
|
||||
// Check if the currently indexing update is from out index.
|
||||
|
@ -1,98 +0,0 @@
|
||||
use std::{collections::HashSet, path::PathBuf};
|
||||
|
||||
use log::{trace, warn};
|
||||
use tokio::sync::mpsc;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{error::UuidResolverError, Result, UuidResolveMsg, UuidStore};
|
||||
|
||||
pub struct UuidResolverActor<S> {
|
||||
inbox: mpsc::Receiver<UuidResolveMsg>,
|
||||
store: S,
|
||||
}
|
||||
|
||||
impl<S: UuidStore> UuidResolverActor<S> {
|
||||
pub fn new(inbox: mpsc::Receiver<UuidResolveMsg>, store: S) -> Self {
|
||||
Self { inbox, store }
|
||||
}
|
||||
|
||||
pub async fn run(mut self) {
|
||||
use UuidResolveMsg::*;
|
||||
|
||||
trace!("uuid resolver started");
|
||||
|
||||
loop {
|
||||
match self.inbox.recv().await {
|
||||
Some(Get { uid: name, ret }) => {
|
||||
let _ = ret.send(self.handle_get(name).await);
|
||||
}
|
||||
Some(Delete { uid: name, ret }) => {
|
||||
let _ = ret.send(self.handle_delete(name).await);
|
||||
}
|
||||
Some(List { ret }) => {
|
||||
let _ = ret.send(self.handle_list().await);
|
||||
}
|
||||
Some(Insert { ret, uuid, name }) => {
|
||||
let _ = ret.send(self.handle_insert(name, uuid).await);
|
||||
}
|
||||
Some(SnapshotRequest { path, ret }) => {
|
||||
let _ = ret.send(self.handle_snapshot(path).await);
|
||||
}
|
||||
Some(GetSize { ret }) => {
|
||||
let _ = ret.send(self.handle_get_size().await);
|
||||
}
|
||||
Some(DumpRequest { path, ret }) => {
|
||||
let _ = ret.send(self.handle_dump(path).await);
|
||||
}
|
||||
// all senders have been dropped, need to quit.
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
warn!("exiting uuid resolver loop");
|
||||
}
|
||||
|
||||
async fn handle_get(&self, uid: String) -> Result<Uuid> {
|
||||
self.store
|
||||
.get_uuid(uid.clone())
|
||||
.await?
|
||||
.ok_or(UuidResolverError::UnexistingIndex(uid))
|
||||
}
|
||||
|
||||
async fn handle_delete(&self, uid: String) -> Result<Uuid> {
|
||||
self.store
|
||||
.delete(uid.clone())
|
||||
.await?
|
||||
.ok_or(UuidResolverError::UnexistingIndex(uid))
|
||||
}
|
||||
|
||||
async fn handle_list(&self) -> Result<Vec<(String, Uuid)>> {
|
||||
let result = self.store.list().await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn handle_snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
||||
self.store.snapshot(path).await
|
||||
}
|
||||
|
||||
async fn handle_dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
||||
self.store.dump(path).await
|
||||
}
|
||||
|
||||
async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> {
|
||||
if !is_index_uid_valid(&uid) {
|
||||
return Err(UuidResolverError::BadlyFormatted(uid));
|
||||
}
|
||||
self.store.insert(uid, uuid).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_get_size(&self) -> Result<u64> {
|
||||
self.store.get_size().await
|
||||
}
|
||||
}
|
||||
|
||||
fn is_index_uid_valid(uid: &str) -> bool {
|
||||
uid.chars()
|
||||
.all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_')
|
||||
}
|
@ -1,4 +1,8 @@
|
||||
use std::fmt;
|
||||
|
||||
use meilisearch_error::{Code, ErrorCode};
|
||||
use tokio::sync::mpsc::error::SendError as MpscSendError;
|
||||
use tokio::sync::oneshot::error::RecvError as OneshotRecvError;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, UuidResolverError>;
|
||||
|
||||
@ -22,6 +26,18 @@ internal_error!(
|
||||
serde_json::Error
|
||||
);
|
||||
|
||||
impl<T: Sync + Send + 'static + fmt::Debug> From<MpscSendError<T>> for UuidResolverError {
|
||||
fn from(other: MpscSendError<T>) -> Self {
|
||||
Self::Internal(Box::new(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OneshotRecvError> for UuidResolverError {
|
||||
fn from(other: OneshotRecvError) -> Self {
|
||||
Self::Internal(Box::new(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl ErrorCode for UuidResolverError {
|
||||
fn error_code(&self) -> Code {
|
||||
match self {
|
||||
|
@ -1,87 +0,0 @@
|
||||
use std::collections::HashSet;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{HeedUuidStore, Result, UuidResolveMsg, UuidResolverActor, UuidResolverHandle};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct UuidResolverHandleImpl {
|
||||
sender: mpsc::Sender<UuidResolveMsg>,
|
||||
}
|
||||
|
||||
impl UuidResolverHandleImpl {
|
||||
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
|
||||
let (sender, reveiver) = mpsc::channel(100);
|
||||
let store = HeedUuidStore::new(path)?;
|
||||
let actor = UuidResolverActor::new(reveiver, store);
|
||||
tokio::spawn(actor.run());
|
||||
Ok(Self { sender })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl UuidResolverHandle for UuidResolverHandleImpl {
|
||||
async fn get(&self, name: String) -> Result<Uuid> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UuidResolveMsg::Get { uid: name, ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
Ok(receiver
|
||||
.await
|
||||
.expect("Uuid resolver actor has been killed")?)
|
||||
}
|
||||
|
||||
async fn delete(&self, name: String) -> Result<Uuid> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UuidResolveMsg::Delete { uid: name, ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
Ok(receiver
|
||||
.await
|
||||
.expect("Uuid resolver actor has been killed")?)
|
||||
}
|
||||
|
||||
async fn list(&self) -> Result<Vec<(String, Uuid)>> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UuidResolveMsg::List { ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
Ok(receiver
|
||||
.await
|
||||
.expect("Uuid resolver actor has been killed")?)
|
||||
}
|
||||
|
||||
async fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UuidResolveMsg::Insert { ret, name, uuid };
|
||||
let _ = self.sender.send(msg).await;
|
||||
Ok(receiver
|
||||
.await
|
||||
.expect("Uuid resolver actor has been killed")?)
|
||||
}
|
||||
|
||||
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UuidResolveMsg::SnapshotRequest { path, ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
Ok(receiver
|
||||
.await
|
||||
.expect("Uuid resolver actor has been killed")?)
|
||||
}
|
||||
|
||||
async fn get_size(&self) -> Result<u64> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UuidResolveMsg::GetSize { ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
Ok(receiver
|
||||
.await
|
||||
.expect("Uuid resolver actor has been killed")?)
|
||||
}
|
||||
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UuidResolveMsg::DumpRequest { ret, path };
|
||||
let _ = self.sender.send(msg).await;
|
||||
Ok(receiver
|
||||
.await
|
||||
.expect("Uuid resolver actor has been killed")?)
|
||||
}
|
||||
}
|
@ -1,12 +1,13 @@
|
||||
use std::collections::HashSet;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::Result;
|
||||
use super::error::Result;
|
||||
|
||||
pub enum UuidResolveMsg {
|
||||
#[derive(Debug)]
|
||||
pub enum UuidResolverMsg {
|
||||
Get {
|
||||
uid: String,
|
||||
ret: oneshot::Sender<Result<Uuid>>,
|
||||
@ -35,3 +36,54 @@ pub enum UuidResolveMsg {
|
||||
ret: oneshot::Sender<Result<HashSet<Uuid>>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl UuidResolverMsg {
|
||||
pub async fn get(channel: &mpsc::Sender<Self>, uid: String) -> Result<Uuid> {
|
||||
let (ret, recv) = oneshot::channel();
|
||||
let msg = Self::Get { uid, ret };
|
||||
channel.send(msg).await?;
|
||||
recv.await?
|
||||
}
|
||||
|
||||
pub async fn insert(channel: &mpsc::Sender<Self>, uuid: Uuid, name: String) -> Result<()> {
|
||||
let (ret, recv) = oneshot::channel();
|
||||
let msg = Self::Insert { name, uuid, ret };
|
||||
channel.send(msg).await?;
|
||||
recv.await?
|
||||
}
|
||||
|
||||
pub async fn list(channel: &mpsc::Sender<Self>) -> Result<Vec<(String, Uuid)>> {
|
||||
let (ret, recv) = oneshot::channel();
|
||||
let msg = Self::List { ret };
|
||||
channel.send(msg).await?;
|
||||
recv.await?
|
||||
}
|
||||
|
||||
pub async fn get_size(channel: &mpsc::Sender<Self>) -> Result<u64> {
|
||||
let (ret, recv) = oneshot::channel();
|
||||
let msg = Self::GetSize { ret };
|
||||
channel.send(msg).await?;
|
||||
recv.await?
|
||||
}
|
||||
|
||||
pub async fn dump(channel: &mpsc::Sender<Self>, path: PathBuf) -> Result<HashSet<Uuid>> {
|
||||
let (ret, recv) = oneshot::channel();
|
||||
let msg = Self::DumpRequest { ret, path };
|
||||
channel.send(msg).await?;
|
||||
recv.await?
|
||||
}
|
||||
|
||||
pub async fn snapshot(channel: &mpsc::Sender<Self>, path: PathBuf) -> Result<HashSet<Uuid>> {
|
||||
let (ret, recv) = oneshot::channel();
|
||||
let msg = Self::SnapshotRequest { ret, path };
|
||||
channel.send(msg).await?;
|
||||
recv.await?
|
||||
}
|
||||
|
||||
pub async fn delete(channel: &mpsc::Sender<Self>, uid: String) -> Result<Uuid> {
|
||||
let (ret, recv) = oneshot::channel();
|
||||
let msg = Self::Delete { ret, uid };
|
||||
channel.send(msg).await?;
|
||||
recv.await?
|
||||
}
|
||||
}
|
||||
|
@ -1,35 +1,118 @@
|
||||
mod actor;
|
||||
pub mod error;
|
||||
mod handle_impl;
|
||||
mod message;
|
||||
pub mod store;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::path::PathBuf;
|
||||
use std::path::Path;
|
||||
use std::{collections::HashSet, path::PathBuf};
|
||||
|
||||
use log::{trace, warn};
|
||||
use tokio::sync::mpsc;
|
||||
use uuid::Uuid;
|
||||
|
||||
use actor::UuidResolverActor;
|
||||
use error::Result;
|
||||
use message::UuidResolveMsg;
|
||||
use store::UuidStore;
|
||||
pub use self::error::UuidResolverError;
|
||||
pub use self::message::UuidResolverMsg;
|
||||
pub use self::store::{HeedUuidStore, UuidStore};
|
||||
use self::error::Result;
|
||||
|
||||
#[cfg(test)]
|
||||
use mockall::automock;
|
||||
|
||||
pub use handle_impl::UuidResolverHandleImpl;
|
||||
pub use store::HeedUuidStore;
|
||||
pub type UuidResolverSender = mpsc::Sender<UuidResolverMsg>;
|
||||
|
||||
const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[cfg_attr(test, automock)]
|
||||
pub trait UuidResolverHandle {
|
||||
async fn get(&self, name: String) -> Result<Uuid>;
|
||||
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
|
||||
async fn delete(&self, name: String) -> Result<Uuid>;
|
||||
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
|
||||
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
|
||||
async fn get_size(&self) -> Result<u64>;
|
||||
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
|
||||
pub fn create_uuid_resolver(path: impl AsRef<Path>) -> Result<mpsc::Sender<UuidResolverMsg>> {
|
||||
let (sender, reveiver) = mpsc::channel(100);
|
||||
let store = HeedUuidStore::new(path)?;
|
||||
let actor = UuidResolver::new(reveiver, store);
|
||||
tokio::spawn(actor.run());
|
||||
Ok(sender)
|
||||
}
|
||||
|
||||
pub struct UuidResolver<S> {
|
||||
inbox: mpsc::Receiver<UuidResolverMsg>,
|
||||
store: S,
|
||||
}
|
||||
|
||||
impl<S: UuidStore> UuidResolver<S> {
|
||||
pub fn new(inbox: mpsc::Receiver<UuidResolverMsg>, store: S) -> Self {
|
||||
Self { inbox, store }
|
||||
}
|
||||
|
||||
pub async fn run(mut self) {
|
||||
use UuidResolverMsg::*;
|
||||
|
||||
trace!("uuid resolver started");
|
||||
|
||||
loop {
|
||||
match self.inbox.recv().await {
|
||||
Some(Get { uid: name, ret }) => {
|
||||
let _ = ret.send(self.handle_get(name).await);
|
||||
}
|
||||
Some(Delete { uid: name, ret }) => {
|
||||
let _ = ret.send(self.handle_delete(name).await);
|
||||
}
|
||||
Some(List { ret }) => {
|
||||
let _ = ret.send(self.handle_list().await);
|
||||
}
|
||||
Some(Insert { ret, uuid, name }) => {
|
||||
let _ = ret.send(self.handle_insert(name, uuid).await);
|
||||
}
|
||||
Some(SnapshotRequest { path, ret }) => {
|
||||
let _ = ret.send(self.handle_snapshot(path).await);
|
||||
}
|
||||
Some(GetSize { ret }) => {
|
||||
let _ = ret.send(self.handle_get_size().await);
|
||||
}
|
||||
Some(DumpRequest { path, ret }) => {
|
||||
let _ = ret.send(self.handle_dump(path).await);
|
||||
}
|
||||
// all senders have been dropped, need to quit.
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
warn!("exiting uuid resolver loop");
|
||||
}
|
||||
|
||||
async fn handle_get(&self, uid: String) -> Result<Uuid> {
|
||||
self.store
|
||||
.get_uuid(uid.clone())
|
||||
.await?
|
||||
.ok_or(UuidResolverError::UnexistingIndex(uid))
|
||||
}
|
||||
|
||||
async fn handle_delete(&self, uid: String) -> Result<Uuid> {
|
||||
self.store
|
||||
.delete(uid.clone())
|
||||
.await?
|
||||
.ok_or(UuidResolverError::UnexistingIndex(uid))
|
||||
}
|
||||
|
||||
async fn handle_list(&self) -> Result<Vec<(String, Uuid)>> {
|
||||
let result = self.store.list().await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn handle_snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
||||
self.store.snapshot(path).await
|
||||
}
|
||||
|
||||
async fn handle_dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
||||
self.store.dump(path).await
|
||||
}
|
||||
|
||||
async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> {
|
||||
if !is_index_uid_valid(&uid) {
|
||||
return Err(UuidResolverError::BadlyFormatted(uid));
|
||||
}
|
||||
self.store.insert(uid, uuid).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_get_size(&self) -> Result<u64> {
|
||||
self.store.get_size().await
|
||||
}
|
||||
}
|
||||
|
||||
fn is_index_uid_valid(uid: &str) -> bool {
|
||||
uid.chars()
|
||||
.all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_')
|
||||
}
|
||||
|
@ -8,7 +8,8 @@ use heed::{CompactionOption, Database, Env, EnvOpenOptions};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{error::UuidResolverError, Result, UUID_STORE_SIZE};
|
||||
use super::UUID_STORE_SIZE;
|
||||
use super::error::{UuidResolverError, Result};
|
||||
use crate::EnvSizer;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
Loading…
x
Reference in New Issue
Block a user