plug the cluster thingy into meilisearch

This commit is contained in:
Tamo 2023-03-16 18:53:22 +01:00
parent 6cc14feb51
commit de1b939ca0
9 changed files with 254 additions and 50 deletions

1
Cargo.lock generated
View File

@ -2547,6 +2547,7 @@ dependencies = [
"bytes",
"cargo_toml",
"clap 4.0.32",
"cluster",
"crossbeam-channel",
"deserr",
"dump",

View File

@ -6,10 +6,12 @@ use bus::{Bus, BusReader};
use crossbeam::channel::{unbounded, Receiver, Sender};
use ductile::{ChannelReceiver, ChannelSender, ChannelServer};
use log::info;
use meilisearch_types::tasks::Task;
use crate::batch::Batch;
use crate::{Consistency, FollowerMsg, LeaderMsg};
#[derive(Clone)]
pub struct Leader {
task_ready_to_commit: Receiver<u32>,
broadcast_to_follower: Sender<LeaderMsg>,
@ -149,4 +151,10 @@ impl Leader {
self.batch_id += 1;
}
pub fn register_new_task(&mut self, task: Task, update_file: Option<Vec<u8>>) {
self.broadcast_to_follower
.send(LeaderMsg::RegisterNewTask { task, update_file })
.expect("Main thread is dead");
}
}

View File

@ -1,8 +1,10 @@
use std::net::ToSocketAddrs;
use batch::Batch;
use crossbeam::channel::{unbounded, Receiver, Sender};
use ductile::{connect_channel, ChannelReceiver, ChannelSender};
use meilisearch_types::tasks::KindWithContent;
use log::info;
use meilisearch_types::tasks::{KindWithContent, Task};
use serde::{Deserialize, Serialize};
pub mod batch;
@ -24,6 +26,8 @@ pub enum LeaderMsg {
StartBatch { id: u32, batch: Batch },
//Tell the follower to commit the update asap
Commit(u32),
//Tell the follower to commit the update asap
RegisterNewTask { task: Task, update_file: Option<Vec<u8>> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -42,40 +46,87 @@ pub enum Consistency {
All,
}
#[derive(Clone)]
pub struct Follower {
sender: ChannelSender<FollowerMsg>,
receiver: ChannelReceiver<LeaderMsg>,
get_batch: Receiver<(u32, Batch)>,
must_commit: Receiver<u32>,
register_new_task: Receiver<(Task, Option<Vec<u8>>)>,
batch_id: u32,
}
impl Follower {
pub fn join(leader: impl ToSocketAddrs) -> Follower {
let (sender, receiver) = connect_channel(leader).unwrap();
Follower { sender, receiver, batch_id: 0 }
info!("Connection to the leader established");
let (get_batch_sender, get_batch_receiver) = unbounded();
let (must_commit_sender, must_commit_receiver) = unbounded();
let (register_task_sender, register_task_receiver) = unbounded();
std::thread::spawn(move || {
Self::router(receiver, get_batch_sender, must_commit_sender, register_task_sender);
});
Follower {
sender,
get_batch: get_batch_receiver,
must_commit: must_commit_receiver,
register_new_task: register_task_receiver,
batch_id: 0,
}
}
fn router(
receiver: ChannelReceiver<LeaderMsg>,
get_batch: Sender<(u32, Batch)>,
must_commit: Sender<u32>,
register_new_task: Sender<(Task, Option<Vec<u8>>)>,
) {
loop {
match receiver.recv().expect("Lost connection to the leader") {
LeaderMsg::StartBatch { id, batch } => {
info!("Starting to process a new batch");
get_batch.send((id, batch)).expect("Lost connection to the main thread")
}
LeaderMsg::Commit(id) => {
info!("Must commit");
must_commit.send(id).expect("Lost connection to the main thread")
}
LeaderMsg::RegisterNewTask { task, update_file } => {
info!("Registered a new task");
register_new_task
.send((task, update_file))
.expect("Lost connection to the main thread")
}
}
}
}
pub fn get_new_batch(&mut self) -> Batch {
loop {
match self.receiver.recv() {
Ok(LeaderMsg::StartBatch { id, batch }) if id == self.batch_id => {
self.batch_id = id;
break batch;
}
Err(_) => log::error!("lost connection to the leader"),
_ => (),
}
}
let (id, batch) = self.get_batch.recv().expect("Lost connection to the leader");
self.batch_id = id;
batch
}
pub fn ready_to_commit(&mut self) {
self.sender.send(FollowerMsg::ReadyToCommit(self.batch_id)).unwrap();
loop {
match self.receiver.recv() {
Ok(LeaderMsg::Commit(id)) if id == self.batch_id => break,
Err(_) => panic!("lost connection to the leader"),
_ => (),
let id = self.must_commit.recv().expect("Lost connection to the leader");
#[allow(clippy::comparison_chain)]
if id == self.batch_id {
break;
} else if id > self.batch_id {
panic!("We missed a batch");
}
}
}
pub fn get_new_task(&mut self) -> (Task, Option<Vec<u8>>) {
self.register_new_task.recv().unwrap()
}
}

View File

@ -1430,18 +1430,20 @@ impl IndexScheduler {
Ok(match batch {
CBatch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => {
Batch::TaskCancelation {
task: self.get_existing_tasks(rtxn, Some(task))?[0],
task: self.get_existing_tasks(rtxn, Some(task))?[0].clone(),
previous_started_at,
previous_processing_tasks,
}
}
CBatch::TaskDeletion(task) => {
Batch::TaskDeletion(self.get_existing_tasks(rtxn, Some(task))?[0])
Batch::TaskDeletion(self.get_existing_tasks(rtxn, Some(task))?[0].clone())
}
CBatch::SnapshotCreation(tasks) => {
Batch::SnapshotCreation(self.get_existing_tasks(rtxn, tasks)?)
}
CBatch::Dump(task) => Batch::Dump(self.get_existing_tasks(rtxn, Some(task))?[0]),
CBatch::Dump(task) => {
Batch::Dump(self.get_existing_tasks(rtxn, Some(task))?[0].clone())
}
CBatch::IndexOperation { op, must_create_index } => Batch::IndexOperation {
op: self.get_index_op_from_cluster_index_op(rtxn, op)?,
must_create_index,
@ -1449,12 +1451,12 @@ impl IndexScheduler {
CBatch::IndexCreation { index_uid, primary_key, task } => Batch::IndexCreation {
index_uid,
primary_key,
task: self.get_existing_tasks(rtxn, Some(task))?[0],
task: self.get_existing_tasks(rtxn, Some(task))?[0].clone(),
},
CBatch::IndexUpdate { index_uid, primary_key, task } => Batch::IndexUpdate {
index_uid,
primary_key,
task: self.get_existing_tasks(rtxn, Some(task))?[0],
task: self.get_existing_tasks(rtxn, Some(task))?[0].clone(),
},
CBatch::IndexDeletion { index_uid, tasks, index_has_been_created } => {
Batch::IndexDeletion {
@ -1464,7 +1466,7 @@ impl IndexScheduler {
}
}
CBatch::IndexSwap { task } => {
Batch::IndexSwap { task: self.get_existing_tasks(rtxn, Some(task))?[0] }
Batch::IndexSwap { task: self.get_existing_tasks(rtxn, Some(task))?[0].clone() }
}
})
}
@ -1559,10 +1561,20 @@ impl From<Batch> for cluster::batch::Batch {
Batch::IndexOperation { op, must_create_index } => {
CBatch::IndexOperation { op: op.into(), must_create_index }
}
Batch::IndexCreation { index_uid, primary_key, task } => todo!(),
Batch::IndexUpdate { index_uid, primary_key, task } => todo!(),
Batch::IndexDeletion { index_uid, tasks, index_has_been_created } => todo!(),
Batch::IndexSwap { task } => todo!(),
Batch::IndexCreation { index_uid, primary_key, task } => {
CBatch::IndexCreation { index_uid, primary_key, task: task.uid }
}
Batch::IndexUpdate { index_uid, primary_key, task } => {
CBatch::IndexUpdate { index_uid, primary_key, task: task.uid }
}
Batch::IndexDeletion { index_uid, tasks, index_has_been_created } => {
CBatch::IndexDeletion {
index_uid,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
index_has_been_created,
}
}
Batch::IndexSwap { task } => CBatch::IndexSwap { task: task.uid },
}
}
}

View File

@ -33,6 +33,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
snapshots_path: _,
auth_path: _,
version_file_path: _,
cluster: _,
test_breakpoint_sdr: _,
planned_failures: _,
run_loop_iteration: _,

View File

@ -31,6 +31,7 @@ mod uuid_codec;
pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32;
use std::io::Write;
use std::ops::{Bound, RangeBounds};
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
@ -39,7 +40,7 @@ use std::sync::{Arc, RwLock};
use std::time::Duration;
use batch::Batch;
use cluster::{Consistency, Follower, Leader};
use cluster::{Follower, Leader};
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
use file_store::FileStore;
@ -52,6 +53,7 @@ use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap;
use serde::Deserialize;
use synchronoise::SignalEvent;
use time::OffsetDateTime;
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
@ -326,9 +328,28 @@ pub struct IndexScheduler {
run_loop_iteration: Arc<RwLock<usize>>,
}
enum Cluster {
Leader(RwLock<Leader>),
Follower(RwLock<Follower>),
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
pub enum ClusterMode {
Leader,
Follower,
}
impl std::str::FromStr for ClusterMode {
type Err = ();
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"leader" => Ok(ClusterMode::Leader),
"follower" => Ok(ClusterMode::Follower),
_ => Err(()),
}
}
}
#[derive(Clone)]
pub enum Cluster {
Leader(Arc<RwLock<Leader>>),
Follower(Arc<RwLock<Follower>>),
}
impl IndexScheduler {
@ -368,6 +389,7 @@ impl IndexScheduler {
/// Create an index scheduler and start its run loop.
pub fn new(
options: IndexSchedulerOptions,
cluster: Option<Cluster>,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> {
@ -427,7 +449,7 @@ impl IndexScheduler {
snapshots_path: options.snapshots_path,
auth_path: options.auth_path,
version_file_path: options.version_file_path,
cluster: None,
cluster,
#[cfg(test)]
test_breakpoint_sdr,
@ -520,6 +542,16 @@ impl IndexScheduler {
/// only once per index scheduler.
fn run(&self) {
let run = self.private_clone();
// if we're a follower we starts a thread to register the tasks coming from the leader
if let Some(Cluster::Follower(follower)) = self.cluster.clone() {
let this = self.private_clone();
std::thread::spawn(move || loop {
let (task, content) = follower.write().unwrap().get_new_task();
this.register_raw_task(task, content);
});
}
std::thread::Builder::new()
.name(String::from("scheduler"))
.spawn(move || {
@ -877,6 +909,16 @@ impl IndexScheduler {
return Err(e.into());
}
if let Some(Cluster::Leader(leader)) = &self.cluster {
let update_file = if let Some(uuid) = task.content_uuid() {
let path = self.file_store.get_update_path(uuid);
Some(std::fs::read(path).unwrap())
} else {
None
};
leader.write().unwrap().register_new_task(task.clone(), update_file);
}
// If the registered task is a task cancelation
// we inform the processing tasks to stop (if necessary).
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
@ -1006,6 +1048,44 @@ impl IndexScheduler {
Ok(task)
}
/// /!\ should only be used when you're a follower in cluster mode
pub fn register_raw_task(&self, task: Task, content_file: Option<Vec<u8>>) {
if let Some(content) = content_file {
let uuid = task.content_uuid().expect("bad task");
let (_, mut file) = self.file_store.new_update_with_uuid(uuid.as_u128()).unwrap();
file.write_all(&content).unwrap();
file.persist().unwrap();
}
let mut wtxn = self.env.write_txn().unwrap();
self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task).unwrap();
for index in task.indexes() {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})
.unwrap();
}
self.update_status(&mut wtxn, task.status, |bitmap| {
bitmap.insert(task.uid);
})
.unwrap();
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
(bitmap.insert(task.uid));
})
.unwrap();
utils::insert_task_datetime(&mut wtxn, self.enqueued_at, task.enqueued_at, task.uid)
.unwrap();
wtxn.commit().unwrap();
self.wake_up.signal();
}
/// Create a new index without any associated task.
pub fn create_raw_index(
&self,
@ -1335,7 +1415,7 @@ mod tests {
autobatching_enabled,
};
let index_scheduler = Self::new(options, sender, planned_failures).unwrap();
let index_scheduler = Self::new(options, None, sender, planned_failures).unwrap();
// To be 100% consistent between all test we're going to start the scheduler right now
// and ensure it's in the expected starting state.

View File

@ -24,6 +24,7 @@ bstr = "1.0.1"
byte-unit = { version = "4.0.14", default-features = false, features = ["std", "serde"] }
bytes = "1.2.1"
clap = { version = "4.0.9", features = ["derive", "env"] }
cluster = { path = "../cluster" }
crossbeam-channel = "0.5.6"
deserr = "0.5.0"
dump = { path = "../dump" }

View File

@ -12,8 +12,9 @@ pub mod search;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::net::ToSocketAddrs;
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
@ -25,10 +26,11 @@ use actix_web::web::Data;
use actix_web::{web, HttpRequest};
use analytics::Analytics;
use anyhow::bail;
use cluster::{Follower, Leader};
use error::PayloadError;
use extractors::payload::PayloadConfig;
use http::header::CONTENT_TYPE;
use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
use index_scheduler::{Cluster, IndexScheduler, IndexSchedulerOptions};
use log::error;
use meilisearch_auth::AuthController;
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
@ -220,22 +222,53 @@ fn open_or_create_database_unchecked(
// we don't want to create anything in the data.ms yet, thus we
// wrap our two builders in a closure that'll be executed later.
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key);
let cluster = if let Some(ref cluster) = opt.cluster_configuration.experimental_enable_ha {
match cluster.as_str() {
"leader" => {
let mut addr = opt.http_addr.to_socket_addrs().unwrap().next().unwrap();
addr.set_port(6666);
Some(Cluster::Leader(Arc::new(RwLock::new(Leader::new(addr)))))
}
"follower" => {
let mut addr = opt
.cluster_configuration
.leader
.as_ref()
.expect("Can't be a follower without a leader")
.to_socket_addrs()
.unwrap()
.next()
.unwrap();
addr.set_port(6666);
Some(Cluster::Follower(Arc::new(RwLock::new(Follower::join(addr)))))
}
_ => panic!("Available values for the cluster mode are leader and follower"),
}
} else {
None
};
let index_scheduler_builder = || -> anyhow::Result<_> {
Ok(IndexScheduler::new(IndexSchedulerOptions {
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
auth_path: opt.db_path.join("auth"),
tasks_path: opt.db_path.join("tasks"),
update_file_path: opt.db_path.join("update_files"),
indexes_path: opt.db_path.join("indexes"),
snapshots_path: opt.snapshot_dir.clone(),
dumps_path: opt.dump_dir.clone(),
task_db_size: opt.max_task_db_size.get_bytes() as usize,
index_base_map_size: opt.max_index_size.get_bytes() as usize,
indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT,
})?)
Ok(IndexScheduler::new(
IndexSchedulerOptions {
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
auth_path: opt.db_path.join("auth"),
tasks_path: opt.db_path.join("tasks"),
update_file_path: opt.db_path.join("update_files"),
indexes_path: opt.db_path.join("indexes"),
snapshots_path: opt.snapshot_dir.clone(),
dumps_path: opt.dump_dir.clone(),
task_db_size: opt.max_task_db_size.get_bytes() as usize,
index_base_map_size: opt.max_index_size.get_bytes() as usize,
indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes()
as usize,
index_count: DEFAULT_INDEX_COUNT,
},
cluster,
)?)
};
match (

View File

@ -12,6 +12,7 @@ use std::{env, fmt, fs};
use byte_unit::{Byte, ByteError};
use clap::Parser;
use index_scheduler::ClusterMode;
use meilisearch_types::milli::update::IndexerConfig;
use rustls::server::{
AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient, ServerSessionMemoryCache,
@ -297,6 +298,10 @@ pub struct Opt {
#[clap(flatten)]
pub indexer_options: IndexerOpts,
#[serde(flatten)]
#[clap(flatten)]
pub cluster_configuration: ClusterOpts,
/// Set the path to a configuration file that should be used to setup the engine.
/// Format must be TOML.
#[clap(long)]
@ -385,6 +390,7 @@ impl Opt {
#[cfg(all(not(debug_assertions), feature = "analytics"))]
no_analytics,
experimental_enable_metrics: enable_metrics_route,
cluster_configuration,
} = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@ -518,6 +524,17 @@ impl IndexerOpts {
}
}
#[derive(Debug, Default, Clone, Parser, Deserialize)]
pub struct ClusterOpts {
#[clap(long)]
#[serde(default)]
pub experimental_enable_ha: Option<String>,
#[clap(long)]
#[serde(default)]
pub leader: Option<String>,
}
impl TryFrom<&IndexerOpts> for IndexerConfig {
type Error = anyhow::Error;