convert UpdateStatus to legacy meilisearch format

This commit is contained in:
Marin Postma 2021-04-29 19:31:58 +02:00 committed by marin postma
parent d46a2713d2
commit e8bd5ea4e0
No known key found for this signature in database
GPG Key ID: 6088B7721C3E39F9
3 changed files with 231 additions and 6 deletions

View File

@ -84,6 +84,10 @@ impl Processed {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn meta(&self) -> &UpdateMeta {
self.from.meta()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -132,21 +136,29 @@ impl Aborted {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn meta(&self) -> &UpdateMeta {
self.from.meta()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Failed {
#[serde(flatten)]
from: Processing,
error: UpdateError,
failed_at: DateTime<Utc>,
pub from: Processing,
pub error: UpdateError,
pub failed_at: DateTime<Utc>,
}
impl Failed {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn meta(&self) -> &UpdateMeta {
self.from.meta()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -170,6 +182,16 @@ impl UpdateStatus {
}
}
pub fn meta(&self) -> &UpdateMeta {
match self {
UpdateStatus::Processing(u) => u.meta(),
UpdateStatus::Enqueued(u) => u.meta(),
UpdateStatus::Processed(u) => u.meta(),
UpdateStatus::Aborted(u) => u.meta(),
UpdateStatus::Failed(u) => u.meta(),
}
}
pub fn processed(&self) -> Option<&Processed> {
match self {
UpdateStatus::Processed(p) => Some(p),

View File

@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use crate::error::ResponseError;
use crate::helpers::Authentication;
use crate::routes::IndexParam;
use super::{UpdateStatusResponse, IndexParam};
use crate::Data;
pub fn services(cfg: &mut web::ServiceConfig) {
@ -129,7 +129,10 @@ async fn get_update_status(
.get_update_status(params.index_uid, params.update_id)
.await;
match result {
Ok(meta) => Ok(HttpResponse::Ok().json(meta)),
Ok(meta) => {
let meta = UpdateStatusResponse::from(meta);
Ok(HttpResponse::Ok().json(meta))
},
Err(e) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() })))
}
@ -143,7 +146,14 @@ async fn get_all_updates_status(
) -> Result<HttpResponse, ResponseError> {
let result = data.get_updates_status(path.into_inner().index_uid).await;
match result {
Ok(metas) => Ok(HttpResponse::Ok().json(metas)),
Ok(metas) => {
let metas = metas
.into_iter()
.map(UpdateStatusResponse::from)
.collect::<Vec<_>>();
Ok(HttpResponse::Ok().json(metas))
},
Err(e) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() })))
}

View File

@ -1,6 +1,12 @@
use std::time::Duration;
use actix_web::{get, HttpResponse};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::index::Settings;
use crate::index_controller::{UpdateMeta, UpdateResult, UpdateStatus};
pub mod document;
pub mod dump;
pub mod health;
@ -11,6 +17,193 @@ pub mod settings;
pub mod stats;
pub mod synonym;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "name")]
pub enum UpdateType {
ClearAll,
Customs,
DocumentsAddition {
#[serde(skip_serializing_if = "Option::is_none")]
number: Option<usize>
},
DocumentsPartial {
#[serde(skip_serializing_if = "Option::is_none")]
number: Option<usize>
},
DocumentsDeletion {
#[serde(skip_serializing_if = "Option::is_none")]
number: Option<usize>
},
Settings { settings: Settings },
}
impl From<&UpdateStatus> for UpdateType {
fn from(other: &UpdateStatus) -> Self {
use milli::update::IndexDocumentsMethod::*;
match other.meta() {
UpdateMeta::DocumentsAddition { method, .. } => {
let number = match other {
UpdateStatus::Processed(processed) => match processed.success {
UpdateResult::DocumentsAddition(ref addition) => {
Some(addition.nb_documents)
}
_ => None,
},
_ => None,
};
match method {
ReplaceDocuments => UpdateType::DocumentsAddition { number },
UpdateDocuments => UpdateType::DocumentsPartial { number },
_ => unreachable!(),
}
}
UpdateMeta::ClearDocuments => UpdateType::ClearAll,
UpdateMeta::DeleteDocuments => {
let number = match other {
UpdateStatus::Processed(processed) => match processed.success {
UpdateResult::DocumentDeletion { deleted } => Some(deleted as usize),
_ => None,
},
_ => None,
};
UpdateType::DocumentsDeletion { number }
}
UpdateMeta::Settings(settings) => UpdateType::Settings {
settings: settings.clone(),
},
_ => unreachable!(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ProcessedUpdateResult {
pub update_id: u64,
#[serde(rename = "type")]
pub update_type: UpdateType,
pub duration: f64, // in seconds
pub enqueued_at: DateTime<Utc>,
pub processed_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FailedUpdateResult {
pub update_id: u64,
#[serde(rename = "type")]
pub update_type: UpdateType,
pub error: String,
pub error_type: String,
pub error_code: String,
pub error_link: String,
pub duration: f64, // in seconds
pub enqueued_at: DateTime<Utc>,
pub processed_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EnqueuedUpdateResult {
pub update_id: u64,
#[serde(rename = "type")]
pub update_type: UpdateType,
pub enqueued_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub started_processing_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "status")]
pub enum UpdateStatusResponse {
Enqueued {
#[serde(flatten)]
content: EnqueuedUpdateResult,
},
Processing {
#[serde(flatten)]
content: EnqueuedUpdateResult,
},
Failed {
#[serde(flatten)]
content: FailedUpdateResult,
},
Processed {
#[serde(flatten)]
content: ProcessedUpdateResult,
},
}
impl From<UpdateStatus> for UpdateStatusResponse {
fn from(other: UpdateStatus) -> Self {
let update_type = UpdateType::from(&other);
match other {
UpdateStatus::Processing(processing) => {
let content = EnqueuedUpdateResult {
update_id: processing.id(),
update_type,
enqueued_at: processing.from.enqueued_at,
started_processing_at: Some(processing.started_processing_at),
};
UpdateStatusResponse::Processing { content }
}
UpdateStatus::Enqueued(enqueued) => {
let content = EnqueuedUpdateResult {
update_id: enqueued.id(),
update_type,
enqueued_at: enqueued.enqueued_at,
started_processing_at: None,
};
UpdateStatusResponse::Enqueued { content }
}
UpdateStatus::Processed(processed) => {
let duration = processed
.processed_at
.signed_duration_since(processed.from.started_processing_at)
.num_milliseconds();
// necessary since chrono::duration don't expose a f64 secs method.
let duration = Duration::from_millis(duration as u64).as_secs_f64();
let content = ProcessedUpdateResult {
update_id: processed.id(),
update_type,
duration,
enqueued_at: processed.from.from.enqueued_at,
processed_at: processed.processed_at,
};
UpdateStatusResponse::Processed { content }
}
UpdateStatus::Aborted(_) => unreachable!(),
UpdateStatus::Failed(failed) => {
let duration = failed
.failed_at
.signed_duration_since(failed.from.started_processing_at)
.num_milliseconds();
// necessary since chrono::duration don't expose a f64 secs method.
let duration = Duration::from_millis(duration as u64).as_secs_f64();
let content = FailedUpdateResult {
update_id: failed.id(),
update_type,
error: failed.error,
error_type: String::from("todo"),
error_code: String::from("todo"),
error_link: String::from("todo"),
duration,
enqueued_at: failed.from.from.enqueued_at,
processed_at: failed.failed_at,
};
UpdateStatusResponse::Failed { content }
}
}
}
}
#[derive(Deserialize)]
pub struct IndexParam {
index_uid: String,