diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index e1ca58a6b..7e285f2a0 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -676,7 +676,31 @@ impl IndexScheduler { zk::WatchedEvent { event_type, session_state, path } = snapshot_watcher.changed() => match event_type { zk::EventType::Session => panic!("Session error {:?}", session_state), zk::EventType::NodeCreated => { - println!("I should load a snapshot - {}", path); + log::info!("The snapshot {} is in preparation", path); + } + zk::EventType::NodeDataChanged => { + log::info!("Importing snapshot {}", path); + + let snapshot_id = path.strip_prefix("/snapshots/snapshot-").unwrap(); + let snapshot_dir = + PathBuf::from(format!("~/zk-snapshots/{}", snapshot_id)); + + // TODO: everything + + // 1. TODO: Ensure the snapshot version file is the same as our version. + + // 2. Download and import the index-scheduler database + log::info!("Importing the index scheduler."); + let tasks = + snapshot_dir.join("tasks.mdb"); + + // 3. Snapshot every indexes + log::info!("Importing the indexes"); + let dst = snapshot_dir.join("indexes"); + let mut indexes = tokio::fs::read_dir(dst).await.unwrap(); + while let Some(uuid) = indexes.next_entry().await.unwrap() { + // TODO: Import the index + } } _ => (), }, @@ -702,17 +726,88 @@ impl IndexScheduler { continue; } - // TODO: - // - create a new snapshot on disk/s3 - - // we must notify everyone that we dropped a new snapshot on the s3 let options = zk::CreateMode::EphemeralSequential .with_acls(zk::Acls::anyone_all()); - let (_stat, id) = zk + let (_stat, snapshot_id) = zk .create("/snapshots/snapshot-", &[], &options) .await .unwrap(); - log::info!("Notified that there was a new snapshot {id}"); + + tokio::fs::create_dir_all("~/zk-snapshots").await.unwrap(); + let snapshot_dir = + PathBuf::from(format!("~/zk-snapshots/{snapshot_id}")); + tokio::fs::create_dir(&snapshot_dir).await.unwrap(); + + // 1. Snapshot the version file. + let dst = + snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME); + tokio::fs::copy(&run.version_file_path, dst).await.unwrap(); + + // 2. Snapshot the index-scheduler LMDB env + let dst = snapshot_dir.join("tasks"); + tokio::fs::create_dir_all(&dst).await.unwrap(); + + log::info!("Snapshotting the tasks"); + let env = run.env.clone(); + tokio::task::spawn_blocking(move || { + env.copy_to_path( + dst.join("tasks.mdb"), + heed::CompactionOption::Enabled, + ) + .unwrap(); + }) + .await + .unwrap(); + + // 3. Snapshot every indexes + log::info!("Snapshotting the indexes"); + let dst = snapshot_dir.join("indexes"); + tokio::fs::create_dir_all(&dst).await.unwrap(); + + let this = run.private_clone(); + let indexes = tokio::task::spawn_blocking(move || { + let rtxn = this.env.read_txn().unwrap(); + this.index_mapper + .index_mapping + .iter(&rtxn) + .unwrap() + .map(|ret| ret.unwrap()) + .map(|(name, uuid)| (name.to_string(), uuid)) + .collect::>() + }) + .await + .unwrap(); + for (name, uuid) in indexes { + log::info!(" Snapshotting index {name}"); + let this = run.private_clone(); + let dst = dst.clone(); + tokio::task::spawn_blocking(move || { + let rtxn = this.env.read_txn().unwrap(); + let index = + this.index_mapper.index(&rtxn, &name).unwrap(); + index + .copy_to_path( + dst.join(format!("{uuid}.mdb")), + heed::CompactionOption::Enabled, + ) + .unwrap(); + }) + .await + .unwrap(); + } + + // we must notify everyone that we dropped a new snapshot on the s3 + let _stat = zk + .set_data( + &format!("/snapshots/snapshot-{}", snapshot_id), + &[], + None, + ) + .await + .unwrap(); + log::info!( + "Notified everyone about the new snapshot {snapshot_id}" + ); // We can now delete all the tasks that has been processed let processed = run @@ -732,6 +827,7 @@ impl IndexScheduler { None, ) .await; + // TODO: Delete the update files associated with the deleted tasks } } }