diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index fb865a98b..aa95b7075 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -480,7 +480,7 @@ impl IndexScheduler { if let Some(task_id) = to_cancel.max() { // We retrieve the tasks that were processing before this tasks cancelation started. // We must *not* reset the processing tasks before calling this method. - let ProcessingTasks { started_at, processing } = + let ProcessingTasks { started_at, processing, .. } = &*self.processing_tasks.read().unwrap(); return Ok(Some(Batch::TaskCancelation { task: self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index c19a005ed..df2dd1429 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -153,23 +153,34 @@ struct ProcessingTasks { started_at: OffsetDateTime, /// The list of tasks ids that are currently running. processing: RoaringBitmap, + /// The list of tasks ids that were processed in the last batch + processed_previously: RoaringBitmap, } impl ProcessingTasks { /// Creates an empty `ProcessingAt` struct. fn new() -> ProcessingTasks { - ProcessingTasks { started_at: OffsetDateTime::now_utc(), processing: RoaringBitmap::new() } + ProcessingTasks { + started_at: OffsetDateTime::now_utc(), + processing: RoaringBitmap::new(), + processed_previously: RoaringBitmap::new(), + } } /// Stores the currently processing tasks, and the date time at which it started. fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) { self.started_at = started_at; - self.processing = processing; + self.processed_previously = std::mem::replace(&mut self.processing, processing); } /// Set the processing tasks to an empty list fn stop_processing(&mut self) { - self.processing = RoaringBitmap::new(); + self.processed_previously = std::mem::take(&mut self.processing); + } + + /// Returns the tasks that were processed in the previous tick. + fn processed_previously(&self) -> &RoaringBitmap { + &self.processed_previously } /// Returns `true` if there, at least, is one task that is currently processing that we must stop. @@ -480,6 +491,19 @@ impl IndexScheduler { features, }; + // initialize the directories we need to process batches. + if let Some(ref zk) = this.zk { + let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all()); + match zk.create("/election", &[], &options).await { + Ok(_) | Err(zk::Error::NodeExists) => (), + Err(e) => panic!("{e}"), + } + + match zk.create("/snapshots", &[], &options).await { + Ok(_) | Err(zk::Error::NodeExists) => (), + Err(e) => panic!("{e}"), + } + } this.run().await; Ok(this) } @@ -576,23 +600,12 @@ impl IndexScheduler { #[cfg(test)] run.breakpoint(Breakpoint::Init); - // potentialy create /leader-q folder - // join the leader q - // subscribe a watcher to the node-1 in the leader q + // Join the potential leaders list. + // The lowest in the list is the leader. And if we're not the leader + // we watch the node right before us to be notified if he dies. + // See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection let mut watchers = if let Some(ref zk) = zk { - let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all()); - match zk.create("/election", &[], &options).await { - Ok(_) | Err(zk::Error::NodeExists) => (), - Err(e) => panic!("{e}"), - } - - match zk.create("/snapshots", &[], &options).await { - Ok(_) | Err(zk::Error::NodeExists) => (), - Err(e) => panic!("{e}"), - } - let options = zk::CreateMode::EphemeralSequential.with_acls(zk::Acls::anyone_all()); - // TODO: ugly unwrap let (_stat, id) = zk.create("/election/node-", &[], &options).await.unwrap(); self_node_id = id; let previous_path = { @@ -601,11 +614,12 @@ impl IndexScheduler { let self_node_path = format!("node-{}", self_node_id); let previous_path = - list.into_iter().take_while(|path| dbg!(path) < &self_node_path).last(); + list.into_iter().take_while(|path| path < &self_node_path).last(); previous_path.map(|path| format!("/election/{}", path)) }; if let Some(previous_path) = previous_path { + log::warn!("I am the follower {}", self_node_id); Some(( zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(), zk.watch("/snapshots", zk::AddWatchMode::PersistentRecursive) @@ -613,6 +627,7 @@ impl IndexScheduler { .unwrap(), )) } else { + // if there was no node before ourselves, then we're the leader. log::warn!("I'm the leader"); None } @@ -623,11 +638,14 @@ impl IndexScheduler { loop { match watchers.as_mut() { - Some((lw, sw)) => { + Some((leader_watcher, snapshot_watcher)) => { + // We wait for a new batch processed by the leader OR a disconnection from the leader. tokio::select! { - zk::WatchedEvent { event_type, session_state, .. } = lw.changed() => match event_type { + zk::WatchedEvent { event_type, session_state, .. } = leader_watcher.changed() => match event_type { zk::EventType::Session => panic!("Session error {:?}", session_state), zk::EventType::NodeDeleted => { + // The node behind us has been disconnected, + // am I the leader or is there someone before me. let zk = zk.as_ref().unwrap(); let previous_path = { let mut list = zk.list_children("/election").await.unwrap(); @@ -635,28 +653,27 @@ impl IndexScheduler { let self_node_path = format!("node-{}", self_node_id); let previous_path = - list.into_iter().take_while(|path| dbg!(path) < &self_node_path).last(); + list.into_iter().take_while(|path| path < &self_node_path).last(); previous_path.map(|path| format!("/election/{}", path)) }; - let (lw, sw) = watchers.take().unwrap(); - lw.remove().await.unwrap(); + let (leader_watcher, snapshot_watcher) = watchers.take().unwrap(); + leader_watcher.remove().await.unwrap(); watchers = if let Some(previous_path) = previous_path { + log::warn!("I stay a follower {}", self_node_id); Some(( zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(), - zk.watch("/snapshots", zk::AddWatchMode::PersistentRecursive) - .await - .unwrap(), + snapshot_watcher, )) } else { log::warn!("I'm the new leader"); - sw.remove().await.unwrap(); + snapshot_watcher.remove().await.unwrap(); None } } _ => (), }, - zk::WatchedEvent { event_type, session_state, path } = sw.changed() => match event_type { + zk::WatchedEvent { event_type, session_state, path } = snapshot_watcher.changed() => match event_type { zk::EventType::Session => panic!("Session error {:?}", session_state), zk::EventType::NodeCreated => { println!("I should load a snapshot - {}", path); @@ -667,18 +684,56 @@ impl IndexScheduler { } } None => { + // we're either a leader or not running in a cluster, + // either way we should wait until we receive a task. let wake_up = run.wake_up.clone(); let _ = tokio::task::spawn_blocking(move || wake_up.wait()).await; match run.tick().await { - Ok(TickOutcome::TickAgain(_)) => { + Ok(TickOutcome::TickAgain(n)) => { + // We must tick again. run.wake_up.signal(); - // TODO: - // - create a new snapshot - // - create snapshot in ZK - // - delete task in ZK - println!("I should create a snapshot"); + // if we're in a cluster that means we're the leader + // and should share a snapshot of what we've done. + if let Some(ref zk) = run.zk { + // if nothing was processed we have nothing to do. + if n == 0 { + continue; + } + + // TODO: + // - create a new snapshot on disk/s3 + + // we must notify everyone that we dropped a new snapshot on the s3 + let options = zk::CreateMode::EphemeralSequential + .with_acls(zk::Acls::anyone_all()); + let (_stat, id) = zk + .create("/snapshots/snapshot-", &[], &options) + .await + .unwrap(); + log::info!("Notified that there was a new snapshot {id}"); + + // We can now delete all the tasks that has been processed + let processed = run + .processing_tasks + .read() + .unwrap() + .processed_previously() + .clone(); // we don't want to hold the mutex + log::info!("Deleting {} processed tasks", processed.len()); + for task in processed { + let _ = zk // we don't want to crash if we can't delete an update file. + .delete( + &format!( + "/tasks/task-{}", + zk::CreateSequence(task as i32) + ), + None, + ) + .await; + } + } } Ok(TickOutcome::WaitForSignal) => (), Err(e) => {