From 3df58831c6afe8cf863fb7c069d3379c343eae19 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 21 Mar 2023 18:25:53 +0100 Subject: [PATCH] make the consistency configurable --- cluster/src/lib.rs | 19 ++++++++++++++++++- index-scheduler/src/batch.rs | 9 ++++----- index-scheduler/src/insta_snapshot.rs | 1 + index-scheduler/src/lib.rs | 10 ++++++++-- meilisearch/src/lib.rs | 1 + meilisearch/src/option.rs | 8 ++++++-- 6 files changed, 38 insertions(+), 10 deletions(-) diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index a62703776..94a58d2a8 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -1,4 +1,5 @@ use std::net::ToSocketAddrs; +use std::str::FromStr; use std::sync::{Arc, RwLock}; use batch::Batch; @@ -40,14 +41,30 @@ pub enum FollowerMsg { RegisterNewTask(KindWithContent), } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Default, Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub enum Consistency { + #[default] One, Two, Quorum, All, } +impl std::fmt::Display for Consistency { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let s = serde_json::to_string(self).unwrap(); + write!(f, "{s}") + } +} + +impl FromStr for Consistency { + type Err = serde_json::Error; + + fn from_str(s: &str) -> Result { + serde_json::from_str(s) + } +} + #[derive(Clone)] pub struct Follower { sender: ChannelSender, diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 51302bddf..18423f6f9 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -22,7 +22,6 @@ use std::ffi::OsStr; use std::fs::{self, File}; use std::io::BufWriter; -use cluster::Consistency; use crossbeam::utils::Backoff; use dump::{DumpWriter, IndexMetadata}; use log::{debug, error, info}; @@ -589,7 +588,7 @@ impl IndexScheduler { } match &self.cluster { - Some(Cluster::Leader(leader)) => leader.commit(Consistency::All), + Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level), Some(Cluster::Follower(follower)) => follower.ready_to_commit(), None => (), } @@ -639,7 +638,7 @@ impl IndexScheduler { } match &self.cluster { - Some(Cluster::Leader(leader)) => leader.commit(Consistency::All), + Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level), Some(Cluster::Follower(follower)) => follower.ready_to_commit(), None => (), } @@ -770,7 +769,7 @@ impl IndexScheduler { let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; match &self.cluster { - Some(Cluster::Leader(leader)) => leader.commit(Consistency::All), + Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level), Some(Cluster::Follower(follower)) => follower.ready_to_commit(), None => (), } @@ -875,7 +874,7 @@ impl IndexScheduler { } match &self.cluster { - Some(Cluster::Leader(leader)) => leader.commit(Consistency::All), + Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level), Some(Cluster::Follower(follower)) => follower.ready_to_commit(), None => (), } diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index 9323ace17..29a2da3dd 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -34,6 +34,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { auth_path: _, version_file_path: _, cluster: _, + consistency_level: _, test_breakpoint_sdr: _, planned_failures: _, run_loop_iteration: _, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index b16472d9b..5ff0193c3 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -40,7 +40,7 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use batch::Batch; -use cluster::{Follower, Leader}; +use cluster::{Consistency, Follower, Leader}; use dump::{KindDump, TaskDump, UpdateFile}; pub use error::Error; use file_store::FileStore; @@ -309,6 +309,8 @@ pub struct IndexScheduler { /// The role in the cluster pub(crate) cluster: Option, + /// The Consistency level used by the leader. Ignored if the node is not in a leader in cluster mode. + pub(crate) consistency_level: Consistency, // ================= test // The next entry is dedicated to the tests. @@ -376,6 +378,7 @@ impl IndexScheduler { auth_path: self.auth_path.clone(), version_file_path: self.version_file_path.clone(), cluster: self.cluster.clone(), + consistency_level: self.consistency_level, #[cfg(test)] test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), #[cfg(test)] @@ -391,6 +394,7 @@ impl IndexScheduler { pub fn new( options: IndexSchedulerOptions, cluster: Option, + consistency_level: Consistency, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, ) -> Result { @@ -451,6 +455,7 @@ impl IndexScheduler { auth_path: options.auth_path, version_file_path: options.version_file_path, cluster, + consistency_level, #[cfg(test)] test_breakpoint_sdr, @@ -1461,7 +1466,8 @@ mod tests { autobatching_enabled, }; - let index_scheduler = Self::new(options, None, sender, planned_failures).unwrap(); + let index_scheduler = + Self::new(options, None, Consistency::default(), sender, planned_failures).unwrap(); // To be 100% consistent between all test we're going to start the scheduler right now // and ensure it's in the expected starting state. diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index e06751310..a35538334 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -291,6 +291,7 @@ fn open_or_create_database_unchecked( index_count: DEFAULT_INDEX_COUNT, }, cluster, + opt.cluster_configuration.consistency, )?) }; diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index 8cda6d72d..027894b86 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -12,7 +12,7 @@ use std::{env, fmt, fs}; use byte_unit::{Byte, ByteError}; use clap::Parser; -use index_scheduler::ClusterMode; +use cluster::Consistency; use meilisearch_types::milli::update::IndexerConfig; use rustls::server::{ AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient, ServerSessionMemoryCache, @@ -390,7 +390,7 @@ impl Opt { #[cfg(all(not(debug_assertions), feature = "analytics"))] no_analytics, experimental_enable_metrics: enable_metrics_route, - cluster_configuration, + cluster_configuration: _, } = self; export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); @@ -533,6 +533,10 @@ pub struct ClusterOpts { #[clap(long)] #[serde(default)] pub leader: Option, + + #[clap(long, default_value_t)] + #[serde(default)] + pub consistency: Consistency, } impl TryFrom<&IndexerOpts> for IndexerConfig {