From 9112b26cd157bc479122cc37a85cc3d89a1ea892 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 22 Mar 2023 14:24:53 +0100 Subject: [PATCH] fix the consistency rules --- Cargo.lock | 1 + cluster/src/leader.rs | 41 +++++++++++++++++-------------------- cluster/src/lib.rs | 29 +++++++++++++++++++++----- index-scheduler/src/lib.rs | 8 +------- meilisearch-auth/Cargo.toml | 1 + meilisearch-auth/src/lib.rs | 10 +++++++-- meilisearch/src/lib.rs | 6 +++--- 7 files changed, 57 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 50f9211a1..e9ff46ff8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2621,6 +2621,7 @@ name = "meilisearch-auth" version = "1.1.0" dependencies = [ "base64 0.13.1", + "cluster", "enum-iterator", "hmac", "maplit", diff --git a/cluster/src/leader.rs b/cluster/src/leader.rs index 69fb1d53a..ca699329e 100644 --- a/cluster/src/leader.rs +++ b/cluster/src/leader.rs @@ -1,6 +1,7 @@ use std::net::ToSocketAddrs; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{atomic, Arc, Mutex, RwLock}; +use std::time::Duration; use bus::{Bus, BusReader}; use crossbeam::channel::{unbounded, Receiver, Sender}; @@ -182,31 +183,27 @@ impl Leader { let batch_id = self.batch_id.write().unwrap(); - // if zero nodes needs to be sync we can commit right away and early exit - if consistency_level != Consistency::One { - // else, we wait till enough nodes are ready to commit - for ready_to_commit in self - .task_ready_to_commit - .iter() - // we need to filter out the messages from the old batches - .filter(|id| *id == *batch_id) - .enumerate() - // we do a +2 because enumerate starts at 1 and we must includes ourselves in the count - .map(|(id, _)| id + 2) - { - // TODO: if the last node dies we're stuck on the iterator + let mut nodes_ready_to_commit = 1; - // we need to reload the cluster size everytime in case a node dies - let size = self.active_followers.load(atomic::Ordering::Relaxed); + loop { + let size = self.active_followers.load(atomic::Ordering::Relaxed); - info!("{ready_to_commit} nodes are ready to commit for a cluster size of {size}"); - match consistency_level { - Consistency::Two if ready_to_commit >= 1 => break, - Consistency::Quorum if ready_to_commit >= (size / 2) => break, - Consistency::All if ready_to_commit == size => break, - _ => (), - } + info!("{nodes_ready_to_commit} nodes are ready to commit for a cluster size of {size}"); + let all = nodes_ready_to_commit == size; + + match consistency_level { + Consistency::One if nodes_ready_to_commit >= 1 || all => break, + Consistency::Two if nodes_ready_to_commit >= 2 || all => break, + Consistency::Quorum if nodes_ready_to_commit >= (size / 2) || all => break, + Consistency::All if all => break, + _ => (), } + + // we can't wait forever here because the cluster size might get updated while we wait if a node dies + match self.task_ready_to_commit.recv_timeout(Duration::new(1, 0)) { + Ok(id) if id == *batch_id => nodes_ready_to_commit += 1, + _ => continue, + }; } info!("Tells all the follower to commit"); diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index 94a58d2a8..b5947cdcf 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -42,29 +42,48 @@ pub enum FollowerMsg { } #[derive(Default, Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] pub enum Consistency { - #[default] One, Two, Quorum, + #[default] All, } impl std::fmt::Display for Consistency { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let s = serde_json::to_string(self).unwrap(); - write!(f, "{s}") + match self { + Consistency::One => write!(f, "one"), + Consistency::Two => write!(f, "two"), + Consistency::Quorum => write!(f, "quorum"), + Consistency::All => write!(f, "all"), + } } } impl FromStr for Consistency { - type Err = serde_json::Error; + type Err = String; fn from_str(s: &str) -> Result { - serde_json::from_str(s) + match s { + "one" => Ok(Consistency::One), + "two" => Ok(Consistency::Two), + "quorum" => Ok(Consistency::Quorum), + "all" => Ok(Consistency::All), + s => Err(format!( + "Unexpected value `{s}`, expected one of `one`, `two`, `quorum`, `all`" + )), + } } } +#[derive(Clone)] +pub enum Cluster { + Leader(Leader), + Follower(Follower), +} + #[derive(Clone)] pub struct Follower { sender: ChannelSender, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 5ff0193c3..52134bdae 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -40,7 +40,7 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use batch::Batch; -use cluster::{Consistency, Follower, Leader}; +use cluster::{Cluster, Consistency}; use dump::{KindDump, TaskDump, UpdateFile}; pub use error::Error; use file_store::FileStore; @@ -349,12 +349,6 @@ impl std::str::FromStr for ClusterMode { } } -#[derive(Clone)] -pub enum Cluster { - Leader(Leader), - Follower(Follower), -} - impl IndexScheduler { fn private_clone(&self) -> IndexScheduler { IndexScheduler { diff --git a/meilisearch-auth/Cargo.toml b/meilisearch-auth/Cargo.toml index 9a00140fa..964486d75 100644 --- a/meilisearch-auth/Cargo.toml +++ b/meilisearch-auth/Cargo.toml @@ -12,6 +12,7 @@ license.workspace = true [dependencies] base64 = "0.13.1" +cluster = { path = "../cluster" } enum-iterator = "1.1.3" hmac = "0.12.1" maplit = "1.0.2" diff --git a/meilisearch-auth/src/lib.rs b/meilisearch-auth/src/lib.rs index 8d5457766..0872503ef 100644 --- a/meilisearch-auth/src/lib.rs +++ b/meilisearch-auth/src/lib.rs @@ -6,6 +6,7 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Arc; +use cluster::Cluster; use error::{AuthControllerError, Result}; use maplit::hashset; use meilisearch_types::index_uid_pattern::IndexUidPattern; @@ -21,17 +22,22 @@ use uuid::Uuid; pub struct AuthController { store: Arc, master_key: Option, + cluster: Option, } impl AuthController { - pub fn new(db_path: impl AsRef, master_key: &Option) -> Result { + pub fn new( + db_path: impl AsRef, + master_key: &Option, + cluster: Option, + ) -> Result { let store = HeedAuthStore::new(db_path)?; if store.is_empty()? { generate_default_keys(&store)?; } - Ok(Self { store: Arc::new(store), master_key: master_key.clone() }) + Ok(Self { store: Arc::new(store), master_key: master_key.clone(), cluster }) } /// Return the size of the `AuthController` database in bytes. diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index a35538334..292f1c589 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -26,11 +26,11 @@ use actix_web::web::Data; use actix_web::{web, HttpRequest}; use analytics::Analytics; use anyhow::bail; -use cluster::{Follower, Leader}; +use cluster::{Cluster, Follower, Leader}; use error::PayloadError; use extractors::payload::PayloadConfig; use http::header::CONTENT_TYPE; -use index_scheduler::{Cluster, IndexScheduler, IndexSchedulerOptions}; +use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use log::{error, info}; use meilisearch_auth::AuthController; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; @@ -270,7 +270,7 @@ fn open_or_create_database_unchecked( ) -> anyhow::Result<(IndexScheduler, AuthController)> { // we don't want to create anything in the data.ms yet, thus we // wrap our two builders in a closure that'll be executed later. - let auth_controller = AuthController::new(&opt.db_path, &opt.master_key); + let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, cluster.clone()); let index_scheduler_builder = || -> anyhow::Result<_> { Ok(IndexScheduler::new(