From 38100d5c057bcfb36090771994397e4da2d47a24 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 23 Mar 2023 12:28:36 +0100 Subject: [PATCH] securise the connecions between the leader and followers + forbid joining the cluster without the right master key --- cluster/src/leader.rs | 36 +++++++++++++++++++++++++++++++----- cluster/src/lib.rs | 21 ++++++++++++++++----- meilisearch/src/lib.rs | 8 ++++++-- 3 files changed, 53 insertions(+), 12 deletions(-) diff --git a/cluster/src/leader.rs b/cluster/src/leader.rs index 44fea225c..0c19b1d7b 100644 --- a/cluster/src/leader.rs +++ b/cluster/src/leader.rs @@ -6,7 +6,7 @@ use std::time::Duration; use bus::{Bus, BusReader}; use crossbeam::channel::{unbounded, Receiver, Sender}; use ductile::{ChannelReceiver, ChannelSender, ChannelServer}; -use log::info; +use log::{info, warn}; use meilisearch_types::keys::Key; use meilisearch_types::tasks::Task; use synchronoise::SignalEvent; @@ -31,7 +31,10 @@ pub struct Leader { } impl Leader { - pub fn new(listen_on: impl ToSocketAddrs + Send + 'static) -> Leader { + pub fn new( + listen_on: impl ToSocketAddrs + Send + 'static, + master_key: Option, + ) -> Leader { let new_followers = Arc::new(AtomicUsize::new(0)); let active_followers = Arc::new(AtomicUsize::new(1)); let wake_up = Arc::new(SignalEvent::auto(true)); @@ -43,7 +46,15 @@ impl Leader { let af = active_followers.clone(); let wu = wake_up.clone(); std::thread::spawn(move || { - Self::listener(listen_on, nf, af, wu, process_batch_receiver, task_finished_sender) + Self::listener( + listen_on, + master_key, + nf, + af, + wu, + process_batch_receiver, + task_finished_sender, + ) }); Leader { @@ -68,14 +79,29 @@ impl Leader { /// to each new followers fn listener( listen_on: impl ToSocketAddrs, + master_key: Option, new_followers: Arc, active_followers: Arc, wake_up: Arc, broadcast_to_follower: Receiver, task_finished: Sender, ) { - let listener: ChannelServer = - ChannelServer::bind(listen_on).unwrap(); + let listener: ChannelServer = if let Some(ref master_key) = + master_key + { + let mut enc = [0; 32]; + let master_key = master_key.as_bytes(); + if master_key.len() < 32 { + warn!("Master key is not secure, use a longer master key (at least 32 bytes long)"); + } + enc.iter_mut().zip(master_key).for_each(|(enc, mk)| *enc = *mk); + info!("Listening with encryption enabled"); + ChannelServer::bind_with_enc(listen_on, enc).unwrap() + } else { + ChannelServer::bind(listen_on).unwrap() + }; + + info!("Ready to the receive connections"); // We're going to broadcast all the batches to all our follower let bus: Bus = Bus::new(10); diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index 724b2d5ef..43b3321fa 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -4,8 +4,8 @@ use std::sync::{Arc, RwLock}; use batch::Batch; use crossbeam::channel::{unbounded, Receiver, Sender}; -use ductile::{connect_channel, ChannelReceiver, ChannelSender}; -use log::info; +use ductile::{connect_channel, connect_channel_with_enc, ChannelReceiver, ChannelSender}; +use log::{info, warn}; use meilisearch_types::keys::Key; use meilisearch_types::tasks::{KindWithContent, Task}; use serde::{Deserialize, Serialize}; @@ -109,8 +109,19 @@ pub struct Follower { } impl Follower { - pub fn join(leader: impl ToSocketAddrs) -> (Follower, Vec) { - let (sender, receiver) = connect_channel(leader).unwrap(); + pub fn join(leader: impl ToSocketAddrs, master_key: Option) -> (Follower, Vec) { + let (sender, receiver) = if let Some(master_key) = master_key { + let mut enc = [0; 32]; + let master_key = master_key.as_bytes(); + if master_key.len() < 32 { + warn!("Master key is not secure, use a longer master key (at least 32 bytes long)"); + } + enc.iter_mut().zip(master_key).for_each(|(enc, mk)| *enc = *mk); + info!("Connecting with encryption enabled"); + connect_channel_with_enc(leader, &enc).unwrap() + } else { + connect_channel(leader).unwrap() + }; info!("Connection to the leader established"); @@ -160,7 +171,7 @@ impl Follower { loop { match receiver.recv().expect("Lost connection to the leader") { LeaderMsg::JoinFromDump(_) => { - panic!("Received a join from dump msg but I’m already running") + warn!("Received a join from dump msg but I’m already running : ignoring the message") } LeaderMsg::StartBatch { id, batch } => { info!("Starting to process a new batch"); diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 292f1c589..d34dd4267 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -197,7 +197,11 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Auth info!("Starting as a leader"); let mut addr = opt.http_addr.to_socket_addrs().unwrap().next().unwrap(); addr.set_port(6666); - open_or_create_database(opt, empty_db, Some(Cluster::Leader(Leader::new(addr))))? + open_or_create_database( + opt, + empty_db, + Some(Cluster::Leader(Leader::new(addr, opt.master_key.clone()))), + )? } "follower" => { info!("Starting as a follower"); @@ -215,7 +219,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Auth .unwrap(); addr.set_port(6666); - let (follower, dump) = Follower::join(addr); + let (follower, dump) = Follower::join(addr, opt.master_key.clone()); let mut dump_file = tempfile::NamedTempFile::new().unwrap(); dump_file.write_all(&dump).unwrap();