Make it compile

This commit is contained in:
Louis Dureuil 2025-04-02 17:14:40 +02:00
parent 87547550f5
commit f3ab940776
No known key found for this signature in database
6 changed files with 36 additions and 27 deletions

View File

@ -182,6 +182,7 @@ impl BatchQueue {
started_at: batch.started_at,
finished_at: batch.finished_at,
enqueued_at: batch.enqueued_at,
stop_reason: batch.reason.to_string(),
},
)?;

View File

@ -146,12 +146,14 @@ impl BatchKind {
// TODO use an AutoBatchKind as input
pub fn new(
task_id: TaskId,
kind: KindWithContent,
kind_with_content: KindWithContent,
primary_key: Option<&str>,
) -> (ControlFlow<(BatchKind, BatchStopReason), BatchKind>, bool) {
use AutobatchKind as K;
match AutobatchKind::from(kind) {
let kind = kind_with_content.as_kind();
match AutobatchKind::from(kind_with_content) {
K::IndexCreation => (
Break((
BatchKind::IndexCreation { id: task_id },
@ -198,7 +200,7 @@ impl BatchKind {
Break((
BatchKind::DocumentOperation {
allow_index_creation,
primary_key: pk,
primary_key: pk.clone(),
operation_ids: vec![task_id],
},
BatchStopReason::PrimaryKeyIndexMismatch {
@ -235,17 +237,20 @@ impl BatchKind {
/// To ease the writing of the code. `true` can be returned when you don't need to create an index
/// but false can't be returned if you needs to create an index.
#[rustfmt::skip]
fn accumulate(self, id: TaskId, kind: AutobatchKind, index_already_exists: bool, primary_key: Option<&str>) -> ControlFlow<(BatchKind, BatchStopReason), BatchKind> {
fn accumulate(self, id: TaskId, kind_with_content: KindWithContent, index_already_exists: bool, primary_key: Option<&str>) -> ControlFlow<(BatchKind, BatchStopReason), BatchKind> {
use AutobatchKind as K;
let pk: Option<String> = match (self.primary_key(), kind.primary_key(), primary_key) {
// 1. If both task don't interact with primary key -> we can continue
let kind = kind_with_content.as_kind();
let autobatch_kind = AutobatchKind::from(kind_with_content);
let pk: Option<String> = match (self.primary_key(), autobatch_kind.primary_key(), primary_key) {
// 1. If incoming task don't interact with primary key -> we can continue
(batch_pk, None | Some(None), _) => {
batch_pk.flatten().map(ToOwned::to_owned)
},
// 2.1 If we already have a primary-key ->
// 2.1.1 If the task we're trying to accumulate have a pk it must be equal to our primary key
(batch_pk, Some(Some(task_pk)), Some(index_pk)) => if task_pk == index_pk {
(_batch_pk, Some(Some(task_pk)), Some(index_pk)) => if task_pk == index_pk {
Some(task_pk.to_owned())
} else {
return Break((self, BatchStopReason::PrimaryKeyMismatch {
@ -258,29 +263,29 @@ impl BatchKind {
},
// 2.2 If we don't have a primary-key ->
// 2.2.2 If the batch is set to Some(None), the task should be too
(Some(None), Some(None), None) => None,
(Some(None), Some(Some(task_pk)), None) => return Break((self, BatchStopReason::PrimaryKeyMismatch {
id,
reason: PrimaryKeyMismatchReason::CannotInterfereWithPrimaryKeyGuessing {
task_pk: task_pk.to_owned(),
},
})),
(Some(Some(batch_pk)), Some(None), None) => Some(batch_pk.to_owned()),
(Some(Some(batch_pk)), Some(Some(task_pk)), None) => if task_pk == batch_pk {
Some(task_pk.to_owned())
} else {
let batch_pk = batch_pk.to_owned();
let task_pk = task_pk.to_owned();
return Break((self, BatchStopReason::PrimaryKeyMismatch {
id,
reason: PrimaryKeyMismatchReason::TaskPrimaryKeyDifferFromCurrentBatchPrimaryKey {
batch_pk: batch_pk.to_owned(),
task_pk: task_pk.to_owned(),
batch_pk,
task_pk
},
}))
},
(None, Some(Some(task_pk)), None) => Some(task_pk.to_owned())
};
match (self, kind) {
match (self, autobatch_kind) {
// We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => Break((this, BatchStopReason::TaskCannotBeBatched { kind, id })),
// We must not batch tasks that don't have the same index creation rights if the index doesn't already exists.
@ -329,7 +334,7 @@ impl BatchKind {
// we can autobatch different kind of document operations and mix replacements with updates
(
BatchKind::DocumentOperation { allow_index_creation, primary_key: _, mut operation_ids },
K::DocumentImport { primary_key, .. },
K::DocumentImport { primary_key: _, .. },
) => {
operation_ids.push(id);
Continue(BatchKind::DocumentOperation {
@ -444,7 +449,7 @@ impl BatchKind {
allow_index_creation,
})
}
(this @ BatchKind::ClearAndSettings { .. }, K::DocumentImport { .. }) => Break(this),
(this @ BatchKind::ClearAndSettings { .. }, K::DocumentImport { .. }) => Break((this, BatchStopReason::SettingsWithDocumentOperation { id })),
(
BatchKind::ClearAndSettings {
mut other,
@ -517,8 +522,8 @@ pub fn autobatch(
// if an index has been created in the previous step we can consider it as existing.
index_exist |= must_create_index;
for (id, kind) in enqueued {
acc = match acc.accumulate(id, kind.into(), index_exist, primary_key) {
for (id, kind_with_content) in enqueued {
acc = match acc.accumulate(id, kind_with_content, index_exist, primary_key) {
Continue(acc) => acc,
Break((acc, batch_stop_reason)) => {
return Some((acc, must_create_index, Some(batch_stop_reason)))

View File

@ -456,7 +456,7 @@ impl IndexScheduler {
current_batch.processing(&mut tasks);
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::UpgradeDatabase,
id: tasks.last().unwrap(),
id: tasks.first().unwrap().uid,
});
return Ok(Some((Batch::UpgradeDatabase { tasks }, current_batch)));
}
@ -469,7 +469,7 @@ impl IndexScheduler {
current_batch.processing(Some(&mut task));
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::TaskCancelation,
id: tasks.last().unwrap(),
id: task_id,
});
return Ok(Some((Batch::TaskCancelation { task }, current_batch)));
}
@ -481,7 +481,7 @@ impl IndexScheduler {
current_batch.processing(&mut tasks);
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::TaskDeletion,
id: tasks.last().unwrap(),
id: tasks.first().unwrap().uid,
});
return Ok(Some((Batch::TaskDeletions(tasks), current_batch)));
}
@ -493,7 +493,7 @@ impl IndexScheduler {
current_batch.processing(&mut tasks);
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::SnapshotCreation,
id: tasks.last().unwrap(),
id: tasks.first().unwrap().uid,
});
return Ok(Some((Batch::SnapshotCreation(tasks), current_batch)));
}
@ -506,7 +506,7 @@ impl IndexScheduler {
current_batch.processing(Some(&mut task));
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::DumpCreation,
id: tasks.last().unwrap(),
id: task.uid,
});
return Ok(Some((Batch::Dump(task), current_batch)));
}
@ -527,7 +527,7 @@ impl IndexScheduler {
current_batch.processing(Some(&mut task));
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::IndexSwap,
id: tasks.last().unwrap(),
id: task.uid,
});
return Ok(Some((Batch::IndexSwap { task }, current_batch)));
};
@ -554,8 +554,8 @@ impl IndexScheduler {
let mut enqueued = Vec::new();
let mut total_size: u64 = 0;
for task_id in index_tasks.into_iter() {
if enqueued.len() >= task_limit {
stop_reason = BatchStopReason::ReachedTaskLimit { task_limit };
if enqueued.len() >= tasks_limit {
stop_reason = BatchStopReason::ReachedTaskLimit { task_limit: tasks_limit };
break;
}
let task = self

View File

@ -21,6 +21,8 @@ pub struct BatchView {
pub started_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339::option", default)]
pub finished_at: Option<OffsetDateTime>,
#[serde(default = "meilisearch_types::batches::default_stop_reason")]
pub stop_reason: String,
}
impl BatchView {
@ -33,6 +35,7 @@ impl BatchView {
duration: batch.finished_at.map(|finished_at| finished_at - batch.started_at),
started_at: batch.started_at,
finished_at: batch.finished_at,
stop_reason: batch.stop_reason.clone(),
}
}
}

View File

@ -32,7 +32,7 @@ pub struct Batch {
pub stop_reason: String,
}
fn default_stop_reason() -> String {
pub fn default_stop_reason() -> String {
BatchStopReason::default().to_string()
}

View File

@ -691,8 +691,8 @@ pub enum BatchStopReason {
task_limit: usize,
},
ReachedSizeLimit {
size_limit: usize,
size: usize,
size_limit: u64,
size: u64,
},
PrimaryKeyIndexMismatch {
id: TaskId,