remove anyhow refs & implement missing errors

This commit is contained in:
marin postma 2021-06-14 21:26:35 +02:00
parent c1b6f0e833
commit 58f9974be4
No known key found for this signature in database
GPG key ID: 6088B7721C3E39F9
40 changed files with 707 additions and 668 deletions

View file

@ -10,8 +10,9 @@ use tokio::sync::{mpsc, oneshot, RwLock};
use update_actor::UpdateActorHandle;
use uuid_resolver::UuidResolverHandle;
use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus, DumpTask};
use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask};
use crate::index_controller::{update_actor, uuid_resolver};
use super::error::{DumpActorError, Result};
pub const CONCURRENT_DUMP_MSG: usize = 10;
@ -95,14 +96,14 @@ where
}
}
async fn handle_create_dump(&self, ret: oneshot::Sender<DumpResult<DumpInfo>>) {
async fn handle_create_dump(&self, ret: oneshot::Sender<Result<DumpInfo>>) {
let uid = generate_uid();
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress);
let _lock = match self.lock.try_lock() {
Some(lock) => lock,
None => {
ret.send(Err(DumpError::DumpAlreadyRunning))
ret.send(Err(DumpActorError::DumpAlreadyRunning))
.expect("Dump actor is dead");
return;
}
@ -147,10 +148,10 @@ where
};
}
async fn handle_dump_info(&self, uid: String) -> DumpResult<DumpInfo> {
async fn handle_dump_info(&self, uid: String) -> Result<DumpInfo> {
match self.dump_infos.read().await.get(&uid) {
Some(info) => Ok(info.clone()),
_ => Err(DumpError::DumpDoesNotExist(uid)),
_ => Err(DumpActorError::DumpDoesNotExist(uid)),
}
}
}

View file

@ -0,0 +1,51 @@
use meilisearch_error::{Code, ErrorCode};
use crate::index_controller::{update_actor::error::UpdateActorError, uuid_resolver::UuidResolverError};
pub type Result<T> = std::result::Result<T, DumpActorError>;
#[derive(thiserror::Error, Debug)]
pub enum DumpActorError {
#[error("dump already running")]
DumpAlreadyRunning,
#[error("dump `{0}` does not exist")]
DumpDoesNotExist(String),
#[error("Internal error: {0}")]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("error while dumping uuids: {0}")]
UuidResolver(#[from] UuidResolverError),
#[error("error while dumping updates: {0}")]
UpdateActor(#[from] UpdateActorError),
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for DumpActorError {
fn from(other: $other) -> Self {
Self::Internal(Box::new(other))
}
}
)*
}
}
internal_error!(
heed::Error,
std::io::Error,
tokio::task::JoinError,
serde_json::error::Error,
tempfile::PersistError
);
impl ErrorCode for DumpActorError {
fn error_code(&self) -> Code {
match self {
DumpActorError::DumpAlreadyRunning => Code::DumpAlreadyInProgress,
DumpActorError::DumpDoesNotExist(_) => Code::DocumentNotFound,
DumpActorError::Internal(_) => Code::Internal,
DumpActorError::UuidResolver(e) => e.error_code(),
DumpActorError::UpdateActor(e) => e.error_code(),
}
}
}

View file

