synchronize most of the operations

This commit is contained in:
Tamo 2023-03-16 16:31:16 +01:00
parent 145f0e753c
commit 6cc14feb51
7 changed files with 447 additions and 37 deletions

View file

@ -22,6 +22,7 @@ use std::ffi::OsStr;
use std::fs::{self, File};
use std::io::BufWriter;
use cluster::Consistency;
use dump::IndexMetadata;
use log::{debug, error, info};
use meilisearch_types::heed::{RoTxn, RwTxn};
@ -41,14 +42,14 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId};
use crate::{Cluster, Error, IndexScheduler, ProcessingTasks, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time.
///
/// A batch contains the set of tasks that it represents (accessible through
/// [`self.ids()`](Batch::ids)), as well as additional information on how to
/// be processed.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum Batch {
TaskCancelation {
/// The task cancelation itself.
@ -85,14 +86,14 @@ pub(crate) enum Batch {
},
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum DocumentOperation {
Add(Uuid),
Delete(Vec<String>),
}
/// A [batch](Batch) that combines multiple tasks operating on an index.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum IndexOperation {
DocumentOperation {
index_uid: String,
@ -586,6 +587,16 @@ impl IndexScheduler {
_ => unreachable!(),
}
match &self.cluster {
Some(Cluster::Leader(leader)) => {
leader.write().unwrap().commit(Consistency::All)
}
Some(Cluster::Follower(follower)) => {
follower.write().unwrap().ready_to_commit()
}
None => (),
}
// We must only remove the content files if the transaction is successfully committed
// and if errors occurs when we are deleting files we must do our best to delete
// everything. We do not return the encountered errors when deleting the content
@ -629,6 +640,17 @@ impl IndexScheduler {
}
_ => unreachable!(),
}
match &self.cluster {
Some(Cluster::Leader(leader)) => {
leader.write().unwrap().commit(Consistency::All)
}
Some(Cluster::Follower(follower)) => {
follower.write().unwrap().ready_to_commit()
}
None => (),
}
wtxn.commit()?;
Ok(vec![task])
}
@ -840,7 +862,17 @@ impl IndexScheduler {
let mut index_wtxn = index.write_txn()?;
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
// TODO cluster: ready to commit
match &self.cluster {
Some(Cluster::Leader(leader)) => {
leader.write().unwrap().commit(Consistency::All)
}
Some(Cluster::Follower(follower)) => {
follower.write().unwrap().ready_to_commit()
}
None => (),
}
index_wtxn.commit()?;
Ok(tasks)
@ -939,6 +971,17 @@ impl IndexScheduler {
for swap in swaps {
self.apply_index_swap(&mut wtxn, task.uid, &swap.indexes.0, &swap.indexes.1)?;
}
match &self.cluster {
Some(Cluster::Leader(leader)) => {
leader.write().unwrap().commit(Consistency::All)
}
Some(Cluster::Follower(follower)) => {
follower.write().unwrap().ready_to_commit()
}
None => (),
}
wtxn.commit()?;
task.status = Status::Succeeded;
Ok(vec![task])
@ -1376,4 +1419,244 @@ impl IndexScheduler {
Ok(content_files_to_delete)
}
pub(crate) fn get_batch_from_cluster_batch(
&self,
rtxn: &RoTxn,
batch: cluster::batch::Batch,
) -> Result<Batch> {
use cluster::batch::Batch as CBatch;
Ok(match batch {
CBatch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => {
Batch::TaskCancelation {
task: self.get_existing_tasks(rtxn, Some(task))?[0],
previous_started_at,
previous_processing_tasks,
}
}
CBatch::TaskDeletion(task) => {
Batch::TaskDeletion(self.get_existing_tasks(rtxn, Some(task))?[0])
}
CBatch::SnapshotCreation(tasks) => {
Batch::SnapshotCreation(self.get_existing_tasks(rtxn, tasks)?)
}
CBatch::Dump(task) => Batch::Dump(self.get_existing_tasks(rtxn, Some(task))?[0]),
CBatch::IndexOperation { op, must_create_index } => Batch::IndexOperation {
op: self.get_index_op_from_cluster_index_op(rtxn, op)?,
must_create_index,
},
CBatch::IndexCreation { index_uid, primary_key, task } => Batch::IndexCreation {
index_uid,
primary_key,
task: self.get_existing_tasks(rtxn, Some(task))?[0],
},
CBatch::IndexUpdate { index_uid, primary_key, task } => Batch::IndexUpdate {
index_uid,
primary_key,
task: self.get_existing_tasks(rtxn, Some(task))?[0],
},
CBatch::IndexDeletion { index_uid, tasks, index_has_been_created } => {
Batch::IndexDeletion {
index_uid,
tasks: self.get_existing_tasks(rtxn, tasks)?,
index_has_been_created,
}
}
CBatch::IndexSwap { task } => {
Batch::IndexSwap { task: self.get_existing_tasks(rtxn, Some(task))?[0] }
}
})
}
pub(crate) fn get_index_op_from_cluster_index_op(
&self,
rtxn: &RoTxn,
op: cluster::batch::IndexOperation,
) -> Result<IndexOperation> {
use cluster::batch::IndexOperation as COp;
Ok(match op {
COp::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
tasks,
} => IndexOperation::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
tasks: self.get_existing_tasks(rtxn, tasks)?,
},
COp::DocumentDeletion { index_uid, documents, tasks } => {
IndexOperation::DocumentDeletion {
index_uid,
documents,
tasks: self.get_existing_tasks(rtxn, tasks)?,
}
}
COp::DocumentClear { index_uid, tasks } => IndexOperation::DocumentClear {
index_uid,
tasks: self.get_existing_tasks(rtxn, tasks)?,
},
COp::Settings { index_uid, settings, tasks } => IndexOperation::Settings {
index_uid,
settings,
tasks: self.get_existing_tasks(rtxn, tasks)?,
},
COp::DocumentClearAndSetting { index_uid, cleared_tasks, settings, settings_tasks } => {
IndexOperation::DocumentClearAndSetting {
index_uid,
cleared_tasks: self.get_existing_tasks(rtxn, cleared_tasks)?,
settings,
settings_tasks: self.get_existing_tasks(rtxn, settings_tasks)?,
}
}
COp::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
document_import_tasks,
settings,
settings_tasks,
} => IndexOperation::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
document_import_tasks: self.get_existing_tasks(rtxn, document_import_tasks)?,
settings,
settings_tasks: self.get_existing_tasks(rtxn, settings_tasks)?,
},
})
}
}
impl From<Batch> for cluster::batch::Batch {
fn from(batch: Batch) -> Self {
use cluster::batch::Batch as CBatch;
match batch {
Batch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => {
CBatch::TaskCancelation {
task: task.uid,
previous_started_at,
previous_processing_tasks,
}
}
Batch::TaskDeletion(task) => CBatch::TaskDeletion(task.uid),
Batch::SnapshotCreation(task) => {
CBatch::SnapshotCreation(task.into_iter().map(|task| task.uid).collect())
}
Batch::Dump(task) => CBatch::Dump(task.uid),
Batch::IndexOperation { op, must_create_index } => {
CBatch::IndexOperation { op: op.into(), must_create_index }
}
Batch::IndexCreation { index_uid, primary_key, task } => todo!(),
Batch::IndexUpdate { index_uid, primary_key, task } => todo!(),
Batch::IndexDeletion { index_uid, tasks, index_has_been_created } => todo!(),
Batch::IndexSwap { task } => todo!(),
}
}
}
impl From<IndexOperation> for cluster::batch::IndexOperation {
fn from(op: IndexOperation) -> Self {
use cluster::batch::IndexOperation as COp;
match op {
IndexOperation::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
tasks,
} => COp::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
tasks: tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::DocumentDeletion { index_uid, documents, tasks } => {
COp::DocumentDeletion {
index_uid,
documents,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
}
}
IndexOperation::DocumentClear { index_uid, tasks } => COp::DocumentClear {
index_uid,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::Settings { index_uid, settings, tasks } => COp::Settings {
index_uid,
settings,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::DocumentClearAndSetting {
index_uid,
cleared_tasks,
settings,
settings_tasks,
} => COp::DocumentClearAndSetting {
index_uid,
cleared_tasks: cleared_tasks.into_iter().map(|task| task.uid).collect(),
settings,
settings_tasks: settings_tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
document_import_tasks,
settings,
settings_tasks,
} => COp::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
document_import_tasks: document_import_tasks
.into_iter()
.map(|task| task.uid)
.collect(),
settings,
settings_tasks: settings_tasks.into_iter().map(|task| task.uid).collect(),
},
}
}
}
impl From<DocumentOperation> for cluster::batch::DocumentOperation {
fn from(op: DocumentOperation) -> Self {
use cluster::batch::DocumentOperation as COp;
match op {
DocumentOperation::Add(uuid) => COp::Add(uuid),
DocumentOperation::Delete(docs) => COp::Delete(docs),
}
}
}
impl From<cluster::batch::DocumentOperation> for DocumentOperation {
fn from(op: cluster::batch::DocumentOperation) -> Self {
use cluster::batch::DocumentOperation as COp;
match op {
COp::Add(uuid) => DocumentOperation::Add(uuid),
COp::Delete(docs) => DocumentOperation::Delete(docs),
}
}
}

