diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index e085bc25e..91483d390 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -33,6 +33,7 @@ pub type Result = std::result::Result; pub type TaskId = u32; use std::collections::{BTreeMap, HashMap}; +use std::fs::File; use std::ops::{Bound, RangeBounds}; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; @@ -359,6 +360,9 @@ impl IndexScheduler { let latch = match self.zookeeper { Some(ref zookeeper) => { + let zk_tasks = format!("{}/zk-tasks", env!("HOME")); + std::fs::create_dir_all(&zk_tasks).unwrap(); + let id = Uuid::new_v4().to_string(); let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string()); let wake_up = self.wake_up.clone(); @@ -517,11 +521,19 @@ impl IndexScheduler { for path in children { log::info!(" Importing {}", path); match zookeeper.get_data(&format!("/tasks/{}", &path), false) { - Ok((task, _stat)) => { - if task.is_empty() { - log::info!(" Task {} was empty, skipping.", path); + Ok((data, _stat)) => { + if data != b"ok" { + log::info!(" Task {} was not \"ok\", skipping.", path); } else { - let task = serde_json::from_slice(&task).unwrap(); + let id = path + .rsplit_once('-') + .map(|(_, id)| id.parse::().unwrap()) + .unwrap(); + let task_path = Path::new(std::env!("HOME")) + .join("zk-tasks") + .join(format!("{:0>10}", id)); + let file = File::open(task_path).unwrap(); + let task = serde_json::from_reader(file).unwrap(); inner.register_raw_task(&mut wtxn, &task).unwrap(); // we received a new tasks, we must wake up self.wake_up.signal(); @@ -550,11 +562,21 @@ impl IndexScheduler { // Add raw task content in local DB log::info!("Received a new task from the cluster at {}", path); let inner = this.inner(); - let mut wtxn = inner.env.write_txn().unwrap(); let (data, _stat) = zookeeperc.get_data(&path, false).unwrap(); - let task = serde_json::from_slice(data.as_slice()).unwrap(); - inner.register_raw_task(&mut wtxn, &task).unwrap(); - wtxn.commit().unwrap(); + if data == b"ok" { + let mut wtxn = inner.env.write_txn().unwrap(); + let id = path + .rsplit_once('-') + .map(|(_, id)| id.parse::().unwrap()) + .unwrap(); + let path = Path::new(env!("HOME")) + .join("zk-tasks") + .join(format!("{:0>10?}", id)); + let file = File::open(path).unwrap(); + let task = serde_json::from_reader(file).unwrap(); + inner.register_raw_task(&mut wtxn, &task).unwrap(); + wtxn.commit().unwrap(); + } } WatchedEventType::None | WatchedEventType::NodeCreated @@ -1454,14 +1476,13 @@ impl IndexSchedulerInner { // TODO: send task to ZK in raw json. if let Some(zookeeper) = &self.zookeeper { + std::fs::write( + Path::new(std::env!("HOME")).join("zk-tasks").join(format!("{:0>10?}", id)), + serde_json::to_vec_pretty(&task).unwrap(), + )?; + // TODO: ugly unwrap - zookeeper - .set_data( - &format!("/tasks/task-{:0>10?}", id), - serde_json::to_vec_pretty(&task).unwrap(), - None, - ) - .unwrap(); + zookeeper.set_data(&format!("/tasks/task-{:0>10?}", id), b"ok".to_vec(), None).unwrap(); } Ok(task)