mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 06:44:27 +01:00
remove uuid resolver and index actor
This commit is contained in:
parent
6a1964f146
commit
9ac999ca59
@ -1,64 +0,0 @@
|
|||||||
use std::fmt;
|
|
||||||
|
|
||||||
use meilisearch_error::{Code, ErrorCode};
|
|
||||||
|
|
||||||
use crate::{error::MilliError, index::error::IndexError};
|
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, IndexActorError>;
|
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
|
||||||
pub enum IndexActorError {
|
|
||||||
#[error("{0}")]
|
|
||||||
IndexError(#[from] IndexError),
|
|
||||||
#[error("Index already exists")]
|
|
||||||
IndexAlreadyExists,
|
|
||||||
#[error("Index not found")]
|
|
||||||
UnexistingIndex,
|
|
||||||
#[error("A primary key is already present. It's impossible to update it")]
|
|
||||||
ExistingPrimaryKey,
|
|
||||||
#[error("Internal Error: {0}")]
|
|
||||||
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
|
|
||||||
#[error("{0}")]
|
|
||||||
Milli(#[from] milli::Error),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for IndexActorError
|
|
||||||
where T: Send + Sync + 'static + fmt::Debug
|
|
||||||
{
|
|
||||||
fn from(other: tokio::sync::mpsc::error::SendError<T>) -> Self {
|
|
||||||
Self::Internal(Box::new(other))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<tokio::sync::oneshot::error::RecvError> for IndexActorError {
|
|
||||||
fn from(other: tokio::sync::oneshot::error::RecvError) -> Self {
|
|
||||||
Self::Internal(Box::new(other))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! internal_error {
|
|
||||||
($($other:path), *) => {
|
|
||||||
$(
|
|
||||||
impl From<$other> for IndexActorError {
|
|
||||||
fn from(other: $other) -> Self {
|
|
||||||
Self::Internal(Box::new(other))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)*
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
internal_error!(heed::Error, tokio::task::JoinError, std::io::Error);
|
|
||||||
|
|
||||||
impl ErrorCode for IndexActorError {
|
|
||||||
fn error_code(&self) -> Code {
|
|
||||||
match self {
|
|
||||||
IndexActorError::IndexError(e) => e.error_code(),
|
|
||||||
IndexActorError::IndexAlreadyExists => Code::IndexAlreadyExists,
|
|
||||||
IndexActorError::UnexistingIndex => Code::IndexNotFound,
|
|
||||||
IndexActorError::ExistingPrimaryKey => Code::PrimaryKeyAlreadyPresent,
|
|
||||||
IndexActorError::Internal(_) => Code::Internal,
|
|
||||||
IndexActorError::Milli(e) => MilliError(e).error_code(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,212 +0,0 @@
|
|||||||
use std::path::PathBuf;
|
|
||||||
|
|
||||||
use tokio::sync::{mpsc, oneshot};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use super::error::Result;
|
|
||||||
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
|
|
||||||
use crate::index_controller::updates::status::{Failed, Processed, Processing};
|
|
||||||
use crate::index_controller::{IndexSettings, IndexStats};
|
|
||||||
|
|
||||||
use super::IndexMeta;
|
|
||||||
|
|
||||||
#[allow(clippy::large_enum_variant)]
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum IndexMsg {
|
|
||||||
CreateIndex {
|
|
||||||
uuid: Uuid,
|
|
||||||
primary_key: Option<String>,
|
|
||||||
ret: oneshot::Sender<Result<IndexMeta>>,
|
|
||||||
},
|
|
||||||
Update {
|
|
||||||
uuid: Uuid,
|
|
||||||
meta: Processing,
|
|
||||||
ret: oneshot::Sender<Result<std::result::Result<Processed, Failed>>>,
|
|
||||||
},
|
|
||||||
Search {
|
|
||||||
uuid: Uuid,
|
|
||||||
query: SearchQuery,
|
|
||||||
ret: oneshot::Sender<Result<SearchResult>>,
|
|
||||||
},
|
|
||||||
Settings {
|
|
||||||
uuid: Uuid,
|
|
||||||
ret: oneshot::Sender<Result<Settings<Checked>>>,
|
|
||||||
},
|
|
||||||
Documents {
|
|
||||||
uuid: Uuid,
|
|
||||||
attributes_to_retrieve: Option<Vec<String>>,
|
|
||||||
offset: usize,
|
|
||||||
limit: usize,
|
|
||||||
ret: oneshot::Sender<Result<Vec<Document>>>,
|
|
||||||
},
|
|
||||||
Document {
|
|
||||||
uuid: Uuid,
|
|
||||||
attributes_to_retrieve: Option<Vec<String>>,
|
|
||||||
doc_id: String,
|
|
||||||
ret: oneshot::Sender<Result<Document>>,
|
|
||||||
},
|
|
||||||
Delete {
|
|
||||||
uuid: Uuid,
|
|
||||||
ret: oneshot::Sender<Result<()>>,
|
|
||||||
},
|
|
||||||
GetMeta {
|
|
||||||
uuid: Uuid,
|
|
||||||
ret: oneshot::Sender<Result<IndexMeta>>,
|
|
||||||
},
|
|
||||||
UpdateIndex {
|
|
||||||
uuid: Uuid,
|
|
||||||
index_settings: IndexSettings,
|
|
||||||
ret: oneshot::Sender<Result<IndexMeta>>,
|
|
||||||
},
|
|
||||||
Snapshot {
|
|
||||||
uuid: Uuid,
|
|
||||||
path: PathBuf,
|
|
||||||
ret: oneshot::Sender<Result<()>>,
|
|
||||||
},
|
|
||||||
Dump {
|
|
||||||
uuid: Uuid,
|
|
||||||
path: PathBuf,
|
|
||||||
ret: oneshot::Sender<Result<()>>,
|
|
||||||
},
|
|
||||||
GetStats {
|
|
||||||
uuid: Uuid,
|
|
||||||
ret: oneshot::Sender<Result<IndexStats>>,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IndexMsg {
|
|
||||||
pub async fn search(
|
|
||||||
sender: &mpsc::Sender<Self>,
|
|
||||||
uuid: Uuid,
|
|
||||||
query: SearchQuery,
|
|
||||||
) -> Result<SearchResult> {
|
|
||||||
let (ret, rcv) = oneshot::channel();
|
|
||||||
let msg = Self::Search {
|
|
||||||
ret,
|
|
||||||
uuid,
|
|
||||||
query,
|
|
||||||
};
|
|
||||||
sender.send(msg).await?;
|
|
||||||
rcv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn update_index(
|
|
||||||
sender: &mpsc::Sender<Self>,
|
|
||||||
uuid: Uuid,
|
|
||||||
index_settings: IndexSettings,
|
|
||||||
) -> Result<IndexMeta> {
|
|
||||||
let (ret, rcv) = oneshot::channel();
|
|
||||||
let msg = Self::UpdateIndex {
|
|
||||||
ret,
|
|
||||||
uuid,
|
|
||||||
index_settings,
|
|
||||||
};
|
|
||||||
sender.send(msg).await?;
|
|
||||||
rcv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn create_index(
|
|
||||||
sender: &mpsc::Sender<Self>,
|
|
||||||
uuid: Uuid,
|
|
||||||
primary_key: Option<String>,
|
|
||||||
) -> Result<IndexMeta> {
|
|
||||||
let (ret, rcv) = oneshot::channel();
|
|
||||||
let msg = Self::CreateIndex {
|
|
||||||
ret,
|
|
||||||
uuid,
|
|
||||||
primary_key,
|
|
||||||
};
|
|
||||||
sender.send(msg).await?;
|
|
||||||
rcv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn index_meta(sender: &mpsc::Sender<Self>, uuid: Uuid) -> Result<IndexMeta> {
|
|
||||||
let (ret, rcv) = oneshot::channel();
|
|
||||||
let msg = Self::GetMeta { ret, uuid };
|
|
||||||
sender.send(msg).await?;
|
|
||||||
rcv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn index_stats(sender: &mpsc::Sender<Self>, uuid: Uuid) -> Result<IndexStats> {
|
|
||||||
let (ret, rcv) = oneshot::channel();
|
|
||||||
let msg = Self::GetStats { ret, uuid };
|
|
||||||
sender.send(msg).await?;
|
|
||||||
rcv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn settings(sender: &mpsc::Sender<Self>, uuid: Uuid) -> Result<Settings<Checked>> {
|
|
||||||
let (ret, rcv) = oneshot::channel();
|
|
||||||
let msg = Self::Settings { ret, uuid };
|
|
||||||
sender.send(msg).await?;
|
|
||||||
rcv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn documents(
|
|
||||||
sender: &mpsc::Sender<Self>,
|
|
||||||
uuid: Uuid,
|
|
||||||
offset: usize,
|
|
||||||
limit: usize,
|
|
||||||
attributes_to_retrieve: Option<Vec<String>>,
|
|
||||||
) -> Result<Vec<Document>> {
|
|
||||||
let (ret, rcv) = oneshot::channel();
|
|
||||||
let msg = Self::Documents {
|
|
||||||
ret,
|
|
||||||
uuid,
|
|
||||||
attributes_to_retrieve,
|
|
||||||
offset,
|
|
||||||
limit,
|
|
||||||
};
|
|
||||||
sender.send(msg).await?;
|
|
||||||
rcv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn document(
|
|
||||||
sender: &mpsc::Sender<Self>,
|
|
||||||
uuid: Uuid,
|
|
||||||
attributes_to_retrieve: Option<Vec<String>>,
|
|
||||||
doc_id: String,
|
|
||||||
) -> Result<Document> {
|
|
||||||
let (ret, rcv) = oneshot::channel();
|
|
||||||
let msg = Self::Document {
|
|
||||||
ret,
|
|
||||||
uuid,
|
|
||||||
attributes_to_retrieve,
|
|
||||||
doc_id,
|
|
||||||
};
|
|
||||||
sender.send(msg).await?;
|
|
||||||
rcv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn update(sender: &mpsc::Sender<Self>, uuid: Uuid, meta: Processing) -> Result<std::result::Result<Processed, Failed>> {
|
|
||||||
let (ret, rcv) = oneshot::channel();
|
|
||||||
let msg = Self::Update {
|
|
||||||
ret,
|
|
||||||
uuid,
|
|
||||||
meta,
|
|
||||||
};
|
|
||||||
sender.send(msg).await?;
|
|
||||||
rcv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn snapshot(sender: &mpsc::Sender<IndexMsg>, uuid: Uuid, path: PathBuf) -> Result<()> {
|
|
||||||
let (ret, rcv) = oneshot::channel();
|
|
||||||
let msg = Self::Snapshot {
|
|
||||||
uuid,
|
|
||||||
path,
|
|
||||||
ret,
|
|
||||||
};
|
|
||||||
sender.send(msg).await?;
|
|
||||||
rcv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn dump(sender: &mpsc::Sender<Self>, uuid: Uuid, path: PathBuf) -> Result<()> {
|
|
||||||
let (ret, rcv) = oneshot::channel();
|
|
||||||
let msg = Self::Dump {
|
|
||||||
uuid,
|
|
||||||
ret,
|
|
||||||
path,
|
|
||||||
};
|
|
||||||
sender.send(msg).await?;
|
|
||||||
rcv.await?
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,483 +0,0 @@
|
|||||||
use std::path::{Path, PathBuf};
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use async_stream::stream;
|
|
||||||
use futures::stream::StreamExt;
|
|
||||||
use heed::CompactionOption;
|
|
||||||
use log::debug;
|
|
||||||
use milli::update::UpdateBuilder;
|
|
||||||
use tokio::task::spawn_blocking;
|
|
||||||
use tokio::{fs, sync::mpsc};
|
|
||||||
|
|
||||||
use crate::index::update_handler::UpdateHandler;
|
|
||||||
use crate::index_controller::updates::status::{Failed, Processed, Processing};
|
|
||||||
use crate::index_controller::{get_arc_ownership_blocking, IndexStats};
|
|
||||||
use crate::options::IndexerOpts;
|
|
||||||
|
|
||||||
pub const CONCURRENT_INDEX_MSG: usize = 10;
|
|
||||||
|
|
||||||
use chrono::{DateTime, Utc};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
pub use message::IndexMsg;
|
|
||||||
|
|
||||||
use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings};
|
|
||||||
use error::Result;
|
|
||||||
|
|
||||||
use self::error::IndexActorError;
|
|
||||||
use self::store::{IndexStore, MapIndexStore};
|
|
||||||
|
|
||||||
use super::IndexSettings;
|
|
||||||
|
|
||||||
pub mod error;
|
|
||||||
mod message;
|
|
||||||
mod store;
|
|
||||||
|
|
||||||
pub type IndexHandlerSender = mpsc::Sender<IndexMsg>;
|
|
||||||
|
|
||||||
pub fn create_indexes_handler(
|
|
||||||
db_path: impl AsRef<Path>,
|
|
||||||
index_size: usize,
|
|
||||||
indexer_options: &IndexerOpts,
|
|
||||||
) -> anyhow::Result<IndexHandlerSender> {
|
|
||||||
let (sender, receiver) = mpsc::channel(100);
|
|
||||||
let store = MapIndexStore::new(&db_path, index_size, indexer_options);
|
|
||||||
let actor = IndexActor::new(receiver, store, indexer_options)?;
|
|
||||||
|
|
||||||
tokio::task::spawn(actor.run());
|
|
||||||
|
|
||||||
Ok(sender)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct IndexMeta {
|
|
||||||
created_at: DateTime<Utc>,
|
|
||||||
pub updated_at: DateTime<Utc>,
|
|
||||||
pub primary_key: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IndexMeta {
|
|
||||||
pub fn new(index: &Index) -> Result<Self> {
|
|
||||||
let txn = index.read_txn()?;
|
|
||||||
Self::new_txn(index, &txn)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> {
|
|
||||||
let created_at = index.created_at(txn)?;
|
|
||||||
let updated_at = index.updated_at(txn)?;
|
|
||||||
let primary_key = index.primary_key(txn)?.map(String::from);
|
|
||||||
Ok(Self {
|
|
||||||
created_at,
|
|
||||||
updated_at,
|
|
||||||
primary_key,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct IndexActor<S> {
|
|
||||||
receiver: Option<mpsc::Receiver<IndexMsg>>,
|
|
||||||
update_handler: Arc<UpdateHandler>,
|
|
||||||
store: S,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> IndexActor<S>
|
|
||||||
where
|
|
||||||
S: IndexStore + Sync + Send,
|
|
||||||
{
|
|
||||||
pub fn new(
|
|
||||||
receiver: mpsc::Receiver<IndexMsg>,
|
|
||||||
store: S,
|
|
||||||
options: &IndexerOpts,
|
|
||||||
) -> anyhow::Result<Self> {
|
|
||||||
let update_handler = Arc::new(UpdateHandler::new(options)?);
|
|
||||||
let receiver = Some(receiver);
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
receiver,
|
|
||||||
update_handler,
|
|
||||||
store,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// `run` poll the write_receiver and read_receiver concurrently, but while messages send
|
|
||||||
/// through the read channel are processed concurrently, the messages sent through the write
|
|
||||||
/// channel are processed one at a time.
|
|
||||||
pub async fn run(mut self) {
|
|
||||||
let mut receiver = self
|
|
||||||
.receiver
|
|
||||||
.take()
|
|
||||||
.expect("Index Actor must have a inbox at this point.");
|
|
||||||
|
|
||||||
let stream = stream! {
|
|
||||||
loop {
|
|
||||||
match receiver.recv().await {
|
|
||||||
Some(msg) => yield msg,
|
|
||||||
None => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
stream
|
|
||||||
.for_each_concurrent(Some(CONCURRENT_INDEX_MSG), |msg| self.handle_message(msg))
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_message(&self, msg: IndexMsg) {
|
|
||||||
use IndexMsg::*;
|
|
||||||
match msg {
|
|
||||||
CreateIndex {
|
|
||||||
uuid,
|
|
||||||
primary_key,
|
|
||||||
ret,
|
|
||||||
} => {
|
|
||||||
let _ = ret.send(self.handle_create_index(uuid, primary_key).await);
|
|
||||||
}
|
|
||||||
Update { ret, meta, uuid } => {
|
|
||||||
let _ = ret.send(self.handle_update(uuid, meta).await);
|
|
||||||
}
|
|
||||||
Search { ret, query, uuid } => {
|
|
||||||
let _ = ret.send(self.handle_search(uuid, query).await);
|
|
||||||
}
|
|
||||||
Settings { ret, uuid } => {
|
|
||||||
let _ = ret.send(self.handle_settings(uuid).await);
|
|
||||||
}
|
|
||||||
Documents {
|
|
||||||
ret,
|
|
||||||
uuid,
|
|
||||||
attributes_to_retrieve,
|
|
||||||
offset,
|
|
||||||
limit,
|
|
||||||
} => {
|
|
||||||
let _ = ret.send(
|
|
||||||
self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve)
|
|
||||||
.await,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Document {
|
|
||||||
uuid,
|
|
||||||
attributes_to_retrieve,
|
|
||||||
doc_id,
|
|
||||||
ret,
|
|
||||||
} => {
|
|
||||||
let _ = ret.send(
|
|
||||||
self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve)
|
|
||||||
.await,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Delete { uuid, ret } => {
|
|
||||||
let _ = ret.send(self.handle_delete(uuid).await);
|
|
||||||
}
|
|
||||||
GetMeta { uuid, ret } => {
|
|
||||||
let _ = ret.send(self.handle_get_meta(uuid).await);
|
|
||||||
}
|
|
||||||
UpdateIndex {
|
|
||||||
uuid,
|
|
||||||
index_settings,
|
|
||||||
ret,
|
|
||||||
} => {
|
|
||||||
let _ = ret.send(self.handle_update_index(uuid, index_settings).await);
|
|
||||||
}
|
|
||||||
Snapshot { uuid, path, ret } => {
|
|
||||||
let _ = ret.send(self.handle_snapshot(uuid, path).await);
|
|
||||||
}
|
|
||||||
Dump { uuid, path, ret } => {
|
|
||||||
let _ = ret.send(self.handle_dump(uuid, path).await);
|
|
||||||
}
|
|
||||||
GetStats { uuid, ret } => {
|
|
||||||
let _ = ret.send(self.handle_get_stats(uuid).await);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
|
|
||||||
let index = self
|
|
||||||
.store
|
|
||||||
.get(uuid)
|
|
||||||
.await?
|
|
||||||
.ok_or(IndexActorError::UnexistingIndex)?;
|
|
||||||
let result = spawn_blocking(move || index.perform_search(query)).await??;
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_create_index(
|
|
||||||
&self,
|
|
||||||
uuid: Uuid,
|
|
||||||
primary_key: Option<String>,
|
|
||||||
) -> Result<IndexMeta> {
|
|
||||||
let index = self.store.create(uuid, primary_key).await?;
|
|
||||||
let meta = spawn_blocking(move || IndexMeta::new(&index)).await??;
|
|
||||||
Ok(meta)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_update(
|
|
||||||
&self,
|
|
||||||
uuid: Uuid,
|
|
||||||
meta: Processing,
|
|
||||||
) -> Result<std::result::Result<Processed, Failed>> {
|
|
||||||
debug!("Processing update {}", meta.id());
|
|
||||||
let update_handler = self.update_handler.clone();
|
|
||||||
let index = match self.store.get(uuid).await? {
|
|
||||||
Some(index) => index,
|
|
||||||
None => self.store.create(uuid, None).await?,
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(spawn_blocking(move || update_handler.handle_update(&index, meta)).await?)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings<Checked>> {
|
|
||||||
let index = self
|
|
||||||
.store
|
|
||||||
.get(uuid)
|
|
||||||
.await?
|
|
||||||
.ok_or(IndexActorError::UnexistingIndex)?;
|
|
||||||
let result = spawn_blocking(move || index.settings()).await??;
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_fetch_documents(
|
|
||||||
&self,
|
|
||||||
uuid: Uuid,
|
|
||||||
offset: usize,
|
|
||||||
limit: usize,
|
|
||||||
attributes_to_retrieve: Option<Vec<String>>,
|
|
||||||
) -> Result<Vec<Document>> {
|
|
||||||
let index = self
|
|
||||||
.store
|
|
||||||
.get(uuid)
|
|
||||||
.await?
|
|
||||||
.ok_or(IndexActorError::UnexistingIndex)?;
|
|
||||||
let result =
|
|
||||||
spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve))
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_fetch_document(
|
|
||||||
&self,
|
|
||||||
uuid: Uuid,
|
|
||||||
doc_id: String,
|
|
||||||
attributes_to_retrieve: Option<Vec<String>>,
|
|
||||||
) -> Result<Document> {
|
|
||||||
let index = self
|
|
||||||
.store
|
|
||||||
.get(uuid)
|
|
||||||
.await?
|
|
||||||
.ok_or(IndexActorError::UnexistingIndex)?;
|
|
||||||
|
|
||||||
let result =
|
|
||||||
spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve))
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
|
|
||||||
let index = self.store.delete(uuid).await?;
|
|
||||||
|
|
||||||
if let Some(index) = index {
|
|
||||||
tokio::task::spawn(async move {
|
|
||||||
let index = index.inner;
|
|
||||||
let store = get_arc_ownership_blocking(index).await;
|
|
||||||
spawn_blocking(move || {
|
|
||||||
store.prepare_for_closing().wait();
|
|
||||||
debug!("Index closed");
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_get_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
|
|
||||||
match self.store.get(uuid).await? {
|
|
||||||
Some(index) => {
|
|
||||||
let meta = spawn_blocking(move || IndexMeta::new(&index)).await??;
|
|
||||||
Ok(meta)
|
|
||||||
}
|
|
||||||
None => Err(IndexActorError::UnexistingIndex),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_update_index(
|
|
||||||
&self,
|
|
||||||
uuid: Uuid,
|
|
||||||
index_settings: IndexSettings,
|
|
||||||
) -> Result<IndexMeta> {
|
|
||||||
let index = self
|
|
||||||
.store
|
|
||||||
.get(uuid)
|
|
||||||
.await?
|
|
||||||
.ok_or(IndexActorError::UnexistingIndex)?;
|
|
||||||
|
|
||||||
let result = spawn_blocking(move || match index_settings.primary_key {
|
|
||||||
Some(primary_key) => {
|
|
||||||
let mut txn = index.write_txn()?;
|
|
||||||
if index.primary_key(&txn)?.is_some() {
|
|
||||||
return Err(IndexActorError::ExistingPrimaryKey);
|
|
||||||
}
|
|
||||||
let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index);
|
|
||||||
builder.set_primary_key(primary_key);
|
|
||||||
builder.execute(|_, _| ())?;
|
|
||||||
let meta = IndexMeta::new_txn(&index, &txn)?;
|
|
||||||
txn.commit()?;
|
|
||||||
Ok(meta)
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
let meta = IndexMeta::new(&index)?;
|
|
||||||
Ok(meta)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> {
|
|
||||||
use tokio::fs::create_dir_all;
|
|
||||||
|
|
||||||
path.push("indexes");
|
|
||||||
create_dir_all(&path).await?;
|
|
||||||
|
|
||||||
if let Some(index) = self.store.get(uuid).await? {
|
|
||||||
let mut index_path = path.join(format!("index-{}", uuid));
|
|
||||||
|
|
||||||
create_dir_all(&index_path).await?;
|
|
||||||
|
|
||||||
index_path.push("data.mdb");
|
|
||||||
spawn_blocking(move || -> Result<()> {
|
|
||||||
// Get write txn to wait for ongoing write transaction before snapshot.
|
|
||||||
let _txn = index.write_txn()?;
|
|
||||||
index
|
|
||||||
.env
|
|
||||||
.copy_to_path(index_path, CompactionOption::Enabled)?;
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the
|
|
||||||
/// documents and all the settings.
|
|
||||||
async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
|
|
||||||
let index = self
|
|
||||||
.store
|
|
||||||
.get(uuid)
|
|
||||||
.await?
|
|
||||||
.ok_or(IndexActorError::UnexistingIndex)?;
|
|
||||||
|
|
||||||
let path = path.join(format!("indexes/index-{}/", uuid));
|
|
||||||
fs::create_dir_all(&path).await?;
|
|
||||||
|
|
||||||
tokio::task::spawn_blocking(move || index.dump(path)).await??;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_get_stats(&self, uuid: Uuid) -> Result<IndexStats> {
|
|
||||||
let index = self
|
|
||||||
.store
|
|
||||||
.get(uuid)
|
|
||||||
.await?
|
|
||||||
.ok_or(IndexActorError::UnexistingIndex)?;
|
|
||||||
|
|
||||||
spawn_blocking(move || {
|
|
||||||
let rtxn = index.read_txn()?;
|
|
||||||
|
|
||||||
Ok(IndexStats {
|
|
||||||
size: index.size(),
|
|
||||||
number_of_documents: index.number_of_documents(&rtxn)?,
|
|
||||||
is_indexing: None,
|
|
||||||
field_distribution: index.field_distribution(&rtxn)?,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.await?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
/// Useful for passing around an `Arc<MockIndexActorHandle>` in tests.
|
|
||||||
impl IndexActorHandle for Arc<MockIndexActorHandle> {
|
|
||||||
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
|
|
||||||
self.as_ref().create_index(uuid, primary_key).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update(
|
|
||||||
&self,
|
|
||||||
uuid: Uuid,
|
|
||||||
meta: Processing,
|
|
||||||
data: Option<std::fs::File>,
|
|
||||||
) -> Result<std::result::Result<Processed, Failed>> {
|
|
||||||
self.as_ref().update(uuid, meta, data).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
|
|
||||||
self.as_ref().search(uuid, query).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn settings(&self, uuid: Uuid) -> Result<Settings<Checked>> {
|
|
||||||
self.as_ref().settings(uuid).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn documents(
|
|
||||||
&self,
|
|
||||||
uuid: Uuid,
|
|
||||||
offset: usize,
|
|
||||||
limit: usize,
|
|
||||||
attributes_to_retrieve: Option<Vec<String>>,
|
|
||||||
) -> Result<Vec<Document>> {
|
|
||||||
self.as_ref()
|
|
||||||
.documents(uuid, offset, limit, attributes_to_retrieve)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn document(
|
|
||||||
&self,
|
|
||||||
uuid: Uuid,
|
|
||||||
doc_id: String,
|
|
||||||
attributes_to_retrieve: Option<Vec<String>>,
|
|
||||||
) -> Result<Document> {
|
|
||||||
self.as_ref()
|
|
||||||
.document(uuid, doc_id, attributes_to_retrieve)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn delete(&self, uuid: Uuid) -> Result<()> {
|
|
||||||
self.as_ref().delete(uuid).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
|
|
||||||
self.as_ref().get_index_meta(uuid).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_index(
|
|
||||||
&self,
|
|
||||||
uuid: Uuid,
|
|
||||||
index_settings: IndexSettings,
|
|
||||||
) -> Result<IndexMeta> {
|
|
||||||
self.as_ref().update_index(uuid, index_settings).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
|
|
||||||
self.as_ref().snapshot(uuid, path).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
|
|
||||||
self.as_ref().dump(uuid, path).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats> {
|
|
||||||
self.as_ref().get_index_stats(uuid).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,113 +0,0 @@
|
|||||||
use std::collections::HashMap;
|
|
||||||
use std::path::{Path, PathBuf};
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use milli::update::UpdateBuilder;
|
|
||||||
use tokio::fs;
|
|
||||||
use tokio::sync::RwLock;
|
|
||||||
use tokio::task::spawn_blocking;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use super::error::{IndexActorError, Result};
|
|
||||||
use crate::index::Index;
|
|
||||||
use crate::index::update_handler::UpdateHandler;
|
|
||||||
use crate::index_controller::update_file_store::UpdateFileStore;
|
|
||||||
|
|
||||||
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
pub trait IndexStore {
|
|
||||||
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
|
|
||||||
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
|
|
||||||
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>>;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct MapIndexStore {
|
|
||||||
index_store: AsyncMap<Uuid, Index>,
|
|
||||||
path: PathBuf,
|
|
||||||
index_size: usize,
|
|
||||||
update_file_store: Arc<UpdateFileStore>,
|
|
||||||
update_handler: Arc<UpdateHandler>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MapIndexStore {
|
|
||||||
pub fn new(path: impl AsRef<Path>, index_size: usize, update_handler: Arc<UpdateHandler>) -> Self {
|
|
||||||
let update_file_store = Arc::new(UpdateFileStore::new(path.as_ref()).unwrap());
|
|
||||||
let path = path.as_ref().join("indexes/");
|
|
||||||
let index_store = Arc::new(RwLock::new(HashMap::new()));
|
|
||||||
Self {
|
|
||||||
index_store,
|
|
||||||
path,
|
|
||||||
index_size,
|
|
||||||
update_file_store,
|
|
||||||
update_handler,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl IndexStore for MapIndexStore {
|
|
||||||
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index> {
|
|
||||||
// We need to keep the lock until we are sure the db file has been opened correclty, to
|
|
||||||
// ensure that another db is not created at the same time.
|
|
||||||
let mut lock = self.index_store.write().await;
|
|
||||||
|
|
||||||
if let Some(index) = lock.get(&uuid) {
|
|
||||||
return Ok(index.clone());
|
|
||||||
}
|
|
||||||
let path = self.path.join(format!("index-{}", uuid));
|
|
||||||
if path.exists() {
|
|
||||||
return Err(IndexActorError::IndexAlreadyExists);
|
|
||||||
}
|
|
||||||
|
|
||||||
let index_size = self.index_size;
|
|
||||||
let file_store = self.update_file_store.clone();
|
|
||||||
let update_handler = self.update_handler.clone();
|
|
||||||
let index = spawn_blocking(move || -> Result<Index> {
|
|
||||||
let index = Index::open(path, index_size, file_store, uuid, update_handler)?;
|
|
||||||
if let Some(primary_key) = primary_key {
|
|
||||||
let mut txn = index.write_txn()?;
|
|
||||||
|
|
||||||
let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index);
|
|
||||||
builder.set_primary_key(primary_key);
|
|
||||||
builder.execute(|_, _| ())?;
|
|
||||||
|
|
||||||
txn.commit()?;
|
|
||||||
}
|
|
||||||
Ok(index)
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
lock.insert(uuid, index.clone());
|
|
||||||
|
|
||||||
Ok(index)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
|
|
||||||
let guard = self.index_store.read().await;
|
|
||||||
match guard.get(&uuid) {
|
|
||||||
Some(index) => Ok(Some(index.clone())),
|
|
||||||
None => {
|
|
||||||
// drop the guard here so we can perform the write after without deadlocking;
|
|
||||||
drop(guard);
|
|
||||||
let path = self.path.join(format!("index-{}", uuid));
|
|
||||||
if !path.exists() {
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
|
|
||||||
let index_size = self.index_size;
|
|
||||||
let file_store = self.update_file_store.clone();
|
|
||||||
let index = spawn_blocking(move || Index::open(path, index_size, file_store)).await??;
|
|
||||||
self.index_store.write().await.insert(uuid, index.clone());
|
|
||||||
Ok(Some(index))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>> {
|
|
||||||
let db_path = self.path.join(format!("index-{}", uuid));
|
|
||||||
fs::remove_dir_all(db_path).await?;
|
|
||||||
let index = self.index_store.write().await.remove(&uuid);
|
|
||||||
Ok(index)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,50 +0,0 @@
|
|||||||
use std::fmt;
|
|
||||||
|
|
||||||
use meilisearch_error::{Code, ErrorCode};
|
|
||||||
use tokio::sync::mpsc::error::SendError as MpscSendError;
|
|
||||||
use tokio::sync::oneshot::error::RecvError as OneshotRecvError;
|
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, UuidResolverError>;
|
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
|
||||||
pub enum UuidResolverError {
|
|
||||||
#[error("Index already exists.")]
|
|
||||||
NameAlreadyExist,
|
|
||||||
#[error("Index \"{0}\" not found.")]
|
|
||||||
UnexistingIndex(String),
|
|
||||||
#[error("Index must have a valid uid; Index uid can be of type integer or string only composed of alphanumeric characters, hyphens (-) and underscores (_).")]
|
|
||||||
BadlyFormatted(String),
|
|
||||||
#[error("Internal error: {0}")]
|
|
||||||
Internal(Box<dyn std::error::Error + Sync + Send + 'static>),
|
|
||||||
}
|
|
||||||
|
|
||||||
internal_error!(
|
|
||||||
UuidResolverError: heed::Error,
|
|
||||||
uuid::Error,
|
|
||||||
std::io::Error,
|
|
||||||
tokio::task::JoinError,
|
|
||||||
serde_json::Error
|
|
||||||
);
|
|
||||||
|
|
||||||
impl<T: Sync + Send + 'static + fmt::Debug> From<MpscSendError<T>> for UuidResolverError {
|
|
||||||
fn from(other: MpscSendError<T>) -> Self {
|
|
||||||
Self::Internal(Box::new(other))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<OneshotRecvError> for UuidResolverError {
|
|
||||||
fn from(other: OneshotRecvError) -> Self {
|
|
||||||
Self::Internal(Box::new(other))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ErrorCode for UuidResolverError {
|
|
||||||
fn error_code(&self) -> Code {
|
|
||||||
match self {
|
|
||||||
UuidResolverError::NameAlreadyExist => Code::IndexAlreadyExists,
|
|
||||||
UuidResolverError::UnexistingIndex(_) => Code::IndexNotFound,
|
|
||||||
UuidResolverError::BadlyFormatted(_) => Code::InvalidIndexUid,
|
|
||||||
UuidResolverError::Internal(_) => Code::Internal,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,89 +0,0 @@
|
|||||||
use std::collections::HashSet;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
|
|
||||||
use tokio::sync::{mpsc, oneshot};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use super::error::Result;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum UuidResolverMsg {
|
|
||||||
Get {
|
|
||||||
uid: String,
|
|
||||||
ret: oneshot::Sender<Result<Uuid>>,
|
|
||||||
},
|
|
||||||
Delete {
|
|
||||||
uid: String,
|
|
||||||
ret: oneshot::Sender<Result<Uuid>>,
|
|
||||||
},
|
|
||||||
List {
|
|
||||||
ret: oneshot::Sender<Result<Vec<(String, Uuid)>>>,
|
|
||||||
},
|
|
||||||
Insert {
|
|
||||||
uuid: Uuid,
|
|
||||||
name: String,
|
|
||||||
ret: oneshot::Sender<Result<()>>,
|
|
||||||
},
|
|
||||||
SnapshotRequest {
|
|
||||||
path: PathBuf,
|
|
||||||
ret: oneshot::Sender<Result<HashSet<Uuid>>>,
|
|
||||||
},
|
|
||||||
GetSize {
|
|
||||||
ret: oneshot::Sender<Result<u64>>,
|
|
||||||
},
|
|
||||||
DumpRequest {
|
|
||||||
path: PathBuf,
|
|
||||||
ret: oneshot::Sender<Result<HashSet<Uuid>>>,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UuidResolverMsg {
|
|
||||||
pub async fn get(channel: &mpsc::Sender<Self>, uid: String) -> Result<Uuid> {
|
|
||||||
let (ret, recv) = oneshot::channel();
|
|
||||||
let msg = Self::Get { uid, ret };
|
|
||||||
channel.send(msg).await?;
|
|
||||||
recv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn insert(channel: &mpsc::Sender<Self>, uuid: Uuid, name: String) -> Result<()> {
|
|
||||||
let (ret, recv) = oneshot::channel();
|
|
||||||
let msg = Self::Insert { name, uuid, ret };
|
|
||||||
channel.send(msg).await?;
|
|
||||||
recv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn list(channel: &mpsc::Sender<Self>) -> Result<Vec<(String, Uuid)>> {
|
|
||||||
let (ret, recv) = oneshot::channel();
|
|
||||||
let msg = Self::List { ret };
|
|
||||||
channel.send(msg).await?;
|
|
||||||
recv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_size(channel: &mpsc::Sender<Self>) -> Result<u64> {
|
|
||||||
let (ret, recv) = oneshot::channel();
|
|
||||||
let msg = Self::GetSize { ret };
|
|
||||||
channel.send(msg).await?;
|
|
||||||
recv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn dump(channel: &mpsc::Sender<Self>, path: PathBuf) -> Result<HashSet<Uuid>> {
|
|
||||||
let (ret, recv) = oneshot::channel();
|
|
||||||
let msg = Self::DumpRequest { ret, path };
|
|
||||||
channel.send(msg).await?;
|
|
||||||
recv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn snapshot(channel: &mpsc::Sender<Self>, path: PathBuf) -> Result<HashSet<Uuid>> {
|
|
||||||
let (ret, recv) = oneshot::channel();
|
|
||||||
let msg = Self::SnapshotRequest { ret, path };
|
|
||||||
channel.send(msg).await?;
|
|
||||||
recv.await?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn delete(channel: &mpsc::Sender<Self>, uid: String) -> Result<Uuid> {
|
|
||||||
let (ret, recv) = oneshot::channel();
|
|
||||||
let msg = Self::Delete { ret, uid };
|
|
||||||
channel.send(msg).await?;
|
|
||||||
recv.await?
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,118 +0,0 @@
|
|||||||
pub mod error;
|
|
||||||
mod message;
|
|
||||||
pub mod store;
|
|
||||||
|
|
||||||
use std::path::Path;
|
|
||||||
use std::{collections::HashSet, path::PathBuf};
|
|
||||||
|
|
||||||
use log::{trace, warn};
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
pub use self::error::UuidResolverError;
|
|
||||||
pub use self::message::UuidResolverMsg;
|
|
||||||
pub use self::store::{HeedUuidStore, UuidStore};
|
|
||||||
use self::error::Result;
|
|
||||||
|
|
||||||
pub type UuidResolverSender = mpsc::Sender<UuidResolverMsg>;
|
|
||||||
|
|
||||||
const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB
|
|
||||||
|
|
||||||
pub fn create_uuid_resolver(path: impl AsRef<Path>) -> Result<mpsc::Sender<UuidResolverMsg>> {
|
|
||||||
let (sender, reveiver) = mpsc::channel(100);
|
|
||||||
let store = HeedUuidStore::new(path)?;
|
|
||||||
let actor = UuidResolver::new(reveiver, store);
|
|
||||||
tokio::spawn(actor.run());
|
|
||||||
Ok(sender)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct UuidResolver<S> {
|
|
||||||
inbox: mpsc::Receiver<UuidResolverMsg>,
|
|
||||||
store: S,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S: UuidStore> UuidResolver<S> {
|
|
||||||
pub fn new(inbox: mpsc::Receiver<UuidResolverMsg>, store: S) -> Self {
|
|
||||||
Self { inbox, store }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run(mut self) {
|
|
||||||
use UuidResolverMsg::*;
|
|
||||||
|
|
||||||
trace!("uuid resolver started");
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match self.inbox.recv().await {
|
|
||||||
Some(Get { uid: name, ret }) => {
|
|
||||||
let _ = ret.send(self.handle_get(name).await);
|
|
||||||
}
|
|
||||||
Some(Delete { uid: name, ret }) => {
|
|
||||||
let _ = ret.send(self.handle_delete(name).await);
|
|
||||||
}
|
|
||||||
Some(List { ret }) => {
|
|
||||||
let _ = ret.send(self.handle_list().await);
|
|
||||||
}
|
|
||||||
Some(Insert { ret, uuid, name }) => {
|
|
||||||
let _ = ret.send(self.handle_insert(name, uuid).await);
|
|
||||||
}
|
|
||||||
Some(SnapshotRequest { path, ret }) => {
|
|
||||||
let _ = ret.send(self.handle_snapshot(path).await);
|
|
||||||
}
|
|
||||||
Some(GetSize { ret }) => {
|
|
||||||
let _ = ret.send(self.handle_get_size().await);
|
|
||||||
}
|
|
||||||
Some(DumpRequest { path, ret }) => {
|
|
||||||
let _ = ret.send(self.handle_dump(path).await);
|
|
||||||
}
|
|
||||||
// all senders have been dropped, need to quit.
|
|
||||||
None => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
warn!("exiting uuid resolver loop");
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_get(&self, uid: String) -> Result<Uuid> {
|
|
||||||
self.store
|
|
||||||
.get_uuid(uid.clone())
|
|
||||||
.await?
|
|
||||||
.ok_or(UuidResolverError::UnexistingIndex(uid))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_delete(&self, uid: String) -> Result<Uuid> {
|
|
||||||
self.store
|
|
||||||
.delete(uid.clone())
|
|
||||||
.await?
|
|
||||||
.ok_or(UuidResolverError::UnexistingIndex(uid))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_list(&self) -> Result<Vec<(String, Uuid)>> {
|
|
||||||
let result = self.store.list().await?;
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
|
||||||
self.store.snapshot(path).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
|
||||||
self.store.dump(path).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> {
|
|
||||||
if !is_index_uid_valid(&uid) {
|
|
||||||
return Err(UuidResolverError::BadlyFormatted(uid));
|
|
||||||
}
|
|
||||||
self.store.insert(uid, uuid).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_get_size(&self) -> Result<u64> {
|
|
||||||
self.store.get_size().await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_index_uid_valid(uid: &str) -> bool {
|
|
||||||
uid.chars()
|
|
||||||
.all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_')
|
|
||||||
}
|
|
@ -1,225 +0,0 @@
|
|||||||
use std::collections::HashSet;
|
|
||||||
use std::fs::{create_dir_all, File};
|
|
||||||
use std::io::{BufRead, BufReader, Write};
|
|
||||||
use std::path::{Path, PathBuf};
|
|
||||||
|
|
||||||
use heed::types::{ByteSlice, Str};
|
|
||||||
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use super::UUID_STORE_SIZE;
|
|
||||||
use super::error::{UuidResolverError, Result};
|
|
||||||
use crate::EnvSizer;
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct DumpEntry {
|
|
||||||
uuid: Uuid,
|
|
||||||
uid: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
const UUIDS_DB_PATH: &str = "index_uuids";
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
pub trait UuidStore: Sized {
|
|
||||||
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return
|
|
||||||
// the uuid otherwise.
|
|
||||||
async fn get_uuid(&self, uid: String) -> Result<Option<Uuid>>;
|
|
||||||
async fn delete(&self, uid: String) -> Result<Option<Uuid>>;
|
|
||||||
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
|
|
||||||
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
|
|
||||||
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
|
|
||||||
async fn get_size(&self) -> Result<u64>;
|
|
||||||
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct HeedUuidStore {
|
|
||||||
env: Env,
|
|
||||||
db: Database<Str, ByteSlice>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HeedUuidStore {
|
|
||||||
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
|
|
||||||
let path = path.as_ref().join(UUIDS_DB_PATH);
|
|
||||||
create_dir_all(&path)?;
|
|
||||||
let mut options = EnvOpenOptions::new();
|
|
||||||
options.map_size(UUID_STORE_SIZE); // 1GB
|
|
||||||
let env = options.open(path)?;
|
|
||||||
let db = env.create_database(None)?;
|
|
||||||
Ok(Self { env, db })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
|
|
||||||
let env = self.env.clone();
|
|
||||||
let db = self.db;
|
|
||||||
let txn = env.read_txn()?;
|
|
||||||
match db.get(&txn, &name)? {
|
|
||||||
Some(uuid) => {
|
|
||||||
let uuid = Uuid::from_slice(uuid)?;
|
|
||||||
Ok(Some(uuid))
|
|
||||||
}
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn delete(&self, uid: String) -> Result<Option<Uuid>> {
|
|
||||||
let env = self.env.clone();
|
|
||||||
let db = self.db;
|
|
||||||
let mut txn = env.write_txn()?;
|
|
||||||
match db.get(&txn, &uid)? {
|
|
||||||
Some(uuid) => {
|
|
||||||
let uuid = Uuid::from_slice(uuid)?;
|
|
||||||
db.delete(&mut txn, &uid)?;
|
|
||||||
txn.commit()?;
|
|
||||||
Ok(Some(uuid))
|
|
||||||
}
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn list(&self) -> Result<Vec<(String, Uuid)>> {
|
|
||||||
let env = self.env.clone();
|
|
||||||
let db = self.db;
|
|
||||||
let txn = env.read_txn()?;
|
|
||||||
let mut entries = Vec::new();
|
|
||||||
for entry in db.iter(&txn)? {
|
|
||||||
let (name, uuid) = entry?;
|
|
||||||
let uuid = Uuid::from_slice(uuid)?;
|
|
||||||
entries.push((name.to_owned(), uuid))
|
|
||||||
}
|
|
||||||
Ok(entries)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
|
|
||||||
let env = self.env.clone();
|
|
||||||
let db = self.db;
|
|
||||||
let mut txn = env.write_txn()?;
|
|
||||||
|
|
||||||
if db.get(&txn, &name)?.is_some() {
|
|
||||||
return Err(UuidResolverError::NameAlreadyExist);
|
|
||||||
}
|
|
||||||
|
|
||||||
db.put(&mut txn, &name, uuid.as_bytes())?;
|
|
||||||
txn.commit()?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn snapshot(&self, mut path: PathBuf) -> Result<HashSet<Uuid>> {
|
|
||||||
let env = self.env.clone();
|
|
||||||
let db = self.db;
|
|
||||||
// Write transaction to acquire a lock on the database.
|
|
||||||
let txn = env.write_txn()?;
|
|
||||||
let mut entries = HashSet::new();
|
|
||||||
for entry in db.iter(&txn)? {
|
|
||||||
let (_, uuid) = entry?;
|
|
||||||
let uuid = Uuid::from_slice(uuid)?;
|
|
||||||
entries.insert(uuid);
|
|
||||||
}
|
|
||||||
|
|
||||||
// only perform snapshot if there are indexes
|
|
||||||
if !entries.is_empty() {
|
|
||||||
path.push(UUIDS_DB_PATH);
|
|
||||||
create_dir_all(&path).unwrap();
|
|
||||||
path.push("data.mdb");
|
|
||||||
env.copy_to_path(path, CompactionOption::Enabled)?;
|
|
||||||
}
|
|
||||||
Ok(entries)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_size(&self) -> Result<u64> {
|
|
||||||
Ok(self.env.size())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
|
||||||
let dump_path = path.join(UUIDS_DB_PATH);
|
|
||||||
create_dir_all(&dump_path)?;
|
|
||||||
let dump_file_path = dump_path.join("data.jsonl");
|
|
||||||
let mut dump_file = File::create(&dump_file_path)?;
|
|
||||||
let mut uuids = HashSet::new();
|
|
||||||
|
|
||||||
let txn = self.env.read_txn()?;
|
|
||||||
for entry in self.db.iter(&txn)? {
|
|
||||||
let (uid, uuid) = entry?;
|
|
||||||
let uid = uid.to_string();
|
|
||||||
let uuid = Uuid::from_slice(uuid)?;
|
|
||||||
|
|
||||||
let entry = DumpEntry { uuid, uid };
|
|
||||||
serde_json::to_writer(&mut dump_file, &entry)?;
|
|
||||||
dump_file.write_all(b"\n").unwrap();
|
|
||||||
|
|
||||||
uuids.insert(uuid);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(uuids)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<()> {
|
|
||||||
let uuid_resolver_path = dst.as_ref().join(UUIDS_DB_PATH);
|
|
||||||
std::fs::create_dir_all(&uuid_resolver_path)?;
|
|
||||||
|
|
||||||
let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl");
|
|
||||||
let indexes = File::open(&src_indexes)?;
|
|
||||||
let mut indexes = BufReader::new(indexes);
|
|
||||||
let mut line = String::new();
|
|
||||||
|
|
||||||
let db = Self::new(dst)?;
|
|
||||||
let mut txn = db.env.write_txn()?;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match indexes.read_line(&mut line) {
|
|
||||||
Ok(0) => break,
|
|
||||||
Ok(_) => {
|
|
||||||
let DumpEntry { uuid, uid } = serde_json::from_str(&line)?;
|
|
||||||
println!("importing {} {}", uid, uuid);
|
|
||||||
db.db.put(&mut txn, &uid, uuid.as_bytes())?;
|
|
||||||
}
|
|
||||||
Err(e) => return Err(e.into()),
|
|
||||||
}
|
|
||||||
|
|
||||||
line.clear();
|
|
||||||
}
|
|
||||||
txn.commit()?;
|
|
||||||
|
|
||||||
db.env.prepare_for_closing().wait();
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl UuidStore for HeedUuidStore {
|
|
||||||
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
|
|
||||||
let this = self.clone();
|
|
||||||
tokio::task::spawn_blocking(move || this.get_uuid(name)).await?
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn delete(&self, uid: String) -> Result<Option<Uuid>> {
|
|
||||||
let this = self.clone();
|
|
||||||
tokio::task::spawn_blocking(move || this.delete(uid)).await?
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn list(&self) -> Result<Vec<(String, Uuid)>> {
|
|
||||||
let this = self.clone();
|
|
||||||
tokio::task::spawn_blocking(move || this.list()).await?
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
|
|
||||||
let this = self.clone();
|
|
||||||
tokio::task::spawn_blocking(move || this.insert(name, uuid)).await?
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
|
||||||
let this = self.clone();
|
|
||||||
tokio::task::spawn_blocking(move || this.snapshot(path)).await?
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_size(&self) -> Result<u64> {
|
|
||||||
self.get_size()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
|
||||||
let this = self.clone();
|
|
||||||
tokio::task::spawn_blocking(move || this.dump(path)).await?
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user