Fix the watcher ordering of the auth/ node

This commit is contained in:
Clément Renault 2023-08-30 17:51:22 +02:00
parent 8c3ad57ef9
commit 9dd4423054
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -39,6 +39,34 @@ impl AuthController {
match controller.zookeeper { match controller.zookeeper {
// setup the auth zk environment, the `auth` node // setup the auth zk environment, the `auth` node
Some(ref zookeeper) => { Some(ref zookeeper) => {
// Zookeeper Event listener loop
let controller_clone = controller.clone();
let zkk = zookeeper.clone();
zookeeper.add_watch("/auth", AddWatchMode::PersistentRecursive, move |event| {
let WatchedEvent { event_type, path, keeper_state: _ } = dbg!(event);
match event_type {
WatchedEventType::NodeDeleted => {
// TODO: ugly unwraps
let path = path.unwrap();
let uuid = path.strip_prefix("/auth/").unwrap();
let uuid = Uuid::parse_str(&uuid).unwrap();
log::info!("The key {} has been deleted", uuid);
controller_clone.store.delete_api_key(uuid).unwrap();
}
WatchedEventType::NodeCreated | WatchedEventType::NodeDataChanged => {
let path = path.unwrap();
if path.strip_prefix("/auth/").map_or(false, |s| !s.is_empty()) {
let (key, _stat) = zkk.get_data(&path, false).unwrap();
let key: Key = serde_json::from_slice(&key).unwrap();
log::info!("The key {} has been deleted", key.uid);
controller_clone.store.put_api_key(key).unwrap();
}
}
otherwise => panic!("Got the unexpected `{otherwise:?}` event!"),
}
})?;
// TODO: we should catch the potential unexpected errors here https://docs.rs/zookeeper-client/latest/zookeeper_client/struct.Client.html#method.create // TODO: we should catch the potential unexpected errors here https://docs.rs/zookeeper-client/latest/zookeeper_client/struct.Client.html#method.create
// for the moment we consider that `create` only returns Error::NodeExists. // for the moment we consider that `create` only returns Error::NodeExists.
match zookeeper.create( match zookeeper.create(
@ -86,33 +114,6 @@ impl AuthController {
// The second one will delete its DB, load nothing and install a watcher // The second one will delete its DB, load nothing and install a watcher
// The first one will push its keys and should wake up and update the second one. // The first one will push its keys and should wake up and update the second one.
// / BUT, if the second one delete its DB and the first one push its files before the second one install the watcher we're fucked // / BUT, if the second one delete its DB and the first one push its files before the second one install the watcher we're fucked
// Zookeeper Event listener loop
let controller_clone = controller.clone();
let zkk = zookeeper.clone();
zookeeper.add_watch("/auth", AddWatchMode::PersistentRecursive, move |event| {
let WatchedEvent { event_type, path, keeper_state: _ } = dbg!(event);
match event_type {
WatchedEventType::NodeDeleted => {
// TODO: ugly unwraps
let path = path.unwrap();
let uuid = path.strip_prefix("/auth/").unwrap();
let uuid = Uuid::parse_str(&uuid).unwrap();
log::info!("The key {} has been deleted", uuid);
dbg!(controller_clone.store.delete_api_key(uuid).unwrap());
}
WatchedEventType::NodeCreated | WatchedEventType::NodeDataChanged => {
let path = path.unwrap();
let (key, _stat) = zkk.get_data(&path, false).unwrap();
let key: Key = serde_json::from_slice(&key).unwrap();
log::info!("The key {} has been deleted", key.uid);
dbg!(controller_clone.store.put_api_key(key).unwrap());
}
otherwise => panic!("Got the unexpected `{otherwise:?}` event!"),
}
})?;
} }
None => { None => {
if controller.store.is_empty()? { if controller.store.is_empty()? {
@ -149,17 +150,19 @@ impl AuthController {
pub fn put_key(&self, key: Key) -> Result<Key> { pub fn put_key(&self, key: Key) -> Result<Key> {
let store = self.store.clone(); let store = self.store.clone();
// TODO: we may commit only after zk persisted the keys match &self.zookeeper {
let key = store.put_api_key(key)?; Some(zookeeper) => {
if let Some(zookeeper) = &self.zookeeper { zookeeper.create(
zookeeper.create( &format!("/auth/{}", key.uid),
&format!("/auth/{}", key.uid), serde_json::to_vec_pretty(&key)?,
serde_json::to_vec_pretty(&key)?, Acl::open_unsafe().clone(),
Acl::open_unsafe().clone(), CreateMode::Persistent,
CreateMode::Persistent, )?;
)?;
Ok(key)
}
None => store.put_api_key(key),
} }
Ok(key)
} }
pub fn update_key(&self, uid: Uuid, patch: PatchApiKey) -> Result<Key> { pub fn update_key(&self, uid: Uuid, patch: PatchApiKey) -> Result<Key> {
@ -175,15 +178,18 @@ impl AuthController {
key.updated_at = OffsetDateTime::now_utc(); key.updated_at = OffsetDateTime::now_utc();
let store = self.store.clone(); let store = self.store.clone();
// TODO: we may commit only after zk persisted the keys // TODO: we may commit only after zk persisted the keys
let key = store.put_api_key(key)?; match &self.zookeeper {
if let Some(zookeeper) = &self.zookeeper { Some(zookeeper) => {
zookeeper.set_data( zookeeper.set_data(
&format!("/auth/{}", key.uid), &format!("/auth/{}", key.uid),
serde_json::to_vec_pretty(&key)?, serde_json::to_vec_pretty(&key)?,
None, None,
)?; )?;
Ok(key)
}
None => store.put_api_key(key),
} }
Ok(key)
} }
pub fn get_key(&self, uid: Uuid) -> Result<Key> { pub fn get_key(&self, uid: Uuid) -> Result<Key> {
@ -225,12 +231,19 @@ impl AuthController {
} }
pub fn delete_key(&self, uid: Uuid) -> Result<()> { pub fn delete_key(&self, uid: Uuid) -> Result<()> {
let store = self.store.clone(); let deleted = match &self.zookeeper {
let deleted = store.delete_api_key(uid)?; Some(zookeeper) => match zookeeper.delete(&format!("/auth/{}", uid), None) {
if deleted { Ok(()) => true,
if let Some(zookeeper) = &self.zookeeper { Err(ZkError::NoNode) => false,
zookeeper.delete(&format!("/auth/{}", uid), None)?; Err(e) => return Err(e.into()),
},
None => {
let store = self.store.clone();
store.delete_api_key(uid)?
} }
};
if deleted {
Ok(()) Ok(())
} else { } else {
Err(AuthControllerError::ApiKeyNotFound(uid.to_string())) Err(AuthControllerError::ApiKeyNotFound(uid.to_string()))