diff --git a/cluster/src/leader.rs b/cluster/src/leader.rs index 58fdb8041..2200d1bb7 100644 --- a/cluster/src/leader.rs +++ b/cluster/src/leader.rs @@ -7,11 +7,13 @@ use bus::{Bus, BusReader}; use crossbeam::channel::{unbounded, Receiver, Sender}; use ductile::{ChannelReceiver, ChannelSender, ChannelServer}; use log::info; +use meilisearch_types::keys::Key; use meilisearch_types::tasks::Task; use synchronoise::SignalEvent; +use uuid::Uuid; use crate::batch::Batch; -use crate::{Consistency, FollowerMsg, LeaderMsg}; +use crate::{ApiKeyOperation, Consistency, FollowerMsg, LeaderMsg}; #[derive(Clone)] pub struct Leader { @@ -157,12 +159,15 @@ impl Leader { info!("A follower left the cluster. {} members.", size); } + // ============= Everything related to the setup of the cluster pub fn join_me(&self, dump: Vec) { self.broadcast_to_follower .send(LeaderMsg::JoinFromDump(dump)) .expect("Lost the link with the followers"); } + // ============= Everything related to the scheduler + pub fn starts_batch(&self, batch: Batch) { let mut batch_id = self.batch_id.write().unwrap(); @@ -195,7 +200,7 @@ impl Leader { _ => (), } - // we can't wait forever here because the cluster size might get updated while we wait if a node dies + // we can't wait forever here because if a node dies the cluster size might get updated while we're stuck match self.task_ready_to_commit.recv_timeout(Duration::new(1, 0)) { Ok(id) if id == *batch_id => nodes_ready_to_commit += 1, _ => continue, @@ -213,4 +218,18 @@ impl Leader { .send(LeaderMsg::RegisterNewTask { task, update_file }) .expect("Main thread is dead"); } + + // ============= Everything related to the api-keys + + pub fn insert_key(&self, key: Key) { + self.broadcast_to_follower + .send(LeaderMsg::ApiKeyOperation(ApiKeyOperation::Insert(key))) + .unwrap() + } + + pub fn delete_key(&self, uuid: Uuid) { + self.broadcast_to_follower + .send(LeaderMsg::ApiKeyOperation(ApiKeyOperation::Delete(uuid))) + .unwrap() + } } diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index b5947cdcf..724b2d5ef 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -6,6 +6,7 @@ use batch::Batch; use crossbeam::channel::{unbounded, Receiver, Sender}; use ductile::{connect_channel, ChannelReceiver, ChannelSender}; use log::info; +use meilisearch_types::keys::Key; use meilisearch_types::tasks::{KindWithContent, Task}; use serde::{Deserialize, Serialize}; @@ -13,6 +14,7 @@ pub mod batch; mod leader; pub use leader::Leader; +use uuid::Uuid; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -24,14 +26,17 @@ pub enum Error { #[derive(Debug, Clone, Serialize, Deserialize)] pub enum LeaderMsg { - // A dump to join the cluster + /// A dump to join the cluster JoinFromDump(Vec), - // Starts a new batch + /// Starts a new batch StartBatch { id: u32, batch: Batch }, - // Tell the follower to commit the update asap + /// Tell the follower to commit the update asap Commit(u32), - // Tell the follower to commit the update asap + /// Tell the follower to commit the update asap RegisterNewTask { task: Task, update_file: Option> }, + + /// Tell the follower to commit the update asap + ApiKeyOperation(ApiKeyOperation), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -51,6 +56,12 @@ pub enum Consistency { All, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum ApiKeyOperation { + Insert(Key), + Delete(Uuid), +} + impl std::fmt::Display for Consistency { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -92,6 +103,8 @@ pub struct Follower { must_commit: Receiver, register_new_task: Receiver<(Task, Option>)>, + api_key_op: Receiver, + batch_id: Arc>, } @@ -112,9 +125,16 @@ impl Follower { let (get_batch_sender, get_batch_receiver) = unbounded(); let (must_commit_sender, must_commit_receiver) = unbounded(); let (register_task_sender, register_task_receiver) = unbounded(); + let (create_api_key_sender, create_api_key_receiver) = unbounded(); std::thread::spawn(move || { - Self::router(receiver, get_batch_sender, must_commit_sender, register_task_sender); + Self::router( + receiver, + get_batch_sender, + must_commit_sender, + register_task_sender, + create_api_key_sender, + ); }); ( @@ -123,6 +143,7 @@ impl Follower { get_batch: get_batch_receiver, must_commit: must_commit_receiver, register_new_task: register_task_receiver, + api_key_op: create_api_key_receiver, batch_id: Arc::default(), }, dump, @@ -134,6 +155,7 @@ impl Follower { get_batch: Sender<(u32, Batch)>, must_commit: Sender, register_new_task: Sender<(Task, Option>)>, + api_key_op: Sender, ) { loop { match receiver.recv().expect("Lost connection to the leader") { @@ -154,6 +176,9 @@ impl Follower { .send((task, update_file)) .expect("Lost connection to the main thread") } + LeaderMsg::ApiKeyOperation(key) => { + api_key_op.send(key).expect("Lost connection to the main thread") + } } } } @@ -185,6 +210,11 @@ impl Follower { } pub fn get_new_task(&self) -> (Task, Option>) { - self.register_new_task.recv().unwrap() + self.register_new_task.recv().expect("Lost connection to the leader") + } + + pub fn api_key_operation(&self) -> ApiKeyOperation { + info!("Creating a new api key"); + self.api_key_op.recv().expect("Lost connection to the leader") } } diff --git a/meilisearch-auth/src/lib.rs b/meilisearch-auth/src/lib.rs index 0872503ef..66f778bab 100644 --- a/meilisearch-auth/src/lib.rs +++ b/meilisearch-auth/src/lib.rs @@ -22,6 +22,7 @@ use uuid::Uuid; pub struct AuthController { store: Arc, master_key: Option, + cluster: Option, } @@ -37,7 +38,28 @@ impl AuthController { generate_default_keys(&store)?; } - Ok(Self { store: Arc::new(store), master_key: master_key.clone(), cluster }) + let this = Self { + store: Arc::new(store), + master_key: master_key.clone(), + cluster: cluster.clone(), + }; + + if let Some(Cluster::Follower(follower)) = cluster { + let this = this.clone(); + + std::thread::spawn(move || loop { + match follower.api_key_operation() { + cluster::ApiKeyOperation::Insert(key) => { + this.store.put_api_key(key).expect("Inconsistency with the leader"); + } + cluster::ApiKeyOperation::Delete(uuid) => { + this.store.delete_api_key(uuid).expect("Inconsistency with the leader"); + } + } + }); + } + + Ok(this) } /// Return the size of the `AuthController` database in bytes. @@ -48,7 +70,13 @@ impl AuthController { pub fn create_key(&self, create_key: CreateApiKey) -> Result { match self.store.get_api_key(create_key.uid)? { Some(_) => Err(AuthControllerError::ApiKeyAlreadyExists(create_key.uid.to_string())), - None => self.store.put_api_key(create_key.to_key()), + None => { + let key = self.store.put_api_key(create_key.to_key())?; + if let Some(Cluster::Leader(ref leader)) = self.cluster { + leader.insert_key(key.clone()); + } + Ok(key) + } } } @@ -63,7 +91,12 @@ impl AuthController { name => key.name = name.set(), }; key.updated_at = OffsetDateTime::now_utc(); - self.store.put_api_key(key) + + let key = self.store.put_api_key(key)?; + if let Some(Cluster::Leader(ref leader)) = self.cluster { + leader.insert_key(key.clone()); + } + Ok(key) } pub fn get_key(&self, uid: Uuid) -> Result { @@ -106,6 +139,9 @@ impl AuthController { pub fn delete_key(&self, uid: Uuid) -> Result<()> { if self.store.delete_api_key(uid)? { + if let Some(Cluster::Leader(ref leader)) = self.cluster { + leader.delete_key(uid); + } Ok(()) } else { Err(AuthControllerError::ApiKeyNotFound(uid.to_string()))