diff --git a/cluster/src/leader.rs b/cluster/src/leader.rs index 2200d1bb7..44fea225c 100644 --- a/cluster/src/leader.rs +++ b/cluster/src/leader.rs @@ -19,6 +19,8 @@ use crate::{ApiKeyOperation, Consistency, FollowerMsg, LeaderMsg}; pub struct Leader { task_ready_to_commit: Receiver, broadcast_to_follower: Sender, + needs_key_sender: Sender>>, + needs_key_receiver: Receiver>>, pub wake_up: Arc, @@ -35,6 +37,7 @@ impl Leader { let wake_up = Arc::new(SignalEvent::auto(true)); let (broadcast_to_follower, process_batch_receiver) = unbounded(); let (task_finished_sender, task_finished_receiver) = unbounded(); + let (needs_key_sender, needs_key_receiver) = unbounded(); let nf = new_followers.clone(); let af = active_followers.clone(); @@ -46,6 +49,8 @@ impl Leader { Leader { task_ready_to_commit: task_finished_receiver, broadcast_to_follower, + needs_key_sender, + needs_key_receiver, wake_up, @@ -232,4 +237,14 @@ impl Leader { .send(LeaderMsg::ApiKeyOperation(ApiKeyOperation::Delete(uuid))) .unwrap() } + + pub fn needs_keys(&self) -> Sender> { + self.needs_key_receiver.recv().expect("The cluster is dead") + } + + pub fn get_keys(&self) -> Vec { + let (send, rcv) = crossbeam::channel::bounded(1); + self.needs_key_sender.send(send).expect("The cluster is dead"); + rcv.recv().expect("The auth controller is dead") + } } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 52134bdae..8e857374a 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1306,6 +1306,7 @@ impl IndexScheduler { }; if let Some(Cluster::Leader(leader)) = &self.cluster { + // first, onboard the new followers if leader.has_new_followers() { info!("New followers are trying to join the cluster"); let started_at = OffsetDateTime::now_utc(); @@ -1321,8 +1322,7 @@ impl IndexScheduler { details: None, status: Status::Enqueued, kind: KindWithContent::DumpCreation { - // TODO cluster: handle the keys - keys: vec![], + keys: leader.get_keys(), // TODO cluster: should we unify the instance_uid between every instances? instance_uid: None, }, @@ -1338,6 +1338,7 @@ impl IndexScheduler { leader.join_me(buffer); } + // second, starts processing the batch if let Some(ref batch) = batch { leader.starts_batch(batch.clone().into()); } diff --git a/meilisearch-auth/src/lib.rs b/meilisearch-auth/src/lib.rs index 66f778bab..3e0a54352 100644 --- a/meilisearch-auth/src/lib.rs +++ b/meilisearch-auth/src/lib.rs @@ -57,6 +57,14 @@ impl AuthController { } } }); + } else if let Some(Cluster::Leader(leader)) = cluster { + let this = this.clone(); + + std::thread::spawn(move || loop { + let channel = leader.needs_keys(); + let keys = this.list_keys().expect("auth controller is dead"); + channel.send(keys).expect("Cluster is dead"); + }); } Ok(this)