send the api keys in a dump

This commit is contained in:
Tamo 2023-03-22 18:45:08 +01:00
parent 0a772fb391
commit 7ad14373e9
3 changed files with 26 additions and 2 deletions

View File

@ -19,6 +19,8 @@ use crate::{ApiKeyOperation, Consistency, FollowerMsg, LeaderMsg};
pub struct Leader {
task_ready_to_commit: Receiver<u32>,
broadcast_to_follower: Sender<LeaderMsg>,
needs_key_sender: Sender<Sender<Vec<Key>>>,
needs_key_receiver: Receiver<Sender<Vec<Key>>>,
pub wake_up: Arc<SignalEvent>,
@ -35,6 +37,7 @@ impl Leader {
let wake_up = Arc::new(SignalEvent::auto(true));
let (broadcast_to_follower, process_batch_receiver) = unbounded();
let (task_finished_sender, task_finished_receiver) = unbounded();
let (needs_key_sender, needs_key_receiver) = unbounded();
let nf = new_followers.clone();
let af = active_followers.clone();
@ -46,6 +49,8 @@ impl Leader {
Leader {
task_ready_to_commit: task_finished_receiver,
broadcast_to_follower,
needs_key_sender,
needs_key_receiver,
wake_up,
@ -232,4 +237,14 @@ impl Leader {
.send(LeaderMsg::ApiKeyOperation(ApiKeyOperation::Delete(uuid)))
.unwrap()
}
pub fn needs_keys(&self) -> Sender<Vec<Key>> {
self.needs_key_receiver.recv().expect("The cluster is dead")
}
pub fn get_keys(&self) -> Vec<Key> {
let (send, rcv) = crossbeam::channel::bounded(1);
self.needs_key_sender.send(send).expect("The cluster is dead");
rcv.recv().expect("The auth controller is dead")
}
}

View File

@ -1306,6 +1306,7 @@ impl IndexScheduler {
};
if let Some(Cluster::Leader(leader)) = &self.cluster {
// first, onboard the new followers
if leader.has_new_followers() {
info!("New followers are trying to join the cluster");
let started_at = OffsetDateTime::now_utc();
@ -1321,8 +1322,7 @@ impl IndexScheduler {
details: None,
status: Status::Enqueued,
kind: KindWithContent::DumpCreation {
// TODO cluster: handle the keys
keys: vec![],
keys: leader.get_keys(),
// TODO cluster: should we unify the instance_uid between every instances?
instance_uid: None,
},
@ -1338,6 +1338,7 @@ impl IndexScheduler {
leader.join_me(buffer);
}
// second, starts processing the batch
if let Some(ref batch) = batch {
leader.starts_batch(batch.clone().into());
}

View File

@ -57,6 +57,14 @@ impl AuthController {
}
}
});
} else if let Some(Cluster::Leader(leader)) = cluster {
let this = this.clone();
std::thread::spawn(move || loop {
let channel = leader.needs_keys();
let keys = this.list_keys().expect("auth controller is dead");
channel.send(keys).expect("Cluster is dead");
});
}
Ok(this)