fix unexisting update store + race conditions

This commit is contained in:
mpostma 2021-03-11 22:11:58 +01:00
parent 3f68460d6c
commit 40b3451a4e
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
3 changed files with 48 additions and 14 deletions

View File

@ -144,6 +144,7 @@ impl IndexController {
let name = name.unwrap(); let name = name.unwrap();
let uuid = self.uuid_resolver.create(name.clone()).await?; let uuid = self.uuid_resolver.create(name.clone()).await?;
let meta = self.index_handle.create_index(uuid, primary_key).await?; let meta = self.index_handle.create_index(uuid, primary_key).await?;
let _ = self.update_handle.create(uuid).await?;
let meta = IndexMetadata { name, meta }; let meta = IndexMetadata { name, meta };
Ok(meta) Ok(meta)

View File

@ -47,6 +47,10 @@ enum UpdateMsg<D> {
uuid: Uuid, uuid: Uuid,
ret: oneshot::Sender<Result<()>>, ret: oneshot::Sender<Result<()>>,
}, },
Create {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
}
} }
struct UpdateActor<D, S> { struct UpdateActor<D, S> {
@ -102,7 +106,10 @@ where
Some(Delete { uuid, ret }) => { Some(Delete { uuid, ret }) => {
let _ = ret.send(self.handle_delete(uuid).await); let _ = ret.send(self.handle_delete(uuid).await);
} }
None => {} Some(Create { uuid, ret }) => {
let _ = ret.send(self.handle_create(uuid).await);
}
None => break,
} }
} }
} }
@ -190,6 +197,11 @@ where
Ok(()) Ok(())
} }
async fn handle_create(&self, uuid: Uuid) -> Result<()> {
let _ = self.store.get_or_create(uuid).await?;
Ok(())
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -249,6 +261,13 @@ where
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.") receiver.await.expect("update actor killed.")
} }
pub async fn create(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Create { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
} }
struct MapUpdateStoreStore { struct MapUpdateStoreStore {
@ -282,7 +301,7 @@ impl UpdateStoreStore for MapUpdateStoreStore {
let store = UpdateStore::open(options, &path, move |meta, file| { let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle.update(meta, file)) futures::executor::block_on(index_handle.update(meta, file))
}) })
.unwrap(); .map_err(|e| UpdateError::Error(e.into()))?;
let store = e.insert(store); let store = e.insert(store);
Ok(store.clone()) Ok(store.clone())
} }
@ -291,22 +310,35 @@ impl UpdateStoreStore for MapUpdateStoreStore {
} }
async fn get(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>> { async fn get(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>> {
// attemps to get pre-loaded ref to the index let guard = self.db.read().await;
match self.db.read().await.get(uuid) { match guard.get(uuid) {
Some(uuid) => Ok(Some(uuid.clone())), Some(uuid) => Ok(Some(uuid.clone())),
None => { None => {
// otherwise we try to check if it exists, and load it. // The index is not found in the found in the loaded indexes, so we attempt to load
// it from disk. We need to acquire a write lock **before** attempting to open the
// index, because someone could be trying to open it at the same time as us.
drop(guard);
let path = self.path.clone().join(format!("updates-{}", uuid)); let path = self.path.clone().join(format!("updates-{}", uuid));
if path.exists() { if path.exists() {
let index_handle = self.index_handle.clone(); let mut guard = self.db.write().await;
let mut options = heed::EnvOpenOptions::new(); match guard.entry(uuid.clone()) {
options.map_size(4096 * 100_000); Entry::Vacant(entry) => {
let store = UpdateStore::open(options, &path, move |meta, file| { // We can safely load the index
futures::executor::block_on(index_handle.update(meta, file)) let index_handle = self.index_handle.clone();
}) let mut options = heed::EnvOpenOptions::new();
.unwrap(); options.map_size(4096 * 100_000);
self.db.write().await.insert(uuid.clone(), store.clone()); let store = UpdateStore::open(options, &path, move |meta, file| {
Ok(Some(store)) futures::executor::block_on(index_handle.update(meta, file))
})
.map_err(|e| UpdateError::Error(e.into()))?;
let store = entry.insert(store.clone());
Ok(Some(store.clone()))
}
Entry::Occupied(entry) => {
// The index was loaded while we attempted to to iter
Ok(Some(entry.get().clone()))
}
}
} else { } else {
Ok(None) Ok(None)
} }

View File

@ -46,6 +46,7 @@ async fn list_no_updates() {
let index = server.index("test"); let index = server.index("test");
index.create(None).await; index.create(None).await;
let (response, code) = index.list_updates().await; let (response, code) = index.list_updates().await;
println!("response: {}", response);
assert_eq!(code, 200); assert_eq!(code, 200);
assert!(response.as_array().unwrap().is_empty()); assert!(response.as_array().unwrap().is_empty());
} }