From 2d1434da81895cc207fa00aecbfcf52752568d57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 30 Aug 2023 17:15:15 +0200 Subject: [PATCH] Keep the ZK flow when enqueuing tasks --- index-scheduler/src/lib.rs | 503 +++++++++++++++++++------------------ 1 file changed, 262 insertions(+), 241 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index dcb64a788..35339271b 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -609,271 +609,288 @@ impl IndexScheduler { #[cfg(test)] run.breakpoint(Breakpoint::Init); - if let Some(zookeeper) = self.zookeeper.clone() { - let id = Uuid::new_v4().to_string(); - let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string()); - latch.start().unwrap(); + let latch = match self.zookeeper.clone() { + Some(zookeeper) => { + let id = Uuid::new_v4().to_string(); + let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string()); + latch.start().unwrap(); - // Join the potential leaders list. - // The lowest in the list is the leader. And if we're not the leader - // we watch the node right before us to be notified if he dies. - // See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection - let latchc = latch.clone(); - let this = self.private_clone(); - zookeeper - .add_watch("/snapshots", AddWatchMode::PersistentRecursive, move |event| { - if !latchc.has_leadership() { + // Join the potential leaders list. + // The lowest in the list is the leader. And if we're not the leader + // we watch the node right before us to be notified if he dies. + // See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection + let latchc = latch.clone(); + let this = self.private_clone(); + zookeeper + .add_watch("/snapshots", AddWatchMode::PersistentRecursive, move |event| { + if !latchc.has_leadership() { + let WatchedEvent { event_type, path, keeper_state: _ } = event; + match event_type { + WatchedEventType::NodeCreated => { + let path = path.unwrap(); + log::info!("The snapshot {} is in preparation", path); + } + WatchedEventType::NodeDataChanged => { + let path = path.unwrap(); + log::info!("Importing snapshot {}", path); + let snapshot_id = + path.strip_prefix("/snapshots/snapshot-").unwrap(); + let snapshot_dir = PathBuf::from(format!( + "{}/zk-snapshots/{}", + env!("HOME"), + snapshot_id + )); + + // 1. TODO: Ensure the snapshot version file is the same as our version. + + // 2. Download all the databases + let tasks_file = + tempfile::NamedTempFile::new_in(this.env.path()).unwrap(); + + log::info!("Downloading the index scheduler database."); + let tasks_snapshot = snapshot_dir.join("tasks.mdb"); + std::fs::copy(tasks_snapshot, tasks_file).unwrap(); + + log::info!("Downloading the indexes databases"); + let indexes_files = + tempfile::TempDir::new_in(&this.index_mapper.base_path) + .unwrap(); + let mut indexes = Vec::new(); + + let dst = snapshot_dir.join("indexes"); + for result in std::fs::read_dir(&dst).unwrap() { + let entry = result.unwrap(); + let uuid = entry + .file_name() + .as_os_str() + .to_str() + .unwrap() + .to_string(); + log::info!("\tDownloading the index {}", uuid.to_string()); + std::fs::copy( + dst.join(&uuid), + indexes_files.path().join(&uuid), + ) + .unwrap(); + indexes.push(uuid); + } + + // 3. Lock the index-mapper and close all the env + // TODO: continue here + + // run.env.close(); + + // 4. Move all the databases + + // 5. Unlock the index-mapper + + // 2. Download and import the index-scheduler database + + // 3. Snapshot every indexes + } + otherwise => panic!("{otherwise:?}"), + } + } + }) + .unwrap(); + + match zookeeper.create( + "/tasks", + vec![], + Acl::open_unsafe().clone(), + CreateMode::Persistent, + ) { + Ok(_) => (), + Err(ZkError::NodeExists) => { + log::warn!("Tasks directory already exists, we're going to import all the tasks on the zk without altering the tasks already on disk."); + + let children = zookeeper + .get_children("/tasks", false) + .expect("Internal, the /tasks directory was deleted during execution."); // TODO change me + + log::info!("Importing {} tasks", children.len()); + for path in children { + log::info!(" Importing {}", path); + match zookeeper.get_data(&format!("/tasks/{}", &path), false) { + Ok((task, _stat)) => { + if task.is_empty() { + log::info!(" Task {} was empty, skipping.", path); + } else { + let task = serde_json::from_slice(&task).unwrap(); + let mut wtxn = self.env.write_txn().unwrap(); + self.register_raw_task(&mut wtxn, &task).unwrap(); + wtxn.commit().unwrap(); + // we received a new tasks, we must wake up + self.wake_up.signal(); + } + } + Err(e) => panic!("{e}"), + } + // else the file was deleted while we were inserting the key. We ignore it. + // TODO: What happens if someone updates the files before we have the time + // to setup the watcher + } + } + Err(e) => panic!("{e}"), + } + + // TODO: fix unwrap by returning a clear error. + let this = self.private_clone(); + zookeeper + .add_watch("/tasks", AddWatchMode::PersistentRecursive, move |event| { let WatchedEvent { event_type, path, keeper_state: _ } = event; match event_type { - WatchedEventType::NodeCreated => { - let path = path.unwrap(); - log::info!("The snapshot {} is in preparation", path); - } WatchedEventType::NodeDataChanged => { let path = path.unwrap(); - log::info!("Importing snapshot {}", path); - let snapshot_id = - path.strip_prefix("/snapshots/snapshot-").unwrap(); - let snapshot_dir = PathBuf::from(format!( - "{}/zk-snapshots/{}", - env!("HOME"), - snapshot_id - )); - - // 1. TODO: Ensure the snapshot version file is the same as our version. - - // 2. Download all the databases - let tasks_file = - tempfile::NamedTempFile::new_in(this.env.path()).unwrap(); - - log::info!("Downloading the index scheduler database."); - let tasks_snapshot = snapshot_dir.join("tasks.mdb"); - std::fs::copy(tasks_snapshot, tasks_file).unwrap(); - - log::info!("Downloading the indexes databases"); - let indexes_files = - tempfile::TempDir::new_in(&this.index_mapper.base_path) - .unwrap(); - let mut indexes = Vec::new(); - - let dst = snapshot_dir.join("indexes"); - for result in std::fs::read_dir(&dst).unwrap() { - let entry = result.unwrap(); - let uuid = - entry.file_name().as_os_str().to_str().unwrap().to_string(); - log::info!("\tDownloading the index {}", uuid.to_string()); - std::fs::copy( - dst.join(&uuid), - indexes_files.path().join(&uuid), - ) + // Add raw task content in local DB + log::info!("Received a new task from the cluster at {}", path); + let (data, _stat) = this + .zookeeper + .as_ref() + .unwrap() + .get_data(&path, false) .unwrap(); - indexes.push(uuid); - } - - // 3. Lock the index-mapper and close all the env - // TODO: continue here - - // run.env.close(); - - // 4. Move all the databases - - // 5. Unlock the index-mapper - - // 2. Download and import the index-scheduler database - - // 3. Snapshot every indexes + let task = serde_json::from_slice(data.as_slice()).unwrap(); + let mut wtxn = this.env.write_txn().unwrap(); + this.register_raw_task(&mut wtxn, &task).unwrap(); + wtxn.commit().unwrap(); } - otherwise => panic!("{otherwise:?}"), + WatchedEventType::None + | WatchedEventType::NodeCreated + | WatchedEventType::NodeDeleted => (), + WatchedEventType::NodeChildrenChanged + | WatchedEventType::DataWatchRemoved + | WatchedEventType::ChildWatchRemoved => panic!("{event_type:?}"), } - } - }) - .unwrap(); - match zookeeper.create( - "/tasks", - vec![], - Acl::open_unsafe().clone(), - CreateMode::Persistent, - ) { - Ok(_) => (), - Err(ZkError::NodeExists) => { - log::warn!("Tasks directory already exists, we're going to import all the tasks on the zk without altering the tasks already on disk."); + this.wake_up.signal(); + }) + .unwrap(); - let children = zookeeper - .get_children("/tasks", false) - .expect("Internal, the /tasks directory was deleted during execution."); // TODO change me - - log::info!("Importing {} tasks", children.len()); - for path in children { - log::info!(" Importing {}", path); - match zookeeper.get_data(&format!("/tasks/{}", &path), false) { - Ok((task, _stat)) => { - if task.is_empty() { - log::info!(" Task {} was empty, skipping.", path); - } else { - let task = serde_json::from_slice(&task).unwrap(); - let mut wtxn = self.env.write_txn().unwrap(); - self.register_raw_task(&mut wtxn, &task).unwrap(); - wtxn.commit().unwrap(); - // we received a new tasks, we must wake up - self.wake_up.signal(); - } - } - Err(e) => panic!("{e}"), - } - // else the file was deleted while we were inserting the key. We ignore it. - // TODO: What happens if someone updates the files before we have the time - // to setup the watcher - } - } - Err(e) => panic!("{e}"), + Some(latch) } - - // TODO: fix unwrap by returning a clear error. - let this = self.private_clone(); - zookeeper - .add_watch("/tasks", AddWatchMode::PersistentRecursive, move |event| { - let WatchedEvent { event_type, path, keeper_state: _ } = event; - match event_type { - WatchedEventType::NodeDataChanged => { - let path = path.unwrap(); - // Add raw task content in local DB - log::info!("Received a new task from the cluster at {}", path); - let (data, _stat) = - this.zookeeper.as_ref().unwrap().get_data(&path, false).unwrap(); - let task = serde_json::from_slice(data.as_slice()).unwrap(); - let mut wtxn = this.env.write_txn().unwrap(); - this.register_raw_task(&mut wtxn, &task).unwrap(); - wtxn.commit().unwrap(); - } - WatchedEventType::None - | WatchedEventType::NodeCreated - | WatchedEventType::NodeDeleted => (), - WatchedEventType::NodeChildrenChanged - | WatchedEventType::DataWatchRemoved - | WatchedEventType::ChildWatchRemoved => panic!("{event_type:?}"), - } - - this.wake_up.signal(); - }) - .unwrap(); - } + None => None, + }; let this = self.private_clone(); std::thread::spawn(move || { loop { // we're either a leader or not running in a cluster, // either way we should wait until we receive a task. - let wake_up = this.wake_up.clone(); - let _ = wake_up.wait(); + this.wake_up.wait(); - match this.tick() { - Ok(TickOutcome::TickAgain(n)) => { - // We must tick again. - this.wake_up.signal(); + // TODO watch the /election node and send a signal once it changes (be careful about atomics ordering) + if latch.as_ref().map_or(true, |latch| latch.has_leadership()) { + match this.tick() { + Ok(TickOutcome::TickAgain(n)) => { + // We must tick again. + this.wake_up.signal(); - // if we're in a cluster that means we're the leader - // and should share a snapshot of what we've done. - if let Some(ref zookeeper) = this.zookeeper { - // if nothing was processed we have nothing to do. - if n == 0 { - continue; - } + // if we're in a cluster that means we're the leader + // and should share a snapshot of what we've done. + if let Some(ref zookeeper) = this.zookeeper { + // if nothing was processed we have nothing to do. + if n == 0 { + continue; + } - let snapshot_path = zookeeper - .create( - "/snapshots/snapshot-", - vec![], - Acl::open_unsafe().clone(), - CreateMode::PersistentSequential, + let snapshot_path = zookeeper + .create( + "/snapshots/snapshot-", + vec![], + Acl::open_unsafe().clone(), + CreateMode::PersistentSequential, + ) + .unwrap(); + + let snapshot_id = snapshot_path + .rsplit_once('-') + .map(|(_, id)| id.parse::().unwrap()) + .unwrap(); + + let zk_snapshots = format!("{}/zk-snapshots", env!("HOME")); + std::fs::create_dir_all(&zk_snapshots).unwrap(); + let snapshot_dir = + PathBuf::from(format!("{zk_snapshots}/{snapshot_id:0>10?}")); + std::fs::create_dir(&snapshot_dir).unwrap(); + + // 1. Snapshot the version file. + let dst = snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME); + std::fs::copy(&this.version_file_path, dst).unwrap(); + + // 2. Snapshot the index-scheduler LMDB env + log::info!("Snapshotting the tasks"); + let env = this.env.clone(); + env.copy_to_path( + snapshot_dir.join("tasks.mdb"), + heed::CompactionOption::Enabled, ) .unwrap(); - let snapshot_id = snapshot_path - .rsplit_once('-') - .map(|(_, id)| id.parse::().unwrap()) - .unwrap(); + // 3. Snapshot every indexes + log::info!("Snapshotting the indexes"); + let dst = snapshot_dir.join("indexes"); + std::fs::create_dir_all(&dst).unwrap(); - let zk_snapshots = format!("{}/zk-snapshots", env!("HOME")); - std::fs::create_dir_all(&zk_snapshots).unwrap(); - let snapshot_dir = - PathBuf::from(format!("{zk_snapshots}/{snapshot_id:0>10?}")); - std::fs::create_dir(&snapshot_dir).unwrap(); - - // 1. Snapshot the version file. - let dst = snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME); - std::fs::copy(&this.version_file_path, dst).unwrap(); - - // 2. Snapshot the index-scheduler LMDB env - log::info!("Snapshotting the tasks"); - let env = this.env.clone(); - env.copy_to_path( - snapshot_dir.join("tasks.mdb"), - heed::CompactionOption::Enabled, - ) - .unwrap(); - - // 3. Snapshot every indexes - log::info!("Snapshotting the indexes"); - let dst = snapshot_dir.join("indexes"); - std::fs::create_dir_all(&dst).unwrap(); - - let this = this.private_clone(); - let rtxn = this.env.read_txn().unwrap(); - let indexes = this - .index_mapper - .index_mapping - .iter(&rtxn) - .unwrap() - .map(|ret| ret.unwrap()) - .map(|(name, uuid)| (name.to_string(), uuid)) - .collect::>(); - - for (name, uuid) in indexes { - log::info!(" Snapshotting index {name}"); let this = this.private_clone(); - let dst = dst.clone(); let rtxn = this.env.read_txn().unwrap(); - let index = this.index_mapper.index(&rtxn, &name).unwrap(); - index - .copy_to_path( - dst.join(format!("{uuid}.mdb")), - heed::CompactionOption::Enabled, - ) - .unwrap(); - } + let indexes = this + .index_mapper + .index_mapping + .iter(&rtxn) + .unwrap() + .map(|ret| ret.unwrap()) + .map(|(name, uuid)| (name.to_string(), uuid)) + .collect::>(); - // we must notify everyone that we dropped a new snapshot on the s3 - let _stat = zookeeper.set_data( - &format!("/snapshots/snapshot-{:0>10?}", snapshot_id), - vec![], - None, - ); - log::info!("Notified everyone about the new snapshot {snapshot_id}"); + for (name, uuid) in indexes { + log::info!(" Snapshotting index {name}"); + let this = this.private_clone(); + let dst = dst.clone(); + let rtxn = this.env.read_txn().unwrap(); + let index = this.index_mapper.index(&rtxn, &name).unwrap(); + index + .copy_to_path( + dst.join(format!("{uuid}.mdb")), + heed::CompactionOption::Enabled, + ) + .unwrap(); + } - // We can now delete all the tasks that has been processed - let processed = this - .processing_tasks - .read() - .unwrap() - .processed_previously() - .clone(); // we don't want to hold the mutex - log::info!("Deleting {} processed tasks", processed.len()); - for task in processed { - let node = dbg!(format!("/tasks/task-{:0>10?}", task as i32)); - let _ = zookeeper // we don't want to crash if we can't delete an update file. - .delete(&node, None) - .unwrap(); - // TODO: Delete the update files associated with the deleted tasks + // we must notify everyone that we dropped a new snapshot on the s3 + let _stat = zookeeper.set_data( + &format!("/snapshots/snapshot-{:0>10?}", snapshot_id), + vec![], + None, + ); + log::info!( + "Notified everyone about the new snapshot {snapshot_id}" + ); + + // We can now delete all the tasks that has been processed + let processed = this + .processing_tasks + .read() + .unwrap() + .processed_previously() + .clone(); // we don't want to hold the mutex + log::info!("Deleting {} processed tasks", processed.len()); + for task in processed { + let node = dbg!(format!("/tasks/task-{:0>10?}", task as i32)); + let _ = zookeeper // we don't want to crash if we can't delete an update file. + .delete(&node, None) + .unwrap(); + // TODO: Delete the update files associated with the deleted tasks + } } } - } - Ok(TickOutcome::WaitForSignal) => (), - Err(e) => { - log::error!("{}", e); - // Wait one second when an irrecoverable error occurs. - if !e.is_recoverable() { - std::thread::sleep(Duration::from_secs(1)); + Ok(TickOutcome::WaitForSignal) => (), + Err(e) => { + log::error!("{}", e); + // Wait one second when an irrecoverable error occurs. + if !e.is_recoverable() { + std::thread::sleep(Duration::from_secs(1)); + } } } } @@ -1264,11 +1281,13 @@ impl IndexScheduler { // (that it does not contain duplicate indexes). check_index_swap_validity(&task)?; - this.register_raw_task(&mut wtxn, &task)?; + if self.zookeeper.is_none() { + this.register_raw_task(&mut wtxn, &task)?; - if let Err(e) = wtxn.commit() { - this.delete_persisted_task_data(&task)?; - return Err(e.into()); + if let Err(e) = wtxn.commit() { + this.delete_persisted_task_data(&task)?; + return Err(e.into()); + } } // If the registered task is a task cancelation @@ -1281,8 +1300,10 @@ impl IndexScheduler { } } - // notify the scheduler loop to execute a new tick - this.wake_up.signal(); + if self.zookeeper.is_none() { + // notify the scheduler loop to execute a new tick + this.wake_up.signal(); + } // TODO: send task to ZK in raw json. if let Some(zookeeper) = &self.zookeeper {