fix index creation bug

This commit is contained in:
mpostma 2021-06-09 11:52:36 +02:00
parent 9f40896f4a
commit 1a65eed724
8 changed files with 16 additions and 56 deletions

View File

@ -40,6 +40,11 @@ impl MapIndexStore {
#[async_trait::async_trait]
impl IndexStore for MapIndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> IndexResult<Index> {
let mut lock = self.index_store.write().await;
if let Some(index) = lock.get(&uuid) {
return Ok(index.clone())
}
let path = self.path.join(format!("index-{}", uuid));
if path.exists() {
return Err(IndexError::IndexAlreadyExists);
@ -57,7 +62,7 @@ impl IndexStore for MapIndexStore {
})
.await??;
self.index_store.write().await.insert(uuid, index.clone());
lock.insert(uuid, index.clone());
Ok(index)
}

View File

@ -249,8 +249,9 @@ impl IndexController {
) -> anyhow::Result<IndexMetadata> {
let IndexSettings { uid, primary_key } = index_settings;
let uid = uid.ok_or_else(|| anyhow::anyhow!("Can't create an index without a uid."))?;
let uuid = self.uuid_resolver.create(uid.clone()).await?;
let uuid = Uuid::new_v4();
let meta = self.index_handle.create_index(uuid, primary_key).await?;
self.uuid_resolver.insert(uid.clone(), uuid).await?;
let meta = IndexMetadata {
uuid,
name: uid.clone(),

View File

@ -229,7 +229,7 @@ impl UpdateStore {
let mut txn = self.env.write_txn()?;
let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?;
let meta = Enqueued::new(meta, update_id, content);
let meta = dbg!(Enqueued::new(meta, update_id, content));
self.pending_queue
.put(&mut txn, &(global_id, index_uuid, update_id), &meta)?;
@ -280,7 +280,7 @@ impl UpdateStore {
) -> anyhow::Result<Option<()>> {
// Create a read transaction to be able to retrieve the pending update in order.
let rtxn = self.env.read_txn()?;
let first_meta = self.pending_queue.first(&rtxn)?;
let first_meta = dbg!(self.pending_queue.first(&rtxn)?);
drop(rtxn);
// If there is a pending update we process and only keep

View File

@ -23,9 +23,6 @@ impl<S: UuidStore> UuidResolverActor<S> {
loop {
match self.inbox.recv().await {
Some(Create { uid: name, ret }) => {
let _ = ret.send(self.handle_create(name).await);
}
Some(Get { uid: name, ret }) => {
let _ = ret.send(self.handle_get(name).await);
}
@ -55,13 +52,6 @@ impl<S: UuidStore> UuidResolverActor<S> {
warn!("exiting uuid resolver loop");
}
async fn handle_create(&self, uid: String) -> Result<Uuid> {
if !is_index_uid_valid(&uid) {
return Err(UuidResolverError::BadlyFormatted(uid));
}
self.store.create_uuid(uid, true).await
}
async fn handle_get(&self, uid: String) -> Result<Uuid> {
self.store
.get_uuid(uid.clone())

View File

@ -32,15 +32,6 @@ impl UuidResolverHandle for UuidResolverHandleImpl {
.expect("Uuid resolver actor has been killed")?)
}
async fn create(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Create { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
async fn delete(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Delete { uid: name, ret };

View File

@ -11,10 +11,6 @@ pub enum UuidResolveMsg {
uid: String,
ret: oneshot::Sender<Result<Uuid>>,
},
Create {
uid: String,
ret: oneshot::Sender<Result<Uuid>>,
},
Delete {
uid: String,
ret: oneshot::Sender<Result<Uuid>>,

View File

@ -28,7 +28,6 @@ pub type Result<T> = std::result::Result<T, UuidResolverError>;
pub trait UuidResolverHandle {
async fn get(&self, name: String) -> Result<Uuid>;
async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()>;
async fn create(&self, name: String) -> anyhow::Result<Uuid>;
async fn delete(&self, name: String) -> anyhow::Result<Uuid>;
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;

View File

@ -8,7 +8,7 @@ use heed::{CompactionOption, Database, Env, EnvOpenOptions};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::{Result, UuidResolverError, UUID_STORE_SIZE};
use super::{Result, UUID_STORE_SIZE, UuidResolverError};
use crate::helpers::EnvSizer;
#[derive(Serialize, Deserialize)]
@ -23,7 +23,6 @@ const UUIDS_DB_PATH: &str = "index_uuids";
pub trait UuidStore: Sized {
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise.
async fn create_uuid(&self, uid: String, err: bool) -> Result<Uuid>;
async fn get_uuid(&self, uid: String) -> Result<Option<Uuid>>;
async fn delete(&self, uid: String) -> Result<Option<Uuid>>;
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
@ -50,27 +49,6 @@ impl HeedUuidStore {
Ok(Self { env, db })
}
pub fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
if err {
Err(UuidResolverError::NameAlreadyExist)
} else {
let uuid = Uuid::from_slice(uuid)?;
Ok(uuid)
}
}
None => {
let uuid = Uuid::new_v4();
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(uuid)
}
}
}
pub fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
@ -116,6 +94,11 @@ impl HeedUuidStore {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
if db.get(&txn, &name)?.is_some() {
return Err(UuidResolverError::NameAlreadyExist)
}
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(())
@ -205,11 +188,6 @@ impl HeedUuidStore {
#[async_trait::async_trait]
impl UuidStore for HeedUuidStore {
async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.create_uuid(name, err)).await?
}
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.get_uuid(name)).await?