MeiliSearch/meilisearch-types/src/tasks.rs
Clémentine Urquizar - curqui 457a473b72
Bring back release-v0.30.0 into release-v0.30.0-temp (final: into main) (#3145)
* Fix error code of the "duplicate index found" error

* Use the content of the ProcessingTasks in the tasks cancelation system

* Change the missing_filters error code into missing_task_filters

* WIP Introduce the invalid_task_uid error code

* Use more precise error codes/message for the task routes

+ Allow star operator in delete/cancel tasks
+ rename originalQuery to originalFilters
+ Display error/canceled_by in task view even when they are = null
+ Rename task filter fields by using their plural forms
+ Prepare an error code for canceledBy filter
+ Only return global tasks if the API key action `index.*` is there

* Add canceledBy task filter

* Update tests following task API changes

* Rename original_query to original_filters everywhere

* Update more insta-snap tests

* Make clippy happy

They're a happy clip now.

* Make rustfmt happy

>:-(

* Fix Index name parsing error message to fit the specification

* Bump milli version to 0.35.1

* Fix the new error messages

* fix the error messages and add tests

* rename the error codes for the sake of consistency

* refactor the way we send the cli informations + add the analytics for the config file and ssl usage

* Apply suggestions from code review

Co-authored-by: Clément Renault <clement@meilisearch.com>

* add a comment over the new infos structure

* reformat, sorry @kero

* Store analytics for the documents deletions

* Add analytics on all the settings

* Spawn threads with names

* Spawn rayon threads with names

* update the distinct attributes to the spec update

* update the analytics on the search route

* implements the analytics on the health and version routes

* Fix task details serialization

* Add the question mark to the task deletion query filter

* Add the question mark to the task cancelation query filter

* Fix tests

* add analytics on the task route

* Add all the missing fields of the new task query type
* Create a new analytics for the task deletion
* Create a new analytics for the task creation

* batch the tasks seen events

* Update the finite pagination analytics

* add the analytics of the swap-indexes route

* Stop removing the DB when failing to read it

* Rename originalFilters into originalFilters

* Rename matchedDocuments into providedIds

* Add `workflow_dispatch` to flaky.yml

* Bump grenad to 0.4.4

* Bump milli to version v0.37.0

* Don't multiply total memory returned by sysinfo anymore

sysinfo now returns bytes rather than KB

* Add a dispatch to the publish binaries workflow

* Fix publish release CI

* Don't use gold but the default linker

* Always display details for the indexDeletion task

* Fix the insta tests

* refactorize the whole test suite
1. Make a call to assert_internally_consistent automatically when snapshoting the scheduler. There is no point in snapshoting something broken and expect the dumb humans to notice.
2. Replace every possible call to assert_internally_consistent by a snapshot of the scheduler. It takes as many lines and ensure we never change something without noticing in any tests ever.
3. Name every snapshots: it's easier to debug when something goes wrong and easier to review in general.
4. Stop skipping breakpoints, it's too easy to miss something. Now you must explicitely show which path is the scheduler supposed to use.
5. Add a timeout on the channel.recv, it eases the process of writing tests, now when something file you get a failure instead of a deadlock.

* rebase on release-v0.30

* makes clippy happy

* update the snapshots after a rebase

* try to remove the flakyness of the failing test

* Add more analytics on the ranking rules positions

* Update the dump test to check for the dumpUid dumpCreation task details

* send the ranking rules as a string because amplitude is too dumb to process an array as a single value

* Display a null dumpUid until we computed the dump itself on disk

* Update tests

* Check if the master key is missing before returning an error

Co-authored-by: Loïc Lecrenier <loic.lecrenier@me.com>
Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com>
Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: ManyTheFish <many@meilisearch.com>
Co-authored-by: Tamo <tamo@meilisearch.com>
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2022-11-28 16:27:41 +01:00

564 lines
21 KiB
Rust

use std::collections::HashSet;
use std::fmt::{Display, Write};
use std::str::FromStr;
use enum_iterator::Sequence;
use milli::update::IndexDocumentsMethod;
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize, Serializer};
use time::{Duration, OffsetDateTime};
use uuid::Uuid;
use crate::error::{Code, ResponseError};
use crate::keys::Key;
use crate::settings::{Settings, Unchecked};
use crate::InstanceUid;
pub type TaskId = u32;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Task {
pub uid: TaskId,
#[serde(with = "time::serde::rfc3339")]
pub enqueued_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339::option")]
pub started_at: Option<OffsetDateTime>,
#[serde(with = "time::serde::rfc3339::option")]
pub finished_at: Option<OffsetDateTime>,
pub error: Option<ResponseError>,
pub canceled_by: Option<TaskId>,
pub details: Option<Details>,
pub status: Status,
pub kind: KindWithContent,
}
impl Task {
pub fn index_uid(&self) -> Option<&str> {
use KindWithContent::*;
match &self.kind {
DumpCreation { .. }
| SnapshotCreation
| TaskCancelation { .. }
| TaskDeletion { .. }
| IndexSwap { .. } => None,
DocumentAdditionOrUpdate { index_uid, .. }
| DocumentDeletion { index_uid, .. }
| DocumentClear { index_uid }
| SettingsUpdate { index_uid, .. }
| IndexCreation { index_uid, .. }
| IndexUpdate { index_uid, .. }
| IndexDeletion { index_uid } => Some(index_uid),
}
}
/// Return the list of indexes updated by this tasks.
pub fn indexes(&self) -> Vec<&str> {
self.kind.indexes()
}
/// Return the content-uuid if there is one
pub fn content_uuid(&self) -> Option<Uuid> {
match self.kind {
KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => Some(content_file),
KindWithContent::DocumentDeletion { .. }
| KindWithContent::DocumentClear { .. }
| KindWithContent::SettingsUpdate { .. }
| KindWithContent::IndexDeletion { .. }
| KindWithContent::IndexCreation { .. }
| KindWithContent::IndexUpdate { .. }
| KindWithContent::IndexSwap { .. }
| KindWithContent::TaskCancelation { .. }
| KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpCreation { .. }
| KindWithContent::SnapshotCreation => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum KindWithContent {
DocumentAdditionOrUpdate {
index_uid: String,
primary_key: Option<String>,
method: IndexDocumentsMethod,
content_file: Uuid,
documents_count: u64,
allow_index_creation: bool,
},
DocumentDeletion {
index_uid: String,
documents_ids: Vec<String>,
},
DocumentClear {
index_uid: String,
},
SettingsUpdate {
index_uid: String,
new_settings: Box<Settings<Unchecked>>,
is_deletion: bool,
allow_index_creation: bool,
},
IndexDeletion {
index_uid: String,
},
IndexCreation {
index_uid: String,
primary_key: Option<String>,
},
IndexUpdate {
index_uid: String,
primary_key: Option<String>,
},
IndexSwap {
swaps: Vec<IndexSwap>,
},
TaskCancelation {
query: String,
tasks: RoaringBitmap,
},
TaskDeletion {
query: String,
tasks: RoaringBitmap,
},
DumpCreation {
keys: Vec<Key>,
instance_uid: Option<InstanceUid>,
},
SnapshotCreation,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct IndexSwap {
pub indexes: (String, String),
}
impl KindWithContent {
pub fn as_kind(&self) -> Kind {
match self {
KindWithContent::DocumentAdditionOrUpdate { .. } => Kind::DocumentAdditionOrUpdate,
KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion,
KindWithContent::DocumentClear { .. } => Kind::DocumentDeletion,
KindWithContent::SettingsUpdate { .. } => Kind::SettingsUpdate,
KindWithContent::IndexCreation { .. } => Kind::IndexCreation,
KindWithContent::IndexDeletion { .. } => Kind::IndexDeletion,
KindWithContent::IndexUpdate { .. } => Kind::IndexUpdate,
KindWithContent::IndexSwap { .. } => Kind::IndexSwap,
KindWithContent::TaskCancelation { .. } => Kind::TaskCancelation,
KindWithContent::TaskDeletion { .. } => Kind::TaskDeletion,
KindWithContent::DumpCreation { .. } => Kind::DumpCreation,
KindWithContent::SnapshotCreation => Kind::SnapshotCreation,
}
}
pub fn indexes(&self) -> Vec<&str> {
use KindWithContent::*;
match self {
DumpCreation { .. }
| SnapshotCreation
| TaskCancelation { .. }
| TaskDeletion { .. } => vec![],
DocumentAdditionOrUpdate { index_uid, .. }
| DocumentDeletion { index_uid, .. }
| DocumentClear { index_uid }
| SettingsUpdate { index_uid, .. }
| IndexCreation { index_uid, .. }
| IndexUpdate { index_uid, .. }
| IndexDeletion { index_uid } => vec![index_uid],
IndexSwap { swaps } => {
let mut indexes = HashSet::<&str>::default();
for swap in swaps {
indexes.insert(swap.indexes.0.as_str());
indexes.insert(swap.indexes.1.as_str());
}
indexes.into_iter().collect()
}
}
}
/// Returns the default `Details` that correspond to this `KindWithContent`,
/// `None` if it cannot be generated.
pub fn default_details(&self) -> Option<Details> {
match self {
KindWithContent::DocumentAdditionOrUpdate { documents_count, .. } => {
Some(Details::DocumentAdditionOrUpdate {
received_documents: *documents_count,
indexed_documents: None,
})
}
KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => {
Some(Details::DocumentDeletion {
provided_ids: documents_ids.len(),
deleted_documents: None,
})
}
KindWithContent::DocumentClear { .. } | KindWithContent::IndexDeletion { .. } => {
Some(Details::ClearAll { deleted_documents: None })
}
KindWithContent::SettingsUpdate { new_settings, .. } => {
Some(Details::SettingsUpdate { settings: new_settings.clone() })
}
KindWithContent::IndexCreation { primary_key, .. }
| KindWithContent::IndexUpdate { primary_key, .. } => {
Some(Details::IndexInfo { primary_key: primary_key.clone() })
}
KindWithContent::IndexSwap { swaps } => {
Some(Details::IndexSwap { swaps: swaps.clone() })
}
KindWithContent::TaskCancelation { query, tasks } => Some(Details::TaskCancelation {
matched_tasks: tasks.len(),
canceled_tasks: None,
original_filter: query.clone(),
}),
KindWithContent::TaskDeletion { query, tasks } => Some(Details::TaskDeletion {
matched_tasks: tasks.len(),
deleted_tasks: None,
original_filter: query.clone(),
}),
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
KindWithContent::SnapshotCreation => None,
}
}
pub fn default_finished_details(&self) -> Option<Details> {
match self {
KindWithContent::DocumentAdditionOrUpdate { documents_count, .. } => {
Some(Details::DocumentAdditionOrUpdate {
received_documents: *documents_count,
indexed_documents: Some(0),
})
}
KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => {
Some(Details::DocumentDeletion {
provided_ids: documents_ids.len(),
deleted_documents: Some(0),
})
}
KindWithContent::DocumentClear { .. } => {
Some(Details::ClearAll { deleted_documents: None })
}
KindWithContent::SettingsUpdate { new_settings, .. } => {
Some(Details::SettingsUpdate { settings: new_settings.clone() })
}
KindWithContent::IndexDeletion { .. } => None,
KindWithContent::IndexCreation { primary_key, .. }
| KindWithContent::IndexUpdate { primary_key, .. } => {
Some(Details::IndexInfo { primary_key: primary_key.clone() })
}
KindWithContent::IndexSwap { .. } => {
todo!()
}
KindWithContent::TaskCancelation { query, tasks } => Some(Details::TaskCancelation {
matched_tasks: tasks.len(),
canceled_tasks: Some(0),
original_filter: query.clone(),
}),
KindWithContent::TaskDeletion { query, tasks } => Some(Details::TaskDeletion {
matched_tasks: tasks.len(),
deleted_tasks: Some(0),
original_filter: query.clone(),
}),
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
KindWithContent::SnapshotCreation => None,
}
}
}
impl From<&KindWithContent> for Option<Details> {
fn from(kind: &KindWithContent) -> Self {
match kind {
KindWithContent::DocumentAdditionOrUpdate { documents_count, .. } => {
Some(Details::DocumentAdditionOrUpdate {
received_documents: *documents_count,
indexed_documents: None,
})
}
KindWithContent::DocumentDeletion { .. } => None,
KindWithContent::DocumentClear { .. } => None,
KindWithContent::SettingsUpdate { new_settings, .. } => {
Some(Details::SettingsUpdate { settings: new_settings.clone() })
}
KindWithContent::IndexDeletion { .. } => None,
KindWithContent::IndexCreation { primary_key, .. } => {
Some(Details::IndexInfo { primary_key: primary_key.clone() })
}
KindWithContent::IndexUpdate { primary_key, .. } => {
Some(Details::IndexInfo { primary_key: primary_key.clone() })
}
KindWithContent::IndexSwap { .. } => None,
KindWithContent::TaskCancelation { query, tasks } => Some(Details::TaskCancelation {
matched_tasks: tasks.len(),
canceled_tasks: None,
original_filter: query.clone(),
}),
KindWithContent::TaskDeletion { query, tasks } => Some(Details::TaskDeletion {
matched_tasks: tasks.len(),
deleted_tasks: None,
original_filter: query.clone(),
}),
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
KindWithContent::SnapshotCreation => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Sequence)]
#[serde(rename_all = "camelCase")]
pub enum Status {
Enqueued,
Processing,
Succeeded,
Failed,
Canceled,
}
impl Display for Status {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Status::Enqueued => write!(f, "enqueued"),
Status::Processing => write!(f, "processing"),
Status::Succeeded => write!(f, "succeeded"),
Status::Failed => write!(f, "failed"),
Status::Canceled => write!(f, "canceled"),
}
}
}
impl FromStr for Status {
type Err = ResponseError;
fn from_str(status: &str) -> Result<Self, Self::Err> {
if status.eq_ignore_ascii_case("enqueued") {
Ok(Status::Enqueued)
} else if status.eq_ignore_ascii_case("processing") {
Ok(Status::Processing)
} else if status.eq_ignore_ascii_case("succeeded") {
Ok(Status::Succeeded)
} else if status.eq_ignore_ascii_case("failed") {
Ok(Status::Failed)
} else if status.eq_ignore_ascii_case("canceled") {
Ok(Status::Canceled)
} else {
Err(ResponseError::from_msg(
format!(
"`{}` is not a status. Available status are {}.",
status,
enum_iterator::all::<Status>()
.map(|s| format!("`{s}`"))
.collect::<Vec<String>>()
.join(", ")
),
Code::BadRequest,
))
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Sequence)]
#[serde(rename_all = "camelCase")]
pub enum Kind {
DocumentAdditionOrUpdate,
DocumentDeletion,
SettingsUpdate,
IndexCreation,
IndexDeletion,
IndexUpdate,
IndexSwap,
TaskCancelation,
TaskDeletion,
DumpCreation,
SnapshotCreation,
}
impl Kind {
pub fn related_to_one_index(&self) -> bool {
match self {
Kind::DocumentAdditionOrUpdate
| Kind::DocumentDeletion
| Kind::SettingsUpdate
| Kind::IndexCreation
| Kind::IndexDeletion
| Kind::IndexUpdate => true,
Kind::IndexSwap
| Kind::TaskCancelation
| Kind::TaskDeletion
| Kind::DumpCreation
| Kind::SnapshotCreation => false,
}
}
}
impl Display for Kind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Kind::DocumentAdditionOrUpdate => write!(f, "documentAdditionOrUpdate"),
Kind::DocumentDeletion => write!(f, "documentDeletion"),
Kind::SettingsUpdate => write!(f, "settingsUpdate"),
Kind::IndexCreation => write!(f, "indexCreation"),
Kind::IndexDeletion => write!(f, "indexDeletion"),
Kind::IndexUpdate => write!(f, "indexUpdate"),
Kind::IndexSwap => write!(f, "indexSwap"),
Kind::TaskCancelation => write!(f, "taskCancelation"),
Kind::TaskDeletion => write!(f, "taskDeletion"),
Kind::DumpCreation => write!(f, "dumpCreation"),
Kind::SnapshotCreation => write!(f, "snapshotCreation"),
}
}
}
impl FromStr for Kind {
type Err = ResponseError;
fn from_str(kind: &str) -> Result<Self, Self::Err> {
if kind.eq_ignore_ascii_case("indexCreation") {
Ok(Kind::IndexCreation)
} else if kind.eq_ignore_ascii_case("indexUpdate") {
Ok(Kind::IndexUpdate)
} else if kind.eq_ignore_ascii_case("indexSwap") {
Ok(Kind::IndexSwap)
} else if kind.eq_ignore_ascii_case("indexDeletion") {
Ok(Kind::IndexDeletion)
} else if kind.eq_ignore_ascii_case("documentAdditionOrUpdate") {
Ok(Kind::DocumentAdditionOrUpdate)
} else if kind.eq_ignore_ascii_case("documentDeletion") {
Ok(Kind::DocumentDeletion)
} else if kind.eq_ignore_ascii_case("settingsUpdate") {
Ok(Kind::SettingsUpdate)
} else if kind.eq_ignore_ascii_case("taskCancelation") {
Ok(Kind::TaskCancelation)
} else if kind.eq_ignore_ascii_case("taskDeletion") {
Ok(Kind::TaskDeletion)
} else if kind.eq_ignore_ascii_case("dumpCreation") {
Ok(Kind::DumpCreation)
} else if kind.eq_ignore_ascii_case("snapshotCreation") {
Ok(Kind::SnapshotCreation)
} else {
Err(ResponseError::from_msg(
format!(
"`{}` is not a type. Available types are {}.",
kind,
enum_iterator::all::<Kind>()
.map(|k| format!(
"`{}`",
// by default serde is going to insert `"` around the value.
serde_json::to_string(&k).unwrap().trim_matches('"')
))
.collect::<Vec<String>>()
.join(", ")
),
Code::BadRequest,
))
}
}
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum Details {
DocumentAdditionOrUpdate { received_documents: u64, indexed_documents: Option<u64> },
SettingsUpdate { settings: Box<Settings<Unchecked>> },
IndexInfo { primary_key: Option<String> },
DocumentDeletion { provided_ids: usize, deleted_documents: Option<u64> },
ClearAll { deleted_documents: Option<u64> },
TaskCancelation { matched_tasks: u64, canceled_tasks: Option<u64>, original_filter: String },
TaskDeletion { matched_tasks: u64, deleted_tasks: Option<u64>, original_filter: String },
Dump { dump_uid: Option<String> },
IndexSwap { swaps: Vec<IndexSwap> },
}
impl Details {
pub fn to_failed(&self) -> Self {
let mut details = self.clone();
match &mut details {
Self::DocumentAdditionOrUpdate { indexed_documents, .. } => {
*indexed_documents = Some(0)
}
Self::DocumentDeletion { deleted_documents, .. } => *deleted_documents = Some(0),
Self::ClearAll { deleted_documents } => *deleted_documents = Some(0),
Self::TaskCancelation { canceled_tasks, .. } => *canceled_tasks = Some(0),
Self::TaskDeletion { deleted_tasks, .. } => *deleted_tasks = Some(0),
Self::SettingsUpdate { .. }
| Self::IndexInfo { .. }
| Self::Dump { .. }
| Self::IndexSwap { .. } => (),
}
details
}
}
/// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for
/// https://github.com/time-rs/time/issues/378.
/// This code is a port of the old code of time that was removed in 0.2.
pub fn serialize_duration<S: Serializer>(
duration: &Option<Duration>,
serializer: S,
) -> Result<S::Ok, S::Error> {
match duration {
Some(duration) => {
// technically speaking, negative duration is not valid ISO 8601
if duration.is_negative() {
return serializer.serialize_none();
}
const SECS_PER_DAY: i64 = Duration::DAY.whole_seconds();
let secs = duration.whole_seconds();
let days = secs / SECS_PER_DAY;
let secs = secs - days * SECS_PER_DAY;
let hasdate = days != 0;
let nanos = duration.subsec_nanoseconds();
let hastime = (secs != 0 || nanos != 0) || !hasdate;
// all the following unwrap can't fail
let mut res = String::new();
write!(&mut res, "P").unwrap();
if hasdate {
write!(&mut res, "{}D", days).unwrap();
}
const NANOS_PER_MILLI: i32 = Duration::MILLISECOND.subsec_nanoseconds();
const NANOS_PER_MICRO: i32 = Duration::MICROSECOND.subsec_nanoseconds();
if hastime {
if nanos == 0 {
write!(&mut res, "T{}S", secs).unwrap();
} else if nanos % NANOS_PER_MILLI == 0 {
write!(&mut res, "T{}.{:03}S", secs, nanos / NANOS_PER_MILLI).unwrap();
} else if nanos % NANOS_PER_MICRO == 0 {
write!(&mut res, "T{}.{:06}S", secs, nanos / NANOS_PER_MICRO).unwrap();
} else {
write!(&mut res, "T{}.{:09}S", secs, nanos).unwrap();
}
}
serializer.serialize_str(&res)
}
None => serializer.serialize_none(),
}
}
#[cfg(test)]
mod tests {
use super::Details;
use crate::heed::types::SerdeJson;
use crate::heed::{BytesDecode, BytesEncode};
#[test]
fn bad_deser() {
let details = Details::TaskDeletion {
matched_tasks: 1,
deleted_tasks: None,
original_filter: "hello".to_owned(),
};
let serialised = SerdeJson::<Details>::bytes_encode(&details).unwrap();
let deserialised = SerdeJson::<Details>::bytes_decode(&serialised).unwrap();
meili_snap::snapshot!(format!("{:?}", details), @r###"TaskDeletion { matched_tasks: 1, deleted_tasks: None, original_filter: "hello" }"###);
meili_snap::snapshot!(format!("{:?}", deserialised), @r###"TaskDeletion { matched_tasks: 1, deleted_tasks: None, original_filter: "hello" }"###);
}
}