View file

@ -38,6 +38,7 @@ use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use batch::Batch;
use cluster::{Consistency, Follower, Leader};
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
@ -326,8 +327,8 @@ pub struct IndexScheduler {
}
enum Cluster {
Leader(Leader),
Follower(Follower),
Leader(RwLock<Leader>),
Follower(RwLock<Follower>),
}
impl IndexScheduler {
@ -1061,17 +1062,11 @@ impl IndexScheduler {
self.breakpoint(Breakpoint::Start);
}
// TODO cluster: If
// - I'm a leader=> create the batch and send it to everyone
// - I'm a follower => get the batch from the leader and gather the tasks from my task queue
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let batch =
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
Some(batch) => batch,
None => return Ok(TickOutcome::WaitForSignal),
};
let batch = match self.get_or_create_next_batch()? {
Some(batch) => batch,
None => return Ok(TickOutcome::WaitForSignal),
};
let index_uid = batch.index_uid().map(ToOwned::to_owned);
drop(rtxn);
// TODO cluster: Should we send the starting date as well so everyone is in sync?
@ -1089,9 +1084,6 @@ impl IndexScheduler {
#[cfg(test)]
self.breakpoint(Breakpoint::BatchCreated);
// TODO cluster: Inside the processing of the tasks we need to check if we should commit
// the batch or not
// 2. Process the tasks
let res = {
let cloned_index_scheduler = self.private_clone();
@ -1205,6 +1197,29 @@ impl IndexScheduler {
Ok(TickOutcome::TickAgain(processed_tasks))
}
/// If there is no cluster or if leader -> create a new batch
/// If follower -> wait till the leader gives us a batch to process
fn get_or_create_next_batch(&self) -> Result<Option<Batch>> {
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let batch = match &self.cluster {
None | Some(Cluster::Leader(_)) => {
self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))?
}
Some(Cluster::Follower(follower)) => {
let batch = follower.write().unwrap().get_new_batch();
Some(self.get_batch_from_cluster_batch(&rtxn, batch)?)
}
};
if let Some(Cluster::Leader(leader)) = &self.cluster {
if let Some(ref batch) = batch {
leader.write().unwrap().starts_batch(batch.clone().into());
}
}
Ok(batch)
}
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
match task.content_uuid() {
Some(content_file) => self.delete_update_file(content_file),