fix the register task watcher

This commit is contained in:
Tamo 2023-08-08 13:18:55 +02:00
parent 0d20d08daf
commit 1191ec5939

View File

@ -600,7 +600,39 @@ impl IndexScheduler {
match zk.create("/tasks", &[], &options).await {
Ok(_) => (),
Err(zk::Error::NodeExists) => {
todo!("Syncronize with the cluster")
log::warn!("Tasks directory already exists, we're going to import all the tasks on the zk without altering the tasks already on disk.");
let children = zk
.list_children("/tasks")
.await
.expect("Internal, the /tasks directory was deleted during execution.");
log::info!("Importing {} tasks", children.len());
for path in children {
log::info!(" Importing {}", path);
match zk.get_data(&format!("/tasks/{}", &path)).await {
Ok((task, _stat)) => {
if task.is_empty() {
log::info!(" Task {} was empty, skipping.", path);
continue;
}
let task = serde_json::from_slice(&task).unwrap();
let this = self.private_clone();
tokio::task::spawn_blocking(move || {
let mut wtxn = this.env.write_txn().unwrap();
this.register_raw_task(&mut wtxn, &task).unwrap();
wtxn.commit().unwrap();
})
.await
.unwrap();
}
Err(e) => panic!("{e}"),
}
// else the file was deleted while we were inserting the key. We ignore it.
// TODO: What happens if someone updates the files before we have the time
// to setup the watcher
}
}
Err(e) => panic!("{e}"),
}
@ -608,20 +640,33 @@ impl IndexScheduler {
// TODO: fix unwrap by returning a clear error.
let mut watcher =
zk.watch("/tasks", zk::AddWatchMode::PersistentRecursive).await.unwrap();
let czk = zk.clone();
let this = self.private_clone();
tokio::spawn(async move {
let zk = czk;
loop {
let zk::WatchedEvent { event_type, session_state, path } =
dbg!(watcher.changed().await);
watcher.changed().await;
match event_type {
zk::EventType::Session => panic!("Session error {:?}", session_state),
// A task as been added
zk::EventType::NodeDataChanged => {
// Add raw task content in local DB
log::info!("Received a new task from the cluster at {}", path);
let (data, _stat) =
this.zk.as_ref().unwrap().get_data(&path).await.unwrap();
let task = serde_json::from_slice(&data).unwrap();
let this = this.private_clone();
tokio::task::spawn_blocking(move || {
let mut wtxn = this.env.write_txn().unwrap();
this.register_raw_task(&mut wtxn, &task).unwrap();
wtxn.commit().unwrap();
})
.await
.unwrap();
}
_ => (),
}
this.wake_up.signal();
}
});
}
@ -1054,7 +1099,7 @@ impl IndexScheduler {
}
pub fn register_raw_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
self.all_tasks.append(wtxn, &BEU32::new(task.uid), &task)?;
self.all_tasks.put(wtxn, &BEU32::new(task.uid), &task)?;
for index in task.indexes() {
self.update_index(wtxn, index, |bitmap| {