mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-04 10:35:46 +01:00
re-implement the snapshot import at startup
This commit is contained in:
parent
c48f72e6b8
commit
1585b3ed2f
@ -477,7 +477,7 @@ impl IndexScheduler {
|
|||||||
log::info!("Entering the election game with the ID: {id}");
|
log::info!("Entering the election game with the ID: {id}");
|
||||||
|
|
||||||
// there is at least us in the childrens of election
|
// there is at least us in the childrens of election
|
||||||
if childrens[0].ends_with(&string_id) {
|
let cluster = if childrens[0].ends_with(&string_id) {
|
||||||
log::warn!("I'm the leader");
|
log::warn!("I'm the leader");
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
@ -495,8 +495,29 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
let watchers = zk.multi_watcher([leader_watcher, snapshot_watcher]);
|
let watchers = zk.multi_watcher([leader_watcher, snapshot_watcher]);
|
||||||
|
|
||||||
Some((zk, string_id, watchers))
|
Some((zk.clone(), string_id, watchers))
|
||||||
|
};
|
||||||
|
|
||||||
|
log::info!("Checking if there is are snapshots already available to load");
|
||||||
|
let mut snapshots = zk.list_children("/snapshots").unwrap();
|
||||||
|
snapshots.sort_unstable();
|
||||||
|
let mut loaded_something = false;
|
||||||
|
for snapshot in snapshots.into_iter().rev() {
|
||||||
|
log::info!("Checking if the snapshot `{snapshot}` is ready to be imported");
|
||||||
|
if zk.get_data(&format!("/snapshots/{snapshot}")).unwrap().0 == b"ok" {
|
||||||
|
log::info!("It is ready to import");
|
||||||
|
loaded_something = true;
|
||||||
|
load_snapshot(self, format!("snapshots/{snapshot}")).unwrap();
|
||||||
|
} else {
|
||||||
|
log::info!("Not ready");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !loaded_something {
|
||||||
|
log::info!("No snapshots to import");
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster
|
||||||
}
|
}
|
||||||
None => None,
|
None => None,
|
||||||
};
|
};
|
||||||
@ -905,7 +926,8 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> {
|
fn load_snapshot(this: &IndexScheduler, path: impl AsRef<str>) -> anyhow::Result<()> {
|
||||||
|
let path = path.as_ref();
|
||||||
log::info!("Importing snapshot {}", path);
|
log::info!("Importing snapshot {}", path);
|
||||||
|
|
||||||
let inner = this.inner();
|
let inner = this.inner();
|
||||||
@ -954,6 +976,7 @@ fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> {
|
|||||||
pfcs.push(raw_inner.env.prepare_for_closing());
|
pfcs.push(raw_inner.env.prepare_for_closing());
|
||||||
pfcs.into_iter().for_each(|pfc| pfc.wait());
|
pfcs.into_iter().for_each(|pfc| pfc.wait());
|
||||||
|
|
||||||
|
log::info!("Replacing all the databases in place");
|
||||||
// Let's replace all the folders/files.
|
// Let's replace all the folders/files.
|
||||||
std::fs::rename(&tasks_file, base_path.join("tasks").join("data.mdb"))?;
|
std::fs::rename(&tasks_file, base_path.join("tasks").join("data.mdb"))?;
|
||||||
let dst_indexes = base_path.join("indexes");
|
let dst_indexes = base_path.join("indexes");
|
||||||
@ -973,6 +996,8 @@ fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> {
|
|||||||
inner.wake_up = raw_inner.wake_up;
|
inner.wake_up = raw_inner.wake_up;
|
||||||
*lock = Some(inner);
|
*lock = Some(inner);
|
||||||
|
|
||||||
|
log::info!("Snapshot import finished");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user