diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 43701d653..1be2df932 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -414,6 +414,29 @@ impl IndexScheduler { }) .unwrap(); + { + // TODO we must lock the IndexSchedulerInner here to make sure that we don't + // load this snapshot after the upper watcher find a more recent one. + let mut snapshots = zookeeper.get_children("/snapshots", false).unwrap(); + snapshots.sort_unstable(); + for snapshot_name in dbg!(snapshots).iter().rev() { + let (_, snapshot_id) = snapshot_name.rsplit_once('-').unwrap(); + let snapshot_path = format!("/snapshots/{snapshot_name}"); + match zookeeper.get_data(&snapshot_path, false) { + Ok((data, _stat)) => { + if data == b"ok" { + eprintln!("Loading snapshot {snapshot_path}"); + let s3_snapshot_path = format!("snapshots/{snapshot_id:0>10}"); + load_snapshot(self, &s3_snapshot_path).unwrap(); + break; + } + } + Err(ZkError::NoNode) => (), + Err(e) => panic!("Impossible to get data: {e}"), + } + } + } + match zookeeper.create( "/tasks", vec![], @@ -615,7 +638,7 @@ impl IndexScheduler { // we must notify everyone that we dropped a new snapshot on the s3 let _stat = zookeeper.set_data( &format!("/snapshots/snapshot-{:0>10?}", snapshot_id), - vec![], + b"ok".to_vec(), None, ); log::info!( @@ -632,7 +655,7 @@ impl IndexScheduler { let _ = zookeeper // we don't want to crash if we can't delete an update file. .delete(&node, None) .unwrap(); - s3.delete_object(format!("/tasks/{:0>10?}", task_id as u32)) + s3.delete_object(format!("tasks/{:0>10?}", task_id as u32)) .unwrap(); // TODO: Delete the update files associated with the deleted tasks if let Some(content_uuid) = inner @@ -640,7 +663,7 @@ impl IndexScheduler { .unwrap() .and_then(|t| t.content_uuid()) { - s3.delete_object(format!("/update-files/{content_uuid}")) + s3.delete_object(format!("update-files/{content_uuid}")) .unwrap(); } } @@ -909,7 +932,8 @@ fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> { std::fs::create_dir_all(indexes_files.path().join(&uuid).with_extension(""))?; let path = indexes_files.path().join(&uuid).with_extension("").join("data.mdb"); let mut file = File::create(path)?; - s3.get_object_to_writer(format!("{src}/{uuid}.mdb"), &mut file)?; + let status = s3.get_object_to_writer(format!("{src}/{uuid}"), &mut file)?; + assert!(matches!(status, 200 | 202)); } // 3. Lock the index-mapper and close all the env