From 0972587cfccd2fdb4bc2ba99603487dec78d2d4c Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 5 Oct 2022 20:11:07 +0200 Subject: [PATCH] start writting the compat layer between v5 and v6 --- dump/Cargo.toml | 1 + dump/src/lib.rs | 2 +- dump/src/reader/compat/mod.rs | 12 +- dump/src/reader/compat/v5_to_v6.rs | 497 +++++++++++++++++++++++++++++ dump/src/reader/mod.rs | 49 ++- dump/src/reader/v4/mod.rs | 1 + dump/src/reader/v5/mod.rs | 15 +- dump/src/reader/v5/tasks.rs | 320 ++++++++++++++++++- dump/src/reader/v6.rs | 42 ++- index-scheduler/src/lib.rs | 2 +- meilisearch-types/src/error.rs | 6 + 11 files changed, 909 insertions(+), 38 deletions(-) create mode 100644 dump/src/reader/compat/v5_to_v6.rs create mode 100644 dump/src/reader/v4/mod.rs diff --git a/dump/Cargo.toml b/dump/Cargo.toml index 5350ecd8f..8cb8b028d 100644 --- a/dump/Cargo.toml +++ b/dump/Cargo.toml @@ -19,6 +19,7 @@ anyhow = "1.0.65" log = "0.4.17" index-scheduler = { path = "../index-scheduler" } meilisearch-auth = { path = "../meilisearch-auth" } +meilisearch-types = { path = "../meilisearch-types" } [dev-dependencies] big_s = "1.0.2" diff --git a/dump/src/lib.rs b/dump/src/lib.rs index a0b3e8ea2..b9e44b5c3 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -14,7 +14,7 @@ type Result = std::result::Result; #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -struct Metadata { +pub struct Metadata { pub dump_version: Version, pub db_version: String, #[serde(with = "time::serde::rfc3339")] diff --git a/dump/src/reader/compat/mod.rs b/dump/src/reader/compat/mod.rs index 9abac24c7..08fa97cc1 100644 --- a/dump/src/reader/compat/mod.rs +++ b/dump/src/reader/compat/mod.rs @@ -1,6 +1,12 @@ -pub mod v2; -pub mod v3; -pub mod v4; +// pub mod v2; +// pub mod v3; +// pub mod v4; + +pub mod v5_to_v6; + +pub struct Compat { + from: Box, +} /// Parses the v1 version of the Asc ranking rules `asc(price)`and returns the field name. pub fn asc_ranking_rule(text: &str) -> Option<&str> { diff --git a/dump/src/reader/compat/v5_to_v6.rs b/dump/src/reader/compat/v5_to_v6.rs new file mode 100644 index 000000000..8f994a1c7 --- /dev/null +++ b/dump/src/reader/compat/v5_to_v6.rs @@ -0,0 +1,497 @@ +use std::fs::File; + +use crate::reader::{v5, v6, DumpReader, IndexReader}; +use crate::Result; + +use super::Compat; + +impl + Compat< + dyn DumpReader< + Document = v5::Document, + Settings = v5::Settings, + Task = v5::Task, + UpdateFile = v5::UpdateFile, + Key = v5::Key, + >, + > +{ + pub fn new( + v5: Box< + dyn DumpReader< + Document = v5::Document, + Settings = v5::Settings, + Task = v5::Task, + UpdateFile = v5::UpdateFile, + Key = v5::Key, + >, + >, + ) -> Compat< + dyn DumpReader< + Document = v5::Document, + Settings = v5::Settings, + Task = v5::Task, + UpdateFile = v5::UpdateFile, + Key = v5::Key, + >, + > { + Compat { from: v5 } + } +} + +impl DumpReader + for Compat< + dyn DumpReader< + Document = v5::Document, + Settings = v5::Settings, + Task = v5::Task, + UpdateFile = v5::UpdateFile, + Key = v5::Key, + >, + > +{ + type Document = v6::Document; + type Settings = v6::Settings; + + type Task = v6::Task; + type UpdateFile = File; + + type Key = v6::Key; + + fn version(&self) -> crate::Version { + self.from.version() + } + + fn date(&self) -> Option { + self.from.date() + } + + fn instance_uid(&self) -> Result> { + self.from.instance_uid() + } + + fn indexes( + &self, + ) -> Result< + Box< + dyn Iterator< + Item = Result< + Box< + dyn crate::reader::IndexReader< + Document = Self::Document, + Settings = Self::Settings, + > + '_, + >, + >, + > + '_, + >, + > { + Ok(Box::new(self.from.indexes()?.map( + |index_reader| -> Result<_> { + let compat = Box::new(Compat::< + dyn IndexReader>, + >::new(index_reader?)) + as Box< + dyn crate::reader::IndexReader< + Document = Self::Document, + Settings = Self::Settings, + > + '_, + >; + Ok(compat) + }, + ))) + } + + fn tasks( + &mut self, + ) -> Box)>> + '_> { + Box::new(self.from.tasks().map(|task| { + task.map(|(task, content_file)| { + let task_view: v5::TaskView = task.into(); + + let task = v6::Task { + uid: task_view.uid, + index_uid: task_view.index_uid, + status: match task_view.status { + v5::TaskStatus::Enqueued => v6::Status::Enqueued, + v5::TaskStatus::Processing => v6::Status::Enqueued, + v5::TaskStatus::Succeeded => v6::Status::Succeeded, + v5::TaskStatus::Failed => v6::Status::Failed, + }, + kind: match task_view.task_type { + v5::TaskType::IndexCreation => v6::Kind::IndexCreation, + v5::TaskType::IndexUpdate => v6::Kind::IndexUpdate, + v5::TaskType::IndexDeletion => v6::Kind::IndexDeletion, + // TODO: this is a `DocumentAdditionOrUpdate` but we still don't have this type in the `TaskView`. + v5::TaskType::DocumentAdditionOrUpdate => v6::Kind::DocumentAddition, + v5::TaskType::DocumentDeletion => v6::Kind::DocumentDeletion, + v5::TaskType::SettingsUpdate => v6::Kind::Settings, + v5::TaskType::DumpCreation => v6::Kind::DumpExport, + }, + details: task_view.details.map(|details| match details { + v5::TaskDetails::DocumentAddition { + received_documents, + indexed_documents, + } => v6::Details::DocumentAddition { + received_documents: received_documents as u64, + indexed_documents: indexed_documents.map_or(0, |i| i as u64), + }, + v5::TaskDetails::Settings { settings } => v6::Details::Settings { + settings: settings.into(), + }, + v5::TaskDetails::IndexInfo { primary_key } => { + v6::Details::IndexInfo { primary_key } + } + v5::TaskDetails::DocumentDeletion { + received_document_ids, + deleted_documents, + } => v6::Details::DocumentDeletion { + received_document_ids, + deleted_documents, + }, + v5::TaskDetails::ClearAll { deleted_documents } => { + v6::Details::ClearAll { deleted_documents } + } + v5::TaskDetails::Dump { dump_uid } => v6::Details::Dump { dump_uid }, + }), + error: task_view.error.map(|e| e.into()), + duration: task_view.duration, + enqueued_at: task_view.enqueued_at, + started_at: task_view.started_at, + finished_at: task_view.finished_at, + }; + + (task, content_file) + }) + })) + } + + fn keys(&mut self) -> Box> + '_> { + Box::new(self.from.keys().map(|key| { + key.map(|key| v6::Key { + description: key.description, + name: key.name, + uid: key.uid, + actions: key + .actions + .into_iter() + .map(|action| action.into()) + .collect(), + indexes: key + .indexes + .into_iter() + .map(|index| match index { + v5::StarOr::Star => v6::StarOr::Star, + v5::StarOr::Other(uid) => { + v6::StarOr::Other(v6::IndexUid::new_unchecked(uid.as_str())) + } + }) + .collect(), + expires_at: key.expires_at, + created_at: key.created_at, + updated_at: key.updated_at, + }) + })) + } +} + +impl Compat>> { + pub fn new( + v5: Box>>, + ) -> Compat>> + { + Compat { from: v5 } + } +} + +impl IndexReader + for Compat>> +{ + type Document = v6::Document; + type Settings = v6::Settings; + + fn metadata(&self) -> &crate::IndexMetadata { + self.from.metadata() + } + + fn documents(&mut self) -> Result> + '_>> { + self.from.documents() + } + + fn settings(&mut self) -> Result { + Ok(v6::Settings::::from(self.from.settings()?).check()) + } +} + +impl From> for v6::Setting { + fn from(setting: v5::Setting) -> Self { + match setting { + v5::Setting::Set(t) => v6::Setting::Set(t), + v5::Setting::Reset => v6::Setting::Reset, + v5::Setting::NotSet => v6::Setting::NotSet, + } + } +} + +impl From for v6::ResponseError { + fn from(error: v5::ResponseError) -> Self { + let code = match error.error_code.as_ref() { + "CreateIndex" => v6::Code::CreateIndex, + "IndexAlreadyExists" => v6::Code::IndexAlreadyExists, + "IndexNotFound" => v6::Code::IndexNotFound, + "InvalidIndexUid" => v6::Code::InvalidIndexUid, + "InvalidMinWordLengthForTypo" => v6::Code::InvalidMinWordLengthForTypo, + "InvalidState" => v6::Code::InvalidState, + "MissingPrimaryKey" => v6::Code::MissingPrimaryKey, + "PrimaryKeyAlreadyPresent" => v6::Code::PrimaryKeyAlreadyPresent, + "MaxFieldsLimitExceeded" => v6::Code::MaxFieldsLimitExceeded, + "MissingDocumentId" => v6::Code::MissingDocumentId, + "InvalidDocumentId" => v6::Code::InvalidDocumentId, + "Filter" => v6::Code::Filter, + "Sort" => v6::Code::Sort, + "BadParameter" => v6::Code::BadParameter, + "BadRequest" => v6::Code::BadRequest, + "DatabaseSizeLimitReached" => v6::Code::DatabaseSizeLimitReached, + "DocumentNotFound" => v6::Code::DocumentNotFound, + "Internal" => v6::Code::Internal, + "InvalidGeoField" => v6::Code::InvalidGeoField, + "InvalidRankingRule" => v6::Code::InvalidRankingRule, + "InvalidStore" => v6::Code::InvalidStore, + "InvalidToken" => v6::Code::InvalidToken, + "MissingAuthorizationHeader" => v6::Code::MissingAuthorizationHeader, + "NoSpaceLeftOnDevice" => v6::Code::NoSpaceLeftOnDevice, + "DumpNotFound" => v6::Code::DumpNotFound, + "TaskNotFound" => v6::Code::TaskNotFound, + "PayloadTooLarge" => v6::Code::PayloadTooLarge, + "RetrieveDocument" => v6::Code::RetrieveDocument, + "SearchDocuments" => v6::Code::SearchDocuments, + "UnsupportedMediaType" => v6::Code::UnsupportedMediaType, + "DumpAlreadyInProgress" => v6::Code::DumpAlreadyInProgress, + "DumpProcessFailed" => v6::Code::DumpProcessFailed, + "InvalidContentType" => v6::Code::InvalidContentType, + "MissingContentType" => v6::Code::MissingContentType, + "MalformedPayload" => v6::Code::MalformedPayload, + "MissingPayload" => v6::Code::MissingPayload, + "ApiKeyNotFound" => v6::Code::ApiKeyNotFound, + "MissingParameter" => v6::Code::MissingParameter, + "InvalidApiKeyActions" => v6::Code::InvalidApiKeyActions, + "InvalidApiKeyIndexes" => v6::Code::InvalidApiKeyIndexes, + "InvalidApiKeyExpiresAt" => v6::Code::InvalidApiKeyExpiresAt, + "InvalidApiKeyDescription" => v6::Code::InvalidApiKeyDescription, + "InvalidApiKeyName" => v6::Code::InvalidApiKeyName, + "InvalidApiKeyUid" => v6::Code::InvalidApiKeyUid, + "ImmutableField" => v6::Code::ImmutableField, + "ApiKeyAlreadyExists" => v6::Code::ApiKeyAlreadyExists, + other => { + log::warn!("Unknown error code {}", other); + v6::Code::UnretrievableErrorCode + } + }; + v6::ResponseError::from_msg(error.message, code) + } +} + +impl From> for v6::Settings { + fn from(settings: v5::Settings) -> Self { + v6::Settings { + displayed_attributes: settings.displayed_attributes.into(), + searchable_attributes: settings.searchable_attributes.into(), + filterable_attributes: settings.filterable_attributes.into(), + sortable_attributes: settings.sortable_attributes.into(), + ranking_rules: settings.ranking_rules.into(), + stop_words: settings.stop_words.into(), + synonyms: settings.synonyms.into(), + distinct_attribute: settings.distinct_attribute.into(), + typo_tolerance: match settings.typo_tolerance { + v5::Setting::Set(typo) => v6::Setting::Set(v6::TypoTolerance { + enabled: typo.enabled.into(), + min_word_size_for_typos: match typo.min_word_size_for_typos { + v5::Setting::Set(t) => v6::Setting::Set(v6::MinWordSizeForTypos { + one_typo: t.one_typo.into(), + two_typos: t.two_typos.into(), + }), + v5::Setting::Reset => v6::Setting::Reset, + v5::Setting::NotSet => v6::Setting::NotSet, + }, + disable_on_words: typo.disable_on_words.into(), + disable_on_attributes: typo.disable_on_attributes.into(), + }), + v5::Setting::Reset => v6::Setting::Reset, + v5::Setting::NotSet => v6::Setting::NotSet, + }, + faceting: match settings.faceting { + v5::Setting::Set(faceting) => v6::Setting::Set(v6::FacetingSettings { + max_values_per_facet: faceting.max_values_per_facet.into(), + }), + v5::Setting::Reset => v6::Setting::Reset, + v5::Setting::NotSet => v6::Setting::NotSet, + }, + pagination: match settings.pagination { + v5::Setting::Set(pagination) => v6::Setting::Set(v6::PaginationSettings { + max_total_hits: pagination.max_total_hits.into(), + }), + v5::Setting::Reset => v6::Setting::Reset, + v5::Setting::NotSet => v6::Setting::NotSet, + }, + _kind: std::marker::PhantomData, + } + } +} + +impl From for v6::Action { + fn from(key: v5::Action) -> Self { + match key { + v5::Action::All => v6::Action::All, + v5::Action::Search => v6::Action::Search, + v5::Action::DocumentsAll => v6::Action::DocumentsAll, + v5::Action::DocumentsAdd => v6::Action::DocumentsAdd, + v5::Action::DocumentsGet => v6::Action::DocumentsGet, + v5::Action::DocumentsDelete => v6::Action::DocumentsDelete, + v5::Action::IndexesAll => v6::Action::IndexesAll, + v5::Action::IndexesAdd => v6::Action::IndexesAdd, + v5::Action::IndexesGet => v6::Action::IndexesGet, + v5::Action::IndexesUpdate => v6::Action::IndexesUpdate, + v5::Action::IndexesDelete => v6::Action::IndexesDelete, + v5::Action::TasksAll => v6::Action::TasksAll, + v5::Action::TasksGet => v6::Action::TasksGet, + v5::Action::SettingsAll => v6::Action::SettingsAll, + v5::Action::SettingsGet => v6::Action::SettingsGet, + v5::Action::SettingsUpdate => v6::Action::SettingsUpdate, + v5::Action::StatsAll => v6::Action::StatsAll, + v5::Action::StatsGet => v6::Action::StatsGet, + v5::Action::MetricsAll => v6::Action::MetricsAll, + v5::Action::MetricsGet => v6::Action::MetricsGet, + v5::Action::DumpsAll => v6::Action::DumpsAll, + v5::Action::DumpsCreate => v6::Action::DumpsCreate, + v5::Action::Version => v6::Action::Version, + v5::Action::KeysAdd => v6::Action::KeysAdd, + v5::Action::KeysGet => v6::Action::KeysGet, + v5::Action::KeysUpdate => v6::Action::KeysUpdate, + v5::Action::KeysDelete => v6::Action::KeysDelete, + } + } +} + +#[cfg(test)] +pub(crate) mod test { + use std::{fs::File, io::BufReader}; + + use flate2::bufread::GzDecoder; + use tempfile::TempDir; + + use super::*; + + #[test] + fn compat_v5_v6() { + let dump = File::open("tests/assets/v5.dump").unwrap(); + let dir = TempDir::new().unwrap(); + let mut dump = BufReader::new(dump); + let gz = GzDecoder::new(&mut dump); + let mut archive = tar::Archive::new(gz); + archive.unpack(dir.path()).unwrap(); + + let dump = Box::new(v5::V5Reader::open(dir).unwrap()); + let mut dump = Box::new(Compat::< + dyn DumpReader< + Document = v5::Document, + Settings = v5::Settings, + Task = v5::Task, + UpdateFile = v5::UpdateFile, + Key = v5::Key, + >, + >::new(dump)) + as Box< + dyn DumpReader< + Document = v6::Document, + Settings = v6::Settings, + Task = v6::Task, + UpdateFile = v6::UpdateFile, + Key = v6::Key, + >, + >; + + // top level infos + insta::assert_display_snapshot!(dump.date().unwrap(), @"2022-10-04 15:55:10.344982459 +00:00:00"); + insta::assert_display_snapshot!(dump.instance_uid().unwrap().unwrap(), @"9e15e977-f2ae-4761-943f-1eaf75fd736d"); + + // tasks + let tasks = dump.tasks().collect::>>().unwrap(); + let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); + insta::assert_json_snapshot!(tasks); + assert_eq!(update_files.len(), 22); + assert!(update_files[0].is_none()); // the dump creation + assert!(update_files[1].is_some()); // the enqueued document addition + assert!(update_files[2..].iter().all(|u| u.is_none())); // everything already processed + + // keys + let keys = dump.keys().collect::>>().unwrap(); + insta::assert_json_snapshot!(keys); + + // indexes + let mut indexes = dump.indexes().unwrap().collect::>>().unwrap(); + // the index are not ordered in any way by default + indexes.sort_by_key(|index| index.metadata().uid.to_string()); + + let mut products = indexes.pop().unwrap(); + let mut movies = indexes.pop().unwrap(); + let mut spells = indexes.pop().unwrap(); + assert!(indexes.is_empty()); + + // products + insta::assert_json_snapshot!(products.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###" + { + "uid": "products", + "primaryKey": "sku", + "createdAt": "[now]", + "updatedAt": "[now]" + } + "###); + + insta::assert_debug_snapshot!(products.settings()); + let documents = products + .documents() + .unwrap() + .collect::>>() + .unwrap(); + assert_eq!(documents.len(), 10); + insta::assert_json_snapshot!(documents); + + // movies + insta::assert_json_snapshot!(movies.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###" + { + "uid": "movies", + "primaryKey": "id", + "createdAt": "[now]", + "updatedAt": "[now]" + } + "###); + + insta::assert_debug_snapshot!(movies.settings()); + let documents = movies + .documents() + .unwrap() + .collect::>>() + .unwrap(); + assert_eq!(documents.len(), 200); + insta::assert_debug_snapshot!(documents); + + // spells + insta::assert_json_snapshot!(spells.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###" + { + "uid": "dnd_spells", + "primaryKey": "index", + "createdAt": "[now]", + "updatedAt": "[now]" + } + "###); + + insta::assert_debug_snapshot!(spells.settings()); + let documents = spells + .documents() + .unwrap() + .collect::>>() + .unwrap(); + assert_eq!(documents.len(), 10); + insta::assert_json_snapshot!(documents); + } +} diff --git a/dump/src/reader/mod.rs b/dump/src/reader/mod.rs index 57e2fa12d..5a44c2aac 100644 --- a/dump/src/reader/mod.rs +++ b/dump/src/reader/mod.rs @@ -1,27 +1,27 @@ use std::io::Read; -use std::path::Path; use std::{fs::File, io::BufReader}; -use flate2::{bufread::GzDecoder, Compression}; -use index::{Checked, Settings, Unchecked}; +use flate2::bufread::GzDecoder; use index_scheduler::TaskView; use meilisearch_auth::Key; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use tempfile::TempDir; use time::OffsetDateTime; use uuid::Uuid; +use crate::reader::compat::Compat; use crate::{IndexMetadata, Result, Version}; // use self::loaders::{v2, v3, v4, v5}; // pub mod error; -// mod compat; +mod compat; // mod loaders; // mod v1; -mod v5; -mod v6; +pub(self) mod v4; +pub(self) mod v5; +pub(self) mod v6; pub fn open( dump: impl Read, @@ -29,7 +29,7 @@ pub fn open( Box< dyn DumpReader< Document = serde_json::Map, - Settings = Settings, + Settings = v6::Settings, Task = TaskView, UpdateFile = File, Key = Key, @@ -56,16 +56,37 @@ pub fn open( Version::V2 => todo!(), Version::V3 => todo!(), Version::V4 => todo!(), - Version::V5 => todo!(), + Version::V5 => { + let dump_reader = Box::new(v5::V5Reader::open(path)?); + let dump_reader = Box::new(Compat::< + dyn DumpReader< + Document = v5::Document, + Settings = v5::Settings, + Task = v5::Task, + UpdateFile = v5::UpdateFile, + Key = v5::Key, + >, + >::new(dump_reader)) + as Box< + dyn DumpReader< + Document = v6::Document, + Settings = v6::Settings, + Task = v6::Task, + UpdateFile = v6::UpdateFile, + Key = v6::Key, + >, + >; + Ok(dump_reader) + } Version::V6 => { let dump_reader = Box::new(v6::V6Reader::open(path)?) as Box< dyn DumpReader< - Document = serde_json::Map, - Settings = Settings, - Task = TaskView, - UpdateFile = File, - Key = Key, + Document = v6::Document, + Settings = v6::Settings, + Task = v6::Task, + UpdateFile = v6::UpdateFile, + Key = v6::Key, >, >; diff --git a/dump/src/reader/v4/mod.rs b/dump/src/reader/v4/mod.rs new file mode 100644 index 000000000..82ccf67ed --- /dev/null +++ b/dump/src/reader/v4/mod.rs @@ -0,0 +1 @@ +// hello diff --git a/dump/src/reader/v5/mod.rs b/dump/src/reader/v5/mod.rs index bebf7a312..984850ffc 100644 --- a/dump/src/reader/v5/mod.rs +++ b/dump/src/reader/v5/mod.rs @@ -45,13 +45,6 @@ use uuid::Uuid; use crate::{IndexMetadata, Result, Version}; -use self::{ - keys::Key, - meta::{DumpMeta, IndexUuid}, - settings::{Checked, Settings, Unchecked}, - tasks::Task, -}; - use super::{DumpReader, IndexReader}; mod keys; @@ -59,6 +52,14 @@ mod meta; mod settings; mod tasks; +pub use keys::*; +pub use meta::*; +pub use settings::*; +pub use tasks::*; + +pub type Document = serde_json::Map; +pub type UpdateFile = File; + #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct Metadata { diff --git a/dump/src/reader/v5/tasks.rs b/dump/src/reader/v5/tasks.rs index 562412e81..ce245b536 100644 --- a/dump/src/reader/v5/tasks.rs +++ b/dump/src/reader/v5/tasks.rs @@ -1,5 +1,7 @@ -use serde::Deserialize; -use time::OffsetDateTime; +use std::fmt::Write; + +use serde::{Deserialize, Serializer}; +use time::{Duration, OffsetDateTime}; use uuid::Uuid; use super::{ @@ -115,13 +117,13 @@ pub enum TaskResult { #[cfg_attr(test, derive(serde::Serialize))] #[serde(rename_all = "camelCase")] pub struct ResponseError { - message: String, + pub message: String, #[serde(rename = "code")] - error_code: String, + pub error_code: String, #[serde(rename = "type")] - error_type: String, + pub error_type: String, #[serde(rename = "link")] - error_link: String, + pub error_link: String, } impl Task { @@ -178,3 +180,309 @@ impl std::ops::Deref for IndexUid { &self.0 } } + +#[derive(Debug)] +#[cfg_attr(test, derive(serde::Serialize))] +#[cfg_attr(test, serde(rename_all = "camelCase"))] +pub struct TaskView { + pub uid: TaskId, + pub index_uid: Option, + pub status: TaskStatus, + #[cfg_attr(test, serde(rename = "type"))] + pub task_type: TaskType, + #[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))] + pub details: Option, + #[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))] + pub error: Option, + #[cfg_attr(test, serde(serialize_with = "serialize_duration"))] + pub duration: Option, + #[cfg_attr(test, serde(serialize_with = "time::serde::rfc3339::serialize"))] + pub enqueued_at: OffsetDateTime, + #[cfg_attr( + test, + serde(serialize_with = "time::serde::rfc3339::option::serialize") + )] + pub started_at: Option, + #[cfg_attr( + test, + serde(serialize_with = "time::serde::rfc3339::option::serialize") + )] + pub finished_at: Option, +} + +impl From for TaskView { + fn from(task: Task) -> Self { + let index_uid = task.index_uid().map(String::from); + let Task { + id, + content, + events, + } = task; + + let (task_type, mut details) = match content { + TaskContent::DocumentAddition { + documents_count, .. + } => { + let details = TaskDetails::DocumentAddition { + received_documents: documents_count, + indexed_documents: None, + }; + + (TaskType::DocumentAdditionOrUpdate, Some(details)) + } + TaskContent::DocumentDeletion { + deletion: DocumentDeletion::Ids(ids), + .. + } => ( + TaskType::DocumentDeletion, + Some(TaskDetails::DocumentDeletion { + received_document_ids: ids.len(), + deleted_documents: None, + }), + ), + TaskContent::DocumentDeletion { + deletion: DocumentDeletion::Clear, + .. + } => ( + TaskType::DocumentDeletion, + Some(TaskDetails::ClearAll { + deleted_documents: None, + }), + ), + TaskContent::IndexDeletion { .. } => ( + TaskType::IndexDeletion, + Some(TaskDetails::ClearAll { + deleted_documents: None, + }), + ), + TaskContent::SettingsUpdate { settings, .. } => ( + TaskType::SettingsUpdate, + Some(TaskDetails::Settings { settings }), + ), + TaskContent::IndexCreation { primary_key, .. } => ( + TaskType::IndexCreation, + Some(TaskDetails::IndexInfo { primary_key }), + ), + TaskContent::IndexUpdate { primary_key, .. } => ( + TaskType::IndexUpdate, + Some(TaskDetails::IndexInfo { primary_key }), + ), + TaskContent::Dump { uid } => ( + TaskType::DumpCreation, + Some(TaskDetails::Dump { dump_uid: uid }), + ), + }; + + // An event always has at least one event: "Created" + let (status, error, finished_at) = match events.last().unwrap() { + TaskEvent::Created(_) => (TaskStatus::Enqueued, None, None), + TaskEvent::Batched { .. } => (TaskStatus::Enqueued, None, None), + TaskEvent::Processing(_) => (TaskStatus::Processing, None, None), + TaskEvent::Succeeded { timestamp, result } => { + match (result, &mut details) { + ( + TaskResult::DocumentAddition { + indexed_documents: num, + .. + }, + Some(TaskDetails::DocumentAddition { + ref mut indexed_documents, + .. + }), + ) => { + indexed_documents.replace(*num); + } + ( + TaskResult::DocumentDeletion { + deleted_documents: docs, + .. + }, + Some(TaskDetails::DocumentDeletion { + ref mut deleted_documents, + .. + }), + ) => { + deleted_documents.replace(*docs); + } + ( + TaskResult::ClearAll { + deleted_documents: docs, + }, + Some(TaskDetails::ClearAll { + ref mut deleted_documents, + }), + ) => { + deleted_documents.replace(*docs); + } + _ => (), + } + (TaskStatus::Succeeded, None, Some(*timestamp)) + } + TaskEvent::Failed { timestamp, error } => { + match details { + Some(TaskDetails::DocumentDeletion { + ref mut deleted_documents, + .. + }) => { + deleted_documents.replace(0); + } + Some(TaskDetails::ClearAll { + ref mut deleted_documents, + .. + }) => { + deleted_documents.replace(0); + } + Some(TaskDetails::DocumentAddition { + ref mut indexed_documents, + .. + }) => { + indexed_documents.replace(0); + } + _ => (), + } + (TaskStatus::Failed, Some(error.clone()), Some(*timestamp)) + } + }; + + let enqueued_at = match events.first() { + Some(TaskEvent::Created(ts)) => *ts, + _ => unreachable!("A task must always have a creation event."), + }; + + let started_at = events.iter().find_map(|e| match e { + TaskEvent::Processing(ts) => Some(*ts), + _ => None, + }); + + let duration = finished_at.zip(started_at).map(|(tf, ts)| (tf - ts)); + + Self { + uid: id, + index_uid, + status, + task_type, + details, + error, + duration, + enqueued_at, + started_at, + finished_at, + } + } +} + +#[derive(Debug, Deserialize)] +#[cfg_attr(test, derive(serde::Serialize))] +#[serde(rename_all = "camelCase")] +pub enum TaskType { + IndexCreation, + IndexUpdate, + IndexDeletion, + DocumentAdditionOrUpdate, + DocumentDeletion, + SettingsUpdate, + DumpCreation, +} + +impl From for TaskType { + fn from(other: TaskContent) -> Self { + match other { + TaskContent::IndexCreation { .. } => TaskType::IndexCreation, + TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate, + TaskContent::IndexDeletion { .. } => TaskType::IndexDeletion, + TaskContent::DocumentAddition { .. } => TaskType::DocumentAdditionOrUpdate, + TaskContent::DocumentDeletion { .. } => TaskType::DocumentDeletion, + TaskContent::SettingsUpdate { .. } => TaskType::SettingsUpdate, + TaskContent::Dump { .. } => TaskType::DumpCreation, + } + } +} + +#[derive(Debug, Deserialize)] +#[cfg_attr(test, derive(serde::Serialize))] +#[serde(rename_all = "camelCase")] +pub enum TaskStatus { + Enqueued, + Processing, + Succeeded, + Failed, +} + +#[derive(Debug)] +#[cfg_attr(test, derive(serde::Serialize))] +#[cfg_attr(test, serde(untagged))] +#[allow(clippy::large_enum_variant)] +pub enum TaskDetails { + #[cfg_attr(test, serde(rename_all = "camelCase"))] + DocumentAddition { + received_documents: usize, + indexed_documents: Option, + }, + #[cfg_attr(test, serde(rename_all = "camelCase"))] + Settings { + #[cfg_attr(test, serde(flatten))] + settings: Settings, + }, + #[cfg_attr(test, serde(rename_all = "camelCase"))] + IndexInfo { primary_key: Option }, + #[cfg_attr(test, serde(rename_all = "camelCase"))] + DocumentDeletion { + received_document_ids: usize, + deleted_documents: Option, + }, + #[cfg_attr(test, serde(rename_all = "camelCase"))] + ClearAll { deleted_documents: Option }, + #[cfg_attr(test, serde(rename_all = "camelCase"))] + Dump { dump_uid: String }, +} + +/// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for +/// https://github.com/time-rs/time/issues/378. +/// This code is a port of the old code of time that was removed in 0.2. +fn serialize_duration( + duration: &Option, + serializer: S, +) -> Result { + match duration { + Some(duration) => { + // technically speaking, negative duration is not valid ISO 8601 + if duration.is_negative() { + return serializer.serialize_none(); + } + + const SECS_PER_DAY: i64 = Duration::DAY.whole_seconds(); + let secs = duration.whole_seconds(); + let days = secs / SECS_PER_DAY; + let secs = secs - days * SECS_PER_DAY; + let hasdate = days != 0; + let nanos = duration.subsec_nanoseconds(); + let hastime = (secs != 0 || nanos != 0) || !hasdate; + + // all the following unwrap can't fail + let mut res = String::new(); + write!(&mut res, "P").unwrap(); + + if hasdate { + write!(&mut res, "{}D", days).unwrap(); + } + + const NANOS_PER_MILLI: i32 = Duration::MILLISECOND.subsec_nanoseconds(); + const NANOS_PER_MICRO: i32 = Duration::MICROSECOND.subsec_nanoseconds(); + + if hastime { + if nanos == 0 { + write!(&mut res, "T{}S", secs).unwrap(); + } else if nanos % NANOS_PER_MILLI == 0 { + write!(&mut res, "T{}.{:03}S", secs, nanos / NANOS_PER_MILLI).unwrap(); + } else if nanos % NANOS_PER_MICRO == 0 { + write!(&mut res, "T{}.{:06}S", secs, nanos / NANOS_PER_MICRO).unwrap(); + } else { + write!(&mut res, "T{}.{:09}S", secs, nanos).unwrap(); + } + } + + serializer.serialize_str(&res) + } + None => serializer.serialize_none(), + } +} diff --git a/dump/src/reader/v6.rs b/dump/src/reader/v6.rs index 57cd8d523..d031cfec7 100644 --- a/dump/src/reader/v6.rs +++ b/dump/src/reader/v6.rs @@ -5,7 +5,6 @@ use std::{ str::FromStr, }; -use index::{Checked, Unchecked}; use tempfile::TempDir; use time::OffsetDateTime; use uuid::Uuid; @@ -14,7 +13,38 @@ use crate::{Error, IndexMetadata, Result, Version}; use super::{DumpReader, IndexReader}; -type Metadata = crate::Metadata; +pub type Metadata = crate::Metadata; + +pub type Document = serde_json::Map; +pub type Settings = index::Settings; +pub type Checked = index::Checked; +pub type Unchecked = index::Unchecked; + +pub type Task = index_scheduler::TaskView; +pub type UpdateFile = File; +pub type Key = meilisearch_auth::Key; + +// ===== Other types to clarify the code of the compat module +// everything related to the tasks +pub type Status = index_scheduler::Status; +pub type Kind = index_scheduler::Kind; +pub type Details = index_scheduler::Details; + +// everything related to the settings +pub type Setting = index::Setting; +pub type TypoTolerance = index::updates::TypoSettings; +pub type MinWordSizeForTypos = index::updates::MinWordSizeTyposSetting; +pub type FacetingSettings = index::updates::FacetingSettings; +pub type PaginationSettings = index::updates::PaginationSettings; + +// everything related to the api keys +pub type Action = meilisearch_auth::Action; +pub type StarOr = meilisearch_types::star_or::StarOr; +pub type IndexUid = meilisearch_types::index_uid::IndexUid; + +// everything related to the errors +pub type ResponseError = meilisearch_types::error::ResponseError; +pub type Code = meilisearch_types::error::Code; pub struct V6Reader { dump: TempDir, @@ -62,12 +92,12 @@ impl V6IndexReader { impl DumpReader for V6Reader { type Document = serde_json::Map; - type Settings = index::Settings; + type Settings = Settings; - type Task = index_scheduler::TaskView; + type Task = Task; type UpdateFile = File; - type Key = meilisearch_auth::Key; + type Key = Key; fn version(&self) -> Version { Version::V6 @@ -161,7 +191,7 @@ impl DumpReader for V6Reader { impl IndexReader for V6IndexReader { type Document = serde_json::Map; - type Settings = index::Settings; + type Settings = Settings; fn metadata(&self) -> &IndexMetadata { &self.metadata diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 7f1ba3d5b..7ac84b880 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -11,7 +11,7 @@ pub type Result = std::result::Result; pub type TaskId = u32; pub use error::Error; -pub use task::{Kind, KindWithContent, Status, TaskView}; +pub use task::{Details, Kind, KindWithContent, Status, TaskView}; use std::path::PathBuf; use std::sync::{Arc, RwLock}; diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index 725fc8360..29705cee0 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -157,6 +157,8 @@ pub enum Code { DumpAlreadyInProgress, DumpProcessFailed, + // Only used when importing a dump + UnretrievableErrorCode, InvalidContentType, MissingContentType, @@ -266,6 +268,10 @@ impl Code { ErrCode::invalid("invalid_content_type", StatusCode::UNSUPPORTED_MEDIA_TYPE) } MissingPayload => ErrCode::invalid("missing_payload", StatusCode::BAD_REQUEST), + // This one can only happen when importing a dump and encountering an unknown code in the task queue. + UnretrievableErrorCode => { + ErrCode::invalid("unretrievable_error_code", StatusCode::BAD_REQUEST) + } // error related to keys ApiKeyNotFound => ErrCode::invalid("api_key_not_found", StatusCode::NOT_FOUND),