From c4c1240ab8971627533021d74935de165d3df84a Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 14 Mar 2023 17:38:21 +0100 Subject: [PATCH] start distributing meilisearch --- Cargo.lock | 85 +++++++++++++++++++++++---- Cargo.toml | 1 + cluster/Cargo.toml | 20 +++++++ cluster/src/leader.rs | 111 +++++++++++++++++++++++++++++++++++ cluster/src/lib.rs | 79 +++++++++++++++++++++++++ index-scheduler/Cargo.toml | 1 + index-scheduler/src/batch.rs | 1 + index-scheduler/src/lib.rs | 19 ++++++ 8 files changed, 304 insertions(+), 13 deletions(-) create mode 100644 cluster/Cargo.toml create mode 100644 cluster/src/leader.rs create mode 100644 cluster/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 853d1a896..d6bc7830e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -252,7 +252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8" dependencies = [ "cfg-if", - "cipher", + "cipher 0.3.0", "cpufeatures", "opaque-debug", ] @@ -641,6 +641,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chacha20" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7fc89c7c5b9e7a02dfe45cd2367bae382f9ed31c61ca8debe5f827c420a2f08" +dependencies = [ + "cfg-if", + "cipher 0.4.4", + "cpufeatures", +] + [[package]] name = "change-detection" version = "1.2.0" @@ -712,6 +723,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clap" version = "3.2.23" @@ -770,6 +791,19 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "cluster" +version = "1.1.0" +dependencies = [ + "ductile", + "log", + "meilisearch-types", + "roaring", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "concat-arrays" version = "0.1.2" @@ -1148,6 +1182,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "ductile" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12cde25956886749c891a27249630ae99471f1ba05c4a924aad1a6ffe6932812" +dependencies = [ + "anyhow", + "bincode", + "chacha20", + "crossbeam-channel", + "log", + "rand", + "serde", +] + [[package]] name = "dump" version = "1.1.0" @@ -1175,9 +1224,9 @@ dependencies = [ [[package]] name = "either" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" +checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" dependencies = [ "serde", ] @@ -1895,6 +1944,7 @@ dependencies = [ "anyhow", "big_s", "bincode", + "cluster", "crossbeam", "csv", "derive_builder", @@ -1929,6 +1979,15 @@ dependencies = [ "serde", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "generic-array", +] + [[package]] name = "insta" version = "1.26.0" @@ -3499,9 +3558,9 @@ checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a" [[package]] name = "serde" -version = "1.0.152" +version = "1.0.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +checksum = "71f2b4817415c6d4210bfe1c7bfcf4801b2d904cb4d0e1a8fdb651013c9e86b8" dependencies = [ "serde_derive", ] @@ -3517,9 +3576,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.152" +version = "1.0.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" +checksum = "d071a94a3fac4aff69d023a7f411e33f40f3483f8c5190b1953822b6b76d7630" dependencies = [ "proc-macro2", "quote", @@ -3528,9 +3587,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.91" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883" +checksum = "1c533a59c9d8a93a09c6ab31f0fd5e5f4dd1b8fc9434804029839884765d04ea" dependencies = [ "indexmap", "itoa 1.0.5", @@ -3816,18 +3875,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.38" +version = "1.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0" +checksum = "a5ab016db510546d856297882807df8da66a16fb8c4101cb8b30054b0d5b2d9c" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.38" +version = "1.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" +checksum = "5420d42e90af0c38c3290abcca25b9b3bdf379fc9f55c528f53a269d9c9a267e" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index b1f475410..a140a57a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "dump", "file-store", "permissive-json-pointer", + "cluster", "milli", "filter-parser", "flatten-serde-json", diff --git a/cluster/Cargo.toml b/cluster/Cargo.toml new file mode 100644 index 000000000..189110d29 --- /dev/null +++ b/cluster/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "cluster" +publish = false + +version.workspace = true +authors.workspace = true +description.workspace = true +homepage.workspace = true +readme.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +ductile = "0.3.0" +serde = { version = "1.0.155", features = ["derive"] } +serde_json = "1.0.94" +thiserror = "1.0.39" +meilisearch-types = { path = "../meilisearch-types" } +roaring = "0.10.1" +log = "0.4.17" diff --git a/cluster/src/leader.rs b/cluster/src/leader.rs new file mode 100644 index 000000000..9df62c4a1 --- /dev/null +++ b/cluster/src/leader.rs @@ -0,0 +1,111 @@ +use std::net::ToSocketAddrs; +use std::time::Duration; + +use ductile::{ChannelReceiver, ChannelSender, ChannelServer}; +use serde::de::DeserializeOwned; +use serde::Serialize; + +use crate::{Consistency, Error, FollowerMsg, LeaderMsg}; + +pub struct Leader { + listener: ChannelServer, + active_followers: Vec, + new_followers: Vec, + dead_followers: Vec, + + batch_id: u32, + tick: Duration, +} + +struct Follower { + sender: ChannelSender, + receiver: ChannelReceiver, +} + +impl Leader { + pub fn new(listen_on: impl ToSocketAddrs) -> Leader { + let listener = ChannelServer::bind(listen_on).unwrap(); + + Leader { + listener, + active_followers: Vec::new(), + new_followers: Vec::new(), + dead_followers: Vec::new(), + batch_id: 0, + tick: Duration::new(1, 0), + } + } + + pub fn starts_batch(&mut self, batch: Vec) -> Result<(), Error> { + let mut dead_nodes = Vec::new(); + + for (idx, follower) in self.active_followers.iter_mut().enumerate() { + match follower + .sender + .send(LeaderMsg::StartBatch { id: self.batch_id, batch: batch.clone() }) + { + Ok(_) => (), + // if a node can't be joined we consider it as dead + Err(_) => dead_nodes.push(idx), + } + } + + // we do it from the end so the indices stays correct while removing elements + for dead_node in dead_nodes.into_iter().rev() { + let dead = self.active_followers.swap_remove(dead_node); + self.dead_followers.push(dead); + } + + Ok(()) + } + + pub fn commit(&mut self, consistency_level: Consistency) -> Result<(), Error> { + let mut dead_nodes = Vec::new(); + let mut ready_to_commit = 0; + // get the size of the cluster to compute what a quorum means + // it's mutable because if followers die we must remove them + // from the quorum + let mut cluster_size = self.active_followers.len(); + + // wait till enough nodes are ready to commit + for (idx, follower) in self.active_followers.iter_mut().enumerate() { + match consistency_level { + Consistency::Zero => break, + Consistency::One if ready_to_commit >= 1 => break, + Consistency::Two if ready_to_commit >= 2 => break, + Consistency::Quorum if ready_to_commit >= (cluster_size / 2) => break, + _ => (), + } + match follower.receiver.recv() { + Ok(FollowerMsg::ReadyToCommit(id)) if id == self.batch_id => ready_to_commit += 1, + Ok(FollowerMsg::RegisterNewTask(_)) => log::warn!("Missed a task"), + Ok(_) => (), + // if a node can't be joined we consider it as dead + Err(_) => { + dead_nodes.push(idx); + cluster_size -= 1 + } + } + } + + let dn = dead_nodes.clone(); + for (idx, follower) in + self.active_followers.iter_mut().enumerate().filter(|(i, _)| !dn.contains(i)) + { + match follower.sender.send(LeaderMsg::Commit(self.batch_id)) { + Ok(_) => (), + Err(_) => dead_nodes.push(idx), + } + } + + // we do it from the end so the indices stays correct while removing elements + for dead_node in dead_nodes.into_iter().rev() { + let dead = self.active_followers.swap_remove(dead_node); + self.dead_followers.push(dead); + } + + self.batch_id += 1; + + Ok(()) + } +} diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs new file mode 100644 index 000000000..6ea5bd1bf --- /dev/null +++ b/cluster/src/lib.rs @@ -0,0 +1,79 @@ +use std::net::ToSocketAddrs; + +use ductile::{connect_channel, ChannelReceiver, ChannelSender}; +use meilisearch_types::tasks::KindWithContent; +use serde::{Deserialize, Serialize}; + +mod leader; + +pub use leader::Leader; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Network issue occured")] + NetworkIssue, + #[error("Internal error: {0}")] + SerdeJson(#[from] serde_json::Error), +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum LeaderMsg { + // Starts a new batch + StartBatch { id: u32, batch: Vec }, + // Tell the follower to commit the update asap + Commit(u32), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum FollowerMsg { + // Let the leader knows you're ready to commit + ReadyToCommit(u32), + RegisterNewTask(KindWithContent), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Consistency { + Zero, + One, + Two, + Quorum, + All, +} + +pub struct Follower { + sender: ChannelSender, + receiver: ChannelReceiver, + batch_id: u32, +} + +impl Follower { + pub fn join(leader: impl ToSocketAddrs) -> Follower { + let (sender, receiver) = connect_channel(leader).unwrap(); + Follower { sender, receiver, batch_id: 0 } + } + + pub fn get_new_batch(&mut self) -> Vec { + loop { + match self.receiver.recv() { + Ok(LeaderMsg::StartBatch { id, batch }) if id == self.batch_id => { + self.batch_id = id; + break batch; + } + Err(_) => log::error!("lost connection to the leader"), + _ => (), + } + } + } + + pub fn ready_to_commit(&mut self) { + self.sender.send(FollowerMsg::ReadyToCommit(self.batch_id)).unwrap(); + + loop { + match self.receiver.recv() { + Ok(LeaderMsg::Commit(id)) if id == self.batch_id => break, + Err(_) => panic!("lost connection to the leader"), + _ => (), + } + } + } +} diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index 99dfaa493..bea3c7d63 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -13,6 +13,7 @@ license.workspace = true [dependencies] anyhow = "1.0.64" bincode = "1.3.3" +cluster = { path = "../cluster" } csv = "1.1.6" derive_builder = "0.11.2" dump = { path = "../dump" } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 66c516d9b..c103bccc5 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -840,6 +840,7 @@ impl IndexScheduler { let mut index_wtxn = index.write_txn()?; let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; + // TODO cluster: ready to commit index_wtxn.commit()?; Ok(tasks) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index e23e4ff8b..19b402bbc 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -38,6 +38,7 @@ use std::sync::atomic::Ordering::Relaxed; use std::sync::{Arc, RwLock}; use std::time::Duration; +use cluster::{Consistency, Follower, Leader}; use dump::{KindDump, TaskDump, UpdateFile}; pub use error::Error; use file_store::FileStore; @@ -302,6 +303,9 @@ pub struct IndexScheduler { /// The path to the version file of Meilisearch. pub(crate) version_file_path: PathBuf, + /// The role in the cluster + pub(crate) cluster: Option, + // ================= test // The next entry is dedicated to the tests. /// Provide a way to set a breakpoint in multiple part of the scheduler. @@ -321,6 +325,11 @@ pub struct IndexScheduler { run_loop_iteration: Arc>, } +enum Cluster { + Leader(Leader), + Follower(Follower), +} + impl IndexScheduler { fn private_clone(&self) -> IndexScheduler { IndexScheduler { @@ -343,6 +352,7 @@ impl IndexScheduler { dumps_path: self.dumps_path.clone(), auth_path: self.auth_path.clone(), version_file_path: self.version_file_path.clone(), + cluster: None, #[cfg(test)] test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), #[cfg(test)] @@ -416,6 +426,7 @@ impl IndexScheduler { snapshots_path: options.snapshots_path, auth_path: options.auth_path, version_file_path: options.version_file_path, + cluster: None, #[cfg(test)] test_breakpoint_sdr, @@ -1050,6 +1061,9 @@ impl IndexScheduler { self.breakpoint(Breakpoint::Start); } + // TODO cluster: If + // - I'm a leader => create the batch and send it to everyone + // - I'm a follower => get the batch from the leader and gather the tasks from my task queue let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let batch = match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { @@ -1059,6 +1073,8 @@ impl IndexScheduler { let index_uid = batch.index_uid().map(ToOwned::to_owned); drop(rtxn); + // TODO cluster: Should we send the starting date as well so everyone is in sync? + // 1. store the starting date with the bitmap of processing tasks. let mut ids = batch.ids(); ids.sort_unstable(); @@ -1073,6 +1089,9 @@ impl IndexScheduler { #[cfg(test)] self.breakpoint(Breakpoint::BatchCreated); + // TODO cluster: Inside the processing of the tasks we need to check if we should commit + // the batch or not + // 2. Process the tasks let res = { let cloned_index_scheduler = self.private_clone();