From 1191ec5939015a86b424363e4462dc91acb1cc8a Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 8 Aug 2023 13:18:55 +0200 Subject: [PATCH] fix the register task watcher --- index-scheduler/src/lib.rs | 55 ++++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index cf0121ef8..5a8ddc1d8 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -600,7 +600,39 @@ impl IndexScheduler { match zk.create("/tasks", &[], &options).await { Ok(_) => (), Err(zk::Error::NodeExists) => { - todo!("Syncronize with the cluster") + log::warn!("Tasks directory already exists, we're going to import all the tasks on the zk without altering the tasks already on disk."); + + let children = zk + .list_children("/tasks") + .await + .expect("Internal, the /tasks directory was deleted during execution."); + + log::info!("Importing {} tasks", children.len()); + for path in children { + log::info!(" Importing {}", path); + match zk.get_data(&format!("/tasks/{}", &path)).await { + Ok((task, _stat)) => { + if task.is_empty() { + log::info!(" Task {} was empty, skipping.", path); + continue; + } + let task = serde_json::from_slice(&task).unwrap(); + + let this = self.private_clone(); + tokio::task::spawn_blocking(move || { + let mut wtxn = this.env.write_txn().unwrap(); + this.register_raw_task(&mut wtxn, &task).unwrap(); + wtxn.commit().unwrap(); + }) + .await + .unwrap(); + } + Err(e) => panic!("{e}"), + } + // else the file was deleted while we were inserting the key. We ignore it. + // TODO: What happens if someone updates the files before we have the time + // to setup the watcher + } } Err(e) => panic!("{e}"), } @@ -608,20 +640,33 @@ impl IndexScheduler { // TODO: fix unwrap by returning a clear error. let mut watcher = zk.watch("/tasks", zk::AddWatchMode::PersistentRecursive).await.unwrap(); - let czk = zk.clone(); + let this = self.private_clone(); tokio::spawn(async move { - let zk = czk; loop { let zk::WatchedEvent { event_type, session_state, path } = - dbg!(watcher.changed().await); + watcher.changed().await; match event_type { zk::EventType::Session => panic!("Session error {:?}", session_state), // A task as been added zk::EventType::NodeDataChanged => { // Add raw task content in local DB + log::info!("Received a new task from the cluster at {}", path); + let (data, _stat) = + this.zk.as_ref().unwrap().get_data(&path).await.unwrap(); + let task = serde_json::from_slice(&data).unwrap(); + + let this = this.private_clone(); + tokio::task::spawn_blocking(move || { + let mut wtxn = this.env.write_txn().unwrap(); + this.register_raw_task(&mut wtxn, &task).unwrap(); + wtxn.commit().unwrap(); + }) + .await + .unwrap(); } _ => (), } + this.wake_up.signal(); } }); } @@ -1054,7 +1099,7 @@ impl IndexScheduler { } pub fn register_raw_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { - self.all_tasks.append(wtxn, &BEU32::new(task.uid), &task)?; + self.all_tasks.put(wtxn, &BEU32::new(task.uid), &task)?; for index in task.indexes() { self.update_index(wtxn, index, |bitmap| {