diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 5a8ddc1d8..36f12dc25 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -570,25 +570,122 @@ impl IndexScheduler { /// only once per index scheduler. async fn run(&self) { let run = self.private_clone(); + let zk = self.zk.clone(); + let mut self_node_id = zk::CreateSequence(0); tokio::task::spawn(async move { #[cfg(test)] run.breakpoint(Breakpoint::Init); - let wake_up = run.wake_up.clone(); - let _ = tokio::task::spawn_blocking(move || wake_up.wait()).await; + // potentialy create /leader-q folder + // join the leader q + // subscribe a watcher to the node-1 in the leader q + let mut watchers = if let Some(ref zk) = zk { + let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all()); + match zk.create("/election", &[], &options).await { + Ok(_) | Err(zk::Error::NodeExists) => (), + Err(e) => panic!("{e}"), + } + + match zk.create("/snapshots", &[], &options).await { + Ok(_) | Err(zk::Error::NodeExists) => (), + Err(e) => panic!("{e}"), + } + + let options = zk::CreateMode::EphemeralSequential.with_acls(zk::Acls::anyone_all()); + // TODO: ugly unwrap + let (_stat, id) = zk.create("/election/node-", &[], &options).await.unwrap(); + self_node_id = id; + let previous_path = { + let list = zk.list_children("/election").await.unwrap(); + + let self_node_path = format!("node-{}", self_node_id); + let previous_path = + list.into_iter().take_while(|path| dbg!(path) < &self_node_path).last(); + previous_path.map(|path| format!("/election/{}", path)) + }; + + if let Some(previous_path) = previous_path { + Some(( + zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(), + zk.watch("/snapshots", zk::AddWatchMode::PersistentRecursive) + .await + .unwrap(), + )) + } else { + log::warn!("I'm the leader"); + None + } + } else { + log::warn!("I don't have any ZK cluster"); + None + }; loop { - match run.tick().await { - Ok(TickOutcome::TickAgain(_)) => (), - Ok(TickOutcome::WaitForSignal) => { + match watchers.as_mut() { + Some((lw, sw)) => { + tokio::select! { + zk::WatchedEvent { event_type, session_state, .. } = lw.changed() => match event_type { + zk::EventType::Session => panic!("Session error {:?}", session_state), + zk::EventType::NodeDeleted => { + let zk = zk.as_ref().unwrap(); + let previous_path = { + let list = zk.list_children("/election").await.unwrap(); + + let self_node_path = format!("node-{}", self_node_id); + let previous_path = + list.into_iter().take_while(|path| dbg!(path) < &self_node_path).last(); + previous_path.map(|path| format!("/election/{}", path)) + }; + + let (lw, sw) = watchers.take().unwrap(); + lw.remove().await.unwrap(); + watchers = if let Some(previous_path) = previous_path { + Some(( + zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(), + zk.watch("/snapshots", zk::AddWatchMode::PersistentRecursive) + .await + .unwrap(), + )) + } else { + log::warn!("I'm the new leader"); + sw.remove().await.unwrap(); + None + } + } + _ => (), + }, + zk::WatchedEvent { event_type, session_state, path } = sw.changed() => match event_type { + zk::EventType::Session => panic!("Session error {:?}", session_state), + zk::EventType::NodeCreated => { + println!("I should load a snapshot - {}", path); + } + _ => (), + }, + else => break, + } + } + None => { let wake_up = run.wake_up.clone(); let _ = tokio::task::spawn_blocking(move || wake_up.wait()).await; - } - Err(e) => { - log::error!("{}", e); - // Wait one second when an irrecoverable error occurs. - if !e.is_recoverable() { - std::thread::sleep(Duration::from_secs(1)); + + match run.tick().await { + Ok(TickOutcome::TickAgain(_)) => { + run.wake_up.signal(); + // TODO: + // - create a new snapshot + // - create snapshot in ZK + // - delete task in ZK + + println!("I should create a snapshot"); + } + Ok(TickOutcome::WaitForSignal) => (), + Err(e) => { + log::error!("{}", e); + // Wait one second when an irrecoverable error occurs. + if !e.is_recoverable() { + std::thread::sleep(Duration::from_secs(1)); + } + } } } }