Load the latest snapshot when we start the engine

This commit is contained in:
Clément Renault 2023-09-12 17:52:02 +02:00
parent 309c33a418
commit 8a2e8a887f
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -414,6 +414,29 @@ impl IndexScheduler {
}) })
.unwrap(); .unwrap();
{
// TODO we must lock the IndexSchedulerInner here to make sure that we don't
// load this snapshot after the upper watcher find a more recent one.
let mut snapshots = zookeeper.get_children("/snapshots", false).unwrap();
snapshots.sort_unstable();
for snapshot_name in dbg!(snapshots).iter().rev() {
let (_, snapshot_id) = snapshot_name.rsplit_once('-').unwrap();
let snapshot_path = format!("/snapshots/{snapshot_name}");
match zookeeper.get_data(&snapshot_path, false) {
Ok((data, _stat)) => {
if data == b"ok" {
eprintln!("Loading snapshot {snapshot_path}");
let s3_snapshot_path = format!("snapshots/{snapshot_id:0>10}");
load_snapshot(self, &s3_snapshot_path).unwrap();
break;
}
}
Err(ZkError::NoNode) => (),
Err(e) => panic!("Impossible to get data: {e}"),
}
}
}
match zookeeper.create( match zookeeper.create(
"/tasks", "/tasks",
vec![], vec![],
@ -615,7 +638,7 @@ impl IndexScheduler {
// we must notify everyone that we dropped a new snapshot on the s3 // we must notify everyone that we dropped a new snapshot on the s3
let _stat = zookeeper.set_data( let _stat = zookeeper.set_data(
&format!("/snapshots/snapshot-{:0>10?}", snapshot_id), &format!("/snapshots/snapshot-{:0>10?}", snapshot_id),
vec![], b"ok".to_vec(),
None, None,
); );
log::info!( log::info!(
@ -632,7 +655,7 @@ impl IndexScheduler {
let _ = zookeeper // we don't want to crash if we can't delete an update file. let _ = zookeeper // we don't want to crash if we can't delete an update file.
.delete(&node, None) .delete(&node, None)
.unwrap(); .unwrap();
s3.delete_object(format!("/tasks/{:0>10?}", task_id as u32)) s3.delete_object(format!("tasks/{:0>10?}", task_id as u32))
.unwrap(); .unwrap();
// TODO: Delete the update files associated with the deleted tasks // TODO: Delete the update files associated with the deleted tasks
if let Some(content_uuid) = inner if let Some(content_uuid) = inner
@ -640,7 +663,7 @@ impl IndexScheduler {
.unwrap() .unwrap()
.and_then(|t| t.content_uuid()) .and_then(|t| t.content_uuid())
{ {
s3.delete_object(format!("/update-files/{content_uuid}")) s3.delete_object(format!("update-files/{content_uuid}"))
.unwrap(); .unwrap();
} }
} }
@ -909,7 +932,8 @@ fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> {
std::fs::create_dir_all(indexes_files.path().join(&uuid).with_extension(""))?; std::fs::create_dir_all(indexes_files.path().join(&uuid).with_extension(""))?;
let path = indexes_files.path().join(&uuid).with_extension("").join("data.mdb"); let path = indexes_files.path().join(&uuid).with_extension("").join("data.mdb");
let mut file = File::create(path)?; let mut file = File::create(path)?;
s3.get_object_to_writer(format!("{src}/{uuid}.mdb"), &mut file)?; let status = s3.get_object_to_writer(format!("{src}/{uuid}"), &mut file)?;
assert!(matches!(status, 200 | 202));
} }
// 3. Lock the index-mapper and close all the env // 3. Lock the index-mapper and close all the env