From 145f0e753cf3847ddcc0b396ebb00993144acdbe Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 16 Mar 2023 14:31:03 +0100 Subject: [PATCH] move the leader to a new design --- Cargo.lock | 13 +++ cluster/Cargo.toml | 2 + cluster/src/leader.rs | 206 +++++++++++++++++++++++++----------------- cluster/src/lib.rs | 2 +- 4 files changed, 139 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d6bc7830e..3bac445d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -523,6 +523,17 @@ version = "3.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" +[[package]] +name = "bus" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80cb4625f5b60155ff1018c9d4ce2e38bf5ae3e5780dfab9fa68bb44a6b751e2" +dependencies = [ + "crossbeam-channel", + "num_cpus", + "parking_lot_core", +] + [[package]] name = "byte-unit" version = "4.0.18" @@ -795,6 +806,8 @@ dependencies = [ name = "cluster" version = "1.1.0" dependencies = [ + "bus", + "crossbeam", "ductile", "log", "meilisearch-types", diff --git a/cluster/Cargo.toml b/cluster/Cargo.toml index 189110d29..3a2f125e6 100644 --- a/cluster/Cargo.toml +++ b/cluster/Cargo.toml @@ -18,3 +18,5 @@ thiserror = "1.0.39" meilisearch-types = { path = "../meilisearch-types" } roaring = "0.10.1" log = "0.4.17" +crossbeam = "0.8.2" +bus = "2.3.0" diff --git a/cluster/src/leader.rs b/cluster/src/leader.rs index 9df62c4a1..aa749a996 100644 --- a/cluster/src/leader.rs +++ b/cluster/src/leader.rs @@ -1,111 +1,151 @@ use std::net::ToSocketAddrs; -use std::time::Duration; +use std::sync::atomic::AtomicUsize; +use std::sync::{atomic, Arc, Mutex}; +use bus::{Bus, BusReader}; +use crossbeam::channel::{unbounded, Receiver, Sender}; use ductile::{ChannelReceiver, ChannelSender, ChannelServer}; -use serde::de::DeserializeOwned; -use serde::Serialize; +use log::info; -use crate::{Consistency, Error, FollowerMsg, LeaderMsg}; +use crate::{Consistency, FollowerMsg, LeaderMsg}; pub struct Leader { - listener: ChannelServer, - active_followers: Vec, - new_followers: Vec, - dead_followers: Vec, + task_ready_to_commit: Receiver, + broadcast_to_follower: Sender, + + cluster_size: Arc, batch_id: u32, - tick: Duration, -} - -struct Follower { - sender: ChannelSender, - receiver: ChannelReceiver, } impl Leader { - pub fn new(listen_on: impl ToSocketAddrs) -> Leader { - let listener = ChannelServer::bind(listen_on).unwrap(); + pub fn new(listen_on: impl ToSocketAddrs + Send + 'static) -> Leader { + let cluster_size = Arc::new(AtomicUsize::new(1)); + let (process_batch_sender, process_batch_receiver) = unbounded(); + let (task_finished_sender, task_finished_receiver) = unbounded(); + + let cs = cluster_size.clone(); + std::thread::spawn(move || { + Self::listener(listen_on, cs, process_batch_receiver, task_finished_sender) + }); Leader { - listener, - active_followers: Vec::new(), - new_followers: Vec::new(), - dead_followers: Vec::new(), + task_ready_to_commit: task_finished_receiver, + broadcast_to_follower: process_batch_sender, + cluster_size, batch_id: 0, - tick: Duration::new(1, 0), } } - pub fn starts_batch(&mut self, batch: Vec) -> Result<(), Error> { - let mut dead_nodes = Vec::new(); + /// Takes all the necessary channels to chat with the scheduler and give them + /// to each new followers + fn listener( + listen_on: impl ToSocketAddrs, + cluster_size: Arc, + broadcast_to_follower: Receiver, + task_finished: Sender, + ) { + let listener: ChannelServer = + ChannelServer::bind(listen_on).unwrap(); - for (idx, follower) in self.active_followers.iter_mut().enumerate() { - match follower - .sender - .send(LeaderMsg::StartBatch { id: self.batch_id, batch: batch.clone() }) + // We're going to broadcast all the batches to all our follower + let bus: Bus = Bus::new(10); + let bus = Arc::new(Mutex::new(bus)); + let b = bus.clone(); + + std::thread::spawn(move || loop { + let msg = broadcast_to_follower.recv().expect("Main thread is dead"); + b.lock().unwrap().broadcast(msg); + }); + + for (sender, receiver, _addr) in listener { + let task_finished = task_finished.clone(); + let cs = cluster_size.clone(); + + let process_batch = bus.lock().unwrap().add_rx(); + + std::thread::spawn(move || { + Self::follower(sender, receiver, cs, process_batch, task_finished) + }); + } + } + + /// Allow a follower to chat with the scheduler + fn follower( + sender: ChannelSender, + receiver: ChannelReceiver, + cluster_size: Arc, + mut broadcast_to_follower: BusReader, + task_finished: Sender, + ) { + let size = cluster_size.fetch_add(1, atomic::Ordering::Relaxed) + 1; + info!("A new follower joined the cluster. {} members.", size); + + // send messages to the follower + std::thread::spawn(move || loop { + let msg = broadcast_to_follower.recv().expect("Main thread died"); + if sender.send(msg).is_err() { + // the follower died, the logging and cluster size update should be done + // in the other thread + break; + } + }); + + // receive messages from the follower + loop { + match receiver.recv() { + Err(_) => break, + Ok(msg) => match msg { + FollowerMsg::ReadyToCommit(id) => { + task_finished.send(id).expect("Can't reach the main thread") + } + FollowerMsg::RegisterNewTask(_) => todo!(), + }, + } + } + + // if we exited from the previous loop it means the follower is down and should + // be removed from the cluster + let size = cluster_size.fetch_sub(1, atomic::Ordering::Relaxed) - 1; + info!("A follower left the cluster. {} members.", size); + } + + pub fn starts_batch(&mut self, batch: Vec) { + assert!( + self.batch_id % 2 == 0, + "Tried to start processing a batch before commiting the previous one" + ); + self.batch_id += 1; + + self.broadcast_to_follower + .send(LeaderMsg::StartBatch { id: self.batch_id, batch }) + .expect("Can't reach the cluster"); + } + + pub fn commit(&mut self, consistency_level: Consistency) { + // if zero nodes needs to be sync we can commit right away and early exit + if consistency_level != Consistency::Zero { + // else, we wait till enough nodes are ready to commit + for (ready_to_commit, _id) in self + .task_ready_to_commit + .iter() + // we need to filter out the messages from the old batches + .filter(|id| *id == self.batch_id) + .enumerate() { - Ok(_) => (), - // if a node can't be joined we consider it as dead - Err(_) => dead_nodes.push(idx), - } - } + let cluster_size = self.cluster_size.load(atomic::Ordering::Relaxed); - // we do it from the end so the indices stays correct while removing elements - for dead_node in dead_nodes.into_iter().rev() { - let dead = self.active_followers.swap_remove(dead_node); - self.dead_followers.push(dead); - } - - Ok(()) - } - - pub fn commit(&mut self, consistency_level: Consistency) -> Result<(), Error> { - let mut dead_nodes = Vec::new(); - let mut ready_to_commit = 0; - // get the size of the cluster to compute what a quorum means - // it's mutable because if followers die we must remove them - // from the quorum - let mut cluster_size = self.active_followers.len(); - - // wait till enough nodes are ready to commit - for (idx, follower) in self.active_followers.iter_mut().enumerate() { - match consistency_level { - Consistency::Zero => break, - Consistency::One if ready_to_commit >= 1 => break, - Consistency::Two if ready_to_commit >= 2 => break, - Consistency::Quorum if ready_to_commit >= (cluster_size / 2) => break, - _ => (), - } - match follower.receiver.recv() { - Ok(FollowerMsg::ReadyToCommit(id)) if id == self.batch_id => ready_to_commit += 1, - Ok(FollowerMsg::RegisterNewTask(_)) => log::warn!("Missed a task"), - Ok(_) => (), - // if a node can't be joined we consider it as dead - Err(_) => { - dead_nodes.push(idx); - cluster_size -= 1 + match consistency_level { + Consistency::One if ready_to_commit >= 1 => break, + Consistency::Two if ready_to_commit >= 2 => break, + Consistency::Quorum if ready_to_commit >= (cluster_size / 2) => break, + _ => (), } } } - let dn = dead_nodes.clone(); - for (idx, follower) in - self.active_followers.iter_mut().enumerate().filter(|(i, _)| !dn.contains(i)) - { - match follower.sender.send(LeaderMsg::Commit(self.batch_id)) { - Ok(_) => (), - Err(_) => dead_nodes.push(idx), - } - } - - // we do it from the end so the indices stays correct while removing elements - for dead_node in dead_nodes.into_iter().rev() { - let dead = self.active_followers.swap_remove(dead_node); - self.dead_followers.push(dead); - } + self.broadcast_to_follower.send(LeaderMsg::Commit(self.batch_id)).unwrap(); self.batch_id += 1; - - Ok(()) } } diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index 6ea5bd1bf..ac595fe95 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -16,7 +16,7 @@ pub enum Error { SerdeJson(#[from] serde_json::Error), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum LeaderMsg { // Starts a new batch StartBatch { id: u32, batch: Vec },