Introduce a rustfmt file

This commit is contained in:
Clément Renault 2022-10-20 18:00:07 +02:00
parent dd57e051d7
commit 3f6bd7fb11
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
92 changed files with 1251 additions and 2857 deletions

View file

@ -21,31 +21,26 @@ use std::collections::HashSet;
use std::fs::File;
use std::io::BufWriter;
use crate::utils::{self, swap_index_uid_in_task};
use crate::Query;
use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId};
use dump::IndexMetadata;
use log::{debug, error, info};
use meilisearch_types::milli::documents::obkv_to_object;
use meilisearch_types::milli::update::IndexDocumentsConfig;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::update::{
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod,
};
use meilisearch_types::milli::{
self, documents::DocumentsBatchReader, update::Settings as MilliSettings, BEU32,
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
Settings as MilliSettings,
};
use meilisearch_types::milli::{self, BEU32};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task};
use meilisearch_types::{
heed::{RoTxn, RwTxn},
Index,
};
use meilisearch_types::Index;
use roaring::RoaringBitmap;
use time::OffsetDateTime;
use uuid::Uuid;
use crate::autobatcher::BatchKind;
use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, Query, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time.
///
/// A batch contains the set of tasks that it represents (accessible through
@ -57,28 +52,11 @@ pub(crate) enum Batch {
TaskDeletion(Task),
Snapshot(Vec<Task>),
Dump(Task),
IndexOperation {
op: IndexOperation,
must_create_index: bool,
},
IndexCreation {
index_uid: String,
primary_key: Option<String>,
task: Task,
},
IndexUpdate {
index_uid: String,
primary_key: Option<String>,
task: Task,
},
IndexDeletion {
index_uid: String,
tasks: Vec<Task>,
index_has_been_created: bool,
},
IndexSwap {
task: Task,
},
IndexOperation { op: IndexOperation, must_create_index: bool },
IndexCreation { index_uid: String, primary_key: Option<String>, task: Task },
IndexUpdate { index_uid: String, primary_key: Option<String>, task: Task },
IndexDeletion { index_uid: String, tasks: Vec<Task>, index_has_been_created: bool },
IndexSwap { task: Task },
}
/// A [batch](Batch) that combines multiple tasks operating on an index.
@ -212,9 +190,7 @@ impl IndexScheduler {
for task in &tasks {
match task.kind {
KindWithContent::DocumentImport {
content_file,
documents_count,
..
content_file, documents_count, ..
} => {
documents_counts.push(documents_count);
content_files.push(content_file);
@ -241,19 +217,15 @@ impl IndexScheduler {
let mut documents = Vec::new();
for task in &tasks {
match task.kind {
KindWithContent::DocumentDeletion {
ref documents_ids, ..
} => documents.extend_from_slice(documents_ids),
KindWithContent::DocumentDeletion { ref documents_ids, .. } => {
documents.extend_from_slice(documents_ids)
}
_ => unreachable!(),
}
}
Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentDeletion {
index_uid,
documents,
tasks,
},
op: IndexOperation::DocumentDeletion { index_uid, documents, tasks },
must_create_index,
}))
}
@ -263,49 +235,30 @@ impl IndexScheduler {
let mut settings = Vec::new();
for task in &tasks {
match task.kind {
KindWithContent::Settings {
ref new_settings,
is_deletion,
..
} => settings.push((is_deletion, new_settings.clone())),
KindWithContent::Settings { ref new_settings, is_deletion, .. } => {
settings.push((is_deletion, new_settings.clone()))
}
_ => unreachable!(),
}
}
Ok(Some(Batch::IndexOperation {
op: IndexOperation::Settings {
index_uid,
settings,
tasks,
},
op: IndexOperation::Settings { index_uid, settings, tasks },
must_create_index,
}))
}
BatchKind::ClearAndSettings {
other,
settings_ids,
allow_index_creation,
} => {
BatchKind::ClearAndSettings { other, settings_ids, allow_index_creation } => {
let (index_uid, settings, settings_tasks) = match self
.create_next_batch_index(
rtxn,
index_uid,
BatchKind::Settings {
settings_ids,
allow_index_creation,
},
BatchKind::Settings { settings_ids, allow_index_creation },
must_create_index,
)?
.unwrap()
{
Batch::IndexOperation {
op:
IndexOperation::Settings {
index_uid,
settings,
tasks,
..
},
op: IndexOperation::Settings { index_uid, settings, tasks, .. },
..
} => (index_uid, settings, tasks),
_ => unreachable!(),
@ -345,21 +298,14 @@ impl IndexScheduler {
let settings = self.create_next_batch_index(
rtxn,
index_uid.clone(),
BatchKind::Settings {
settings_ids,
allow_index_creation,
},
BatchKind::Settings { settings_ids, allow_index_creation },
must_create_index,
)?;
let document_import = self.create_next_batch_index(
rtxn,
index_uid.clone(),
BatchKind::DocumentImport {
method,
allow_index_creation,
import_ids,
},
BatchKind::DocumentImport { method, allow_index_creation, import_ids },
must_create_index,
)?;
@ -377,12 +323,7 @@ impl IndexScheduler {
..
}),
Some(Batch::IndexOperation {
op:
IndexOperation::Settings {
settings,
tasks: settings_tasks,
..
},
op: IndexOperation::Settings { settings, tasks: settings_tasks, .. },
..
}),
) => Ok(Some(Batch::IndexOperation {
@ -404,17 +345,12 @@ impl IndexScheduler {
BatchKind::IndexCreation { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
let (index_uid, primary_key) = match &task.kind {
KindWithContent::IndexCreation {
index_uid,
primary_key,
} => (index_uid.clone(), primary_key.clone()),
KindWithContent::IndexCreation { index_uid, primary_key } => {
(index_uid.clone(), primary_key.clone())
}
_ => unreachable!(),
};
Ok(Some(Batch::IndexCreation {
index_uid,
primary_key,
task,
}))
Ok(Some(Batch::IndexCreation { index_uid, primary_key, task }))
}
BatchKind::IndexUpdate { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
@ -422,11 +358,7 @@ impl IndexScheduler {
KindWithContent::IndexUpdate { primary_key, .. } => primary_key.clone(),
_ => unreachable!(),
};
Ok(Some(Batch::IndexUpdate {
index_uid,
primary_key,
task,
}))
Ok(Some(Batch::IndexUpdate { index_uid, primary_key, task }))
}
BatchKind::IndexDeletion { ids } => Ok(Some(Batch::IndexDeletion {
index_uid,
@ -453,17 +385,14 @@ impl IndexScheduler {
// 1. we get the last task to cancel.
if let Some(task_id) = to_cancel.max() {
return Ok(Some(Batch::TaskCancelation(
self.get_task(rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?,
self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?,
)));
}
// 2. we get the next task to delete
let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued;
if let Some(task_id) = to_delete.min() {
let task = self
.get_task(rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
return Ok(Some(Batch::TaskDeletion(task)));
}
@ -471,25 +400,20 @@ impl IndexScheduler {
// 3. we batch the snapshot.
let to_snapshot = self.get_kind(rtxn, Kind::Snapshot)? & enqueued;
if !to_snapshot.is_empty() {
return Ok(Some(Batch::Snapshot(
self.get_existing_tasks(rtxn, to_snapshot)?,
)));
return Ok(Some(Batch::Snapshot(self.get_existing_tasks(rtxn, to_snapshot)?)));
}
// 4. we batch the dumps.
let to_dump = self.get_kind(rtxn, Kind::DumpExport)? & enqueued;
if let Some(to_dump) = to_dump.min() {
return Ok(Some(Batch::Dump(
self.get_task(rtxn, to_dump)?
.ok_or(Error::CorruptedTaskQueue)?,
self.get_task(rtxn, to_dump)?.ok_or(Error::CorruptedTaskQueue)?,
)));
}
// 5. We take the next task and try to batch all the tasks associated with this index.
if let Some(task_id) = enqueued.min() {
let task = self
.get_task(rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
// This is safe because all the remaining task are associated with
// AT LEAST one index. We can use the right or left one it doesn't
@ -500,11 +424,7 @@ impl IndexScheduler {
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
// If autobatching is disabled we only take one task at a time.
let tasks_limit = if self.autobatching_enabled {
usize::MAX
} else {
1
};
let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 };
let enqueued = index_tasks
.into_iter()
@ -716,10 +636,7 @@ impl IndexScheduler {
task.status = Status::Succeeded;
Ok(vec![task])
}
Batch::IndexOperation {
op,
must_create_index,
} => {
Batch::IndexOperation { op, must_create_index } => {
let index_uid = op.index_uid();
let index = if must_create_index {
// create the index if it doesn't already exist
@ -738,26 +655,14 @@ impl IndexScheduler {
Ok(tasks)
}
Batch::IndexCreation {
index_uid,
primary_key,
task,
} => {
Batch::IndexCreation { index_uid, primary_key, task } => {
let mut wtxn = self.env.write_txn()?;
self.index_mapper.create_index(&mut wtxn, &index_uid)?;
wtxn.commit()?;
self.process_batch(Batch::IndexUpdate {
index_uid,
primary_key,
task,
})
self.process_batch(Batch::IndexUpdate { index_uid, primary_key, task })
}
Batch::IndexUpdate {
index_uid,
primary_key,
mut task,
} => {
Batch::IndexUpdate { index_uid, primary_key, mut task } => {
let rtxn = self.env.read_txn()?;
let index = self.index_mapper.index(&rtxn, &index_uid)?;
@ -781,11 +686,7 @@ impl IndexScheduler {
Ok(vec![task])
}
Batch::IndexDeletion {
index_uid,
index_has_been_created,
mut tasks,
} => {
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
let wtxn = self.env.write_txn()?;
// it's possible that the index doesn't exist
@ -807,9 +708,9 @@ impl IndexScheduler {
for task in &mut tasks {
task.status = Status::Succeeded;
task.details = match &task.kind {
KindWithContent::IndexDeletion { .. } => Some(Details::ClearAll {
deleted_documents: Some(number_of_documents),
}),
KindWithContent::IndexDeletion { .. } => {
Some(Details::ClearAll { deleted_documents: Some(number_of_documents) })
}
otherwise => otherwise.default_finished_details(),
};
}
@ -855,9 +756,7 @@ impl IndexScheduler {
// 3. before_name -> new_name in the task's KindWithContent
for task_id in &index_lhs_task_ids | &index_rhs_task_ids {
let mut task = self
.get_task(&wtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
let mut task = self.get_task(&wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
swap_index_uid_in_task(&mut task, (lhs, rhs));
self.all_tasks.put(wtxn, &BEU32::new(task_id), &task)?;
}
@ -902,9 +801,7 @@ impl IndexScheduler {
KindWithContent::DocumentClear { .. } => {
let count = if first_clear_found { 0 } else { count };
first_clear_found = true;
Some(Details::ClearAll {
deleted_documents: Some(count),
})
Some(Details::ClearAll { deleted_documents: Some(count) })
}
otherwise => otherwise.default_details(),
};
@ -935,10 +832,7 @@ impl IndexScheduler {
}
}
let config = IndexDocumentsConfig {
update_method: method,
..Default::default()
};
let config = IndexDocumentsConfig { update_method: method, ..Default::default() };
let mut builder = milli::update::IndexDocuments::new(
index_wtxn,
@ -973,15 +867,11 @@ impl IndexScheduler {
info!("document addition done: {:?}", addition);
}
for (task, (ret, count)) in tasks
.iter_mut()
.zip(results.into_iter().zip(documents_counts))
for (task, (ret, count)) in
tasks.iter_mut().zip(results.into_iter().zip(documents_counts))
{
match ret {
Ok(DocumentAdditionResult {
indexed_documents,
number_of_documents,
}) => {
Ok(DocumentAdditionResult { indexed_documents, number_of_documents }) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentAddition {
received_documents: number_of_documents,
@ -1001,19 +891,13 @@ impl IndexScheduler {
Ok(tasks)
}
IndexOperation::DocumentDeletion {
index_uid: _,
documents,
mut tasks,
} => {
IndexOperation::DocumentDeletion { index_uid: _, documents, mut tasks } => {
let mut builder = milli::update::DeleteDocuments::new(index_wtxn, index)?;
documents.iter().for_each(|id| {
builder.delete_external_id(id);
});
let DocumentDeletionResult {
deleted_documents, ..
} = builder.execute()?;
let DocumentDeletionResult { deleted_documents, .. } = builder.execute()?;
for (task, documents) in tasks.iter_mut().zip(documents) {
task.status = Status::Succeeded;
@ -1025,11 +909,7 @@ impl IndexScheduler {
Ok(tasks)
}
IndexOperation::Settings {
index_uid: _,
settings,
mut tasks,
} => {
IndexOperation::Settings { index_uid: _, settings, mut tasks } => {
let indexer_config = self.index_mapper.indexer_config();
// TODO merge the settings to only do *one* reindexation.
for (task, (_, settings)) in tasks.iter_mut().zip(settings) {
@ -1105,11 +985,7 @@ impl IndexScheduler {
let settings_tasks = self.apply_index_operation(
index_wtxn,
index,
IndexOperation::Settings {
index_uid,
settings,
tasks: settings_tasks,
},
IndexOperation::Settings { index_uid, settings, tasks: settings_tasks },
)?;
let mut tasks = settings_tasks;
@ -1139,9 +1015,7 @@ impl IndexScheduler {
let mut affected_kinds = HashSet::new();
for task_id in to_delete_tasks.iter() {
let task = self
.get_task(wtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(task_indexes) = task.indexes() {
affected_indexes.extend(task_indexes.into_iter().map(|x| x.to_owned()));
}