let the follower join the leader at any time

This commit is contained in:
Tamo 2023-03-21 17:56:18 +01:00
parent 498b59ac84
commit 5ecfa3570f
7 changed files with 299 additions and 166 deletions

1
Cargo.lock generated
View File

@ -814,6 +814,7 @@ dependencies = [
"roaring",
"serde",
"serde_json",
"synchronoise",
"thiserror",
"time",
"uuid 1.3.0",

View File

@ -22,3 +22,4 @@ crossbeam = "0.8.2"
bus = "2.3.0"
time = "0.3.20"
uuid = { version = "1.3.0", features = ["v4"] }
synchronoise = "1.0.1"

View File

@ -7,44 +7,62 @@ use crossbeam::channel::{unbounded, Receiver, Sender};
use ductile::{ChannelReceiver, ChannelSender, ChannelServer};
use log::info;
use meilisearch_types::tasks::Task;
use synchronoise::SignalEvent;
use crate::batch::Batch;
use crate::{Consistency, FollowerMsg, LeaderMsg};
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct Leader {
task_ready_to_commit: Receiver<u32>,
broadcast_to_follower: Sender<LeaderMsg>,
cluster_size: Arc<AtomicUsize>,
pub wake_up: Arc<SignalEvent>,
new_followers: Arc<AtomicUsize>,
active_followers: Arc<AtomicUsize>,
batch_id: Arc<RwLock<u32>>,
}
impl Leader {
pub fn new(listen_on: impl ToSocketAddrs + Send + 'static) -> Leader {
let cluster_size = Arc::new(AtomicUsize::new(1));
let (process_batch_sender, process_batch_receiver) = unbounded();
let new_followers = Arc::new(AtomicUsize::new(0));
let active_followers = Arc::new(AtomicUsize::new(1));
let wake_up = Arc::new(SignalEvent::auto(true));
let (broadcast_to_follower, process_batch_receiver) = unbounded();
let (task_finished_sender, task_finished_receiver) = unbounded();
let cs = cluster_size.clone();
let nf = new_followers.clone();
let af = active_followers.clone();
let wu = wake_up.clone();
std::thread::spawn(move || {
Self::listener(listen_on, cs, process_batch_receiver, task_finished_sender)
Self::listener(listen_on, nf, af, wu, process_batch_receiver, task_finished_sender)
});
Leader {
task_ready_to_commit: task_finished_receiver,
broadcast_to_follower: process_batch_sender,
cluster_size,
broadcast_to_follower,
wake_up,
new_followers,
active_followers,
batch_id: Arc::default(),
}
}
pub fn has_new_followers(&self) -> bool {
self.new_followers.load(Ordering::Relaxed) != 0
}
/// Takes all the necessary channels to chat with the scheduler and give them
/// to each new followers
fn listener(
listen_on: impl ToSocketAddrs,
cluster_size: Arc<AtomicUsize>,
new_followers: Arc<AtomicUsize>,
active_followers: Arc<AtomicUsize>,
wake_up: Arc<SignalEvent>,
broadcast_to_follower: Receiver<LeaderMsg>,
task_finished: Sender<u32>,
) {
@ -63,12 +81,14 @@ impl Leader {
for (sender, receiver, _addr) in listener {
let task_finished = task_finished.clone();
let cs = cluster_size.clone();
let nf = new_followers.clone();
let af = active_followers.clone();
let wu = wake_up.clone();
let process_batch = bus.lock().unwrap().add_rx();
std::thread::spawn(move || {
Self::follower(sender, receiver, cs, process_batch, task_finished)
Self::follower(sender, receiver, nf, af, wu, process_batch, task_finished)
});
}
}
@ -77,20 +97,43 @@ impl Leader {
fn follower(
sender: ChannelSender<LeaderMsg>,
receiver: ChannelReceiver<FollowerMsg>,
cluster_size: Arc<AtomicUsize>,
new_followers: Arc<AtomicUsize>,
active_followers: Arc<AtomicUsize>,
wake_up: Arc<SignalEvent>,
mut broadcast_to_follower: BusReader<LeaderMsg>,
task_finished: Sender<u32>,
) {
let size = cluster_size.fetch_add(1, atomic::Ordering::Relaxed) + 1;
let size = new_followers.fetch_add(1, Ordering::Relaxed) + 1;
wake_up.signal();
info!("A new follower joined the cluster. {} members.", size);
loop {
if let msg @ LeaderMsg::JoinFromDump(_) =
broadcast_to_follower.recv().expect("Main thread died")
{
// we exit the new_follower state and become an active follower even though
// the dump will takes some time to index
new_followers.fetch_sub(1, Ordering::Relaxed);
let size = active_followers.fetch_add(1, Ordering::Relaxed) + 1;
info!("A new follower became active. {} active members.", size);
sender.send(msg).unwrap();
break;
}
}
// send messages to the follower
std::thread::spawn(move || loop {
let msg = broadcast_to_follower.recv().expect("Main thread died");
if sender.send(msg).is_err() {
// the follower died, the logging and cluster size update should be done
// in the other thread
break;
match msg {
LeaderMsg::JoinFromDump(_) => (),
msg => {
if sender.send(msg).is_err() {
// the follower died, the logging and cluster size update should be done
// in the other thread
break;
}
}
}
});
@ -109,10 +152,20 @@ impl Leader {
// if we exited from the previous loop it means the follower is down and should
// be removed from the cluster
let size = cluster_size.fetch_sub(1, atomic::Ordering::Relaxed) - 1;
let size = active_followers.fetch_sub(1, atomic::Ordering::Relaxed) - 1;
info!("A follower left the cluster. {} members.", size);
}
pub fn wake_up(&self) {
todo!()
}
pub fn join_me(&self, dump: Vec<u8>) {
self.broadcast_to_follower
.send(LeaderMsg::JoinFromDump(dump))
.expect("Lost the link with the followers");
}
pub fn starts_batch(&self, batch: Batch) {
let mut batch_id = self.batch_id.write().unwrap();
@ -127,7 +180,7 @@ impl Leader {
pub fn commit(&self, consistency_level: Consistency) {
info!("Wait until enough followers are ready to commit a batch");
let mut batch_id = self.batch_id.write().unwrap();
let batch_id = self.batch_id.write().unwrap();
// if zero nodes needs to be sync we can commit right away and early exit
if consistency_level != Consistency::One {
@ -144,13 +197,13 @@ impl Leader {
// TODO: if the last node dies we're stuck on the iterator
// we need to reload the cluster size everytime in case a node dies
let cluster_size = self.cluster_size.load(atomic::Ordering::Relaxed);
let size = self.active_followers.load(atomic::Ordering::Relaxed);
info!("{ready_to_commit} nodes are ready to commit for a cluster size of {cluster_size}");
info!("{ready_to_commit} nodes are ready to commit for a cluster size of {size}");
match consistency_level {
Consistency::Two if ready_to_commit >= 1 => break,
Consistency::Quorum if ready_to_commit >= (cluster_size / 2) => break,
Consistency::All if ready_to_commit == cluster_size => break,
Consistency::Quorum if ready_to_commit >= (size / 2) => break,
Consistency::All if ready_to_commit == size => break,
_ => (),
}
}

View File

@ -23,6 +23,8 @@ pub enum Error {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LeaderMsg {
// A dump to join the cluster
JoinFromDump(Vec<u8>),
// Starts a new batch
StartBatch { id: u32, batch: Batch },
//Tell the follower to commit the update asap
@ -58,11 +60,19 @@ pub struct Follower {
}
impl Follower {
pub fn join(leader: impl ToSocketAddrs) -> Follower {
pub fn join(leader: impl ToSocketAddrs) -> (Follower, Vec<u8>) {
let (sender, receiver) = connect_channel(leader).unwrap();
info!("Connection to the leader established");
info!("Waiting for the leader to contact us");
let state = receiver.recv().unwrap();
let dump = match state {
LeaderMsg::JoinFromDump(dump) => dump,
msg => panic!("Received unexpected message {msg:?}"),
};
let (get_batch_sender, get_batch_receiver) = unbounded();
let (must_commit_sender, must_commit_receiver) = unbounded();
let (register_task_sender, register_task_receiver) = unbounded();
@ -71,13 +81,16 @@ impl Follower {
Self::router(receiver, get_batch_sender, must_commit_sender, register_task_sender);
});
Follower {
sender,
get_batch: get_batch_receiver,
must_commit: must_commit_receiver,
register_new_task: register_task_receiver,
batch_id: Arc::default(),
}
(
Follower {
sender,
get_batch: get_batch_receiver,
must_commit: must_commit_receiver,
register_new_task: register_task_receiver,
batch_id: Arc::default(),
},
dump,
)
}
fn router(
@ -88,6 +101,9 @@ impl Follower {
) {
loop {
match receiver.recv().expect("Lost connection to the leader") {
LeaderMsg::JoinFromDump(_) => {
panic!("Received a join from dump msg but Im already running")
}
LeaderMsg::StartBatch { id, batch } => {
info!("Starting to process a new batch");
get_batch.send((id, batch)).expect("Lost connection to the main thread")

View File

@ -24,7 +24,7 @@ use std::io::BufWriter;
use cluster::Consistency;
use crossbeam::utils::Backoff;
use dump::IndexMetadata;
use dump::{DumpWriter, IndexMetadata};
use log::{debug, error, info};
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
@ -738,96 +738,9 @@ impl IndexScheduler {
Ok(tasks)
}
Batch::Dump(mut task) => {
// TODO: It would be better to use the started_at from the task instead of generating a new one
let started_at = OffsetDateTime::now_utc();
let (keys, instance_uid) =
if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind {
(keys, instance_uid)
} else {
unreachable!();
};
let dump = dump::DumpWriter::new(*instance_uid)?;
// 1. dump the keys
let mut dump_keys = dump.create_keys()?;
for key in keys {
dump_keys.push_key(key)?;
}
dump_keys.flush()?;
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
let (_, mut t) = ret?;
let status = t.status;
let content_file = t.content_uuid();
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if t.uid == task.uid {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
t.status = Status::Succeeded;
t.started_at = Some(started_at);
t.finished_at = Some(finished_at);
}
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file {
if status == Status::Enqueued {
let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?;
let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index();
while let Some(doc) =
cursor.next_document().map_err(milli::Error::from)?
{
dump_content_file.push_document(&obkv_to_object(
&doc,
&documents_batch_index,
)?)?;
}
dump_content_file.flush()?;
}
}
}
dump_tasks.flush()?;
// 3. Dump the indexes
self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
let rtxn = index.read_txn()?;
let metadata = IndexMetadata {
uid: uid.to_owned(),
primary_key: index.primary_key(&rtxn)?.map(String::from),
created_at: index.created_at(&rtxn)?,
updated_at: index.updated_at(&rtxn)?,
};
let mut index_dumper = dump.create_index(uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? {
let (_id, doc) = ret?;
let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?;
}
// 3.2. Dump the settings
let settings = meilisearch_types::settings::settings(index, &rtxn)?;
index_dumper.settings(&settings)?;
Ok(())
})?;
let dump = self.create_dump(&task, &started_at)?;
let dump_uid = started_at.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
@ -974,6 +887,99 @@ impl IndexScheduler {
}
}
pub(crate) fn create_dump(
&self,
task: &Task,
started_at: &OffsetDateTime,
) -> Result<DumpWriter> {
let (keys, instance_uid) =
if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind {
(keys, instance_uid)
} else {
unreachable!();
};
let dump = dump::DumpWriter::new(*instance_uid)?;
// 1. dump the keys
let mut dump_keys = dump.create_keys()?;
for key in keys {
dump_keys.push_key(key)?;
}
dump_keys.flush()?;
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
let (_, mut t) = ret?;
let status = t.status;
let content_file = t.content_uuid();
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if t.uid == task.uid {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
t.status = Status::Succeeded;
t.started_at = Some(*started_at);
t.finished_at = Some(finished_at);
}
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file {
if status == Status::Enqueued {
let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?;
let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().map_err(milli::Error::from)? {
dump_content_file
.push_document(&obkv_to_object(&doc, &documents_batch_index)?)?;
}
dump_content_file.flush()?;
}
}
}
dump_tasks.flush()?;
// 3. Dump the indexes
self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
let rtxn = index.read_txn()?;
let metadata = IndexMetadata {
uid: uid.to_owned(),
primary_key: index.primary_key(&rtxn)?.map(String::from),
created_at: index.created_at(&rtxn)?,
updated_at: index.updated_at(&rtxn)?,
};
let mut index_dumper = dump.create_index(uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? {
let (_id, doc) = ret?;
let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?;
}
// 3.2. Dump the settings
let settings = meilisearch_types::settings::settings(index, &rtxn)?;
index_dumper.settings(&settings)?;
Ok(())
})?;
Ok(dump)
}
/// Swap the index `lhs` with the index `rhs`.
fn apply_index_swap(&self, wtxn: &mut RwTxn, task_id: u32, lhs: &str, rhs: &str) -> Result<()> {
// 1. Verify that both lhs and rhs are existing indexes

View File

@ -551,17 +551,27 @@ impl IndexScheduler {
}
// if we're a follower we starts a thread to register the tasks coming from the leader
if let Some(Cluster::Follower(follower)) = self.cluster.clone() {
if let Some(Cluster::Follower(ref follower)) = self.cluster {
let this = self.private_clone();
if this.cluster.is_some() {
log::warn!("this in a cluster");
} else {
log::warn!("this not in a cluster");
}
let follower = follower.clone();
std::thread::spawn(move || loop {
let (task, content) = follower.get_new_task();
this.register_raw_task(task, content);
});
} else if let Some(Cluster::Leader(ref leader)) = self.cluster {
// we need a way to let the leader come out of its loop if a new follower joins the cluster
let cluster = leader.wake_up.clone();
let scheduler = self.wake_up.clone();
std::thread::spawn(move || loop {
cluster.wait();
scheduler.signal();
});
}
std::thread::Builder::new()
@ -1307,16 +1317,40 @@ impl IndexScheduler {
}
};
if self.cluster.is_some() {
println!("HERE: Part of a cluster");
} else if self.cluster.is_none() {
println!("HERE: Not part of a cluster");
}
info!("before checking if im a leader");
if let Some(Cluster::Leader(leader)) = &self.cluster {
info!("I'm a leader");
if leader.has_new_followers() {
info!("New followers are trying to join the cluster");
let started_at = OffsetDateTime::now_utc();
let dump = self
.create_dump(
&Task {
uid: TaskId::MAX,
enqueued_at: started_at,
started_at: Some(started_at),
finished_at: Some(started_at),
error: None,
canceled_by: None,
details: None,
status: Status::Enqueued,
kind: KindWithContent::DumpCreation {
// TODO cluster: handle the keys
keys: vec![],
// TODO cluster: should we unify the instance_uid between every instances?
instance_uid: None,
},
},
&started_at,
)
.unwrap();
let mut buffer = Vec::new();
// TODO cluster: stop writing everything in RAM
dump.persist_to(&mut buffer).unwrap();
leader.join_me(buffer);
}
if let Some(ref batch) = batch {
info!("I'm a leader and I got a batch to process");
leader.starts_batch(batch.clone().into());
}
}

View File

@ -11,10 +11,10 @@ pub mod routes;
pub mod search;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::io::{BufReader, BufWriter, Write};
use std::net::ToSocketAddrs;
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
@ -145,7 +145,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
// the db is empty and the snapshot exists, import it
if empty_db && snapshot_path_exists {
match compression::from_tar_gz(snapshot_path, &opt.db_path) {
Ok(()) => open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?,
Ok(()) => open_or_create_database_unchecked(opt, None, OnFailure::RemoveDb)?,
Err(e) => {
std::fs::remove_dir_all(&opt.db_path)?;
return Err(e);
@ -162,14 +162,14 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
bail!("snapshot doesn't exist at {}", snapshot_path.display())
// the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag
} else {
open_or_create_database(opt, empty_db)?
open_or_create_database(opt, empty_db, None)?
}
} else if let Some(ref path) = opt.import_dump {
let src_path_exists = path.exists();
// the db is empty and the dump exists, import it
if empty_db && src_path_exists {
let (mut index_scheduler, mut auth_controller) =
open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?;
open_or_create_database_unchecked(opt, None, OnFailure::RemoveDb)?;
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
Ok(()) => (index_scheduler, auth_controller),
Err(e) => {
@ -189,10 +189,58 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
// the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag
// or, the dump is missing but we can ignore that because of the ignore_missing_dump flag
} else {
open_or_create_database(opt, empty_db)?
open_or_create_database(opt, empty_db, None)?
}
} else if let Some(ref cluster) = opt.cluster_configuration.experimental_enable_ha {
match cluster.as_str() {
"leader" => {
info!("Starting as a leader");
let mut addr = opt.http_addr.to_socket_addrs().unwrap().next().unwrap();
addr.set_port(6666);
open_or_create_database(opt, empty_db, Some(Cluster::Leader(Leader::new(addr))))?
}
"follower" => {
info!("Starting as a follower");
if !empty_db {
panic!("Can't start as a follower with an already existing data.ms");
}
let mut addr = opt
.cluster_configuration
.leader
.as_ref()
.expect("Can't be a follower without a leader")
.to_socket_addrs()
.unwrap()
.next()
.unwrap();
addr.set_port(6666);
let (follower, dump) = Follower::join(addr);
let mut dump_file = tempfile::NamedTempFile::new().unwrap();
dump_file.write_all(&dump).unwrap();
let (mut index_scheduler, mut auth_controller) = open_or_create_database_unchecked(
opt,
Some(Cluster::Follower(follower)),
OnFailure::RemoveDb,
)?;
match import_dump(
&opt.db_path,
dump_file.path(),
&mut index_scheduler,
&mut auth_controller,
) {
Ok(()) => (index_scheduler, auth_controller),
Err(e) => {
std::fs::remove_dir_all(&opt.db_path)?;
return Err(e);
}
}
}
_ => panic!("Available values for the cluster mode are leader and follower"),
}
} else {
open_or_create_database(opt, empty_db)?
open_or_create_database(opt, empty_db, None)?
};
// We create a loop in a thread that registers snapshotCreation tasks
@ -217,40 +265,13 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
/// Try to start the IndexScheduler and AuthController without checking the VERSION file or anything.
fn open_or_create_database_unchecked(
opt: &Opt,
cluster: Option<Cluster>,
on_failure: OnFailure,
) -> anyhow::Result<(IndexScheduler, AuthController)> {
// we don't want to create anything in the data.ms yet, thus we
// wrap our two builders in a closure that'll be executed later.
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key);
let cluster = if let Some(ref cluster) = opt.cluster_configuration.experimental_enable_ha {
match cluster.as_str() {
"leader" => {
info!("Starting as a leader");
let mut addr = opt.http_addr.to_socket_addrs().unwrap().next().unwrap();
addr.set_port(6666);
Some(Cluster::Leader(Leader::new(addr)))
}
"follower" => {
info!("Starting as a follower");
let mut addr = opt
.cluster_configuration
.leader
.as_ref()
.expect("Can't be a follower without a leader")
.to_socket_addrs()
.unwrap()
.next()
.unwrap();
addr.set_port(6666);
Some(Cluster::Follower(Follower::join(addr)))
}
_ => panic!("Available values for the cluster mode are leader and follower"),
}
} else {
None
};
let index_scheduler_builder = || -> anyhow::Result<_> {
Ok(IndexScheduler::new(
IndexSchedulerOptions {
@ -292,12 +313,13 @@ fn open_or_create_database_unchecked(
fn open_or_create_database(
opt: &Opt,
empty_db: bool,
cluster: Option<Cluster>,
) -> anyhow::Result<(IndexScheduler, AuthController)> {
if !empty_db {
check_version_file(&opt.db_path)?;
}
open_or_create_database_unchecked(opt, OnFailure::KeepDb)
open_or_create_database_unchecked(opt, cluster, OnFailure::KeepDb)
}
fn import_dump(