dump uuid resolver

This commit is contained in:
Marin Postma 2021-05-24 16:05:43 +02:00
parent cbcf50960f
commit 2185fb8367
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30
6 changed files with 82 additions and 21 deletions

View File

@ -20,7 +20,7 @@ use dump_actor::DumpActorHandle;
use index_actor::IndexActorHandle;
use snapshot::{SnapshotService, load_snapshot};
use update_actor::UpdateActorHandle;
use uuid_resolver::{UuidError, UuidResolverHandle};
use uuid_resolver::{UuidResolverError, UuidResolverHandle};
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::option::Opt;
@ -176,7 +176,7 @@ impl IndexController {
match self.uuid_resolver.get(uid).await {
Ok(uuid) => Ok(perform_update(uuid).await?),
Err(UuidError::UnexistingIndex(name)) => {
Err(UuidResolverError::UnexistingIndex(name)) => {
let uuid = Uuid::new_v4();
let status = perform_update(uuid).await?;
// ignore if index creation fails now, since it may already have been created
@ -230,7 +230,7 @@ impl IndexController {
match self.uuid_resolver.get(uid).await {
Ok(uuid) => Ok(perform_udpate(uuid).await?),
Err(UuidError::UnexistingIndex(name)) if create => {
Err(UuidResolverError::UnexistingIndex(name)) if create => {
let uuid = Uuid::new_v4();
let status = perform_udpate(uuid).await?;
// ignore if index creation fails now, since it may already have been created

View File

@ -4,7 +4,7 @@ use log::{info, warn};
use tokio::sync::mpsc;
use uuid::Uuid;
use super::{Result, UuidError, UuidResolveMsg, UuidStore};
use super::{Result, UuidResolverError, UuidResolveMsg, UuidStore};
pub struct UuidResolverActor<S> {
inbox: mpsc::Receiver<UuidResolveMsg>,
@ -44,6 +44,9 @@ impl<S: UuidStore> UuidResolverActor<S> {
Some(GetSize { ret }) => {
let _ = ret.send(self.handle_get_size().await);
}
Some(DumpRequest { path, ret }) => {
let _ = ret.send(self.handle_dump(path).await);
}
// all senders have been dropped, need to quit.
None => break,
}
@ -54,7 +57,7 @@ impl<S: UuidStore> UuidResolverActor<S> {
async fn handle_create(&self, uid: String) -> Result<Uuid> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));
return Err(UuidResolverError::BadlyFormatted(uid));
}
self.store.create_uuid(uid, true).await
}
@ -63,14 +66,14 @@ impl<S: UuidStore> UuidResolverActor<S> {
self.store
.get_uuid(uid.clone())
.await?
.ok_or(UuidError::UnexistingIndex(uid))
.ok_or(UuidResolverError::UnexistingIndex(uid))
}
async fn handle_delete(&self, uid: String) -> Result<Uuid> {
self.store
.delete(uid.clone())
.await?
.ok_or(UuidError::UnexistingIndex(uid))
.ok_or(UuidResolverError::UnexistingIndex(uid))
}
async fn handle_list(&self) -> Result<Vec<(String, Uuid)>> {
@ -82,9 +85,13 @@ impl<S: UuidStore> UuidResolverActor<S> {
self.store.snapshot(path).await
}
async fn handle_dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
self.store.dump(path).await
}
async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));
return Err(UuidResolverError::BadlyFormatted(uid));
}
self.store.insert(uid, uuid).await?;
Ok(())

View File

@ -85,4 +85,12 @@ impl UuidResolverHandle for UuidResolverHandleImpl {
.await
.expect("Uuid resolver actor has been killed")?)
}
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::DumpRequest { ret, path };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
}

View File

@ -34,4 +34,8 @@ pub enum UuidResolveMsg {
GetSize {
ret: oneshot::Sender<Result<u64>>,
},
DumpRequest {
path: PathBuf,
ret: oneshot::Sender<Result<HashSet<Uuid>>>,
}
}

View File

@ -16,12 +16,12 @@ use store::UuidStore;
#[cfg(test)]
use mockall::automock;
pub use store::HeedUuidStore;
pub use handle_impl::UuidResolverHandleImpl;
pub use store::HeedUuidStore;
const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB
pub type Result<T> = std::result::Result<T, UuidError>;
pub type Result<T> = std::result::Result<T, UuidResolverError>;
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
@ -33,20 +33,37 @@ pub trait UuidResolverHandle {
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>;
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
}
#[derive(Debug, Error)]
pub enum UuidError {
pub enum UuidResolverError {
#[error("Name already exist.")]
NameAlreadyExist,
#[error("Index \"{0}\" doesn't exist.")]
UnexistingIndex(String),
#[error("Error performing task: {0}")]
TokioTask(#[from] tokio::task::JoinError),
#[error("Database error: {0}")]
Heed(#[from] heed::Error),
#[error("Uuid error: {0}")]
Uuid(#[from] uuid::Error),
#[error("Badly formatted index uid: {0}")]
BadlyFormatted(String),
#[error("Internal error resolving index uid: {0}")]
Internal(String),
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for UuidResolverError {
fn from(other: $other) -> Self {
Self::Internal(other.to_string())
}
}
)*
}
}
internal_error!(
heed::Error,
uuid::Error,
std::io::Error,
tokio::task::JoinError,
serde_json::Error
);

View File

@ -1,5 +1,5 @@
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::{collections::HashSet, io::Write};
use std::fs::{create_dir_all, File};
use std::path::{Path, PathBuf};
use heed::{
@ -8,7 +8,7 @@ use heed::{
};
use uuid::Uuid;
use super::{Result, UuidError, UUID_STORE_SIZE};
use super::{Result, UuidResolverError, UUID_STORE_SIZE};
use crate::helpers::EnvSizer;
#[async_trait::async_trait]
@ -22,6 +22,7 @@ pub trait UuidStore {
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>;
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
}
#[derive(Clone)]
@ -48,7 +49,7 @@ impl HeedUuidStore {
match db.get(&txn, &name)? {
Some(uuid) => {
if err {
Err(UuidError::NameAlreadyExist)
Err(UuidResolverError::NameAlreadyExist)
} else {
let uuid = Uuid::from_slice(uuid)?;
Ok(uuid)
@ -138,6 +139,25 @@ impl HeedUuidStore {
pub fn get_size(&self) -> Result<u64> {
Ok(self.env.size())
}
pub fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let dump_path = path.join("index_uuids");
create_dir_all(&dump_path)?;
let dump_file_path = dump_path.join("data.jsonl");
let mut dump_file = File::create(&dump_file_path)?;
let mut uuids = HashSet::new();
let txn = self.env.read_txn()?;
for entry in self.db.iter(&txn)? {
let entry = entry?;
let uuid = Uuid::from_slice(entry.1)?;
uuids.insert(uuid);
serde_json::to_writer(&mut dump_file, &serde_json::json!({ "uid": entry.0, "uuid": uuid }))?;
dump_file.write(b"\n").unwrap();
}
Ok(uuids)
}
}
#[async_trait::async_trait]
@ -175,4 +195,9 @@ impl UuidStore for HeedUuidStore {
async fn get_size(&self) -> Result<u64> {
self.get_size()
}
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.dump(path)).await?
}
}