diff --git a/cluster/src/batch.rs b/cluster/src/batch.rs index 778e1aff9..800df204b 100644 --- a/cluster/src/batch.rs +++ b/cluster/src/batch.rs @@ -103,3 +103,46 @@ pub enum IndexOperation { settings_tasks: Vec, }, } + +impl Batch { + pub fn ids(&self) -> impl Iterator { + type Ret = Box>; + + match self { + Batch::TaskCancelation { task, .. } => Box::new(std::iter::once(*task)) as Ret, + Batch::TaskDeletion(task) => Box::new(std::iter::once(*task)) as Ret, + Batch::SnapshotCreation(tasks) => Box::new(tasks.clone().into_iter()) as Ret, + Batch::Dump(task) => Box::new(std::iter::once(*task)) as Ret, + Batch::IndexOperation { op, .. } => match op { + IndexOperation::DocumentOperation { tasks, .. } => { + Box::new(tasks.clone().into_iter()) as Ret + } + IndexOperation::DocumentDeletion { tasks, .. } => { + Box::new(tasks.clone().into_iter()) as Ret + } + IndexOperation::DocumentClear { tasks, .. } => { + Box::new(tasks.clone().into_iter()) as Ret + } + IndexOperation::Settings { tasks, .. } => { + Box::new(tasks.clone().into_iter()) as Ret + } + IndexOperation::DocumentClearAndSetting { + cleared_tasks, settings_tasks, .. + } => { + Box::new(cleared_tasks.clone().into_iter().chain(settings_tasks.clone())) as Ret + } + IndexOperation::SettingsAndDocumentOperation { + document_import_tasks, + settings_tasks, + .. + } => Box::new( + document_import_tasks.clone().into_iter().chain(settings_tasks.clone()), + ) as Ret, + }, + Batch::IndexCreation { task, .. } => Box::new(std::iter::once(*task)) as Ret, + Batch::IndexUpdate { task, .. } => Box::new(std::iter::once(*task)) as Ret, + Batch::IndexDeletion { tasks, .. } => Box::new(tasks.clone().into_iter()) as Ret, + Batch::IndexSwap { task } => Box::new(std::iter::once(*task)) as Ret, + } + } +} diff --git a/cluster/src/leader.rs b/cluster/src/leader.rs index 377c3a786..78bc15eb3 100644 --- a/cluster/src/leader.rs +++ b/cluster/src/leader.rs @@ -1,6 +1,6 @@ use std::net::ToSocketAddrs; -use std::sync::atomic::AtomicUsize; -use std::sync::{atomic, Arc, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{atomic, Arc, Mutex, RwLock}; use bus::{Bus, BusReader}; use crossbeam::channel::{unbounded, Receiver, Sender}; @@ -11,14 +11,14 @@ use meilisearch_types::tasks::Task; use crate::batch::Batch; use crate::{Consistency, FollowerMsg, LeaderMsg}; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Leader { task_ready_to_commit: Receiver, broadcast_to_follower: Sender, cluster_size: Arc, - batch_id: u32, + batch_id: Arc>, } impl Leader { @@ -36,7 +36,7 @@ impl Leader { task_ready_to_commit: task_finished_receiver, broadcast_to_follower: process_batch_sender, cluster_size, - batch_id: 0, + batch_id: Arc::default(), } } @@ -113,46 +113,62 @@ impl Leader { info!("A follower left the cluster. {} members.", size); } - pub fn starts_batch(&mut self, batch: Batch) { + pub fn starts_batch(&self, batch: Batch) { + let mut batch_id = self.batch_id.write().unwrap(); + assert!( - self.batch_id % 2 == 0, + *batch_id % 2 == 0, "Tried to start processing a batch before commiting the previous one" ); - self.batch_id += 1; + info!("Send the batch to process to the followers"); + *batch_id += 1; self.broadcast_to_follower - .send(LeaderMsg::StartBatch { id: self.batch_id, batch }) + .send(LeaderMsg::StartBatch { id: *batch_id, batch }) .expect("Can't reach the cluster"); } - pub fn commit(&mut self, consistency_level: Consistency) { + pub fn commit(&self, consistency_level: Consistency) { + info!("Wait until enough followers are ready to commit a batch"); + + let mut batch_id = self.batch_id.write().unwrap(); + // if zero nodes needs to be sync we can commit right away and early exit - if consistency_level != Consistency::Zero { + if consistency_level != Consistency::One { // else, we wait till enough nodes are ready to commit - for (ready_to_commit, _id) in self + for ready_to_commit in self .task_ready_to_commit .iter() // we need to filter out the messages from the old batches - .filter(|id| *id == self.batch_id) + .filter(|id| *id == *batch_id) .enumerate() + // we do a +2 because enumerate starts at 1 and we must includes ourselves in the count + .map(|(id, _)| id + 2) { + // TODO: if the last node dies we're stuck on the iterator + + // we need to reload the cluster size everytime in case a node dies let cluster_size = self.cluster_size.load(atomic::Ordering::Relaxed); + info!("{ready_to_commit} nodes are ready to commit for a cluster size of {cluster_size}"); match consistency_level { - Consistency::One if ready_to_commit >= 1 => break, - Consistency::Two if ready_to_commit >= 2 => break, + Consistency::Two if ready_to_commit >= 1 => break, Consistency::Quorum if ready_to_commit >= (cluster_size / 2) => break, + Consistency::All if ready_to_commit == cluster_size => break, _ => (), } } } - self.broadcast_to_follower.send(LeaderMsg::Commit(self.batch_id)).unwrap(); + info!("Tells all the follower to commit"); - self.batch_id += 1; + self.broadcast_to_follower.send(LeaderMsg::Commit(*batch_id)).unwrap(); + + *batch_id += 1; } - pub fn register_new_task(&mut self, task: Task, update_file: Option>) { + pub fn register_new_task(&self, task: Task, update_file: Option>) { + info!("Tells all the follower to register a new task"); self.broadcast_to_follower .send(LeaderMsg::RegisterNewTask { task, update_file }) .expect("Main thread is dead"); diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index 243b016e5..8e55f2080 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -1,4 +1,5 @@ use std::net::ToSocketAddrs; +use std::sync::{Arc, RwLock}; use batch::Batch; use crossbeam::channel::{unbounded, Receiver, Sender}; @@ -39,7 +40,6 @@ pub enum FollowerMsg { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Consistency { - Zero, One, Two, Quorum, @@ -54,7 +54,7 @@ pub struct Follower { must_commit: Receiver, register_new_task: Receiver<(Task, Option>)>, - batch_id: u32, + batch_id: Arc>, } impl Follower { @@ -76,7 +76,7 @@ impl Follower { get_batch: get_batch_receiver, must_commit: must_commit_receiver, register_new_task: register_task_receiver, - batch_id: 0, + batch_id: Arc::default(), } } @@ -106,31 +106,33 @@ impl Follower { } } - pub fn get_new_batch(&mut self) -> Batch { + pub fn get_new_batch(&self) -> Batch { info!("Get new batch called"); let (id, batch) = self.get_batch.recv().expect("Lost connection to the leader"); info!("Got a new batch"); - self.batch_id = id; + *self.batch_id.write().unwrap() = id; batch } - pub fn ready_to_commit(&mut self) { + pub fn ready_to_commit(&self) { info!("I'm ready to commit"); - self.sender.send(FollowerMsg::ReadyToCommit(self.batch_id)).unwrap(); + let batch_id = self.batch_id.read().unwrap(); + + self.sender.send(FollowerMsg::ReadyToCommit(*batch_id)).unwrap(); loop { let id = self.must_commit.recv().expect("Lost connection to the leader"); #[allow(clippy::comparison_chain)] - if id == self.batch_id { + if id == *batch_id { break; - } else if id > self.batch_id { + } else if id > *batch_id { panic!("We missed a batch"); } } info!("I got the right to commit"); } - pub fn get_new_task(&mut self) -> (Task, Option>) { + pub fn get_new_task(&self) -> (Task, Option>) { self.register_new_task.recv().unwrap() } } diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index bea3c7d63..b0be7dfa3 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -14,6 +14,7 @@ license.workspace = true anyhow = "1.0.64" bincode = "1.3.3" cluster = { path = "../cluster" } +crossbeam = "0.8.2" csv = "1.1.6" derive_builder = "0.11.2" dump = { path = "../dump" } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index e55654e1f..d08290c17 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -23,6 +23,7 @@ use std::fs::{self, File}; use std::io::BufWriter; use cluster::Consistency; +use crossbeam::utils::Backoff; use dump::IndexMetadata; use log::{debug, error, info}; use meilisearch_types::heed::{RoTxn, RwTxn}; @@ -588,12 +589,8 @@ impl IndexScheduler { } match &self.cluster { - Some(Cluster::Leader(leader)) => { - leader.write().unwrap().commit(Consistency::All) - } - Some(Cluster::Follower(follower)) => { - follower.write().unwrap().ready_to_commit() - } + Some(Cluster::Leader(leader)) => leader.commit(Consistency::All), + Some(Cluster::Follower(follower)) => follower.ready_to_commit(), None => (), } @@ -642,12 +639,8 @@ impl IndexScheduler { } match &self.cluster { - Some(Cluster::Leader(leader)) => { - leader.write().unwrap().commit(Consistency::All) - } - Some(Cluster::Follower(follower)) => { - follower.write().unwrap().ready_to_commit() - } + Some(Cluster::Leader(leader)) => leader.commit(Consistency::All), + Some(Cluster::Follower(follower)) => follower.ready_to_commit(), None => (), } @@ -864,12 +857,8 @@ impl IndexScheduler { let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; match &self.cluster { - Some(Cluster::Leader(leader)) => { - leader.write().unwrap().commit(Consistency::All) - } - Some(Cluster::Follower(follower)) => { - follower.write().unwrap().ready_to_commit() - } + Some(Cluster::Leader(leader)) => leader.commit(Consistency::All), + Some(Cluster::Follower(follower)) => follower.ready_to_commit(), None => (), } @@ -973,12 +962,8 @@ impl IndexScheduler { } match &self.cluster { - Some(Cluster::Leader(leader)) => { - leader.write().unwrap().commit(Consistency::All) - } - Some(Cluster::Follower(follower)) => { - follower.write().unwrap().ready_to_commit() - } + Some(Cluster::Leader(leader)) => leader.commit(Consistency::All), + Some(Cluster::Follower(follower)) => follower.ready_to_commit(), None => (), } @@ -1422,51 +1407,69 @@ impl IndexScheduler { pub(crate) fn get_batch_from_cluster_batch( &self, - rtxn: &RoTxn, batch: cluster::batch::Batch, ) -> Result { use cluster::batch::Batch as CBatch; + let mut rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; + + for id in batch.ids() { + let backoff = Backoff::new(); + let id = BEU32::new(id); + + loop { + if self.all_tasks.get(&rtxn, &id)?.is_some() { + info!("Found the task_id"); + break; + } + info!("The task is not present in the task queue, we wait"); + // we need to drop the txn to make a write visible + drop(rtxn); + backoff.spin(); + rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; + } + } + Ok(match batch { CBatch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => { Batch::TaskCancelation { - task: self.get_existing_tasks(rtxn, Some(task))?[0].clone(), + task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone(), previous_started_at, previous_processing_tasks, } } CBatch::TaskDeletion(task) => { - Batch::TaskDeletion(self.get_existing_tasks(rtxn, Some(task))?[0].clone()) + Batch::TaskDeletion(self.get_existing_tasks(&rtxn, Some(task))?[0].clone()) } CBatch::SnapshotCreation(tasks) => { - Batch::SnapshotCreation(self.get_existing_tasks(rtxn, tasks)?) + Batch::SnapshotCreation(self.get_existing_tasks(&rtxn, tasks)?) } CBatch::Dump(task) => { - Batch::Dump(self.get_existing_tasks(rtxn, Some(task))?[0].clone()) + Batch::Dump(self.get_existing_tasks(&rtxn, Some(task))?[0].clone()) } CBatch::IndexOperation { op, must_create_index } => Batch::IndexOperation { - op: self.get_index_op_from_cluster_index_op(rtxn, op)?, + op: self.get_index_op_from_cluster_index_op(&rtxn, op)?, must_create_index, }, CBatch::IndexCreation { index_uid, primary_key, task } => Batch::IndexCreation { index_uid, primary_key, - task: self.get_existing_tasks(rtxn, Some(task))?[0].clone(), + task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone(), }, CBatch::IndexUpdate { index_uid, primary_key, task } => Batch::IndexUpdate { index_uid, primary_key, - task: self.get_existing_tasks(rtxn, Some(task))?[0].clone(), + task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone(), }, CBatch::IndexDeletion { index_uid, tasks, index_has_been_created } => { Batch::IndexDeletion { index_uid, - tasks: self.get_existing_tasks(rtxn, tasks)?, + tasks: self.get_existing_tasks(&rtxn, tasks)?, index_has_been_created, } } CBatch::IndexSwap { task } => { - Batch::IndexSwap { task: self.get_existing_tasks(rtxn, Some(task))?[0].clone() } + Batch::IndexSwap { task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone() } } }) } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index fb73d3a7f..f790a0866 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -349,8 +349,8 @@ impl std::str::FromStr for ClusterMode { #[derive(Clone)] pub enum Cluster { - Leader(Arc>), - Follower(Arc>), + Leader(Leader), + Follower(Follower), } impl IndexScheduler { @@ -375,7 +375,7 @@ impl IndexScheduler { dumps_path: self.dumps_path.clone(), auth_path: self.auth_path.clone(), version_file_path: self.version_file_path.clone(), - cluster: None, + cluster: self.cluster.clone(), #[cfg(test)] test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), #[cfg(test)] @@ -544,11 +544,22 @@ impl IndexScheduler { fn run(&self) { let run = self.private_clone(); + if run.cluster.is_some() { + log::warn!("Run in a cluster"); + } else { + log::warn!("Run not in a cluster"); + } + // if we're a follower we starts a thread to register the tasks coming from the leader if let Some(Cluster::Follower(follower)) = self.cluster.clone() { let this = self.private_clone(); + if this.cluster.is_some() { + log::warn!("this in a cluster"); + } else { + log::warn!("this not in a cluster"); + } std::thread::spawn(move || loop { - let (task, content) = follower.write().unwrap().get_new_task(); + let (task, content) = follower.get_new_task(); this.register_raw_task(task, content); }); } @@ -917,7 +928,7 @@ impl IndexScheduler { } else { None }; - leader.write().unwrap().register_new_task(task.clone(), update_file); + leader.register_new_task(task.clone(), update_file); } // If the registered task is a task cancelation @@ -1283,21 +1294,30 @@ impl IndexScheduler { /// If there is no cluster or if leader -> create a new batch /// If follower -> wait till the leader gives us a batch to process fn get_or_create_next_batch(&self) -> Result> { - let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; + info!("inside get or create next batch"); let batch = match &self.cluster { None | Some(Cluster::Leader(_)) => { + let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? } Some(Cluster::Follower(follower)) => { - let batch = follower.write().unwrap().get_new_batch(); - Some(self.get_batch_from_cluster_batch(&rtxn, batch)?) + let batch = follower.get_new_batch(); + Some(self.get_batch_from_cluster_batch(batch)?) } }; + if self.cluster.is_some() { + println!("HERE: Part of a cluster"); + } else if self.cluster.is_none() { + println!("HERE: Not part of a cluster"); + } + info!("before checking if i’m a leader"); if let Some(Cluster::Leader(leader)) = &self.cluster { + info!("I'm a leader"); if let Some(ref batch) = batch { - leader.write().unwrap().starts_batch(batch.clone().into()); + info!("I'm a leader and I got a batch to process"); + leader.starts_batch(batch.clone().into()); } } Ok(batch) diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index a8c15a698..1fc7a8ef9 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -31,7 +31,7 @@ use error::PayloadError; use extractors::payload::PayloadConfig; use http::header::CONTENT_TYPE; use index_scheduler::{Cluster, IndexScheduler, IndexSchedulerOptions}; -use log::error; +use log::{error, info}; use meilisearch_auth::AuthController; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; @@ -226,11 +226,13 @@ fn open_or_create_database_unchecked( let cluster = if let Some(ref cluster) = opt.cluster_configuration.experimental_enable_ha { match cluster.as_str() { "leader" => { + info!("Starting as a leader"); let mut addr = opt.http_addr.to_socket_addrs().unwrap().next().unwrap(); addr.set_port(6666); - Some(Cluster::Leader(Arc::new(RwLock::new(Leader::new(addr))))) + Some(Cluster::Leader(Leader::new(addr))) } "follower" => { + info!("Starting as a follower"); let mut addr = opt .cluster_configuration .leader @@ -241,7 +243,7 @@ fn open_or_create_database_unchecked( .next() .unwrap(); addr.set_port(6666); - Some(Cluster::Follower(Arc::new(RwLock::new(Follower::join(addr))))) + Some(Cluster::Follower(Follower::join(addr))) } _ => panic!("Available values for the cluster mode are leader and follower"), }