mirror of
synced 2025-03-27 01:50:30 +01:00
414 lines
14 KiB
414 lines
14 KiB
use serde::Deserialize;
use time::{Duration, OffsetDateTime};
use uuid::Uuid;
use super::errors::ResponseError;
use super::meta::IndexUid;
use super::settings::{Settings, Unchecked};
pub type TaskId = u32;
pub type BatchId = u32;
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[cfg_attr(test, derive(serde::Serialize))]
pub struct Task {
pub id: TaskId,
/// The name of the index the task is targeting. If it isn't targeting any index (i.e Dump task)
/// then this is None
// TODO: when next forward breaking dumps, it would be a good idea to move this field inside of
// the TaskContent.
pub content: TaskContent,
pub events: Vec<TaskEvent>,
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[cfg_attr(test, derive(serde::Serialize))]
pub enum TaskContent {
DocumentAddition {
index_uid: IndexUid,
content_uuid: Uuid,
merge_strategy: IndexDocumentsMethod,
primary_key: Option<String>,
documents_count: usize,
allow_index_creation: bool,
DocumentDeletion {
index_uid: IndexUid,
deletion: DocumentDeletion,
SettingsUpdate {
index_uid: IndexUid,
settings: Settings<Unchecked>,
/// Indicates whether the task was a deletion
is_deletion: bool,
allow_index_creation: bool,
IndexDeletion {
index_uid: IndexUid,
IndexCreation {
index_uid: IndexUid,
primary_key: Option<String>,
IndexUpdate {
index_uid: IndexUid,
primary_key: Option<String>,
Dump {
uid: String,
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize)]
#[cfg_attr(test, derive(serde::Serialize))]
pub enum IndexDocumentsMethod {
/// Replace the previous document with the new one,
/// removing all the already known attributes.
/// Merge the previous version of the document with the new version,
/// replacing old attributes values with the new ones and add the new attributes.
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[cfg_attr(test, derive(serde::Serialize))]
pub enum DocumentDeletion {
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[cfg_attr(test, derive(serde::Serialize))]
pub enum TaskEvent {
Created(#[serde(with = "time::serde::rfc3339")] OffsetDateTime),
Batched {
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
batch_id: BatchId,
Processing(#[serde(with = "time::serde::rfc3339")] OffsetDateTime),
Succeeded {
result: TaskResult,
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
Failed {
error: ResponseError,
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[cfg_attr(test, derive(serde::Serialize))]
pub enum TaskResult {
DocumentAddition { indexed_documents: u64 },
DocumentDeletion { deleted_documents: u64 },
ClearAll { deleted_documents: u64 },
impl Task {
/// Return true when a task is finished.
/// A task is finished when its last state is either `Succeeded` or `Failed`.
pub fn is_finished(&self) -> bool {
self.events.last().map_or(false, |event| {
matches!(event, TaskEvent::Succeeded { .. } | TaskEvent::Failed { .. })
/// Return the content_uuid of the `Task` if there is one.
pub fn get_content_uuid(&self) -> Option<Uuid> {
match self {
Task { content: TaskContent::DocumentAddition { content_uuid, .. }, .. } => {
_ => None,
pub fn index_uid(&self) -> Option<&str> {
match &self.content {
TaskContent::DocumentAddition { index_uid, .. }
| TaskContent::DocumentDeletion { index_uid, .. }
| TaskContent::SettingsUpdate { index_uid, .. }
| TaskContent::IndexDeletion { index_uid }
| TaskContent::IndexCreation { index_uid, .. }
| TaskContent::IndexUpdate { index_uid, .. } => Some(index_uid.as_str()),
TaskContent::Dump { .. } => None,
impl IndexUid {
pub fn into_inner(self) -> String {
/// Return a reference over the inner str.
pub fn as_str(&self) -> &str {
impl std::ops::Deref for IndexUid {
type Target = str;
fn deref(&self) -> &Self::Target {
#[cfg_attr(test, derive(serde::Serialize))]
#[cfg_attr(test, serde(rename_all = "camelCase"))]
pub struct TaskView {
pub uid: TaskId,
pub index_uid: Option<String>,
pub status: TaskStatus,
#[cfg_attr(test, serde(rename = "type"))]
pub task_type: TaskType,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub details: Option<TaskDetails>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub error: Option<ResponseError>,
#[cfg_attr(test, serde(serialize_with = "serialize_duration"))]
pub duration: Option<Duration>,
#[cfg_attr(test, serde(serialize_with = "time::serde::rfc3339::serialize"))]
pub enqueued_at: OffsetDateTime,
#[cfg_attr(test, serde(serialize_with = "time::serde::rfc3339::option::serialize"))]
pub started_at: Option<OffsetDateTime>,
#[cfg_attr(test, serde(serialize_with = "time::serde::rfc3339::option::serialize"))]
pub finished_at: Option<OffsetDateTime>,
impl From<Task> for TaskView {
fn from(task: Task) -> Self {
let index_uid = task.index_uid().map(String::from);
let Task { id, content, events } = task;
let (task_type, mut details) = match content {
TaskContent::DocumentAddition { documents_count, .. } => {
let details = TaskDetails::DocumentAddition {
received_documents: documents_count,
indexed_documents: None,
(TaskType::DocumentAdditionOrUpdate, Some(details))
TaskContent::DocumentDeletion { deletion: DocumentDeletion::Ids(ids), .. } => (
Some(TaskDetails::DocumentDeletion {
received_document_ids: ids.len(),
deleted_documents: None,
TaskContent::DocumentDeletion { deletion: DocumentDeletion::Clear, .. } => (
Some(TaskDetails::ClearAll { deleted_documents: None }),
TaskContent::IndexDeletion { .. } => {
(TaskType::IndexDeletion, Some(TaskDetails::ClearAll { deleted_documents: None }))
TaskContent::SettingsUpdate { settings, .. } => {
(TaskType::SettingsUpdate, Some(TaskDetails::Settings { settings }))
TaskContent::IndexCreation { primary_key, .. } => {
(TaskType::IndexCreation, Some(TaskDetails::IndexInfo { primary_key }))
TaskContent::IndexUpdate { primary_key, .. } => {
(TaskType::IndexUpdate, Some(TaskDetails::IndexInfo { primary_key }))
TaskContent::Dump { uid } => {
(TaskType::DumpCreation, Some(TaskDetails::Dump { dump_uid: uid }))
// An event always has at least one event: "Created"
let (status, error, finished_at) = match events.last().unwrap() {
TaskEvent::Created(_) => (TaskStatus::Enqueued, None, None),
TaskEvent::Batched { .. } => (TaskStatus::Enqueued, None, None),
TaskEvent::Processing(_) => (TaskStatus::Processing, None, None),
TaskEvent::Succeeded { timestamp, result } => {
match (result, &mut details) {
TaskResult::DocumentAddition { indexed_documents: num, .. },
Some(TaskDetails::DocumentAddition { ref mut indexed_documents, .. }),
) => {
TaskResult::DocumentDeletion { deleted_documents: docs, .. },
Some(TaskDetails::DocumentDeletion { ref mut deleted_documents, .. }),
) => {
TaskResult::ClearAll { deleted_documents: docs },
Some(TaskDetails::ClearAll { ref mut deleted_documents }),
) => {
_ => (),
(TaskStatus::Succeeded, None, Some(*timestamp))
TaskEvent::Failed { timestamp, error } => {
match details {
Some(TaskDetails::DocumentDeletion { ref mut deleted_documents, .. }) => {
Some(TaskDetails::ClearAll { ref mut deleted_documents, .. }) => {
Some(TaskDetails::DocumentAddition { ref mut indexed_documents, .. }) => {
_ => (),
(TaskStatus::Failed, Some(error.clone()), Some(*timestamp))
let enqueued_at = match events.first() {
Some(TaskEvent::Created(ts)) => *ts,
_ => unreachable!("A task must always have a creation event."),
let started_at = events.iter().find_map(|e| match e {
TaskEvent::Processing(ts) => Some(*ts),
_ => None,
let duration = finished_at.zip(started_at).map(|(tf, ts)| (tf - ts));
Self {
uid: id,
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(serde::Serialize))]
#[serde(rename_all = "camelCase")]
pub enum TaskType {
impl From<TaskContent> for TaskType {
fn from(other: TaskContent) -> Self {
match other {
TaskContent::IndexCreation { .. } => TaskType::IndexCreation,
TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate,
TaskContent::IndexDeletion { .. } => TaskType::IndexDeletion,
TaskContent::DocumentAddition { .. } => TaskType::DocumentAdditionOrUpdate,
TaskContent::DocumentDeletion { .. } => TaskType::DocumentDeletion,
TaskContent::SettingsUpdate { .. } => TaskType::SettingsUpdate,
TaskContent::Dump { .. } => TaskType::DumpCreation,
#[derive(Debug, PartialEq, Eq, Deserialize)]
#[cfg_attr(test, derive(serde::Serialize))]
#[serde(rename_all = "camelCase")]
pub enum TaskStatus {
#[cfg_attr(test, derive(serde::Serialize))]
#[cfg_attr(test, serde(untagged))]
pub enum TaskDetails {
#[cfg_attr(test, serde(rename_all = "camelCase"))]
DocumentAddition { received_documents: usize, indexed_documents: Option<u64> },
#[cfg_attr(test, serde(rename_all = "camelCase"))]
Settings {
#[cfg_attr(test, serde(flatten))]
settings: Settings<Unchecked>,
#[cfg_attr(test, serde(rename_all = "camelCase"))]
IndexInfo { primary_key: Option<String> },
#[cfg_attr(test, serde(rename_all = "camelCase"))]
DocumentDeletion { received_document_ids: usize, deleted_documents: Option<u64> },
#[cfg_attr(test, serde(rename_all = "camelCase"))]
ClearAll { deleted_documents: Option<u64> },
#[cfg_attr(test, serde(rename_all = "camelCase"))]
Dump { dump_uid: String },
/// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for
/// https://github.com/time-rs/time/issues/378.
/// This code is a port of the old code of time that was removed in 0.2.
fn serialize_duration<S: serde::Serializer>(
duration: &Option<Duration>,
serializer: S,
) -> Result<S::Ok, S::Error> {
use std::fmt::Write;
match duration {
Some(duration) => {
// technically speaking, negative duration is not valid ISO 8601
if duration.is_negative() {
return serializer.serialize_none();
const SECS_PER_DAY: i64 = Duration::DAY.whole_seconds();
let secs = duration.whole_seconds();
let days = secs / SECS_PER_DAY;
let secs = secs - days * SECS_PER_DAY;
let hasdate = days != 0;
let nanos = duration.subsec_nanoseconds();
let hastime = (secs != 0 || nanos != 0) || !hasdate;
// all the following unwrap can't fail
let mut res = String::new();
write!(&mut res, "P").unwrap();
if hasdate {
write!(&mut res, "{}D", days).unwrap();
const NANOS_PER_MILLI: i32 = Duration::MILLISECOND.subsec_nanoseconds();
const NANOS_PER_MICRO: i32 = Duration::MICROSECOND.subsec_nanoseconds();
if hastime {
if nanos == 0 {
write!(&mut res, "T{}S", secs).unwrap();
} else if nanos % NANOS_PER_MILLI == 0 {
write!(&mut res, "T{}.{:03}S", secs, nanos / NANOS_PER_MILLI).unwrap();
} else if nanos % NANOS_PER_MICRO == 0 {
write!(&mut res, "T{}.{:06}S", secs, nanos / NANOS_PER_MICRO).unwrap();
} else {
write!(&mut res, "T{}.{:09}S", secs, nanos).unwrap();
None => serializer.serialize_none(),