diff --git a/Cargo.lock b/Cargo.lock index a7cdd16e8..50f9211a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -814,6 +814,7 @@ dependencies = [ "roaring", "serde", "serde_json", + "synchronoise", "thiserror", "time", "uuid 1.3.0", diff --git a/cluster/Cargo.toml b/cluster/Cargo.toml index 529c4212b..75956d2d9 100644 --- a/cluster/Cargo.toml +++ b/cluster/Cargo.toml @@ -22,3 +22,4 @@ crossbeam = "0.8.2" bus = "2.3.0" time = "0.3.20" uuid = { version = "1.3.0", features = ["v4"] } +synchronoise = "1.0.1" diff --git a/cluster/src/leader.rs b/cluster/src/leader.rs index b91d27688..69fb1d53a 100644 --- a/cluster/src/leader.rs +++ b/cluster/src/leader.rs @@ -7,44 +7,62 @@ use crossbeam::channel::{unbounded, Receiver, Sender}; use ductile::{ChannelReceiver, ChannelSender, ChannelServer}; use log::info; use meilisearch_types::tasks::Task; +use synchronoise::SignalEvent; use crate::batch::Batch; use crate::{Consistency, FollowerMsg, LeaderMsg}; -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct Leader { task_ready_to_commit: Receiver, broadcast_to_follower: Sender, - cluster_size: Arc, + pub wake_up: Arc, + + new_followers: Arc, + active_followers: Arc, batch_id: Arc>, } impl Leader { pub fn new(listen_on: impl ToSocketAddrs + Send + 'static) -> Leader { - let cluster_size = Arc::new(AtomicUsize::new(1)); - let (process_batch_sender, process_batch_receiver) = unbounded(); + let new_followers = Arc::new(AtomicUsize::new(0)); + let active_followers = Arc::new(AtomicUsize::new(1)); + let wake_up = Arc::new(SignalEvent::auto(true)); + let (broadcast_to_follower, process_batch_receiver) = unbounded(); let (task_finished_sender, task_finished_receiver) = unbounded(); - let cs = cluster_size.clone(); + let nf = new_followers.clone(); + let af = active_followers.clone(); + let wu = wake_up.clone(); std::thread::spawn(move || { - Self::listener(listen_on, cs, process_batch_receiver, task_finished_sender) + Self::listener(listen_on, nf, af, wu, process_batch_receiver, task_finished_sender) }); Leader { task_ready_to_commit: task_finished_receiver, - broadcast_to_follower: process_batch_sender, - cluster_size, + broadcast_to_follower, + + wake_up, + + new_followers, + active_followers, batch_id: Arc::default(), } } + pub fn has_new_followers(&self) -> bool { + self.new_followers.load(Ordering::Relaxed) != 0 + } + /// Takes all the necessary channels to chat with the scheduler and give them /// to each new followers fn listener( listen_on: impl ToSocketAddrs, - cluster_size: Arc, + new_followers: Arc, + active_followers: Arc, + wake_up: Arc, broadcast_to_follower: Receiver, task_finished: Sender, ) { @@ -63,12 +81,14 @@ impl Leader { for (sender, receiver, _addr) in listener { let task_finished = task_finished.clone(); - let cs = cluster_size.clone(); + let nf = new_followers.clone(); + let af = active_followers.clone(); + let wu = wake_up.clone(); let process_batch = bus.lock().unwrap().add_rx(); std::thread::spawn(move || { - Self::follower(sender, receiver, cs, process_batch, task_finished) + Self::follower(sender, receiver, nf, af, wu, process_batch, task_finished) }); } } @@ -77,20 +97,43 @@ impl Leader { fn follower( sender: ChannelSender, receiver: ChannelReceiver, - cluster_size: Arc, + new_followers: Arc, + active_followers: Arc, + wake_up: Arc, mut broadcast_to_follower: BusReader, task_finished: Sender, ) { - let size = cluster_size.fetch_add(1, atomic::Ordering::Relaxed) + 1; + let size = new_followers.fetch_add(1, Ordering::Relaxed) + 1; + wake_up.signal(); info!("A new follower joined the cluster. {} members.", size); + loop { + if let msg @ LeaderMsg::JoinFromDump(_) = + broadcast_to_follower.recv().expect("Main thread died") + { + // we exit the new_follower state and become an active follower even though + // the dump will takes some time to index + new_followers.fetch_sub(1, Ordering::Relaxed); + let size = active_followers.fetch_add(1, Ordering::Relaxed) + 1; + info!("A new follower became active. {} active members.", size); + + sender.send(msg).unwrap(); + break; + } + } + // send messages to the follower std::thread::spawn(move || loop { let msg = broadcast_to_follower.recv().expect("Main thread died"); - if sender.send(msg).is_err() { - // the follower died, the logging and cluster size update should be done - // in the other thread - break; + match msg { + LeaderMsg::JoinFromDump(_) => (), + msg => { + if sender.send(msg).is_err() { + // the follower died, the logging and cluster size update should be done + // in the other thread + break; + } + } } }); @@ -109,10 +152,20 @@ impl Leader { // if we exited from the previous loop it means the follower is down and should // be removed from the cluster - let size = cluster_size.fetch_sub(1, atomic::Ordering::Relaxed) - 1; + let size = active_followers.fetch_sub(1, atomic::Ordering::Relaxed) - 1; info!("A follower left the cluster. {} members.", size); } + pub fn wake_up(&self) { + todo!() + } + + pub fn join_me(&self, dump: Vec) { + self.broadcast_to_follower + .send(LeaderMsg::JoinFromDump(dump)) + .expect("Lost the link with the followers"); + } + pub fn starts_batch(&self, batch: Batch) { let mut batch_id = self.batch_id.write().unwrap(); @@ -127,7 +180,7 @@ impl Leader { pub fn commit(&self, consistency_level: Consistency) { info!("Wait until enough followers are ready to commit a batch"); - let mut batch_id = self.batch_id.write().unwrap(); + let batch_id = self.batch_id.write().unwrap(); // if zero nodes needs to be sync we can commit right away and early exit if consistency_level != Consistency::One { @@ -144,13 +197,13 @@ impl Leader { // TODO: if the last node dies we're stuck on the iterator // we need to reload the cluster size everytime in case a node dies - let cluster_size = self.cluster_size.load(atomic::Ordering::Relaxed); + let size = self.active_followers.load(atomic::Ordering::Relaxed); - info!("{ready_to_commit} nodes are ready to commit for a cluster size of {cluster_size}"); + info!("{ready_to_commit} nodes are ready to commit for a cluster size of {size}"); match consistency_level { Consistency::Two if ready_to_commit >= 1 => break, - Consistency::Quorum if ready_to_commit >= (cluster_size / 2) => break, - Consistency::All if ready_to_commit == cluster_size => break, + Consistency::Quorum if ready_to_commit >= (size / 2) => break, + Consistency::All if ready_to_commit == size => break, _ => (), } } diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index 8e55f2080..a62703776 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -23,6 +23,8 @@ pub enum Error { #[derive(Debug, Clone, Serialize, Deserialize)] pub enum LeaderMsg { + // A dump to join the cluster + JoinFromDump(Vec), // Starts a new batch StartBatch { id: u32, batch: Batch }, // Tell the follower to commit the update asap @@ -58,11 +60,19 @@ pub struct Follower { } impl Follower { - pub fn join(leader: impl ToSocketAddrs) -> Follower { + pub fn join(leader: impl ToSocketAddrs) -> (Follower, Vec) { let (sender, receiver) = connect_channel(leader).unwrap(); info!("Connection to the leader established"); + info!("Waiting for the leader to contact us"); + let state = receiver.recv().unwrap(); + + let dump = match state { + LeaderMsg::JoinFromDump(dump) => dump, + msg => panic!("Received unexpected message {msg:?}"), + }; + let (get_batch_sender, get_batch_receiver) = unbounded(); let (must_commit_sender, must_commit_receiver) = unbounded(); let (register_task_sender, register_task_receiver) = unbounded(); @@ -71,13 +81,16 @@ impl Follower { Self::router(receiver, get_batch_sender, must_commit_sender, register_task_sender); }); - Follower { - sender, - get_batch: get_batch_receiver, - must_commit: must_commit_receiver, - register_new_task: register_task_receiver, - batch_id: Arc::default(), - } + ( + Follower { + sender, + get_batch: get_batch_receiver, + must_commit: must_commit_receiver, + register_new_task: register_task_receiver, + batch_id: Arc::default(), + }, + dump, + ) } fn router( @@ -88,6 +101,9 @@ impl Follower { ) { loop { match receiver.recv().expect("Lost connection to the leader") { + LeaderMsg::JoinFromDump(_) => { + panic!("Received a join from dump msg but I’m already running") + } LeaderMsg::StartBatch { id, batch } => { info!("Starting to process a new batch"); get_batch.send((id, batch)).expect("Lost connection to the main thread") diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index d08290c17..51302bddf 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -24,7 +24,7 @@ use std::io::BufWriter; use cluster::Consistency; use crossbeam::utils::Backoff; -use dump::IndexMetadata; +use dump::{DumpWriter, IndexMetadata}; use log::{debug, error, info}; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; @@ -738,96 +738,9 @@ impl IndexScheduler { Ok(tasks) } Batch::Dump(mut task) => { + // TODO: It would be better to use the started_at from the task instead of generating a new one let started_at = OffsetDateTime::now_utc(); - let (keys, instance_uid) = - if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind { - (keys, instance_uid) - } else { - unreachable!(); - }; - let dump = dump::DumpWriter::new(*instance_uid)?; - - // 1. dump the keys - let mut dump_keys = dump.create_keys()?; - for key in keys { - dump_keys.push_key(key)?; - } - dump_keys.flush()?; - - let rtxn = self.env.read_txn()?; - - // 2. dump the tasks - let mut dump_tasks = dump.create_tasks_queue()?; - for ret in self.all_tasks.iter(&rtxn)? { - let (_, mut t) = ret?; - let status = t.status; - let content_file = t.content_uuid(); - - // In the case we're dumping ourselves we want to be marked as finished - // to not loop over ourselves indefinitely. - if t.uid == task.uid { - let finished_at = OffsetDateTime::now_utc(); - - // We're going to fake the date because we don't know if everything is going to go well. - // But we need to dump the task as finished and successful. - // If something fail everything will be set appropriately in the end. - t.status = Status::Succeeded; - t.started_at = Some(started_at); - t.finished_at = Some(finished_at); - } - let mut dump_content_file = dump_tasks.push_task(&t.into())?; - - // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. - if let Some(content_file) = content_file { - if status == Status::Enqueued { - let content_file = self.file_store.get_update(content_file)?; - - let reader = DocumentsBatchReader::from_reader(content_file) - .map_err(milli::Error::from)?; - - let (mut cursor, documents_batch_index) = - reader.into_cursor_and_fields_index(); - - while let Some(doc) = - cursor.next_document().map_err(milli::Error::from)? - { - dump_content_file.push_document(&obkv_to_object( - &doc, - &documents_batch_index, - )?)?; - } - dump_content_file.flush()?; - } - } - } - dump_tasks.flush()?; - - // 3. Dump the indexes - self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> { - let rtxn = index.read_txn()?; - let metadata = IndexMetadata { - uid: uid.to_owned(), - primary_key: index.primary_key(&rtxn)?.map(String::from), - created_at: index.created_at(&rtxn)?, - updated_at: index.updated_at(&rtxn)?, - }; - let mut index_dumper = dump.create_index(uid, &metadata)?; - - let fields_ids_map = index.fields_ids_map(&rtxn)?; - let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); - - // 3.1. Dump the documents - for ret in index.all_documents(&rtxn)? { - let (_id, doc) = ret?; - let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?; - index_dumper.push_document(&document)?; - } - - // 3.2. Dump the settings - let settings = meilisearch_types::settings::settings(index, &rtxn)?; - index_dumper.settings(&settings)?; - Ok(()) - })?; + let dump = self.create_dump(&task, &started_at)?; let dump_uid = started_at.format(format_description!( "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" @@ -974,6 +887,99 @@ impl IndexScheduler { } } + pub(crate) fn create_dump( + &self, + task: &Task, + started_at: &OffsetDateTime, + ) -> Result { + let (keys, instance_uid) = + if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind { + (keys, instance_uid) + } else { + unreachable!(); + }; + let dump = dump::DumpWriter::new(*instance_uid)?; + + // 1. dump the keys + let mut dump_keys = dump.create_keys()?; + for key in keys { + dump_keys.push_key(key)?; + } + dump_keys.flush()?; + + let rtxn = self.env.read_txn()?; + + // 2. dump the tasks + let mut dump_tasks = dump.create_tasks_queue()?; + for ret in self.all_tasks.iter(&rtxn)? { + let (_, mut t) = ret?; + let status = t.status; + let content_file = t.content_uuid(); + + // In the case we're dumping ourselves we want to be marked as finished + // to not loop over ourselves indefinitely. + if t.uid == task.uid { + let finished_at = OffsetDateTime::now_utc(); + + // We're going to fake the date because we don't know if everything is going to go well. + // But we need to dump the task as finished and successful. + // If something fail everything will be set appropriately in the end. + t.status = Status::Succeeded; + t.started_at = Some(*started_at); + t.finished_at = Some(finished_at); + } + let mut dump_content_file = dump_tasks.push_task(&t.into())?; + + // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. + if let Some(content_file) = content_file { + if status == Status::Enqueued { + let content_file = self.file_store.get_update(content_file)?; + + let reader = DocumentsBatchReader::from_reader(content_file) + .map_err(milli::Error::from)?; + + let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); + + while let Some(doc) = cursor.next_document().map_err(milli::Error::from)? { + dump_content_file + .push_document(&obkv_to_object(&doc, &documents_batch_index)?)?; + } + dump_content_file.flush()?; + } + } + } + dump_tasks.flush()?; + + // 3. Dump the indexes + self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> { + let rtxn = index.read_txn()?; + let metadata = IndexMetadata { + uid: uid.to_owned(), + primary_key: index.primary_key(&rtxn)?.map(String::from), + created_at: index.created_at(&rtxn)?, + updated_at: index.updated_at(&rtxn)?, + }; + let mut index_dumper = dump.create_index(uid, &metadata)?; + + let fields_ids_map = index.fields_ids_map(&rtxn)?; + let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); + + // 3.1. Dump the documents + for ret in index.all_documents(&rtxn)? { + let (_id, doc) = ret?; + let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?; + index_dumper.push_document(&document)?; + } + + // 3.2. Dump the settings + let settings = meilisearch_types::settings::settings(index, &rtxn)?; + index_dumper.settings(&settings)?; + Ok(()) + })?; + + Ok(dump) + } + /// Swap the index `lhs` with the index `rhs`. fn apply_index_swap(&self, wtxn: &mut RwTxn, task_id: u32, lhs: &str, rhs: &str) -> Result<()> { // 1. Verify that both lhs and rhs are existing indexes diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index f790a0866..4b1af6624 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -551,17 +551,27 @@ impl IndexScheduler { } // if we're a follower we starts a thread to register the tasks coming from the leader - if let Some(Cluster::Follower(follower)) = self.cluster.clone() { + if let Some(Cluster::Follower(ref follower)) = self.cluster { let this = self.private_clone(); if this.cluster.is_some() { log::warn!("this in a cluster"); } else { log::warn!("this not in a cluster"); } + let follower = follower.clone(); std::thread::spawn(move || loop { let (task, content) = follower.get_new_task(); this.register_raw_task(task, content); }); + } else if let Some(Cluster::Leader(ref leader)) = self.cluster { + // we need a way to let the leader come out of its loop if a new follower joins the cluster + let cluster = leader.wake_up.clone(); + let scheduler = self.wake_up.clone(); + + std::thread::spawn(move || loop { + cluster.wait(); + scheduler.signal(); + }); } std::thread::Builder::new() @@ -1307,16 +1317,40 @@ impl IndexScheduler { } }; - if self.cluster.is_some() { - println!("HERE: Part of a cluster"); - } else if self.cluster.is_none() { - println!("HERE: Not part of a cluster"); - } - info!("before checking if i’m a leader"); if let Some(Cluster::Leader(leader)) = &self.cluster { - info!("I'm a leader"); + if leader.has_new_followers() { + info!("New followers are trying to join the cluster"); + let started_at = OffsetDateTime::now_utc(); + let dump = self + .create_dump( + &Task { + uid: TaskId::MAX, + enqueued_at: started_at, + started_at: Some(started_at), + finished_at: Some(started_at), + error: None, + canceled_by: None, + details: None, + status: Status::Enqueued, + kind: KindWithContent::DumpCreation { + // TODO cluster: handle the keys + keys: vec![], + // TODO cluster: should we unify the instance_uid between every instances? + instance_uid: None, + }, + }, + &started_at, + ) + .unwrap(); + + let mut buffer = Vec::new(); + // TODO cluster: stop writing everything in RAM + dump.persist_to(&mut buffer).unwrap(); + + leader.join_me(buffer); + } + if let Some(ref batch) = batch { - info!("I'm a leader and I got a batch to process"); leader.starts_batch(batch.clone().into()); } } diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 1fc7a8ef9..e06751310 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -11,10 +11,10 @@ pub mod routes; pub mod search; use std::fs::File; -use std::io::{BufReader, BufWriter}; +use std::io::{BufReader, BufWriter, Write}; use std::net::ToSocketAddrs; use std::path::Path; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread; use std::time::Duration; @@ -145,7 +145,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Auth // the db is empty and the snapshot exists, import it if empty_db && snapshot_path_exists { match compression::from_tar_gz(snapshot_path, &opt.db_path) { - Ok(()) => open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?, + Ok(()) => open_or_create_database_unchecked(opt, None, OnFailure::RemoveDb)?, Err(e) => { std::fs::remove_dir_all(&opt.db_path)?; return Err(e); @@ -162,14 +162,14 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Auth bail!("snapshot doesn't exist at {}", snapshot_path.display()) // the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag } else { - open_or_create_database(opt, empty_db)? + open_or_create_database(opt, empty_db, None)? } } else if let Some(ref path) = opt.import_dump { let src_path_exists = path.exists(); // the db is empty and the dump exists, import it if empty_db && src_path_exists { let (mut index_scheduler, mut auth_controller) = - open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?; + open_or_create_database_unchecked(opt, None, OnFailure::RemoveDb)?; match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) { Ok(()) => (index_scheduler, auth_controller), Err(e) => { @@ -189,10 +189,58 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Auth // the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag // or, the dump is missing but we can ignore that because of the ignore_missing_dump flag } else { - open_or_create_database(opt, empty_db)? + open_or_create_database(opt, empty_db, None)? + } + } else if let Some(ref cluster) = opt.cluster_configuration.experimental_enable_ha { + match cluster.as_str() { + "leader" => { + info!("Starting as a leader"); + let mut addr = opt.http_addr.to_socket_addrs().unwrap().next().unwrap(); + addr.set_port(6666); + open_or_create_database(opt, empty_db, Some(Cluster::Leader(Leader::new(addr))))? + } + "follower" => { + info!("Starting as a follower"); + if !empty_db { + panic!("Can't start as a follower with an already existing data.ms"); + } + let mut addr = opt + .cluster_configuration + .leader + .as_ref() + .expect("Can't be a follower without a leader") + .to_socket_addrs() + .unwrap() + .next() + .unwrap(); + addr.set_port(6666); + + let (follower, dump) = Follower::join(addr); + let mut dump_file = tempfile::NamedTempFile::new().unwrap(); + dump_file.write_all(&dump).unwrap(); + + let (mut index_scheduler, mut auth_controller) = open_or_create_database_unchecked( + opt, + Some(Cluster::Follower(follower)), + OnFailure::RemoveDb, + )?; + match import_dump( + &opt.db_path, + dump_file.path(), + &mut index_scheduler, + &mut auth_controller, + ) { + Ok(()) => (index_scheduler, auth_controller), + Err(e) => { + std::fs::remove_dir_all(&opt.db_path)?; + return Err(e); + } + } + } + _ => panic!("Available values for the cluster mode are leader and follower"), } } else { - open_or_create_database(opt, empty_db)? + open_or_create_database(opt, empty_db, None)? }; // We create a loop in a thread that registers snapshotCreation tasks @@ -217,40 +265,13 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Auth /// Try to start the IndexScheduler and AuthController without checking the VERSION file or anything. fn open_or_create_database_unchecked( opt: &Opt, + cluster: Option, on_failure: OnFailure, ) -> anyhow::Result<(IndexScheduler, AuthController)> { // we don't want to create anything in the data.ms yet, thus we // wrap our two builders in a closure that'll be executed later. let auth_controller = AuthController::new(&opt.db_path, &opt.master_key); - let cluster = if let Some(ref cluster) = opt.cluster_configuration.experimental_enable_ha { - match cluster.as_str() { - "leader" => { - info!("Starting as a leader"); - let mut addr = opt.http_addr.to_socket_addrs().unwrap().next().unwrap(); - addr.set_port(6666); - Some(Cluster::Leader(Leader::new(addr))) - } - "follower" => { - info!("Starting as a follower"); - let mut addr = opt - .cluster_configuration - .leader - .as_ref() - .expect("Can't be a follower without a leader") - .to_socket_addrs() - .unwrap() - .next() - .unwrap(); - addr.set_port(6666); - Some(Cluster::Follower(Follower::join(addr))) - } - _ => panic!("Available values for the cluster mode are leader and follower"), - } - } else { - None - }; - let index_scheduler_builder = || -> anyhow::Result<_> { Ok(IndexScheduler::new( IndexSchedulerOptions { @@ -292,12 +313,13 @@ fn open_or_create_database_unchecked( fn open_or_create_database( opt: &Opt, empty_db: bool, + cluster: Option, ) -> anyhow::Result<(IndexScheduler, AuthController)> { if !empty_db { check_version_file(&opt.db_path)?; } - open_or_create_database_unchecked(opt, OnFailure::KeepDb) + open_or_create_database_unchecked(opt, cluster, OnFailure::KeepDb) } fn import_dump(