From 1585b3ed2f85194ea8c360968825a17c0d38dcfc Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 7 Nov 2023 14:15:57 +0100 Subject: [PATCH] re-implement the snapshot import at startup --- index-scheduler/src/lib.rs | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index e9edfea71..cd71890a1 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -477,7 +477,7 @@ impl IndexScheduler { log::info!("Entering the election game with the ID: {id}"); // there is at least us in the childrens of election - if childrens[0].ends_with(&string_id) { + let cluster = if childrens[0].ends_with(&string_id) { log::warn!("I'm the leader"); None } else { @@ -495,8 +495,29 @@ impl IndexScheduler { let watchers = zk.multi_watcher([leader_watcher, snapshot_watcher]); - Some((zk, string_id, watchers)) + Some((zk.clone(), string_id, watchers)) + }; + + log::info!("Checking if there is are snapshots already available to load"); + let mut snapshots = zk.list_children("/snapshots").unwrap(); + snapshots.sort_unstable(); + let mut loaded_something = false; + for snapshot in snapshots.into_iter().rev() { + log::info!("Checking if the snapshot `{snapshot}` is ready to be imported"); + if zk.get_data(&format!("/snapshots/{snapshot}")).unwrap().0 == b"ok" { + log::info!("It is ready to import"); + loaded_something = true; + load_snapshot(self, format!("snapshots/{snapshot}")).unwrap(); + } else { + log::info!("Not ready"); + } } + + if !loaded_something { + log::info!("No snapshots to import"); + } + + cluster } None => None, }; @@ -905,7 +926,8 @@ impl IndexScheduler { } } -fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> { +fn load_snapshot(this: &IndexScheduler, path: impl AsRef) -> anyhow::Result<()> { + let path = path.as_ref(); log::info!("Importing snapshot {}", path); let inner = this.inner(); @@ -954,6 +976,7 @@ fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> { pfcs.push(raw_inner.env.prepare_for_closing()); pfcs.into_iter().for_each(|pfc| pfc.wait()); + log::info!("Replacing all the databases in place"); // Let's replace all the folders/files. std::fs::rename(&tasks_file, base_path.join("tasks").join("data.mdb"))?; let dst_indexes = base_path.join("indexes"); @@ -973,6 +996,8 @@ fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> { inner.wake_up = raw_inner.wake_up; *lock = Some(inner); + log::info!("Snapshot import finished"); + Ok(()) }