Introduce a rustfmt file

This commit is contained in:
Clément Renault 2022-10-20 18:00:07 +02:00
parent 52e858a588
commit 80b2e70ee7
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
92 changed files with 1250 additions and 2855 deletions

View file

@ -5,11 +5,12 @@ tasks affecting a single index into a [batch](crate::batch::Batch).
The main function of the autobatcher is [`next_autobatch`].
*/
use std::ops::ControlFlow::{self, Break, Continue};
use meilisearch_types::milli::update::IndexDocumentsMethod::{
self, ReplaceDocuments, UpdateDocuments,
};
use meilisearch_types::tasks::TaskId;
use std::ops::ControlFlow::{self, Break, Continue};
use crate::KindWithContent;
@ -18,15 +19,10 @@ use crate::KindWithContent;
///
/// Only the non-prioritised tasks that can be grouped in a batch have a corresponding [`AutobatchKind`]
enum AutobatchKind {
DocumentImport {
method: IndexDocumentsMethod,
allow_index_creation: bool,
},
DocumentImport { method: IndexDocumentsMethod, allow_index_creation: bool },
DocumentDeletion,
DocumentClear,
Settings {
allow_index_creation: bool,
},
Settings { allow_index_creation: bool },
IndexCreation,
IndexDeletion,
IndexUpdate,
@ -47,23 +43,16 @@ impl AutobatchKind {
impl From<KindWithContent> for AutobatchKind {
fn from(kind: KindWithContent) -> Self {
match kind {
KindWithContent::DocumentImport {
method,
allow_index_creation,
..
} => AutobatchKind::DocumentImport {
method,
allow_index_creation,
},
KindWithContent::DocumentImport { method, allow_index_creation, .. } => {
AutobatchKind::DocumentImport { method, allow_index_creation }
}
KindWithContent::DocumentDeletion { .. } => AutobatchKind::DocumentDeletion,
KindWithContent::DocumentClear { .. } => AutobatchKind::DocumentClear,
KindWithContent::Settings {
allow_index_creation,
is_deletion,
..
} => AutobatchKind::Settings {
allow_index_creation: allow_index_creation && !is_deletion,
},
KindWithContent::Settings { allow_index_creation, is_deletion, .. } => {
AutobatchKind::Settings {
allow_index_creation: allow_index_creation && !is_deletion,
}
}
KindWithContent::IndexDeletion { .. } => AutobatchKind::IndexDeletion,
KindWithContent::IndexCreation { .. } => AutobatchKind::IndexCreation,
KindWithContent::IndexUpdate { .. } => AutobatchKind::IndexUpdate,
@ -147,20 +136,11 @@ impl BatchKind {
match AutobatchKind::from(kind) {
K::IndexCreation => (Break(BatchKind::IndexCreation { id: task_id }), true),
K::IndexDeletion => (
Break(BatchKind::IndexDeletion { ids: vec![task_id] }),
false,
),
K::IndexDeletion => (Break(BatchKind::IndexDeletion { ids: vec![task_id] }), false),
K::IndexUpdate => (Break(BatchKind::IndexUpdate { id: task_id }), false),
K::IndexSwap => (Break(BatchKind::IndexSwap { id: task_id }), false),
K::DocumentClear => (
Continue(BatchKind::DocumentClear { ids: vec![task_id] }),
false,
),
K::DocumentImport {
method,
allow_index_creation,
} => (
K::DocumentClear => (Continue(BatchKind::DocumentClear { ids: vec![task_id] }), false),
K::DocumentImport { method, allow_index_creation } => (
Continue(BatchKind::DocumentImport {
method,
allow_index_creation,
@ -168,19 +148,11 @@ impl BatchKind {
}),
allow_index_creation,
),
K::DocumentDeletion => (
Continue(BatchKind::DocumentDeletion {
deletion_ids: vec![task_id],
}),
false,
),
K::Settings {
allow_index_creation,
} => (
Continue(BatchKind::Settings {
allow_index_creation,
settings_ids: vec![task_id],
}),
K::DocumentDeletion => {
(Continue(BatchKind::DocumentDeletion { deletion_ids: vec![task_id] }), false)
}
K::Settings { allow_index_creation } => (
Continue(BatchKind::Settings { allow_index_creation, settings_ids: vec![task_id] }),
allow_index_creation,
),
}
@ -461,21 +433,17 @@ pub fn autobatch(
#[cfg(test)]
mod tests {
use crate::debug_snapshot;
use uuid::Uuid;
use super::*;
use uuid::Uuid;
use crate::debug_snapshot;
fn autobatch_from(
index_already_exists: bool,
input: impl IntoIterator<Item = KindWithContent>,
) -> Option<(BatchKind, bool)> {
autobatch(
input
.into_iter()
.enumerate()
.map(|(id, kind)| (id as TaskId, kind.into()))
.collect(),
input.into_iter().enumerate().map(|(id, kind)| (id as TaskId, kind.into())).collect(),
index_already_exists,
)
}
@ -499,9 +467,7 @@ mod tests {
}
fn doc_clr() -> KindWithContent {
KindWithContent::DocumentClear {
index_uid: String::from("doggo"),
}
KindWithContent::DocumentClear { index_uid: String::from("doggo") }
}
fn settings(allow_index_creation: bool) -> KindWithContent {
@ -514,29 +480,19 @@ mod tests {
}
fn idx_create() -> KindWithContent {
KindWithContent::IndexCreation {
index_uid: String::from("doggo"),
primary_key: None,
}
KindWithContent::IndexCreation { index_uid: String::from("doggo"), primary_key: None }
}
fn idx_update() -> KindWithContent {
KindWithContent::IndexUpdate {
index_uid: String::from("doggo"),
primary_key: None,
}
KindWithContent::IndexUpdate { index_uid: String::from("doggo"), primary_key: None }
}
fn idx_del() -> KindWithContent {
KindWithContent::IndexDeletion {
index_uid: String::from("doggo"),
}
KindWithContent::IndexDeletion { index_uid: String::from("doggo") }
}
fn idx_swap() -> KindWithContent {
KindWithContent::IndexSwap {
swaps: vec![(String::from("doggo"), String::from("catto"))],
}
KindWithContent::IndexSwap { swaps: vec![(String::from("doggo"), String::from("catto"))] }
}
#[test]

View file

@ -21,31 +21,26 @@ use std::collections::HashSet;
use std::fs::File;
use std::io::BufWriter;
use crate::utils::{self, swap_index_uid_in_task};
use crate::Query;
use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId};
use dump::IndexMetadata;
use log::{debug, error, info};
use meilisearch_types::milli::documents::obkv_to_object;
use meilisearch_types::milli::update::IndexDocumentsConfig;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::update::{
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod,
};
use meilisearch_types::milli::{
self, documents::DocumentsBatchReader, update::Settings as MilliSettings, BEU32,
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
Settings as MilliSettings,
};
use meilisearch_types::milli::{self, BEU32};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task};
use meilisearch_types::{
heed::{RoTxn, RwTxn},
Index,
};
use meilisearch_types::Index;
use roaring::RoaringBitmap;
use time::OffsetDateTime;
use uuid::Uuid;
use crate::autobatcher::BatchKind;
use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, Query, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time.
///
/// A batch contains the set of tasks that it represents (accessible through
@ -57,28 +52,11 @@ pub(crate) enum Batch {
TaskDeletion(Task),
Snapshot(Vec<Task>),
Dump(Task),
IndexOperation {
op: IndexOperation,
must_create_index: bool,
},
IndexCreation {
index_uid: String,
primary_key: Option<String>,
task: Task,
},
IndexUpdate {
index_uid: String,
primary_key: Option<String>,
task: Task,
},
IndexDeletion {
index_uid: String,
tasks: Vec<Task>,
index_has_been_created: bool,
},
IndexSwap {
task: Task,
},
IndexOperation { op: IndexOperation, must_create_index: bool },
IndexCreation { index_uid: String, primary_key: Option<String>, task: Task },
IndexUpdate { index_uid: String, primary_key: Option<String>, task: Task },
IndexDeletion { index_uid: String, tasks: Vec<Task>, index_has_been_created: bool },
IndexSwap { task: Task },
}
/// A [batch](Batch) that combines multiple tasks operating on an index.
@ -212,9 +190,7 @@ impl IndexScheduler {
for task in &tasks {
match task.kind {
KindWithContent::DocumentImport {
content_file,
documents_count,
..
content_file, documents_count, ..
} => {
documents_counts.push(documents_count);
content_files.push(content_file);
@ -241,19 +217,15 @@ impl IndexScheduler {
let mut documents = Vec::new();
for task in &tasks {
match task.kind {
KindWithContent::DocumentDeletion {
ref documents_ids, ..
} => documents.extend_from_slice(documents_ids),
KindWithContent::DocumentDeletion { ref documents_ids, .. } => {
documents.extend_from_slice(documents_ids)
}
_ => unreachable!(),
}
}
Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentDeletion {
index_uid,
documents,
tasks,
},
op: IndexOperation::DocumentDeletion { index_uid, documents, tasks },
must_create_index,
}))
}
@ -263,49 +235,30 @@ impl IndexScheduler {
let mut settings = Vec::new();
for task in &tasks {
match task.kind {
KindWithContent::Settings {
ref new_settings,
is_deletion,
..
} => settings.push((is_deletion, new_settings.clone())),
KindWithContent::Settings { ref new_settings, is_deletion, .. } => {
settings.push((is_deletion, new_settings.clone()))
}
_ => unreachable!(),
}
}
Ok(Some(Batch::IndexOperation {
op: IndexOperation::Settings {
index_uid,
settings,
tasks,
},
op: IndexOperation::Settings { index_uid, settings, tasks },
must_create_index,
}))
}
BatchKind::ClearAndSettings {
other,
settings_ids,
allow_index_creation,
} => {
BatchKind::ClearAndSettings { other, settings_ids, allow_index_creation } => {
let (index_uid, settings, settings_tasks) = match self
.create_next_batch_index(
rtxn,
index_uid,
BatchKind::Settings {
settings_ids,
allow_index_creation,
},
BatchKind::Settings { settings_ids, allow_index_creation },
must_create_index,
)?
.unwrap()
{
Batch::IndexOperation {
op:
IndexOperation::Settings {
index_uid,
settings,
tasks,
..
},
op: IndexOperation::Settings { index_uid, settings, tasks, .. },
..
} => (index_uid, settings, tasks),
_ => unreachable!(),
@ -345,21 +298,14 @@ impl IndexScheduler {
let settings = self.create_next_batch_index(
rtxn,
index_uid.clone(),
BatchKind::Settings {
settings_ids,
allow_index_creation,
},
BatchKind::Settings { settings_ids, allow_index_creation },
must_create_index,
)?;
let document_import = self.create_next_batch_index(
rtxn,
index_uid.clone(),
BatchKind::DocumentImport {
method,
allow_index_creation,
import_ids,
},
BatchKind::DocumentImport { method, allow_index_creation, import_ids },
must_create_index,
)?;
@ -377,12 +323,7 @@ impl IndexScheduler {
..
}),
Some(Batch::IndexOperation {
op:
IndexOperation::Settings {
settings,
tasks: settings_tasks,
..
},
op: IndexOperation::Settings { settings, tasks: settings_tasks, .. },
..
}),
) => Ok(Some(Batch::IndexOperation {
@ -404,17 +345,12 @@ impl IndexScheduler {
BatchKind::IndexCreation { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
let (index_uid, primary_key) = match &task.kind {
KindWithContent::IndexCreation {
index_uid,
primary_key,
} => (index_uid.clone(), primary_key.clone()),
KindWithContent::IndexCreation { index_uid, primary_key } => {
(index_uid.clone(), primary_key.clone())
}
_ => unreachable!(),
};
Ok(Some(Batch::IndexCreation {
index_uid,
primary_key,
task,
}))
Ok(Some(Batch::IndexCreation { index_uid, primary_key, task }))
}
BatchKind::IndexUpdate { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
@ -422,11 +358,7 @@ impl IndexScheduler {
KindWithContent::IndexUpdate { primary_key, .. } => primary_key.clone(),
_ => unreachable!(),
};
Ok(Some(Batch::IndexUpdate {
index_uid,
primary_key,
task,
}))
Ok(Some(Batch::IndexUpdate { index_uid, primary_key, task }))
}
BatchKind::IndexDeletion { ids } => Ok(Some(Batch::IndexDeletion {
index_uid,
@ -453,17 +385,14 @@ impl IndexScheduler {
// 1. we get the last task to cancel.
if let Some(task_id) = to_cancel.max() {
return Ok(Some(Batch::TaskCancelation(
self.get_task(rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?,
self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?,
)));
}
// 2. we get the next task to delete
let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued;
if let Some(task_id) = to_delete.min() {
let task = self
.get_task(rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
return Ok(Some(Batch::TaskDeletion(task)));
}
@ -471,25 +400,20 @@ impl IndexScheduler {
// 3. we batch the snapshot.
let to_snapshot = self.get_kind(rtxn, Kind::Snapshot)? & enqueued;
if !to_snapshot.is_empty() {
return Ok(Some(Batch::Snapshot(
self.get_existing_tasks(rtxn, to_snapshot)?,
)));
return Ok(Some(Batch::Snapshot(self.get_existing_tasks(rtxn, to_snapshot)?)));
}
// 4. we batch the dumps.
let to_dump = self.get_kind(rtxn, Kind::DumpExport)? & enqueued;
if let Some(to_dump) = to_dump.min() {
return Ok(Some(Batch::Dump(
self.get_task(rtxn, to_dump)?
.ok_or(Error::CorruptedTaskQueue)?,
self.get_task(rtxn, to_dump)?.ok_or(Error::CorruptedTaskQueue)?,
)));
}
// 5. We take the next task and try to batch all the tasks associated with this index.
if let Some(task_id) = enqueued.min() {
let task = self
.get_task(rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
// This is safe because all the remaining task are associated with
// AT LEAST one index. We can use the right or left one it doesn't
@ -500,11 +424,7 @@ impl IndexScheduler {
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
// If autobatching is disabled we only take one task at a time.
let tasks_limit = if self.autobatching_enabled {
usize::MAX
} else {
1
};
let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 };
let enqueued = index_tasks
.into_iter()
@ -716,10 +636,7 @@ impl IndexScheduler {
task.status = Status::Succeeded;
Ok(vec![task])
}
Batch::IndexOperation {
op,
must_create_index,
} => {
Batch::IndexOperation { op, must_create_index } => {
let index_uid = op.index_uid();
let index = if must_create_index {
// create the index if it doesn't already exist
@ -738,26 +655,14 @@ impl IndexScheduler {
Ok(tasks)
}
Batch::IndexCreation {
index_uid,
primary_key,
task,
} => {
Batch::IndexCreation { index_uid, primary_key, task } => {
let mut wtxn = self.env.write_txn()?;
self.index_mapper.create_index(&mut wtxn, &index_uid)?;
wtxn.commit()?;
self.process_batch(Batch::IndexUpdate {
index_uid,
primary_key,
task,
})
self.process_batch(Batch::IndexUpdate { index_uid, primary_key, task })
}
Batch::IndexUpdate {
index_uid,
primary_key,
mut task,
} => {
Batch::IndexUpdate { index_uid, primary_key, mut task } => {
let rtxn = self.env.read_txn()?;
let index = self.index_mapper.index(&rtxn, &index_uid)?;
@ -781,11 +686,7 @@ impl IndexScheduler {
Ok(vec![task])
}
Batch::IndexDeletion {
index_uid,
index_has_been_created,
mut tasks,
} => {
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
let wtxn = self.env.write_txn()?;
// it's possible that the index doesn't exist
@ -807,9 +708,9 @@ impl IndexScheduler {
for task in &mut tasks {
task.status = Status::Succeeded;
task.details = match &task.kind {
KindWithContent::IndexDeletion { .. } => Some(Details::ClearAll {
deleted_documents: Some(number_of_documents),
}),
KindWithContent::IndexDeletion { .. } => {
Some(Details::ClearAll { deleted_documents: Some(number_of_documents) })
}
otherwise => otherwise.default_finished_details(),
};
}
@ -855,9 +756,7 @@ impl IndexScheduler {
// 3. before_name -> new_name in the task's KindWithContent
for task_id in &index_lhs_task_ids | &index_rhs_task_ids {
let mut task = self
.get_task(&wtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
let mut task = self.get_task(&wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
swap_index_uid_in_task(&mut task, (lhs, rhs));
self.all_tasks.put(wtxn, &BEU32::new(task_id), &task)?;
}
@ -902,9 +801,7 @@ impl IndexScheduler {
KindWithContent::DocumentClear { .. } => {
let count = if first_clear_found { 0 } else { count };
first_clear_found = true;
Some(Details::ClearAll {
deleted_documents: Some(count),
})
Some(Details::ClearAll { deleted_documents: Some(count) })
}
otherwise => otherwise.default_details(),
};
@ -935,10 +832,7 @@ impl IndexScheduler {
}
}
let config = IndexDocumentsConfig {
update_method: method,
..Default::default()
};
let config = IndexDocumentsConfig { update_method: method, ..Default::default() };
let mut builder = milli::update::IndexDocuments::new(
index_wtxn,
@ -973,15 +867,11 @@ impl IndexScheduler {
info!("document addition done: {:?}", addition);
}
for (task, (ret, count)) in tasks
.iter_mut()
.zip(results.into_iter().zip(documents_counts))
for (task, (ret, count)) in
tasks.iter_mut().zip(results.into_iter().zip(documents_counts))
{
match ret {
Ok(DocumentAdditionResult {
indexed_documents,
number_of_documents,
}) => {
Ok(DocumentAdditionResult { indexed_documents, number_of_documents }) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentAddition {
received_documents: number_of_documents,
@ -1001,19 +891,13 @@ impl IndexScheduler {
Ok(tasks)
}
IndexOperation::DocumentDeletion {
index_uid: _,
documents,
mut tasks,
} => {
IndexOperation::DocumentDeletion { index_uid: _, documents, mut tasks } => {
let mut builder = milli::update::DeleteDocuments::new(index_wtxn, index)?;
documents.iter().for_each(|id| {
builder.delete_external_id(id);
});
let DocumentDeletionResult {
deleted_documents, ..
} = builder.execute()?;
let DocumentDeletionResult { deleted_documents, .. } = builder.execute()?;
for (task, documents) in tasks.iter_mut().zip(documents) {
task.status = Status::Succeeded;
@ -1025,11 +909,7 @@ impl IndexScheduler {
Ok(tasks)
}
IndexOperation::Settings {
index_uid: _,
settings,
mut tasks,
} => {
IndexOperation::Settings { index_uid: _, settings, mut tasks } => {
let indexer_config = self.index_mapper.indexer_config();
// TODO merge the settings to only do *one* reindexation.
for (task, (_, settings)) in tasks.iter_mut().zip(settings) {
@ -1105,11 +985,7 @@ impl IndexScheduler {
let settings_tasks = self.apply_index_operation(
index_wtxn,
index,
IndexOperation::Settings {
index_uid,
settings,
tasks: settings_tasks,
},
IndexOperation::Settings { index_uid, settings, tasks: settings_tasks },
)?;
let mut tasks = settings_tasks;
@ -1139,9 +1015,7 @@ impl IndexScheduler {
let mut affected_kinds = HashSet::new();
for task_id in to_delete_tasks.iter() {
let task = self
.get_task(wtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(task_indexes) = task.indexes() {
affected_indexes.extend(task_indexes.into_iter().map(|x| x.to_owned()));
}

View file

@ -1,6 +1,5 @@
use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::heed;
use meilisearch_types::milli;
use meilisearch_types::{heed, milli};
use thiserror::Error;
use crate::TaskId;

View file

@ -122,10 +122,7 @@ impl IndexMapper {
}
// Finally we remove the entry from the index map.
assert!(matches!(
index_map.write().unwrap().remove(&uuid),
Some(BeingDeleted)
));
assert!(matches!(index_map.write().unwrap().remove(&uuid), Some(BeingDeleted)));
});
Ok(())
@ -183,8 +180,7 @@ impl IndexMapper {
.iter(rtxn)?
.map(|ret| {
ret.map_err(Error::from).and_then(|(name, _)| {
self.index(rtxn, name)
.map(|index| (name.to_string(), index))
self.index(rtxn, name).map(|index| (name.to_string(), index))
})
})
.collect()

View file

@ -29,29 +29,28 @@ mod utils;
pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32;
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use utils::keep_tasks_within_datetimes;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, RwLock};
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
use file_store::FileStore;
use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env};
use meilisearch_types::milli;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap;
use synchronoise::SignalEvent;
use time::OffsetDateTime;
use utils::keep_tasks_within_datetimes;
use uuid::Uuid;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env};
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use crate::index_mapper::IndexMapper;
type BEI128 = meilisearch_types::heed::zerocopy::I128<meilisearch_types::heed::byteorder::BE>;
@ -124,10 +123,7 @@ impl Query {
pub fn with_index(self, index_uid: String) -> Self {
let mut index_vec = self.index_uid.unwrap_or_default();
index_vec.push(index_uid);
Self {
index_uid: Some(index_vec),
..self
}
Self { index_uid: Some(index_vec), ..self }
}
}
@ -142,10 +138,7 @@ struct ProcessingTasks {
impl ProcessingTasks {
/// Creates an empty `ProcessingAt` struct.
fn new() -> ProcessingTasks {
ProcessingTasks {
started_at: OffsetDateTime::now_utc(),
processing: RoaringBitmap::new(),
}
ProcessingTasks { started_at: OffsetDateTime::now_utc(), processing: RoaringBitmap::new() }
}
/// Stores the currently processing tasks, and the date time at which it started.
@ -447,21 +440,11 @@ impl IndexScheduler {
let tasks = self.get_existing_tasks(
&rtxn,
tasks
.into_iter()
.rev()
.take(query.limit.unwrap_or(u32::MAX) as usize),
tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize),
)?;
let ProcessingTasks {
started_at,
processing,
..
} = self
.processing_tasks
.read()
.map_err(|_| Error::CorruptedTaskQueue)?
.clone();
let ProcessingTasks { started_at, processing, .. } =
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();
let ret = tasks.into_iter();
if processing.is_empty() {
@ -469,11 +452,9 @@ impl IndexScheduler {
} else {
Ok(ret
.map(|task| match processing.contains(task.uid) {
true => Task {
status: Status::Processing,
started_at: Some(started_at),
..task
},
true => {
Task { status: Status::Processing, started_at: Some(started_at), ..task }
}
false => task,
})
.collect())
@ -497,8 +478,7 @@ impl IndexScheduler {
status: Status::Enqueued,
kind: kind.clone(),
};
self.all_tasks
.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
self.all_tasks.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
if let Some(indexes) = task.indexes() {
for index in indexes {
@ -527,11 +507,7 @@ impl IndexScheduler {
// we inform the processing tasks to stop (if necessary).
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
if self
.processing_tasks
.read()
.unwrap()
.must_cancel_processing_tasks(&tasks_to_cancel)
if self.processing_tasks.read().unwrap().must_cancel_processing_tasks(&tasks_to_cancel)
{
self.must_stop_processing.must_stop();
}
@ -601,16 +577,14 @@ impl IndexScheduler {
KindDump::DocumentClear => KindWithContent::DocumentClear {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::Settings {
settings,
is_deletion,
allow_index_creation,
} => KindWithContent::Settings {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
new_settings: settings,
is_deletion,
allow_index_creation,
},
KindDump::Settings { settings, is_deletion, allow_index_creation } => {
KindWithContent::Settings {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
new_settings: settings,
is_deletion,
allow_index_creation,
}
}
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
@ -629,21 +603,14 @@ impl IndexScheduler {
KindDump::TasksDeletion { query, tasks } => {
KindWithContent::TaskDeletion { query, tasks }
}
KindDump::DumpExport {
dump_uid,
keys,
instance_uid,
} => KindWithContent::DumpExport {
dump_uid,
keys,
instance_uid,
},
KindDump::DumpExport { dump_uid, keys, instance_uid } => {
KindWithContent::DumpExport { dump_uid, keys, instance_uid }
}
KindDump::Snapshot => KindWithContent::Snapshot,
},
};
self.all_tasks
.put(&mut wtxn, &BEU32::new(task.uid), &task)?;
self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task)?;
if let Some(indexes) = task.indexes() {
for index in indexes {
@ -729,19 +696,12 @@ impl IndexScheduler {
// We reset the must_stop flag to be sure that we don't stop processing tasks
self.must_stop_processing.reset();
self.processing_tasks
.write()
.unwrap()
.start_processing_at(started_at, processing_tasks);
self.processing_tasks.write().unwrap().start_processing_at(started_at, processing_tasks);
#[cfg(test)]
{
self.test_breakpoint_sdr
.send(Breakpoint::BatchCreated)
.unwrap();
self.test_breakpoint_sdr
.send(Breakpoint::BeforeProcessing)
.unwrap();
self.test_breakpoint_sdr.send(Breakpoint::BatchCreated).unwrap();
self.test_breakpoint_sdr.send(Breakpoint::BeforeProcessing).unwrap();
}
// 2. Process the tasks
@ -781,16 +741,11 @@ impl IndexScheduler {
}
}
}
self.processing_tasks
.write()
.unwrap()
.stop_processing_at(finished_at);
self.processing_tasks.write().unwrap().stop_processing_at(finished_at);
wtxn.commit()?;
#[cfg(test)]
self.test_breakpoint_sdr
.send(Breakpoint::AfterProcessing)
.unwrap();
self.test_breakpoint_sdr.send(Breakpoint::AfterProcessing).unwrap();
Ok(processed_tasks)
}
@ -812,16 +767,12 @@ mod tests {
use tempfile::TempDir;
use uuid::Uuid;
use crate::snapshot::snapshot_index_scheduler;
use super::*;
use crate::snapshot::snapshot_index_scheduler;
/// Return a `KindWithContent::IndexCreation` task
fn index_creation_task(index: &'static str, primary_key: &'static str) -> KindWithContent {
KindWithContent::IndexCreation {
index_uid: S(index),
primary_key: Some(S(primary_key)),
}
KindWithContent::IndexCreation { index_uid: S(index), primary_key: Some(S(primary_key)) }
}
/// Create a `KindWithContent::DocumentImport` task that imports documents.
///
@ -864,9 +815,7 @@ mod tests {
}}"#
);
let (_uuid, mut file) = index_scheduler
.create_update_file_with_uuid(file_uuid)
.unwrap();
let (_uuid, mut file) = index_scheduler.create_update_file_with_uuid(file_uuid).unwrap();
let documents_count =
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
.unwrap() as u64;
@ -890,10 +839,8 @@ mod tests {
)
.unwrap();
let index_scheduler_handle = IndexSchedulerHandle {
_tempdir: tempdir,
test_breakpoint_rcv: receiver,
};
let index_scheduler_handle =
IndexSchedulerHandle { _tempdir: tempdir, test_breakpoint_rcv: receiver };
(index_scheduler, index_scheduler_handle)
}
@ -952,18 +899,12 @@ mod tests {
fn insert_task_while_another_task_is_processing() {
let (index_scheduler, handle) = IndexScheduler::test(true);
index_scheduler
.register(index_creation_task("index_a", "id"))
.unwrap();
index_scheduler.register(index_creation_task("index_a", "id")).unwrap();
handle.wait_till(Breakpoint::BatchCreated);
// while the task is processing can we register another task?
index_scheduler.register(index_creation_task("index_b", "id")).unwrap();
index_scheduler
.register(index_creation_task("index_b", "id"))
.unwrap();
index_scheduler
.register(KindWithContent::IndexDeletion {
index_uid: S("index_a"),
})
.register(KindWithContent::IndexDeletion { index_uid: S("index_a") })
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler));
@ -976,21 +917,13 @@ mod tests {
let (index_scheduler, handle) = IndexScheduler::test(true);
index_scheduler
.register(KindWithContent::IndexCreation {
index_uid: S("doggos"),
primary_key: None,
})
.register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None })
.unwrap();
index_scheduler
.register(KindWithContent::IndexCreation {
index_uid: S("cattos"),
primary_key: None,
})
.register(KindWithContent::IndexCreation { index_uid: S("cattos"), primary_key: None })
.unwrap();
index_scheduler
.register(KindWithContent::IndexDeletion {
index_uid: S("doggos"),
})
.register(KindWithContent::IndexDeletion { index_uid: S("doggos") })
.unwrap();
handle.wait_till(Breakpoint::Start);
@ -1011,25 +944,16 @@ mod tests {
let (index_scheduler, handle) = IndexScheduler::test(false);
index_scheduler
.register(KindWithContent::IndexCreation {
index_uid: S("doggos"),
primary_key: None,
})
.register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None })
.unwrap();
index_scheduler
.register(KindWithContent::DocumentClear {
index_uid: S("doggos"),
})
.register(KindWithContent::DocumentClear { index_uid: S("doggos") })
.unwrap();
index_scheduler
.register(KindWithContent::DocumentClear {
index_uid: S("doggos"),
})
.register(KindWithContent::DocumentClear { index_uid: S("doggos") })
.unwrap();
index_scheduler
.register(KindWithContent::DocumentClear {
index_uid: S("doggos"),
})
.register(KindWithContent::DocumentClear { index_uid: S("doggos") })
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
@ -1211,10 +1135,7 @@ mod tests {
}"#;
index_scheduler
.register(KindWithContent::IndexCreation {
index_uid: S("doggos"),
primary_key: None,
})
.register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None })
.unwrap();
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
@ -1233,9 +1154,7 @@ mod tests {
})
.unwrap();
index_scheduler
.register(KindWithContent::IndexDeletion {
index_uid: S("doggos"),
})
.register(KindWithContent::IndexDeletion { index_uid: S("doggos") })
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler));
@ -1263,9 +1182,7 @@ mod tests {
for name in index_names {
index_scheduler
.register(KindWithContent::DocumentClear {
index_uid: name.to_string(),
})
.register(KindWithContent::DocumentClear { index_uid: name.to_string() })
.unwrap();
}
@ -1308,10 +1225,7 @@ mod tests {
index_scheduler
.register(KindWithContent::IndexSwap {
swaps: vec![
("a".to_owned(), "b".to_owned()),
("c".to_owned(), "d".to_owned()),
],
swaps: vec![("a".to_owned(), "b".to_owned()), ("c".to_owned(), "d".to_owned())],
})
.unwrap();
@ -1319,9 +1233,7 @@ mod tests {
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_swap_processed");
index_scheduler
.register(KindWithContent::IndexSwap {
swaps: vec![("a".to_owned(), "c".to_owned())],
})
.register(KindWithContent::IndexSwap { swaps: vec![("a".to_owned(), "c".to_owned())] })
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed");
@ -1353,9 +1265,7 @@ mod tests {
})
.unwrap();
index_scheduler
.register(KindWithContent::IndexDeletion {
index_uid: S("doggos"),
})
.register(KindWithContent::IndexDeletion { index_uid: S("doggos") })
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler));

View file

@ -1,16 +1,11 @@
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, RoTxn};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::Details;
use meilisearch_types::{
heed::{
types::{OwnedType, SerdeBincode, SerdeJson, Str},
Database, RoTxn,
},
tasks::Task,
};
use meilisearch_types::tasks::{Details, Task};
use roaring::RoaringBitmap;
use crate::BEI128;
use crate::{index_mapper::IndexMapper, IndexScheduler, Kind, Status};
use crate::index_mapper::IndexMapper;
use crate::{IndexScheduler, Kind, Status, BEI128};
pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let IndexScheduler {
@ -37,9 +32,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let mut snap = String::new();
let processing_tasks = processing_tasks.read().unwrap().processing.clone();
snap.push_str(&format!(
"### Autobatching Enabled = {autobatching_enabled}\n"
));
snap.push_str(&format!("### Autobatching Enabled = {autobatching_enabled}\n"));
snap.push_str("### Processing Tasks:\n");
snap.push_str(&snapshot_bitmap(&processing_tasks));
snap.push_str("\n----------------------------------------------------------------------\n");
@ -151,6 +144,7 @@ fn snapshot_task(task: &Task) -> String {
snap.push('}');
snap
}
fn snaphsot_details(d: &Details) -> String {
match d {
Details::DocumentAddition {
@ -191,8 +185,7 @@ fn snaphsot_details(d: &Details) -> String {
},
Details::IndexSwap { swaps } => {
format!("{{ indexes: {swaps:?} }}")
},
}
}
}
@ -205,6 +198,7 @@ fn snapshot_status(rtxn: &RoTxn, db: Database<SerdeBincode<Status>, RoaringBitma
}
snap
}
fn snapshot_kind(rtxn: &RoTxn, db: Database<SerdeBincode<Kind>, RoaringBitmapCodec>) -> String {
let mut snap = String::new();
let mut iter = db.iter(rtxn).unwrap();
@ -227,11 +221,6 @@ fn snapshot_index_tasks(rtxn: &RoTxn, db: Database<Str, RoaringBitmapCodec>) ->
}
fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String {
let names = mapper
.indexes(rtxn)
.unwrap()
.into_iter()
.map(|(n, _)| n)
.collect::<Vec<_>>();
let names = mapper.indexes(rtxn).unwrap().into_iter().map(|(n, _)| n).collect::<Vec<_>>();
format!("{names:?}")
}

View file

@ -2,29 +2,22 @@
use std::ops::Bound;
use meilisearch_types::heed::types::OwnedType;
use meilisearch_types::heed::Database;
use meilisearch_types::heed::{types::DecodeIgnore, RoTxn, RwTxn};
use meilisearch_types::heed::types::{DecodeIgnore, OwnedType};
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::{CboRoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status};
use roaring::{MultiOps, RoaringBitmap};
use time::OffsetDateTime;
use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128};
use meilisearch_types::tasks::{Kind, KindWithContent, Status};
impl IndexScheduler {
pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result<RoaringBitmap> {
enum_iterator::all()
.map(|s| self.get_status(&rtxn, s))
.union()
enum_iterator::all().map(|s| self.get_status(&rtxn, s)).union()
}
pub(crate) fn last_task_id(&self, rtxn: &RoTxn) -> Result<Option<TaskId>> {
Ok(self
.all_tasks
.remap_data_type::<DecodeIgnore>()
.last(rtxn)?
.map(|(k, _)| k.get() + 1))
Ok(self.all_tasks.remap_data_type::<DecodeIgnore>().last(rtxn)?.map(|(k, _)| k.get() + 1))
}
pub(crate) fn next_task_id(&self, rtxn: &RoTxn) -> Result<TaskId> {
@ -45,16 +38,13 @@ impl IndexScheduler {
tasks
.into_iter()
.map(|task_id| {
self.get_task(rtxn, task_id)
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue))
self.get_task(rtxn, task_id).and_then(|task| task.ok_or(Error::CorruptedTaskQueue))
})
.collect::<Result<_>>()
}
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
let old_task = self
.get_task(wtxn, task.uid)?
.ok_or(Error::CorruptedTaskQueue)?;
let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?;
debug_assert_eq!(old_task.uid, task.uid);
@ -85,19 +75,13 @@ impl IndexScheduler {
"Cannot update a task's enqueued_at time"
);
if old_task.started_at != task.started_at {
assert!(
old_task.started_at.is_none(),
"Cannot update a task's started_at time"
);
assert!(old_task.started_at.is_none(), "Cannot update a task's started_at time");
if let Some(started_at) = task.started_at {
insert_task_datetime(wtxn, self.started_at, started_at, task.uid)?;
}
}
if old_task.finished_at != task.finished_at {
assert!(
old_task.finished_at.is_none(),
"Cannot update a task's finished_at time"
);
assert!(old_task.finished_at.is_none(), "Cannot update a task's finished_at time");
if let Some(finished_at) = task.finished_at {
insert_task_datetime(wtxn, self.finished_at, finished_at, task.uid)?;
}
@ -269,7 +253,9 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
}
}
}
K::TaskCancelation { .. } | K::TaskDeletion { .. } | K::DumpExport { .. } | K::Snapshot => (),
K::TaskCancelation { .. } | K::TaskDeletion { .. } | K::DumpExport { .. } | K::Snapshot => {
()
}
};
for index_uid in index_uids {
if index_uid == &swap.0 {