Fix autobatching of documents and settings

This commit is contained in:
Louis Dureuil 2024-12-02 12:25:01 +01:00
parent 8d33af1dff
commit d78f4666a0
No known key found for this signature in database
3 changed files with 5 additions and 140 deletions

View File

@ -115,13 +115,6 @@ pub enum BatchKind {
allow_index_creation: bool,
settings_ids: Vec<TaskId>,
},
SettingsAndDocumentOperation {
settings_ids: Vec<TaskId>,
method: IndexDocumentsMethod,
allow_index_creation: bool,
primary_key: Option<String>,
operation_ids: Vec<TaskId>,
},
Settings {
allow_index_creation: bool,
settings_ids: Vec<TaskId>,
@ -146,7 +139,6 @@ impl BatchKind {
match self {
BatchKind::DocumentOperation { allow_index_creation, .. }
| BatchKind::ClearAndSettings { allow_index_creation, .. }
| BatchKind::SettingsAndDocumentOperation { allow_index_creation, .. }
| BatchKind::Settings { allow_index_creation, .. } => Some(*allow_index_creation),
_ => None,
}
@ -154,10 +146,7 @@ impl BatchKind {
fn primary_key(&self) -> Option<Option<&str>> {
match self {
BatchKind::DocumentOperation { primary_key, .. }
| BatchKind::SettingsAndDocumentOperation { primary_key, .. } => {
Some(primary_key.as_deref())
}
BatchKind::DocumentOperation { primary_key, .. } => Some(primary_key.as_deref()),
_ => None,
}
}
@ -275,8 +264,7 @@ impl BatchKind {
Break(BatchKind::IndexDeletion { ids })
}
(
BatchKind::ClearAndSettings { settings_ids: mut ids, allow_index_creation: _, mut other }
| BatchKind::SettingsAndDocumentOperation { operation_ids: mut ids, method: _, allow_index_creation: _, primary_key: _, settings_ids: mut other },
BatchKind::ClearAndSettings { settings_ids: mut ids, allow_index_creation: _, mut other },
K::IndexDeletion,
) => {
ids.push(id);
@ -356,15 +344,9 @@ impl BatchKind {
) => Break(this),
(
BatchKind::DocumentOperation { method, allow_index_creation, primary_key, operation_ids },
this @ BatchKind::DocumentOperation { .. },
K::Settings { .. },
) => Continue(BatchKind::SettingsAndDocumentOperation {
settings_ids: vec![id],
method,
allow_index_creation,
primary_key,
operation_ids,
}),
) => Break(this),
(BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: _ }, K::DocumentClear) => {
deletion_ids.push(id);
@ -477,63 +459,7 @@ impl BatchKind {
allow_index_creation,
})
}
(
BatchKind::SettingsAndDocumentOperation { settings_ids, method: _, mut operation_ids, allow_index_creation, primary_key: _ },
K::DocumentClear,
) => {
operation_ids.push(id);
Continue(BatchKind::ClearAndSettings {
settings_ids,
other: operation_ids,
allow_index_creation,
})
}
(
BatchKind::SettingsAndDocumentOperation { settings_ids, method: ReplaceDocuments, mut operation_ids, allow_index_creation, primary_key: _},
K::DocumentImport { method: ReplaceDocuments, primary_key: pk2, .. },
) => {
operation_ids.push(id);
Continue(BatchKind::SettingsAndDocumentOperation {
settings_ids,
method: ReplaceDocuments,
allow_index_creation,
primary_key: pk2,
operation_ids,
})
}
(
BatchKind::SettingsAndDocumentOperation { settings_ids, method: UpdateDocuments, allow_index_creation, primary_key: _, mut operation_ids },
K::DocumentImport { method: UpdateDocuments, primary_key: pk2, .. },
) => {
operation_ids.push(id);
Continue(BatchKind::SettingsAndDocumentOperation {
settings_ids,
method: UpdateDocuments,
allow_index_creation,
primary_key: pk2,
operation_ids,
})
}
// But we can't batch a settings and a doc op with another doc op
// this MUST be AFTER the two previous branch
(
this @ BatchKind::SettingsAndDocumentOperation { .. },
K::DocumentDeletion { .. } | K::DocumentImport { .. },
) => Break(this),
(
BatchKind::SettingsAndDocumentOperation { mut settings_ids, method, allow_index_creation,primary_key, operation_ids },
K::Settings { .. },
) => {
settings_ids.push(id);
Continue(BatchKind::SettingsAndDocumentOperation {
settings_ids,
method,
allow_index_creation,
primary_key,
operation_ids,
})
}
(
BatchKind::IndexCreation { .. }
| BatchKind::IndexDeletion { .. }

View File

@ -441,67 +441,6 @@ impl IndexScheduler {
must_create_index,
}))
}
BatchKind::SettingsAndDocumentOperation {
settings_ids,
method,
allow_index_creation,
primary_key,
operation_ids,
} => {
let settings = self.create_next_batch_index(
rtxn,
index_uid.clone(),
BatchKind::Settings { settings_ids, allow_index_creation },
current_batch,
must_create_index,
)?;
let document_import = self.create_next_batch_index(
rtxn,
index_uid.clone(),
BatchKind::DocumentOperation {
method,
allow_index_creation,
primary_key,
operation_ids,
},
current_batch,
must_create_index,
)?;
match (document_import, settings) {
(
Some(Batch::IndexOperation {
op:
IndexOperation::DocumentOperation {
primary_key,
documents_counts,
operations,
tasks: document_import_tasks,
..
},
..
}),
Some(Batch::IndexOperation {
op: IndexOperation::Settings { settings, tasks: settings_tasks, .. },
..
}),
) => Ok(Some(Batch::IndexOperation {
op: IndexOperation::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
document_import_tasks,
settings,
settings_tasks,
},
must_create_index,
})),
_ => unreachable!(),
}
}
BatchKind::IndexCreation { id } => {
let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
current_batch.processing(Some(&mut task));

View File

@ -106,7 +106,7 @@ impl ProcessingBatch {
self.stats.total_nb_tasks = 0;
}
/// Update the timestamp of the tasks and the inner structure of this sturcture.
/// Update the timestamp of the tasks and the inner structure of this structure.
pub fn update(&mut self, task: &mut Task) {
// We must re-set this value in case we're dealing with a task that has been added between
// the `processing` and `finished` state