@ -3,7 +3,8 @@ use std::path::Path;
use actix_web::web::Bytes;
use tokio::sync::{mpsc, oneshot};
use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg, DumpResult};
use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg};
use super::error::Result;
#[derive(Clone)]
pub struct DumpActorHandleImpl {
@ -12,14 +13,14 @@ pub struct DumpActorHandleImpl {
#[async_trait::async_trait]
impl DumpActorHandle for DumpActorHandleImpl {
async fn create_dump(&self) -> DumpResult<DumpInfo> {
async fn create_dump(&self) -> Result<DumpInfo> {
let (ret, receiver) = oneshot::channel();
let msg = DumpMsg::CreateDump { ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}
async fn dump_info(&self, uid: String) -> DumpResult<DumpInfo> {
async fn dump_info(&self, uid: String) -> Result<DumpInfo> {
let (ret, receiver) = oneshot::channel();
let msg = DumpMsg::DumpInfo { ret, uid };
let _ = self.sender.send(msg).await;
@ -34,7 +35,7 @@ impl DumpActorHandleImpl {
update: crate::index_controller::update_actor::UpdateActorHandleImpl<Bytes>,
index_db_size: usize,
update_db_size: usize,
) -> anyhow::Result<Self> {
) -> std::result::Result<Self, Box<dyn std::error::Error>> {
let (sender, receiver) = mpsc::channel(10);
let actor = DumpActor::new(
receiver,

View file

@ -31,7 +31,7 @@ impl MetadataV1 {
dst: impl AsRef<Path>,
size: usize,
indexer_options: &IndexerOpts,
) -> anyhow::Result<()> {
) -> std::result::Result<(), Box<dyn std::error::Error>> {
info!(
"Loading dump, dump database version: {}, dump version: V1",
self.db_version
@ -83,7 +83,7 @@ fn load_index(
primary_key: Option<&str>,
size: usize,
indexer_options: &IndexerOpts,
) -> anyhow::Result<()> {
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid));
create_dir_all(&index_path)?;
@ -172,7 +172,7 @@ impl From<Settings> for index_controller::Settings<Unchecked> {
}
/// Extract Settings from `settings.json` file present at provided `dir_path`
fn import_settings(dir_path: impl AsRef<Path>) -> anyhow::Result<Settings> {
fn import_settings(dir_path: impl AsRef<Path>) -> std::result::Result<Settings, Box<dyn std::error::Error>> {
let path = dir_path.as_ref().join("settings.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);

View file

@ -34,7 +34,7 @@ impl MetadataV2 {
index_db_size: usize,
update_db_size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
) -> std::result::Result<(), Box<dyn std::error::Error>> {
info!(
"Loading dump from {}, dump database version: {}, dump version: V2",
self.dump_date, self.db_version

View file

@ -1,13 +1,14 @@
use tokio::sync::oneshot;
use super::{DumpInfo, DumpResult};
use super::DumpInfo;
use super::error::Result;
pub enum DumpMsg {
CreateDump {
ret: oneshot::Sender<DumpResult<DumpInfo>>,
ret: oneshot::Sender<Result<DumpInfo>>,
},
DumpInfo {
uid: String,
ret: oneshot::Sender<DumpResult<DumpInfo>>,
ret: oneshot::Sender<Result<DumpInfo>>,
},
}

View file

@ -1,13 +1,11 @@
use std::fs::File;
use std::path::{Path, PathBuf};
use anyhow::Context;
use chrono::{DateTime, Utc};
use log::{error, info, warn};
use log::{info, warn};
#[cfg(test)]
use mockall::automock;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::fs::create_dir_all;
use loaders::v1::MetadataV1;
@ -18,39 +16,28 @@ pub use handle_impl::*;
pub use message::DumpMsg;
use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle};
use crate::index_controller::dump_actor::error::DumpActorError;
use crate::{helpers::compression, option::IndexerOpts};
use error::Result;
mod actor;
mod handle_impl;
mod loaders;
mod message;
pub mod error;
const META_FILE_NAME: &str = "metadata.json";
pub type DumpResult<T> = std::result::Result<T, DumpError>;
#[derive(Error, Debug)]
pub enum DumpError {
#[error("error with index: {0}")]
Error(#[from] anyhow::Error),
#[error("Heed error: {0}")]
HeedError(#[from] heed::Error),
#[error("dump already running")]
DumpAlreadyRunning,
#[error("dump `{0}` does not exist")]
DumpDoesNotExist(String),
}
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
pub trait DumpActorHandle {
/// Start the creation of a dump
/// Implementation: [handle_impl::DumpActorHandleImpl::create_dump]
async fn create_dump(&self) -> DumpResult<DumpInfo>;
async fn create_dump(&self) -> Result<DumpInfo>;
/// Return the status of an already created dump
/// Implementation: [handle_impl::DumpActorHandleImpl::dump_status]
async fn dump_info(&self, uid: String) -> DumpResult<DumpInfo>;
async fn dump_info(&self, uid: String) -> Result<DumpInfo>;
}
#[derive(Debug, Serialize, Deserialize)]
@ -120,7 +107,7 @@ pub fn load_dump(
index_db_size: usize,
update_db_size: usize,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let tmp_src = tempfile::tempdir_in(".")?;
let tmp_src_path = tmp_src.path();
@ -133,7 +120,9 @@ pub fn load_dump(
let dst_dir = dst_path
.as_ref()
.parent()
.with_context(|| format!("Invalid db path: {}", dst_path.as_ref().display()))?;
// TODO
//.with_context(|| format!("Invalid db path: {}", dst_path.as_ref().display()))?;
.unwrap();
let tmp_dst = tempfile::tempdir_in(dst_dir)?;
@ -175,7 +164,7 @@ where
U: UuidResolverHandle + Send + Sync + Clone + 'static,
P: UpdateActorHandle + Send + Sync + Clone + 'static,
{
async fn run(self) -> anyhow::Result<()> {
async fn run(self) -> Result<()> {
info!("Performing dump.");
create_dir_all(&self.path).await?;
@ -196,9 +185,10 @@ where
.dump(uuids, temp_dump_path.clone())
.await?;
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?;
compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?;
compression::to_tar_gz(temp_dump_path, temp_dump_file.path())
.map_err(|e| DumpActorError::Internal(e))?;
let dump_path = self.path.join(self.uid).with_extension("dump");
temp_dump_file.persist(&dump_path)?;

View file

@ -0,0 +1,40 @@
use std::error::Error;
use meilisearch_error::Code;
use meilisearch_error::ErrorCode;
use super::dump_actor::error::DumpActorError;
use super::index_actor::error::IndexActorError;
use super::update_actor::error::UpdateActorError;
use super::uuid_resolver::UuidResolverError;
pub type Result<T> = std::result::Result<T, IndexControllerError>;
#[derive(Debug, thiserror::Error)]
pub enum IndexControllerError {
#[error("Internal error: {0}")]
Internal(Box<dyn Error>),
#[error("Missing index uid")]
MissingUid,
#[error("error resolving index uid: {0}")]
Uuid(#[from] UuidResolverError),
#[error("error with index: {0}")]
IndexActor(#[from] IndexActorError),
#[error("error with update: {0}")]
UpdateActor(#[from] UpdateActorError),
#[error("error with dump: {0}")]
DumpActor(#[from] DumpActorError),
}
impl ErrorCode for IndexControllerError {
fn error_code(&self) -> Code {
match self {
IndexControllerError::Internal(_) => Code::Internal,
IndexControllerError::MissingUid => Code::InvalidIndexUid,
IndexControllerError::Uuid(e) => e.error_code(),
IndexControllerError::IndexActor(e) => e.error_code(),
IndexControllerError::UpdateActor(e) => e.error_code(),
IndexControllerError::DumpActor(e) => e.error_code(),
}
}
}

View file

@ -19,7 +19,8 @@ use crate::index_controller::{
};
use crate::option::IndexerOpts;
use super::{IndexError, IndexMeta, IndexMsg, IndexResult, IndexSettings, IndexStore};
use super::{IndexMeta, IndexMsg, IndexSettings, IndexStore};
use super::error::{Result, IndexActorError};
pub const CONCURRENT_INDEX_MSG: usize = 10;
@ -30,7 +31,7 @@ pub struct IndexActor<S> {
}
impl<S: IndexStore + Sync + Send> IndexActor<S> {
pub fn new(receiver: mpsc::Receiver<IndexMsg>, store: S) -> IndexResult<Self> {
pub fn new(receiver: mpsc::Receiver<IndexMsg>, store: S) -> std::result::Result<Self, Box<dyn std::error::Error>> {
let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options)?;
let update_handler = Arc::new(update_handler);
@ -137,20 +138,22 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
}
}
async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result<SearchResult> {
async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || index.perform_search(query)).await?
.ok_or(IndexActorError::UnexistingIndex)?;
let result = spawn_blocking(move || index.perform_search(query)).await??;
Ok(result)
}
async fn handle_create_index(
&self,
uuid: Uuid,
primary_key: Option<String>,
) -> IndexResult<IndexMeta> {
) -> Result<IndexMeta> {
let index = self.store.create(uuid, primary_key).await?;
let meta = spawn_blocking(move || IndexMeta::new(&index)).await??;
Ok(meta)
@ -161,7 +164,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
uuid: Uuid,
meta: Processing,
data: Option<File>,
) -> IndexResult<Result<Processed, Failed>> {
) -> Result<std::result::Result<Processed, Failed>> {
debug!("Processing update {}", meta.id());
let update_handler = self.update_handler.clone();
let index = match self.store.get(uuid).await? {
@ -172,12 +175,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?)
}
async fn handle_settings(&self, uuid: Uuid) -> IndexResult<Settings<Checked>> {
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings<Checked>> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
.ok_or(IndexActorError::UnexistingIndex)?;
let result = spawn_blocking(move || index.settings()).await??;
Ok(result)
}
@ -188,12 +191,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> IndexResult<Vec<Document>> {
) -> Result<Vec<Document>> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
.ok_or(IndexActorError::UnexistingIndex)?;
let result =
spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve))
.await??;
@ -206,12 +209,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> IndexResult<Document> {
) -> Result<Document> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
.ok_or(IndexActorError::UnexistingIndex)?;
let result =
spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve))
@ -220,7 +223,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(result)
}
async fn handle_delete(&self, uuid: Uuid) -> IndexResult<()> {
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let index = self.store.delete(uuid).await?;
if let Some(index) = index {
@ -237,13 +240,13 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(())
}
async fn handle_get_meta(&self, uuid: Uuid) -> IndexResult<IndexMeta> {
async fn handle_get_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
match self.store.get(uuid).await? {
Some(index) => {
let meta = spawn_blocking(move || IndexMeta::new(&index)).await??;
Ok(meta)
}
None => Err(IndexError::UnexistingIndex),
None => Err(IndexActorError::UnexistingIndex),
}
}
@ -251,23 +254,23 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> IndexResult<IndexMeta> {
) -> Result<IndexMeta> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
.ok_or(IndexActorError::UnexistingIndex)?;
let result = spawn_blocking(move || match index_settings.primary_key {
Some(primary_key) => {
let mut txn = index.write_txn()?;
if index.primary_key(&txn)?.is_some() {
return Err(IndexError::ExistingPrimaryKey);
return Err(IndexActorError::ExistingPrimaryKey);
}
let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index);
builder.set_primary_key(primary_key);
builder.execute(|_, _| ())
.map_err(|e| IndexError::Internal(e.to_string()))?;
.map_err(|e| IndexActorError::Internal(Box::new(e)))?;
let meta = IndexMeta::new_txn(&index, &txn)?;
txn.commit()?;
Ok(meta)
@ -282,7 +285,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(result)
}
async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> IndexResult<()> {
async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> {
use tokio::fs::create_dir_all;
path.push("indexes");
@ -294,7 +297,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
create_dir_all(&index_path).await?;
index_path.push("data.mdb");
spawn_blocking(move || -> anyhow::Result<()> {
spawn_blocking(move || -> Result<()> {
// Get write txn to wait for ongoing write transaction before snapshot.
let _txn = index.write_txn()?;
index
@ -310,12 +313,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
/// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the
/// documents and all the settings.
async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
.ok_or(IndexActorError::UnexistingIndex)?;
let path = path.join(format!("indexes/index-{}/", uuid));
fs::create_dir_all(&path).await?;
@ -325,12 +328,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(())
}
async fn handle_get_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
async fn handle_get_stats(&self, uuid: Uuid) -> Result<IndexStats> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
.ok_or(IndexActorError::UnexistingIndex)?;
spawn_blocking(move || {
let rtxn = index.read_txn()?;
@ -338,9 +341,10 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(IndexStats {
size: index.size(),
number_of_documents: index.number_of_documents(&rtxn)
.map_err(|e| IndexError::Internal(e.to_string()))?,
.map_err(|e| IndexActorError::Internal(Box::new(e)))?,
is_indexing: None,
fields_distribution: index.fields_distribution(&rtxn)?,
fields_distribution: index.fields_distribution(&rtxn)
.map_err(|e| IndexActorError::Internal(e.into()))?,
})
})
.await?

View file

@ -0,0 +1,49 @@
use meilisearch_error::{Code, ErrorCode};
use crate::index::error::IndexError;
pub type Result<T> = std::result::Result<T, IndexActorError>;
#[derive(thiserror::Error, Debug)]
pub enum IndexActorError {
#[error("index error: {0}")]
IndexError(#[from] IndexError),
#[error("index already exists")]
IndexAlreadyExists,
#[error("Index doesn't exists")]
UnexistingIndex,
#[error("Existing primary key")]
ExistingPrimaryKey,
#[error("Internal Index Error: {0}")]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for IndexActorError {
fn from(other: $other) -> Self {
Self::Internal(Box::new(other))
}
}
)*
}
}
internal_error!(
heed::Error,
tokio::task::JoinError,
std::io::Error
);
impl ErrorCode for IndexActorError {
fn error_code(&self) -> Code {
match self {
IndexActorError::IndexError(e) => e.error_code(),
IndexActorError::IndexAlreadyExists => Code::IndexAlreadyExists,
IndexActorError::UnexistingIndex => Code::IndexNotFound,
IndexActorError::ExistingPrimaryKey => Code::PrimaryKeyAlreadyPresent,
IndexActorError::Internal(_) => Code::Internal,
}
}
}

View file

@ -12,7 +12,8 @@ use crate::{
index_controller::{Failed, Processed},
};
use super::{IndexActor, IndexActorHandle, IndexMeta, IndexMsg, IndexResult, MapIndexStore};
use super::{IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore};
use super::error::Result;
#[derive(Clone)]
pub struct IndexActorHandleImpl {
@ -25,7 +26,7 @@ impl IndexActorHandle for IndexActorHandleImpl {
&self,
uuid: Uuid,
primary_key: Option<String>,
) -> IndexResult<IndexMeta> {
) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::CreateIndex {
ret,
@ -41,7 +42,7 @@ impl IndexActorHandle for IndexActorHandleImpl {
uuid: Uuid,
meta: Processing,
data: Option<std::fs::File>,
) -> anyhow::Result<Result<Processed, Failed>> {
) -> Result<std::result::Result<Processed, Failed>> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Update {
ret,
@ -53,14 +54,14 @@ impl IndexActorHandle for IndexActorHandleImpl {
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult<SearchResult> {
async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Search { uuid, query, ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn settings(&self, uuid: Uuid) -> IndexResult<Settings<Checked>> {
async fn settings(&self, uuid: Uuid) -> Result<Settings<Checked>> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Settings { uuid, ret };
let _ = self.sender.send(msg).await;
@ -73,7 +74,7 @@ impl IndexActorHandle for IndexActorHandleImpl {
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> IndexResult<Vec<Document>> {
) -> Result<Vec<Document>> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Documents {
uuid,
@ -91,7 +92,7 @@ impl IndexActorHandle for IndexActorHandleImpl {
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> IndexResult<Document> {
) -> Result<Document> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Document {
uuid,
@ -103,14 +104,14 @@ impl IndexActorHandle for IndexActorHandleImpl {
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn delete(&self, uuid: Uuid) -> IndexResult<()> {
async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Delete { uuid, ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn get_index_meta(&self, uuid: Uuid) -> IndexResult<IndexMeta> {
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetMeta { uuid, ret };
let _ = self.sender.send(msg).await;
@ -121,7 +122,7 @@ impl IndexActorHandle for IndexActorHandleImpl {
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> IndexResult<IndexMeta> {
) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::UpdateIndex {
uuid,
@ -132,21 +133,21 @@ impl IndexActorHandle for IndexActorHandleImpl {
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Snapshot { uuid, path, ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Dump { uuid, path, ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetStats { uuid, ret };
let _ = self.sender.send(msg).await;
@ -155,7 +156,7 @@ impl IndexActorHandle for IndexActorHandleImpl {
}
impl IndexActorHandleImpl {
pub fn new(path: impl AsRef<Path>, index_size: usize) -> anyhow::Result<Self> {
pub fn new(path: impl AsRef<Path>, index_size: usize) -> std::result::Result<Self, Box<dyn std::error::Error>> {
let (sender, receiver) = mpsc::channel(100);
let store = MapIndexStore::new(path, index_size);

View file

@ -5,8 +5,9 @@ use uuid::Uuid;
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::{Failed, IndexStats, Processed, Processing};
use super::error::Result as IndexResult;
use super::{IndexMeta, IndexResult, IndexSettings};
use super::{IndexMeta, IndexSettings};
#[allow(clippy::large_enum_variant)]
pub enum IndexMsg {
@ -24,7 +25,7 @@ pub enum IndexMsg {
Search {
uuid: Uuid,
query: SearchQuery,
ret: oneshot::Sender<anyhow::Result<SearchResult>>,
ret: oneshot::Sender<IndexResult<SearchResult>>,
},
Settings {
uuid: Uuid,

View file

@ -5,7 +5,6 @@ use chrono::{DateTime, Utc};
#[cfg(test)]
use mockall::automock;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;
use actor::IndexActor;
@ -16,6 +15,9 @@ use store::{IndexStore, MapIndexStore};
use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings};
use crate::index_controller::{Failed, IndexStats, Processed, Processing};
use error::Result;
use self::error::IndexActorError;
use super::IndexSettings;
@ -23,8 +25,7 @@ mod actor;
mod handle_impl;
mod message;
mod store;
pub type IndexResult<T> = std::result::Result<T, IndexError>;
pub mod error;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
@ -35,18 +36,18 @@ pub struct IndexMeta {
}
impl IndexMeta {
fn new(index: &Index) -> IndexResult<Self> {
fn new(index: &Index) -> Result<Self> {
let txn = index.read_txn()?;
Self::new_txn(index, &txn)
}
fn new_txn(index: &Index, txn: &heed::RoTxn) -> IndexResult<Self> {
fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> {
let created_at = index
.created_at(&txn)
.map_err(|e| IndexError::Internal(e.to_string()))?;
.map_err(|e| IndexActorError::Internal(Box::new(e)))?;
let updated_at = index
.updated_at(&txn)
.map_err(|e| IndexError::Internal(e.to_string()))?;
.map_err(|e| IndexActorError::Internal(Box::new(e)))?;
let primary_key = index.primary_key(&txn)?.map(String::from);
Ok(Self {
created_at,
@ -56,50 +57,19 @@ impl IndexMeta {
}
}
#[derive(Error, Debug)]
pub enum IndexError {
#[error("index already exists")]
IndexAlreadyExists,
#[error("Index doesn't exists")]
UnexistingIndex,
#[error("Existing primary key")]
ExistingPrimaryKey,
#[error("Internal Index Error: {0}")]
Internal(String),
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for IndexError {
fn from(other: $other) -> Self {
Self::Internal(other.to_string())
}
}
)*
}
}
internal_error!(
anyhow::Error,
heed::Error,
tokio::task::JoinError,
std::io::Error
);
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
pub trait IndexActorHandle {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>)
-> IndexResult<IndexMeta>;
-> Result<IndexMeta>;
async fn update(
&self,
uuid: Uuid,
meta: Processing,
data: Option<File>,
) -> anyhow::Result<Result<Processed, Failed>>;
async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult<SearchResult>;
async fn settings(&self, uuid: Uuid) -> IndexResult<Settings<Checked>>;
) -> Result<std::result::Result<Processed, Failed>>;
async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult>;
async fn settings(&self, uuid: Uuid) -> Result<Settings<Checked>>;
async fn documents(
&self,
@ -107,23 +77,23 @@ pub trait IndexActorHandle {
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> IndexResult<Vec<Document>>;
) -> Result<Vec<Document>>;
async fn document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> IndexResult<Document>;
async fn delete(&self, uuid: Uuid) -> IndexResult<()>;
async fn get_index_meta(&self, uuid: Uuid) -> IndexResult<IndexMeta>;
) -> Result<Document>;
async fn delete(&self, uuid: Uuid) -> Result<()>;
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta>;
async fn update_index(
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> IndexResult<IndexMeta>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>;
async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>;
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats>;
) -> Result<IndexMeta>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats>;
}
#[cfg(test)]
@ -139,7 +109,7 @@ mod test {
&self,
uuid: Uuid,
primary_key: Option<String>,
) -> IndexResult<IndexMeta> {
) -> Result<IndexMeta> {
self.as_ref().create_index(uuid, primary_key).await
}
@ -148,15 +118,15 @@ mod test {
uuid: Uuid,
meta: Processing,
data: Option<std::fs::File>,
) -> anyhow::Result<Result<Processed, Failed>> {
) -> Result<std::result::Result<Processed, Failed>> {
self.as_ref().update(uuid, meta, data).await
}
async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult<SearchResult> {
async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
self.as_ref().search(uuid, query).await
}
async fn settings(&self, uuid: Uuid) -> IndexResult<Settings<Checked>> {
async fn settings(&self, uuid: Uuid) -> Result<Settings<Checked>> {
self.as_ref().settings(uuid).await
}
@ -166,7 +136,7 @@ mod test {
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> IndexResult<Vec<Document>> {
) -> Result<Vec<Document>> {
self.as_ref()
.documents(uuid, offset, limit, attributes_to_retrieve)
.await
@ -177,17 +147,17 @@ mod test {
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> IndexResult<Document> {
) -> Result<Document> {
self.as_ref()
.document(uuid, doc_id, attributes_to_retrieve)
.await
}
async fn delete(&self, uuid: Uuid) -> IndexResult<()> {
async fn delete(&self, uuid: Uuid) -> Result<()> {
self.as_ref().delete(uuid).await
}
async fn get_index_meta(&self, uuid: Uuid) -> IndexResult<IndexMeta> {
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
self.as_ref().get_index_meta(uuid).await
}
@ -195,19 +165,19 @@ mod test {
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> IndexResult<IndexMeta> {
) -> Result<IndexMeta> {
self.as_ref().update_index(uuid, index_settings).await
}
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
self.as_ref().snapshot(uuid, path).await
}
async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
self.as_ref().dump(uuid, path).await
}
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats> {
self.as_ref().get_index_stats(uuid).await
}
}

View file

@ -8,16 +8,16 @@ use tokio::sync::RwLock;
use tokio::task::spawn_blocking;
use uuid::Uuid;
use super::{IndexError, IndexResult};
use super::error::{IndexActorError, Result};
use crate::index::Index;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
#[async_trait::async_trait]
pub trait IndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> IndexResult<Index>;
async fn get(&self, uuid: Uuid) -> IndexResult<Option<Index>>;
async fn delete(&self, uuid: Uuid) -> IndexResult<Option<Index>>;
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>>;
}
pub struct MapIndexStore {
@ -40,7 +40,7 @@ impl MapIndexStore {
#[async_trait::async_trait]
impl IndexStore for MapIndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> IndexResult<Index> {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index> {
// We need to keep the lock until we are sure the db file has been opened correclty, to
// ensure that another db is not created at the same time.
let mut lock = self.index_store.write().await;
@ -50,11 +50,11 @@ impl IndexStore for MapIndexStore {
}
let path = self.path.join(format!("index-{}", uuid));
if path.exists() {
return Err(IndexError::IndexAlreadyExists);
return Err(IndexActorError::IndexAlreadyExists);
}
let index_size = self.index_size;
let index = spawn_blocking(move || -> IndexResult<Index> {
let index = spawn_blocking(move || -> Result<Index> {
let index = Index::open(path, index_size)?;
if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?;
@ -62,7 +62,7 @@ impl IndexStore for MapIndexStore {
let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index);
builder.set_primary_key(primary_key);
builder.execute(|_, _| ())
.map_err(|e| IndexError::Internal(e.to_string()))?;
.map_err(|e| IndexActorError::Internal(Box::new(e)))?;
txn.commit()?;
}
@ -75,7 +75,7 @@ impl IndexStore for MapIndexStore {
Ok(index)
}
async fn get(&self, uuid: Uuid) -> IndexResult<Option<Index>> {
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
let guard = self.index_store.read().await;
match guard.get(&uuid) {
Some(index) => Ok(Some(index.clone())),
@ -95,7 +95,7 @@ impl IndexStore for MapIndexStore {
}
}
async fn delete(&self, uuid: Uuid) -> IndexResult<Option<Index>> {
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>> {
let db_path = self.path.join(format!("index-{}", uuid));
fs::remove_dir_all(db_path).await?;
let index = self.index_store.write().await.remove(&uuid);

View file

@ -4,7 +4,6 @@ use std::sync::Arc;
use std::time::Duration;
use actix_web::web::{Bytes, Payload};
use anyhow::bail;
use chrono::{DateTime, Utc};
use futures::stream::StreamExt;
use log::info;
@ -24,8 +23,10 @@ use uuid_resolver::{UuidResolverError, UuidResolverHandle};
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::option::Opt;
use error::Result;
use self::dump_actor::load_dump;
use self::error::IndexControllerError;
mod dump_actor;
mod index_actor;
@ -33,6 +34,7 @@ mod snapshot;
mod update_actor;
mod updates;
mod uuid_resolver;
pub mod error;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
@ -81,7 +83,7 @@ pub struct Stats {
}
impl IndexController {
pub fn new(path: impl AsRef<Path>, options: &Opt) -> anyhow::Result<Self> {
pub fn new(path: impl AsRef<Path>, options: &Opt) -> std::result::Result<Self, Box<dyn std::error::Error>> {
let index_size = options.max_mdb_size.get_bytes() as usize;
let update_store_size = options.max_udb_size.get_bytes() as usize;
@ -151,7 +153,7 @@ impl IndexController {
format: milli::update::UpdateFormat,
payload: Payload,
primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus> {
) -> Result<UpdateStatus> {
let perform_update = |uuid| async move {
let meta = UpdateMeta::DocumentsAddition {
method,
@ -189,7 +191,7 @@ impl IndexController {
}
}
pub async fn clear_documents(&self, uid: String) -> anyhow::Result<UpdateStatus> {
pub async fn clear_documents(&self, uid: String) -> Result<UpdateStatus> {
let uuid = self.uuid_resolver.get(uid).await?;
let meta = UpdateMeta::ClearDocuments;
let (_, receiver) = mpsc::channel(1);
@ -201,7 +203,7 @@ impl IndexController {
&self,
uid: String,
documents: Vec<String>,
) -> anyhow::Result<UpdateStatus> {
) -> Result<UpdateStatus> {
let uuid = self.uuid_resolver.get(uid).await?;
let meta = UpdateMeta::DeleteDocuments { ids: documents };
let (_, receiver) = mpsc::channel(1);
@ -214,7 +216,7 @@ impl IndexController {
uid: String,
settings: Settings<Checked>,
create: bool,
) -> anyhow::Result<UpdateStatus> {
) -> Result<UpdateStatus> {
let perform_udpate = |uuid| async move {
let meta = UpdateMeta::Settings(settings.into_unchecked());
// Nothing so send, drop the sender right away, as not to block the update actor.
@ -239,9 +241,9 @@ impl IndexController {
pub async fn create_index(
&self,
index_settings: IndexSettings,
) -> anyhow::Result<IndexMetadata> {
) -> Result<IndexMetadata> {
let IndexSettings { uid, primary_key } = index_settings;
let uid = uid.ok_or_else(|| anyhow::anyhow!("Can't create an index without a uid."))?;
let uid = uid.ok_or(IndexControllerError::MissingUid)?;
let uuid = Uuid::new_v4();
let meta = self.index_handle.create_index(uuid, primary_key).await?;
self.uuid_resolver.insert(uid.clone(), uuid).await?;
@ -255,26 +257,26 @@ impl IndexController {
Ok(meta)
}
pub async fn delete_index(&self, uid: String) -> anyhow::Result<()> {
pub async fn delete_index(&self, uid: String) -> Result<()> {
let uuid = self.uuid_resolver.delete(uid).await?;
self.update_handle.delete(uuid).await?;
self.index_handle.delete(uuid).await?;
Ok(())
}
pub async fn update_status(&self, uid: String, id: u64) -> anyhow::Result<UpdateStatus> {
pub async fn update_status(&self, uid: String, id: u64) -> Result<UpdateStatus> {
let uuid = self.uuid_resolver.get(uid).await?;
let result = self.update_handle.update_status(uuid, id).await?;
Ok(result)
}
pub async fn all_update_status(&self, uid: String) -> anyhow::Result<Vec<UpdateStatus>> {
pub async fn all_update_status(&self, uid: String) -> Result<Vec<UpdateStatus>> {
let uuid = self.uuid_resolver.get(uid).await?;
let result = self.update_handle.get_all_updates_status(uuid).await?;
Ok(result)
}
pub async fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
pub async fn list_indexes(&self) -> Result<Vec<IndexMetadata>> {
let uuids = self.uuid_resolver.list().await?;
let mut ret = Vec::new();
@ -293,7 +295,7 @@ impl IndexController {
Ok(ret)
}
pub async fn settings(&self, uid: String) -> anyhow::Result<Settings<Checked>> {
pub async fn settings(&self, uid: String) -> Result<Settings<Checked>> {
let uuid = self.uuid_resolver.get(uid.clone()).await?;
let settings = self.index_handle.settings(uuid).await?;
Ok(settings)
@ -305,7 +307,7 @@ impl IndexController {
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> anyhow::Result<Vec<Document>> {
) -> Result<Vec<Document>> {
let uuid = self.uuid_resolver.get(uid.clone()).await?;
let documents = self
.index_handle
@ -319,7 +321,7 @@ impl IndexController {
uid: String,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> anyhow::Result<Document> {
) -> Result<Document> {
let uuid = self.uuid_resolver.get(uid.clone()).await?;
let document = self
.index_handle
@ -332,9 +334,9 @@ impl IndexController {
&self,
uid: String,
index_settings: IndexSettings,
) -> anyhow::Result<IndexMetadata> {
) -> Result<IndexMetadata> {
if index_settings.uid.is_some() {
bail!("Can't change the index uid.")
todo!("Can't change the index uid.")
}
let uuid = self.uuid_resolver.get(uid.clone()).await?;
@ -348,13 +350,13 @@ impl IndexController {
Ok(meta)
}
pub async fn search(&self, uid: String, query: SearchQuery) -> anyhow::Result<SearchResult> {
pub async fn search(&self, uid: String, query: SearchQuery) -> Result<SearchResult> {
let uuid = self.uuid_resolver.get(uid).await?;
let result = self.index_handle.search(uuid, query).await?;
Ok(result)
}
pub async fn get_index(&self, uid: String) -> anyhow::Result<IndexMetadata> {
pub async fn get_index(&self, uid: String) -> Result<IndexMetadata> {
let uuid = self.uuid_resolver.get(uid.clone()).await?;
let meta = self.index_handle.get_index_meta(uuid).await?;
let meta = IndexMetadata {
@ -366,11 +368,11 @@ impl IndexController {
Ok(meta)
}
pub async fn get_uuids_size(&self) -> anyhow::Result<u64> {
pub async fn get_uuids_size(&self) -> Result<u64> {
Ok(self.uuid_resolver.get_size().await?)
}
pub async fn get_index_stats(&self, uid: String) -> anyhow::Result<IndexStats> {
pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> {
let uuid = self.uuid_resolver.get(uid).await?;
let update_infos = self.update_handle.get_info().await?;
let mut stats = self.index_handle.get_index_stats(uuid).await?;
@ -379,7 +381,7 @@ impl IndexController {
Ok(stats)
}
pub async fn get_all_stats(&self) -> anyhow::Result<Stats> {
pub async fn get_all_stats(&self) -> Result<Stats> {
let update_infos = self.update_handle.get_info().await?;
let mut database_size = self.get_uuids_size().await? + update_infos.size;
let mut last_update: Option<DateTime<_>> = None;
@ -405,11 +407,11 @@ impl IndexController {
})
}
pub async fn create_dump(&self) -> anyhow::Result<DumpInfo> {
pub async fn create_dump(&self) -> Result<DumpInfo> {
Ok(self.dump_handle.create_dump().await?)
}
pub async fn dump_info(&self, uid: String) -> anyhow::Result<DumpInfo> {
pub async fn dump_info(&self, uid: String) -> Result<DumpInfo> {
Ok(self.dump_handle.dump_info(uid).await?)
}
}

View file

@ -1,7 +1,6 @@
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::bail;
use log::{error, info};
use tokio::fs;
use tokio::task::spawn_blocking;
@ -53,7 +52,7 @@ where
}
}
async fn perform_snapshot(&self) -> anyhow::Result<()> {
async fn perform_snapshot(&self) -> std::result::Result<(), Box<dyn std::error::Error + Sync + Send + 'static>> {
info!("Performing snapshot.");
let snapshot_dir = self.snapshot_path.clone();
@ -78,7 +77,7 @@ where
let snapshot_path = self
.snapshot_path
.join(format!("{}.snapshot", self.db_name));
let snapshot_path = spawn_blocking(move || -> anyhow::Result<PathBuf> {
let snapshot_path = spawn_blocking(move || -> Result<PathBuf, Box<dyn std::error::Error + Sync + Send + 'static>> {
let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?;
let temp_snapshot_file_path = temp_snapshot_file.path().to_owned();
compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?;
@ -98,7 +97,7 @@ pub fn load_snapshot(
snapshot_path: impl AsRef<Path>,
ignore_snapshot_if_db_exists: bool,
ignore_missing_snapshot: bool,
) -> anyhow::Result<()> {
) -> std::result::Result<(), Box<dyn std::error::Error>> {
if !db_path.as_ref().exists() && snapshot_path.as_ref().exists() {
match compression::from_tar_gz(snapshot_path, &db_path) {
Ok(()) => Ok(()),
@ -109,7 +108,7 @@ pub fn load_snapshot(
}
}
} else if db_path.as_ref().exists() && !ignore_snapshot_if_db_exists {
bail!(
todo!(
"database already exists at {:?}, try to delete it or rename it",
db_path
.as_ref()
@ -117,7 +116,7 @@ pub fn load_snapshot(
.unwrap_or_else(|_| db_path.as_ref().to_owned())
)
} else if !snapshot_path.as_ref().exists() && !ignore_missing_snapshot {
bail!(
todo!(
"snapshot doesn't exist at {:?}",
snapshot_path
.as_ref()
@ -142,7 +141,7 @@ mod test {
use super::*;
use crate::index_controller::index_actor::MockIndexActorHandle;
use crate::index_controller::update_actor::{
MockUpdateActorHandle, UpdateActorHandleImpl, UpdateError,
MockUpdateActorHandle, UpdateActorHandleImpl, error::UpdateActorError,
};
use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidResolverError};
@ -224,7 +223,7 @@ mod test {
update_handle
.expect_snapshot()
// abitrary error
.returning(|_, _| Box::pin(err(UpdateError::UnexistingUpdate(0))));
.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0))));
let snapshot_path = tempfile::tempdir_in(".").unwrap();
let snapshot_service = SnapshotService::new(

View file

@ -13,7 +13,8 @@ use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
use uuid::Uuid;
use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo};
use super::{PayloadData, UpdateMsg, UpdateStore, UpdateStoreInfo};
use super::error::{Result, UpdateActorError};
use crate::index_controller::index_actor::IndexActorHandle;
use crate::index_controller::{UpdateMeta, UpdateStatus};
@ -35,7 +36,7 @@ where
inbox: mpsc::Receiver<UpdateMsg<D>>,
path: impl AsRef<Path>,
index_handle: I,
) -> anyhow::Result<Self> {
) -> std::result::Result<Self, Box<dyn std::error::Error>> {
let path = path.as_ref().join("updates");
std::fs::create_dir_all(&path)?;
@ -202,7 +203,7 @@ where
tokio::task::spawn_blocking(move || {
let result = store
.meta(uuid, id)?
.ok_or(UpdateError::UnexistingUpdate(id))?;
.ok_or(UpdateActorError::UnexistingUpdate(id))?;
Ok(result)
})
.await?
@ -230,7 +231,7 @@ where
let index_handle = self.index_handle.clone();
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
tokio::task::spawn_blocking(move || -> Result<()> {
update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
Ok(())
})
@ -241,7 +242,7 @@ where
async fn handle_get_info(&self) -> Result<UpdateStoreInfo> {
let update_store = self.store.clone();
let info = tokio::task::spawn_blocking(move || -> anyhow::Result<UpdateStoreInfo> {
let info = tokio::task::spawn_blocking(move || -> Result<UpdateStoreInfo> {
let info = update_store.get_info()?;
Ok(info)
})

View file

@ -0,0 +1,64 @@
use std::error::Error;
use meilisearch_error::{Code, ErrorCode};
use crate::index_controller::index_actor::error::IndexActorError;
pub type Result<T> = std::result::Result<T, UpdateActorError>;
#[derive(Debug, thiserror::Error)]
pub enum UpdateActorError {
#[error("Update {0} doesn't exist.")]
UnexistingUpdate(u64),
#[error("Internal error processing update: {0}")]
Internal(Box<dyn Error + Send + Sync + 'static>),
#[error("error with index: {0}")]
IndexActor(#[from] IndexActorError),
#[error(
"Update store was shut down due to a fatal error, please check your logs for more info."
)]
FatalUpdateStoreError,
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for UpdateActorError {
fn from(other: $other) -> Self {
Self::Internal(Box::new(other))
}
}
)*
}
}
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateActorError {
fn from(_: tokio::sync::mpsc::error::SendError<T>) -> Self {
Self::FatalUpdateStoreError
}
}
impl From<tokio::sync::oneshot::error::RecvError> for UpdateActorError {
fn from(_: tokio::sync::oneshot::error::RecvError) -> Self {
Self::FatalUpdateStoreError
}
}
internal_error!(
heed::Error,
std::io::Error,
serde_json::Error,
actix_http::error::PayloadError,
tokio::task::JoinError
);
impl ErrorCode for UpdateActorError {
fn error_code(&self) -> Code {
match self {
UpdateActorError::UnexistingUpdate(_) => Code::NotFound,
UpdateActorError::Internal(_) => Code::Internal,
UpdateActorError::IndexActor(e) => e.error_code(),
UpdateActorError::FatalUpdateStoreError => Code::Internal,
}
}
}

View file

@ -6,10 +6,8 @@ use uuid::Uuid;
use crate::index_controller::{IndexActorHandle, UpdateStatus};
use super::{
PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateError, UpdateMeta, UpdateMsg,
UpdateStoreInfo,
};
use super::error::Result;
use super::{PayloadData, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStoreInfo};
#[derive(Clone)]
pub struct UpdateActorHandleImpl<D> {
@ -24,7 +22,7 @@ where
index_handle: I,
path: impl AsRef<Path>,
update_store_size: usize,
) -> anyhow::Result<Self>
) -> std::result::Result<Self, Box<dyn std::error::Error>>
where
I: IndexActorHandle + Clone + Send + Sync + 'static,
{
@ -48,72 +46,42 @@ where
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::ListUpdates { uuid, ret };
self.sender
.send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
self.sender.send(msg).await?;
receiver.await?
}
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetUpdate { uuid, id, ret };
self.sender
.send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
self.sender.send(msg).await?;
receiver.await?
}
async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Delete { uuid, ret };
self.sender
.send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
self.sender.send(msg).await?;
receiver.await?
}
async fn snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Snapshot { uuids, path, ret };
self.sender
.send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
self.sender.send(msg).await?;
receiver.await?
}
async fn dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Dump { uuids, path, ret };
self.sender
.send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
self.sender.send(msg).await?;
receiver.await?
}
async fn get_info(&self) -> Result<UpdateStoreInfo> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetInfo { ret };
self.sender
.send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
self.sender.send(msg).await?;
receiver.await?
}
async fn update(
@ -129,12 +97,7 @@ where
meta,
ret,
};
self.sender
.send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
self.sender.send(msg).await?;
receiver.await?
}
}

View file

@ -4,7 +4,8 @@ use std::path::PathBuf;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use super::{PayloadData, Result, UpdateMeta, UpdateStatus, UpdateStoreInfo};
use super::{PayloadData, UpdateMeta, UpdateStatus, UpdateStoreInfo};
use super::error::Result;
pub enum UpdateMsg<D> {
Update {

View file

@ -1,12 +1,6 @@
mod actor;
mod handle_impl;
mod message;
pub mod store;
use std::{collections::HashSet, path::PathBuf};
use actix_http::error::PayloadError;
use thiserror::Error;
use tokio::sync::mpsc;
use uuid::Uuid;
@ -14,49 +8,22 @@ use crate::index_controller::{UpdateMeta, UpdateStatus};
use actor::UpdateActor;
use message::UpdateMsg;
use error::Result;
pub use handle_impl::UpdateActorHandleImpl;
pub use store::{UpdateStore, UpdateStoreInfo};
pub type Result<T> = std::result::Result<T, UpdateError>;
mod actor;
mod handle_impl;
mod message;
pub mod error;
pub mod store;
type PayloadData<D> = std::result::Result<D, PayloadError>;
#[cfg(test)]
use mockall::automock;
#[derive(Debug, Error)]
pub enum UpdateError {
#[error("Update {0} doesn't exist.")]
UnexistingUpdate(u64),
#[error("Internal error processing update: {0}")]
Internal(String),
#[error(
"Update store was shut down due to a fatal error, please check your logs for more info."
)]
FatalUpdateStoreError,
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for UpdateError {
fn from(other: $other) -> Self {
Self::Internal(other.to_string())
}
}
)*
}
}
internal_error!(
heed::Error,
std::io::Error,
serde_json::Error,
PayloadError,
tokio::task::JoinError,
anyhow::Error
);
#[async_trait::async_trait]
#[cfg_attr(test, automock(type Data=Vec<u8>;))]
pub trait UpdateActorHandle {

View file

@ -9,7 +9,7 @@ use heed::{EnvOpenOptions, RoTxn};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::{State, UpdateStore};
use super::{State, UpdateStore, Result};
use crate::index_controller::{
index_actor::IndexActorHandle, update_actor::store::update_uuid_to_file_path, Enqueued,
UpdateStatus,
@ -27,7 +27,7 @@ impl UpdateStore {
uuids: &HashSet<Uuid>,
path: PathBuf,
handle: impl IndexActorHandle,
) -> anyhow::Result<()> {
) -> Result<()> {
let state_lock = self.state.write();
state_lock.swap(State::Dumping);
@ -52,7 +52,7 @@ impl UpdateStore {
txn: &RoTxn,
uuids: &HashSet<Uuid>,
path: impl AsRef<Path>,
) -> anyhow::Result<()> {
) -> Result<()> {
let dump_data_path = path.as_ref().join("data.jsonl");
let mut dump_data_file = File::create(dump_data_path)?;
@ -71,7 +71,7 @@ impl UpdateStore {
uuids: &HashSet<Uuid>,
mut file: &mut File,
dst_path: impl AsRef<Path>,
) -> anyhow::Result<()> {
) -> Result<()> {
let pendings = self.pending_queue.iter(txn)?.lazily_decode_data();
for pending in pendings {
@ -103,7 +103,7 @@ impl UpdateStore {
txn: &RoTxn,
uuids: &HashSet<Uuid>,
mut file: &mut File,
) -> anyhow::Result<()> {
) -> Result<()> {
let updates = self.updates.iter(txn)?.lazily_decode_data();
for update in updates {
@ -125,7 +125,7 @@ impl UpdateStore {
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
db_size: usize,
) -> anyhow::Result<()> {
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let dst_update_path = dst.as_ref().join("updates/");
create_dir_all(&dst_update_path)?;
@ -175,7 +175,7 @@ async fn dump_indexes(
uuids: &HashSet<Uuid>,
handle: impl IndexActorHandle,
path: impl AsRef<Path>,
) -> anyhow::Result<()> {
) -> Result<()> {
for uuid in uuids {
handle.dump(*uuid, path.as_ref().to_owned()).await?;
}

View file

@ -24,8 +24,9 @@ use uuid::Uuid;
use codec::*;
use super::UpdateMeta;
use super::error::Result;
use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle};
use crate::{helpers::EnvSizer, index_controller::index_actor::IndexResult};
use crate::helpers::EnvSizer;
#[allow(clippy::upper_case_acronyms)]
type BEU64 = U64<heed::byteorder::BE>;
@ -109,7 +110,7 @@ impl UpdateStore {
fn new(
mut options: EnvOpenOptions,
path: impl AsRef<Path>,
) -> anyhow::Result<(Self, mpsc::Receiver<()>)> {
) -> std::result::Result<(Self, mpsc::Receiver<()>), Box<dyn std::error::Error>> {
options.max_dbs(5);
let env = options.open(&path)?;
@ -140,7 +141,7 @@ impl UpdateStore {
path: impl AsRef<Path>,
index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static,
must_exit: Arc<AtomicBool>,
) -> anyhow::Result<Arc<Self>> {
) -> std::result::Result<Arc<Self>, Box<dyn std::error::Error>> {
let (update_store, mut notification_receiver) = Self::new(options, path)?;
let update_store = Arc::new(update_store);
@ -285,7 +286,7 @@ impl UpdateStore {
fn process_pending_update(
&self,
index_handle: impl IndexActorHandle,
) -> anyhow::Result<Option<()>> {
) -> Result<Option<()>> {
// Create a read transaction to be able to retrieve the pending update in order.
let rtxn = self.env.read_txn()?;
let first_meta = self.pending_queue.first(&rtxn)?;
@ -320,7 +321,7 @@ impl UpdateStore {
index_handle: impl IndexActorHandle,
index_uuid: Uuid,
global_id: u64,
) -> anyhow::Result<Option<()>> {
) -> Result<Option<()>> {
let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid));
let update_id = processing.id();
@ -368,7 +369,7 @@ impl UpdateStore {
}
/// List the updates for `index_uuid`.
pub fn list(&self, index_uuid: Uuid) -> anyhow::Result<Vec<UpdateStatus>> {
pub fn list(&self, index_uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let mut update_list = BTreeMap::<u64, UpdateStatus>::new();
let txn = self.env.read_txn()?;
@ -437,7 +438,7 @@ impl UpdateStore {
}
/// Delete all updates for an index from the update store.
pub fn delete_all(&self, index_uuid: Uuid) -> anyhow::Result<()> {
pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> {
let mut txn = self.env.write_txn()?;
// Contains all the content file paths that we need to be removed if the deletion was successful.
let mut uuids_to_remove = Vec::new();
@ -488,7 +489,7 @@ impl UpdateStore {
uuids: &HashSet<Uuid>,
path: impl AsRef<Path>,
handle: impl IndexActorHandle + Clone,
) -> anyhow::Result<()> {
) -> Result<()> {
let state_lock = self.state.write();
state_lock.swap(State::Snapshoting);
@ -535,13 +536,13 @@ impl UpdateStore {
while let Some(res) = stream.next().await {
res?;
}
Ok(()) as IndexResult<()>
Ok(()) as Result<()>
})?;
Ok(())
}
pub fn get_info(&self) -> anyhow::Result<UpdateStoreInfo> {
pub fn get_info(&self) -> Result<UpdateStoreInfo> {
let mut size = self.env.size();
let txn = self.env.read_txn()?;
for entry in self.pending_queue.iter(&txn)? {

View file

@ -12,7 +12,7 @@ pub struct UuidResolverHandleImpl {
}
impl UuidResolverHandleImpl {
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let (sender, reveiver) = mpsc::channel(100);
let store = HeedUuidStore::new(path)?;
let actor = UuidResolverActor::new(reveiver, store);
@ -32,7 +32,7 @@ impl UuidResolverHandle for UuidResolverHandleImpl {
.expect("Uuid resolver actor has been killed")?)
}
async fn delete(&self, name: String) -> anyhow::Result<Uuid> {
async fn delete(&self, name: String) -> Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Delete { uid: name, ret };
let _ = self.sender.send(msg).await;
@ -41,7 +41,7 @@ impl UuidResolverHandle for UuidResolverHandleImpl {
.expect("Uuid resolver actor has been killed")?)
}
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>> {
async fn list(&self) -> Result<Vec<(String, Uuid)>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::List { ret };
let _ = self.sender.send(msg).await;
@ -50,7 +50,7 @@ impl UuidResolverHandle for UuidResolverHandleImpl {
.expect("Uuid resolver actor has been killed")?)
}
async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()> {
async fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Insert { ret, name, uuid };
let _ = self.sender.send(msg).await;

View file

@ -6,6 +6,8 @@ pub mod store;
use std::collections::HashSet;
use std::path::PathBuf;
use meilisearch_error::Code;
use meilisearch_error::ErrorCode;
use thiserror::Error;
use uuid::Uuid;
@ -27,9 +29,9 @@ pub type Result<T> = std::result::Result<T, UuidResolverError>;
#[cfg_attr(test, automock)]
pub trait UuidResolverHandle {
async fn get(&self, name: String) -> Result<Uuid>;
async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()>;
async fn delete(&self, name: String) -> anyhow::Result<Uuid>;
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>>;
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
async fn delete(&self, name: String) -> Result<Uuid>;
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>;
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
@ -44,7 +46,7 @@ pub enum UuidResolverError {
#[error("Badly formatted index uid: {0}")]
BadlyFormatted(String),
#[error("Internal error resolving index uid: {0}")]
Internal(String),
Internal(Box<dyn std::error::Error + Sync + Send + 'static>),
}
macro_rules! internal_error {
@ -52,7 +54,7 @@ macro_rules! internal_error {
$(
impl From<$other> for UuidResolverError {
fn from(other: $other) -> Self {
Self::Internal(other.to_string())
Self::Internal(Box::new(other))
}
}
)*
@ -66,3 +68,14 @@ internal_error!(
tokio::task::JoinError,
serde_json::Error
);
impl ErrorCode for UuidResolverError {
fn error_code(&self) -> Code {
match self {
UuidResolverError::NameAlreadyExist => Code::IndexAlreadyExists,
UuidResolverError::UnexistingIndex(_) => Code::IndexNotFound,
UuidResolverError::BadlyFormatted(_) => Code::InvalidIndexUid,
UuidResolverError::Internal(_) => Code::Internal,
}
}
}

View file

@ -39,7 +39,7 @@ pub struct HeedUuidStore {
}
impl HeedUuidStore {
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().join(UUIDS_DB_PATH);
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
@ -153,7 +153,7 @@ impl HeedUuidStore {
Ok(uuids)
}
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<()> {
let uuid_resolver_path = dst.as_ref().join(UUIDS_DB_PATH);
std::fs::create_dir_all(&uuid_resolver_path)?;