From 6cc14feb51414ba5fd42732b762db78bb81b7983 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 16 Mar 2023 16:31:16 +0100 Subject: [PATCH] synchronize most of the operations --- Cargo.lock | 28 ++-- cluster/Cargo.toml | 4 +- cluster/src/batch.rs | 105 +++++++++++++ cluster/src/leader.rs | 3 +- cluster/src/lib.rs | 6 +- index-scheduler/src/batch.rs | 293 ++++++++++++++++++++++++++++++++++- index-scheduler/src/lib.rs | 45 ++++-- 7 files changed, 447 insertions(+), 37 deletions(-) create mode 100644 cluster/src/batch.rs diff --git a/Cargo.lock b/Cargo.lock index 3bac445d5..701156b46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -815,6 +815,8 @@ dependencies = [ "serde", "serde_json", "thiserror", + "time", + "uuid 1.3.0", ] [[package]] @@ -1232,7 +1234,7 @@ dependencies = [ "tempfile", "thiserror", "time", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -1438,7 +1440,7 @@ dependencies = [ "faux", "tempfile", "thiserror", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -1978,7 +1980,7 @@ dependencies = [ "tempfile", "thiserror", "time", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -2605,7 +2607,7 @@ dependencies = [ "tokio-stream", "toml", "urlencoding", - "uuid 1.2.2", + "uuid 1.3.0", "vergen", "walkdir", "yaup", @@ -2628,7 +2630,7 @@ dependencies = [ "sha2", "thiserror", "time", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -2658,7 +2660,7 @@ dependencies = [ "thiserror", "time", "tokio", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -2733,7 +2735,7 @@ dependencies = [ "tempfile", "thiserror", "time", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -3908,9 +3910,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.17" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376" +checksum = "cd0cbfecb4d19b5ea75bb31ad904eb5b9fa13f21079c3b92017ebdf4999a5890" dependencies = [ "itoa 1.0.5", "serde", @@ -3926,9 +3928,9 @@ checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" [[package]] name = "time-macros" -version = "0.2.6" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2" +checksum = "fd80a657e71da814b8e5d60d3374fc6d35045062245d80224748ae522dd76f36" dependencies = [ "time-core", ] @@ -4173,9 +4175,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.2.2" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" +checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79" dependencies = [ "getrandom", "serde", diff --git a/cluster/Cargo.toml b/cluster/Cargo.toml index 3a2f125e6..529c4212b 100644 --- a/cluster/Cargo.toml +++ b/cluster/Cargo.toml @@ -16,7 +16,9 @@ serde = { version = "1.0.155", features = ["derive"] } serde_json = "1.0.94" thiserror = "1.0.39" meilisearch-types = { path = "../meilisearch-types" } -roaring = "0.10.1" +roaring = { version = "0.10.1", features = ["serde"] } log = "0.4.17" crossbeam = "0.8.2" bus = "2.3.0" +time = "0.3.20" +uuid = { version = "1.3.0", features = ["v4"] } diff --git a/cluster/src/batch.rs b/cluster/src/batch.rs new file mode 100644 index 000000000..778e1aff9 --- /dev/null +++ b/cluster/src/batch.rs @@ -0,0 +1,105 @@ +use meilisearch_types::milli::update::IndexDocumentsMethod; +use meilisearch_types::settings::{Settings, Unchecked}; +use meilisearch_types::tasks::TaskId; +use roaring::RoaringBitmap; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use uuid::Uuid; + +/// Represents a combination of tasks that can all be processed at the same time. +/// +/// A batch contains the set of tasks that it represents (accessible through +/// [`self.ids()`](Batch::ids)), as well as additional information on how to +/// be processed. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Batch { + TaskCancelation { + /// The task cancelation itself. + task: TaskId, + /// The date and time at which the previously processing tasks started. + previous_started_at: OffsetDateTime, + /// The list of tasks that were processing when this task cancelation appeared. + previous_processing_tasks: RoaringBitmap, + }, + TaskDeletion(TaskId), + SnapshotCreation(Vec), + Dump(TaskId), + IndexOperation { + op: IndexOperation, + must_create_index: bool, + }, + IndexCreation { + index_uid: String, + primary_key: Option, + task: TaskId, + }, + IndexUpdate { + index_uid: String, + primary_key: Option, + task: TaskId, + }, + IndexDeletion { + index_uid: String, + tasks: Vec, + index_has_been_created: bool, + }, + IndexSwap { + task: TaskId, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum DocumentOperation { + Add(Uuid), + Delete(Vec), +} + +/// A [batch](Batch) that combines multiple tasks operating on an index. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum IndexOperation { + DocumentOperation { + index_uid: String, + primary_key: Option, + method: IndexDocumentsMethod, + documents_counts: Vec, + operations: Vec, + tasks: Vec, + }, + DocumentDeletion { + index_uid: String, + // The vec associated with each document deletion tasks. + documents: Vec>, + tasks: Vec, + }, + DocumentClear { + index_uid: String, + tasks: Vec, + }, + Settings { + index_uid: String, + // The boolean indicates if it's a settings deletion or creation. + settings: Vec<(bool, Settings)>, + tasks: Vec, + }, + DocumentClearAndSetting { + index_uid: String, + cleared_tasks: Vec, + + // The boolean indicates if it's a settings deletion or creation. + settings: Vec<(bool, Settings)>, + settings_tasks: Vec, + }, + SettingsAndDocumentOperation { + index_uid: String, + + primary_key: Option, + method: IndexDocumentsMethod, + documents_counts: Vec, + operations: Vec, + document_import_tasks: Vec, + + // The boolean indicates if it's a settings deletion or creation. + settings: Vec<(bool, Settings)>, + settings_tasks: Vec, + }, +} diff --git a/cluster/src/leader.rs b/cluster/src/leader.rs index aa749a996..8159de4f3 100644 --- a/cluster/src/leader.rs +++ b/cluster/src/leader.rs @@ -7,6 +7,7 @@ use crossbeam::channel::{unbounded, Receiver, Sender}; use ductile::{ChannelReceiver, ChannelSender, ChannelServer}; use log::info; +use crate::batch::Batch; use crate::{Consistency, FollowerMsg, LeaderMsg}; pub struct Leader { @@ -110,7 +111,7 @@ impl Leader { info!("A follower left the cluster. {} members.", size); } - pub fn starts_batch(&mut self, batch: Vec) { + pub fn starts_batch(&mut self, batch: Batch) { assert!( self.batch_id % 2 == 0, "Tried to start processing a batch before commiting the previous one" diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index ac595fe95..625ebc4ac 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -1,9 +1,11 @@ use std::net::ToSocketAddrs; +use batch::Batch; use ductile::{connect_channel, ChannelReceiver, ChannelSender}; use meilisearch_types::tasks::KindWithContent; use serde::{Deserialize, Serialize}; +pub mod batch; mod leader; pub use leader::Leader; @@ -19,7 +21,7 @@ pub enum Error { #[derive(Debug, Clone, Serialize, Deserialize)] pub enum LeaderMsg { // Starts a new batch - StartBatch { id: u32, batch: Vec }, + StartBatch { id: u32, batch: Batch }, // Tell the follower to commit the update asap Commit(u32), } @@ -52,7 +54,7 @@ impl Follower { Follower { sender, receiver, batch_id: 0 } } - pub fn get_new_batch(&mut self) -> Vec { + pub fn get_new_batch(&mut self) -> Batch { loop { match self.receiver.recv() { Ok(LeaderMsg::StartBatch { id, batch }) if id == self.batch_id => { diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index c103bccc5..b1d69de4a 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -22,6 +22,7 @@ use std::ffi::OsStr; use std::fs::{self, File}; use std::io::BufWriter; +use cluster::Consistency; use dump::IndexMetadata; use log::{debug, error, info}; use meilisearch_types::heed::{RoTxn, RwTxn}; @@ -41,14 +42,14 @@ use uuid::Uuid; use crate::autobatcher::{self, BatchKind}; use crate::utils::{self, swap_index_uid_in_task}; -use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId}; +use crate::{Cluster, Error, IndexScheduler, ProcessingTasks, Result, TaskId}; /// Represents a combination of tasks that can all be processed at the same time. /// /// A batch contains the set of tasks that it represents (accessible through /// [`self.ids()`](Batch::ids)), as well as additional information on how to /// be processed. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) enum Batch { TaskCancelation { /// The task cancelation itself. @@ -85,14 +86,14 @@ pub(crate) enum Batch { }, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) enum DocumentOperation { Add(Uuid), Delete(Vec), } /// A [batch](Batch) that combines multiple tasks operating on an index. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) enum IndexOperation { DocumentOperation { index_uid: String, @@ -586,6 +587,16 @@ impl IndexScheduler { _ => unreachable!(), } + match &self.cluster { + Some(Cluster::Leader(leader)) => { + leader.write().unwrap().commit(Consistency::All) + } + Some(Cluster::Follower(follower)) => { + follower.write().unwrap().ready_to_commit() + } + None => (), + } + // We must only remove the content files if the transaction is successfully committed // and if errors occurs when we are deleting files we must do our best to delete // everything. We do not return the encountered errors when deleting the content @@ -629,6 +640,17 @@ impl IndexScheduler { } _ => unreachable!(), } + + match &self.cluster { + Some(Cluster::Leader(leader)) => { + leader.write().unwrap().commit(Consistency::All) + } + Some(Cluster::Follower(follower)) => { + follower.write().unwrap().ready_to_commit() + } + None => (), + } + wtxn.commit()?; Ok(vec![task]) } @@ -840,7 +862,17 @@ impl IndexScheduler { let mut index_wtxn = index.write_txn()?; let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; - // TODO cluster: ready to commit + + match &self.cluster { + Some(Cluster::Leader(leader)) => { + leader.write().unwrap().commit(Consistency::All) + } + Some(Cluster::Follower(follower)) => { + follower.write().unwrap().ready_to_commit() + } + None => (), + } + index_wtxn.commit()?; Ok(tasks) @@ -939,6 +971,17 @@ impl IndexScheduler { for swap in swaps { self.apply_index_swap(&mut wtxn, task.uid, &swap.indexes.0, &swap.indexes.1)?; } + + match &self.cluster { + Some(Cluster::Leader(leader)) => { + leader.write().unwrap().commit(Consistency::All) + } + Some(Cluster::Follower(follower)) => { + follower.write().unwrap().ready_to_commit() + } + None => (), + } + wtxn.commit()?; task.status = Status::Succeeded; Ok(vec![task]) @@ -1376,4 +1419,244 @@ impl IndexScheduler { Ok(content_files_to_delete) } + + pub(crate) fn get_batch_from_cluster_batch( + &self, + rtxn: &RoTxn, + batch: cluster::batch::Batch, + ) -> Result { + use cluster::batch::Batch as CBatch; + + Ok(match batch { + CBatch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => { + Batch::TaskCancelation { + task: self.get_existing_tasks(rtxn, Some(task))?[0], + previous_started_at, + previous_processing_tasks, + } + } + CBatch::TaskDeletion(task) => { + Batch::TaskDeletion(self.get_existing_tasks(rtxn, Some(task))?[0]) + } + CBatch::SnapshotCreation(tasks) => { + Batch::SnapshotCreation(self.get_existing_tasks(rtxn, tasks)?) + } + CBatch::Dump(task) => Batch::Dump(self.get_existing_tasks(rtxn, Some(task))?[0]), + CBatch::IndexOperation { op, must_create_index } => Batch::IndexOperation { + op: self.get_index_op_from_cluster_index_op(rtxn, op)?, + must_create_index, + }, + CBatch::IndexCreation { index_uid, primary_key, task } => Batch::IndexCreation { + index_uid, + primary_key, + task: self.get_existing_tasks(rtxn, Some(task))?[0], + }, + CBatch::IndexUpdate { index_uid, primary_key, task } => Batch::IndexUpdate { + index_uid, + primary_key, + task: self.get_existing_tasks(rtxn, Some(task))?[0], + }, + CBatch::IndexDeletion { index_uid, tasks, index_has_been_created } => { + Batch::IndexDeletion { + index_uid, + tasks: self.get_existing_tasks(rtxn, tasks)?, + index_has_been_created, + } + } + CBatch::IndexSwap { task } => { + Batch::IndexSwap { task: self.get_existing_tasks(rtxn, Some(task))?[0] } + } + }) + } + + pub(crate) fn get_index_op_from_cluster_index_op( + &self, + rtxn: &RoTxn, + op: cluster::batch::IndexOperation, + ) -> Result { + use cluster::batch::IndexOperation as COp; + + Ok(match op { + COp::DocumentOperation { + index_uid, + primary_key, + method, + documents_counts, + operations, + tasks, + } => IndexOperation::DocumentOperation { + index_uid, + primary_key, + method, + documents_counts, + operations: operations.into_iter().map(|op| op.into()).collect(), + tasks: self.get_existing_tasks(rtxn, tasks)?, + }, + COp::DocumentDeletion { index_uid, documents, tasks } => { + IndexOperation::DocumentDeletion { + index_uid, + documents, + tasks: self.get_existing_tasks(rtxn, tasks)?, + } + } + COp::DocumentClear { index_uid, tasks } => IndexOperation::DocumentClear { + index_uid, + tasks: self.get_existing_tasks(rtxn, tasks)?, + }, + COp::Settings { index_uid, settings, tasks } => IndexOperation::Settings { + index_uid, + settings, + tasks: self.get_existing_tasks(rtxn, tasks)?, + }, + COp::DocumentClearAndSetting { index_uid, cleared_tasks, settings, settings_tasks } => { + IndexOperation::DocumentClearAndSetting { + index_uid, + cleared_tasks: self.get_existing_tasks(rtxn, cleared_tasks)?, + settings, + settings_tasks: self.get_existing_tasks(rtxn, settings_tasks)?, + } + } + COp::SettingsAndDocumentOperation { + index_uid, + primary_key, + method, + documents_counts, + operations, + document_import_tasks, + settings, + settings_tasks, + } => IndexOperation::SettingsAndDocumentOperation { + index_uid, + primary_key, + method, + documents_counts, + operations: operations.into_iter().map(|op| op.into()).collect(), + document_import_tasks: self.get_existing_tasks(rtxn, document_import_tasks)?, + settings, + settings_tasks: self.get_existing_tasks(rtxn, settings_tasks)?, + }, + }) + } +} + +impl From for cluster::batch::Batch { + fn from(batch: Batch) -> Self { + use cluster::batch::Batch as CBatch; + + match batch { + Batch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => { + CBatch::TaskCancelation { + task: task.uid, + previous_started_at, + previous_processing_tasks, + } + } + Batch::TaskDeletion(task) => CBatch::TaskDeletion(task.uid), + Batch::SnapshotCreation(task) => { + CBatch::SnapshotCreation(task.into_iter().map(|task| task.uid).collect()) + } + Batch::Dump(task) => CBatch::Dump(task.uid), + Batch::IndexOperation { op, must_create_index } => { + CBatch::IndexOperation { op: op.into(), must_create_index } + } + Batch::IndexCreation { index_uid, primary_key, task } => todo!(), + Batch::IndexUpdate { index_uid, primary_key, task } => todo!(), + Batch::IndexDeletion { index_uid, tasks, index_has_been_created } => todo!(), + Batch::IndexSwap { task } => todo!(), + } + } +} + +impl From for cluster::batch::IndexOperation { + fn from(op: IndexOperation) -> Self { + use cluster::batch::IndexOperation as COp; + match op { + IndexOperation::DocumentOperation { + index_uid, + primary_key, + method, + documents_counts, + operations, + tasks, + } => COp::DocumentOperation { + index_uid, + primary_key, + method, + documents_counts, + operations: operations.into_iter().map(|op| op.into()).collect(), + tasks: tasks.into_iter().map(|task| task.uid).collect(), + }, + IndexOperation::DocumentDeletion { index_uid, documents, tasks } => { + COp::DocumentDeletion { + index_uid, + documents, + tasks: tasks.into_iter().map(|task| task.uid).collect(), + } + } + IndexOperation::DocumentClear { index_uid, tasks } => COp::DocumentClear { + index_uid, + tasks: tasks.into_iter().map(|task| task.uid).collect(), + }, + IndexOperation::Settings { index_uid, settings, tasks } => COp::Settings { + index_uid, + settings, + tasks: tasks.into_iter().map(|task| task.uid).collect(), + }, + IndexOperation::DocumentClearAndSetting { + index_uid, + cleared_tasks, + settings, + settings_tasks, + } => COp::DocumentClearAndSetting { + index_uid, + cleared_tasks: cleared_tasks.into_iter().map(|task| task.uid).collect(), + settings, + settings_tasks: settings_tasks.into_iter().map(|task| task.uid).collect(), + }, + IndexOperation::SettingsAndDocumentOperation { + index_uid, + primary_key, + method, + documents_counts, + operations, + document_import_tasks, + settings, + settings_tasks, + } => COp::SettingsAndDocumentOperation { + index_uid, + primary_key, + method, + documents_counts, + operations: operations.into_iter().map(|op| op.into()).collect(), + document_import_tasks: document_import_tasks + .into_iter() + .map(|task| task.uid) + .collect(), + settings, + settings_tasks: settings_tasks.into_iter().map(|task| task.uid).collect(), + }, + } + } +} + +impl From for cluster::batch::DocumentOperation { + fn from(op: DocumentOperation) -> Self { + use cluster::batch::DocumentOperation as COp; + + match op { + DocumentOperation::Add(uuid) => COp::Add(uuid), + DocumentOperation::Delete(docs) => COp::Delete(docs), + } + } +} + +impl From for DocumentOperation { + fn from(op: cluster::batch::DocumentOperation) -> Self { + use cluster::batch::DocumentOperation as COp; + + match op { + COp::Add(uuid) => DocumentOperation::Add(uuid), + COp::Delete(docs) => DocumentOperation::Delete(docs), + } + } } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 19b402bbc..0ef7195ca 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -38,6 +38,7 @@ use std::sync::atomic::Ordering::Relaxed; use std::sync::{Arc, RwLock}; use std::time::Duration; +use batch::Batch; use cluster::{Consistency, Follower, Leader}; use dump::{KindDump, TaskDump, UpdateFile}; pub use error::Error; @@ -326,8 +327,8 @@ pub struct IndexScheduler { } enum Cluster { - Leader(Leader), - Follower(Follower), + Leader(RwLock), + Follower(RwLock), } impl IndexScheduler { @@ -1061,17 +1062,11 @@ impl IndexScheduler { self.breakpoint(Breakpoint::Start); } - // TODO cluster: If - // - I'm a leader => create the batch and send it to everyone - // - I'm a follower => get the batch from the leader and gather the tasks from my task queue - let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; - let batch = - match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { - Some(batch) => batch, - None => return Ok(TickOutcome::WaitForSignal), - }; + let batch = match self.get_or_create_next_batch()? { + Some(batch) => batch, + None => return Ok(TickOutcome::WaitForSignal), + }; let index_uid = batch.index_uid().map(ToOwned::to_owned); - drop(rtxn); // TODO cluster: Should we send the starting date as well so everyone is in sync? @@ -1089,9 +1084,6 @@ impl IndexScheduler { #[cfg(test)] self.breakpoint(Breakpoint::BatchCreated); - // TODO cluster: Inside the processing of the tasks we need to check if we should commit - // the batch or not - // 2. Process the tasks let res = { let cloned_index_scheduler = self.private_clone(); @@ -1205,6 +1197,29 @@ impl IndexScheduler { Ok(TickOutcome::TickAgain(processed_tasks)) } + /// If there is no cluster or if leader -> create a new batch + /// If follower -> wait till the leader gives us a batch to process + fn get_or_create_next_batch(&self) -> Result> { + let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; + + let batch = match &self.cluster { + None | Some(Cluster::Leader(_)) => { + self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? + } + Some(Cluster::Follower(follower)) => { + let batch = follower.write().unwrap().get_new_batch(); + Some(self.get_batch_from_cluster_batch(&rtxn, batch)?) + } + }; + + if let Some(Cluster::Leader(leader)) = &self.cluster { + if let Some(ref batch) = batch { + leader.write().unwrap().starts_batch(batch.clone().into()); + } + } + Ok(batch) + } + pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> { match task.content_uuid() { Some(content_file) => self.delete_update_file(content_file),