Correctly batch tasks with different index creation rights

This commit is contained in:
Clément Renault 2022-10-06 15:55:48 +02:00
parent 87212cfd20
commit 068a4b2884
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
4 changed files with 255 additions and 176 deletions

View file

@ -4,7 +4,7 @@ use crate::{
Error, IndexScheduler, Result, TaskId,
};
use index::apply_settings_to_builder;
use index::error::{IndexError, MilliError};
use index::error::IndexError;
use index::{Settings, Unchecked};
use log::{debug, info};
use milli::documents::DocumentsBatchReader;
@ -39,6 +39,7 @@ pub(crate) enum IndexOperation {
index_uid: String,
primary_key: Option<String>,
method: IndexDocumentsMethod,
allow_index_creation: bool,
documents_counts: Vec<u64>,
content_files: Vec<Uuid>,
tasks: Vec<Task>,
@ -56,6 +57,7 @@ pub(crate) enum IndexOperation {
index_uid: String,
// TODO what's that boolean, does it mean that it removes things or what?
settings: Vec<(bool, Settings<Unchecked>)>,
allow_index_creation: bool,
tasks: Vec<Task>,
},
DocumentClearAndSetting {
@ -64,6 +66,7 @@ pub(crate) enum IndexOperation {
// TODO what's that boolean, does it mean that it removes things or what?
settings: Vec<(bool, Settings<Unchecked>)>,
allow_index_creation: bool,
settings_tasks: Vec<Task>,
},
SettingsAndDocumentImport {
@ -71,6 +74,7 @@ pub(crate) enum IndexOperation {
primary_key: Option<String>,
method: IndexDocumentsMethod,
allow_index_creation: bool,
documents_counts: Vec<u64>,
content_files: Vec<Uuid>,
document_import_tasks: Vec<Task>,
@ -126,7 +130,11 @@ impl IndexScheduler {
index_uid,
})))
}
BatchKind::DocumentImport { method, import_ids } => {
BatchKind::DocumentImport {
method,
import_ids,
allow_index_creation,
} => {
let tasks = self.get_existing_tasks(rtxn, import_ids)?;
let primary_key = match &tasks[0].kind {
KindWithContent::DocumentImport { primary_key, .. } => primary_key.clone(),
@ -154,6 +162,7 @@ impl IndexScheduler {
index_uid,
primary_key,
method,
allow_index_creation,
documents_counts,
content_files,
tasks,
@ -181,7 +190,10 @@ impl IndexScheduler {
},
)))
}
BatchKind::Settings { settings_ids } => {
BatchKind::Settings {
settings_ids,
allow_index_creation,
} => {
let tasks = self.get_existing_tasks(rtxn, settings_ids)?;
let mut settings = Vec::new();
@ -199,20 +211,30 @@ impl IndexScheduler {
Ok(Some(Batch::IndexOperation(IndexOperation::Settings {
index_uid,
settings,
allow_index_creation,
tasks,
})))
}
BatchKind::ClearAndSettings {
other,
settings_ids,
allow_index_creation,
} => {
let (index_uid, settings, settings_tasks) = match self
.create_next_batch_index(rtxn, index_uid, BatchKind::Settings { settings_ids })?
.create_next_batch_index(
rtxn,
index_uid,
BatchKind::Settings {
settings_ids,
allow_index_creation,
},
)?
.unwrap()
{
Batch::IndexOperation(IndexOperation::Settings {
index_uid,
settings,
allow_index_creation,
tasks,
}) => (index_uid, settings, tasks),
_ => unreachable!(),
@ -235,6 +257,7 @@ impl IndexScheduler {
IndexOperation::DocumentClearAndSetting {
index_uid,
cleared_tasks,
allow_index_creation,
settings,
settings_tasks,
},
@ -243,18 +266,26 @@ impl IndexScheduler {
BatchKind::SettingsAndDocumentImport {
settings_ids,
method,
allow_index_creation,
import_ids,
} => {
let settings = self.create_next_batch_index(
rtxn,
index_uid.clone(),
BatchKind::Settings { settings_ids },
BatchKind::Settings {
settings_ids,
allow_index_creation,
},
)?;
let document_import = self.create_next_batch_index(
rtxn,
index_uid.clone(),
BatchKind::DocumentImport { method, import_ids },
BatchKind::DocumentImport {
method,
allow_index_creation,
import_ids,
},
)?;
match (document_import, settings) {
@ -276,6 +307,7 @@ impl IndexScheduler {
index_uid,
primary_key,
method,
allow_index_creation,
documents_counts,
content_files,
document_import_tasks,
@ -521,6 +553,7 @@ impl IndexScheduler {
index_uid: _,
primary_key,
method,
allow_index_creation,
documents_counts,
content_files,
mut tasks,
@ -628,6 +661,7 @@ impl IndexScheduler {
IndexOperation::Settings {
index_uid: _,
settings,
allow_index_creation,
mut tasks,
} => {
let indexer_config = self.index_mapper.indexer_config();
@ -652,6 +686,7 @@ impl IndexScheduler {
index_uid,
primary_key,
method,
allow_index_creation,
documents_counts,
content_files,
document_import_tasks,
@ -664,6 +699,7 @@ impl IndexScheduler {
IndexOperation::Settings {
index_uid: index_uid.clone(),
settings,
allow_index_creation,
tasks: settings_tasks,
},
)?;
@ -675,6 +711,7 @@ impl IndexScheduler {
index_uid,
primary_key,
method,
allow_index_creation,
documents_counts,
content_files,
tasks: document_import_tasks,
@ -689,6 +726,7 @@ impl IndexScheduler {
index_uid,
cleared_tasks,
settings,
allow_index_creation,
settings_tasks,
} => {
let mut import_tasks = self.apply_index_operation(
@ -706,6 +744,7 @@ impl IndexScheduler {
IndexOperation::Settings {
index_uid,
settings,
allow_index_creation,
tasks: settings_tasks,
},
)?;