Start leader election and task processing (WIP)

This commit is contained in:
ManyTheFish 2023-08-09 16:52:38 +02:00
parent 1191ec5939
commit 8e437ed76c

View File

@ -570,25 +570,122 @@ impl IndexScheduler {
/// only once per index scheduler. /// only once per index scheduler.
async fn run(&self) { async fn run(&self) {
let run = self.private_clone(); let run = self.private_clone();
let zk = self.zk.clone();
let mut self_node_id = zk::CreateSequence(0);
tokio::task::spawn(async move { tokio::task::spawn(async move {
#[cfg(test)] #[cfg(test)]
run.breakpoint(Breakpoint::Init); run.breakpoint(Breakpoint::Init);
let wake_up = run.wake_up.clone(); // potentialy create /leader-q folder
let _ = tokio::task::spawn_blocking(move || wake_up.wait()).await; // join the leader q
// subscribe a watcher to the node-1 in the leader q
let mut watchers = if let Some(ref zk) = zk {
let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all());
match zk.create("/election", &[], &options).await {
Ok(_) | Err(zk::Error::NodeExists) => (),
Err(e) => panic!("{e}"),
}
match zk.create("/snapshots", &[], &options).await {
Ok(_) | Err(zk::Error::NodeExists) => (),
Err(e) => panic!("{e}"),
}
let options = zk::CreateMode::EphemeralSequential.with_acls(zk::Acls::anyone_all());
// TODO: ugly unwrap
let (_stat, id) = zk.create("/election/node-", &[], &options).await.unwrap();
self_node_id = id;
let previous_path = {
let list = zk.list_children("/election").await.unwrap();
let self_node_path = format!("node-{}", self_node_id);
let previous_path =
list.into_iter().take_while(|path| dbg!(path) < &self_node_path).last();
previous_path.map(|path| format!("/election/{}", path))
};
if let Some(previous_path) = previous_path {
Some((
zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(),
zk.watch("/snapshots", zk::AddWatchMode::PersistentRecursive)
.await
.unwrap(),
))
} else {
log::warn!("I'm the leader");
None
}
} else {
log::warn!("I don't have any ZK cluster");
None
};
loop { loop {
match run.tick().await { match watchers.as_mut() {
Ok(TickOutcome::TickAgain(_)) => (), Some((lw, sw)) => {
Ok(TickOutcome::WaitForSignal) => { tokio::select! {
zk::WatchedEvent { event_type, session_state, .. } = lw.changed() => match event_type {
zk::EventType::Session => panic!("Session error {:?}", session_state),
zk::EventType::NodeDeleted => {
let zk = zk.as_ref().unwrap();
let previous_path = {
let list = zk.list_children("/election").await.unwrap();
let self_node_path = format!("node-{}", self_node_id);
let previous_path =
list.into_iter().take_while(|path| dbg!(path) < &self_node_path).last();
previous_path.map(|path| format!("/election/{}", path))
};
let (lw, sw) = watchers.take().unwrap();
lw.remove().await.unwrap();
watchers = if let Some(previous_path) = previous_path {
Some((
zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(),
zk.watch("/snapshots", zk::AddWatchMode::PersistentRecursive)
.await
.unwrap(),
))
} else {
log::warn!("I'm the new leader");
sw.remove().await.unwrap();
None
}
}
_ => (),
},
zk::WatchedEvent { event_type, session_state, path } = sw.changed() => match event_type {
zk::EventType::Session => panic!("Session error {:?}", session_state),
zk::EventType::NodeCreated => {
println!("I should load a snapshot - {}", path);
}
_ => (),
},
else => break,
}
}
None => {
let wake_up = run.wake_up.clone(); let wake_up = run.wake_up.clone();
let _ = tokio::task::spawn_blocking(move || wake_up.wait()).await; let _ = tokio::task::spawn_blocking(move || wake_up.wait()).await;
}
Err(e) => { match run.tick().await {
log::error!("{}", e); Ok(TickOutcome::TickAgain(_)) => {
// Wait one second when an irrecoverable error occurs. run.wake_up.signal();
if !e.is_recoverable() { // TODO:
std::thread::sleep(Duration::from_secs(1)); // - create a new snapshot
// - create snapshot in ZK
// - delete task in ZK
println!("I should create a snapshot");
}
Ok(TickOutcome::WaitForSignal) => (),
Err(e) => {
log::error!("{}", e);
// Wait one second when an irrecoverable error occurs.
if !e.is_recoverable() {
std::thread::sleep(Duration::from_secs(1));
}
}
} }
} }
} }