2446: rename Succeded to Succeeded r=irevoire a=MarinPostma

this pr renames `TaskEvent::Succeded` to `TaskEvent::Succeeded` and apply the migration to the dumps


Co-authored-by: ad hoc <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2022-05-31 18:27:02 +00:00 committed by GitHub
commit 47007fa71b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 111 additions and 17 deletions

View File

@ -46,7 +46,7 @@ fn task_status_matches_events(status: &TaskStatus, events: &[TaskEvent]) -> bool
matches!((status, event), matches!((status, event),
(TaskStatus::Enqueued, TaskEvent::Created(_)) (TaskStatus::Enqueued, TaskEvent::Created(_))
| (TaskStatus::Processing, TaskEvent::Processing(_) | TaskEvent::Batched { .. }) | (TaskStatus::Processing, TaskEvent::Processing(_) | TaskEvent::Batched { .. })
| (TaskStatus::Succeeded, TaskEvent::Succeded { .. }) | (TaskStatus::Succeeded, TaskEvent::Succeeded { .. })
| (TaskStatus::Failed, TaskEvent::Failed { .. }), | (TaskStatus::Failed, TaskEvent::Failed { .. }),
) )
}) })

View File

@ -263,7 +263,7 @@ impl From<Task> for TaskView {
TaskEvent::Created(_) => (TaskStatus::Enqueued, None, None), TaskEvent::Created(_) => (TaskStatus::Enqueued, None, None),
TaskEvent::Batched { .. } => (TaskStatus::Enqueued, None, None), TaskEvent::Batched { .. } => (TaskStatus::Enqueued, None, None),
TaskEvent::Processing(_) => (TaskStatus::Processing, None, None), TaskEvent::Processing(_) => (TaskStatus::Processing, None, None),
TaskEvent::Succeded { timestamp, result } => { TaskEvent::Succeeded { timestamp, result } => {
match (result, &mut details) { match (result, &mut details) {
( (
TaskResult::DocumentAddition { TaskResult::DocumentAddition {

View File

@ -496,7 +496,6 @@ async fn search_facet_distribution() {
|response, code| { |response, code| {
assert_eq!(code, 200, "{}", response); assert_eq!(code, 200, "{}", response);
let dist = response["facetDistribution"].as_object().unwrap(); let dist = response["facetDistribution"].as_object().unwrap();
dbg!(&dist);
assert_eq!(dist.len(), 3); assert_eq!(dist.len(), 3);
assert_eq!( assert_eq!(
dist["doggos.name"], dist["doggos.name"],

View File

@ -4,9 +4,10 @@ use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
use super::v4::{Task, TaskEvent};
use crate::index::{Settings, Unchecked}; use crate::index::{Settings, Unchecked};
use crate::index_resolver::IndexUid; use crate::index_resolver::IndexUid;
use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskEvent, TaskId, TaskResult}; use crate::tasks::task::{DocumentDeletion, TaskContent, TaskId, TaskResult};
use super::v2; use super::v2;
@ -187,7 +188,7 @@ impl From<(UpdateStatus, String, TaskId)> for Task {
// Dummy task // Dummy task
let mut task = Task { let mut task = Task {
id: task_id, id: task_id,
index_uid: Some(IndexUid::new(uid).unwrap()), index_uid: IndexUid::new(uid).unwrap(),
content: TaskContent::IndexDeletion, content: TaskContent::IndexDeletion,
events: Vec::new(), events: Vec::new(),
}; };

View File

@ -1 +1,67 @@
use meilisearch_error::ResponseError;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use crate::tasks::batch::BatchId;
use crate::tasks::task::{TaskContent, TaskEvent as NewTaskEvent, TaskId, TaskResult};
use crate::IndexUid;
#[derive(Debug, Serialize, Deserialize)]
pub struct Task {
pub id: TaskId,
pub index_uid: IndexUid,
pub content: TaskContent,
pub events: Vec<TaskEvent>,
}
impl From<Task> for crate::tasks::task::Task {
fn from(other: Task) -> Self {
Self {
id: other.id,
index_uid: Some(other.index_uid),
content: other.content,
events: other.events.into_iter().map(Into::into).collect(),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum TaskEvent {
Created(#[serde(with = "time::serde::rfc3339")] OffsetDateTime),
Batched {
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
batch_id: BatchId,
},
Processing(#[serde(with = "time::serde::rfc3339")] OffsetDateTime),
Succeded {
result: TaskResult,
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
},
Failed {
error: ResponseError,
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
},
}
impl From<TaskEvent> for NewTaskEvent {
fn from(other: TaskEvent) -> Self {
match other {
TaskEvent::Created(x) => NewTaskEvent::Created(x),
TaskEvent::Batched {
timestamp,
batch_id,
} => NewTaskEvent::Batched {
timestamp,
batch_id,
},
TaskEvent::Processing(x) => NewTaskEvent::Processing(x),
TaskEvent::Succeded { result, timestamp } => {
NewTaskEvent::Succeeded { result, timestamp }
}
TaskEvent::Failed { error, timestamp } => NewTaskEvent::Failed { error, timestamp },
}
}
}

View File

@ -9,11 +9,11 @@ use log::info;
use tempfile::tempdir; use tempfile::tempdir;
use uuid::Uuid; use uuid::Uuid;
use crate::dump::compat::v3; use crate::dump::compat::{self, v3};
use crate::dump::Metadata; use crate::dump::Metadata;
use crate::index_resolver::meta_store::{DumpEntry, IndexMeta}; use crate::index_resolver::meta_store::{DumpEntry, IndexMeta};
use crate::options::IndexerOpts; use crate::options::IndexerOpts;
use crate::tasks::task::{Task, TaskId}; use crate::tasks::task::TaskId;
/// dump structure for V3: /// dump structure for V3:
/// . /// .
@ -124,7 +124,7 @@ fn patch_updates(
.clone(); .clone();
serde_json::to_writer( serde_json::to_writer(
&mut dst_file, &mut dst_file,
&Task::from((entry.update, name, task_id as TaskId)), &compat::v4::Task::from((entry.update, name, task_id as TaskId)),
)?; )?;
dst_file.write_all(b"\n")?; dst_file.write_all(b"\n")?;
Ok(()) Ok(())

View File

@ -1,12 +1,14 @@
use std::fs; use std::fs::{self, create_dir_all, File};
use std::io::Write;
use std::path::Path; use std::path::Path;
use fs_extra::dir::{self, CopyOptions}; use fs_extra::dir::{self, CopyOptions};
use log::info; use log::info;
use tempfile::tempdir; use tempfile::tempdir;
use crate::dump::Metadata; use crate::dump::{compat, Metadata};
use crate::options::IndexerOpts; use crate::options::IndexerOpts;
use crate::tasks::task::Task;
pub fn load_dump( pub fn load_dump(
meta: Metadata, meta: Metadata,
@ -38,7 +40,7 @@ pub fn load_dump(
)?; )?;
// Updates // Updates
dir::copy(src.as_ref().join("updates"), patched_dir.path(), &options)?; patch_updates(&src, &patched_dir)?;
// Keys // Keys
if src.as_ref().join("keys").exists() { if src.as_ref().join("keys").exists() {
@ -54,3 +56,26 @@ pub fn load_dump(
indexing_options, indexing_options,
) )
} }
fn patch_updates(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
let updates_path = src.as_ref().join("updates/data.jsonl");
let output_updates_path = dst.as_ref().join("updates/data.jsonl");
create_dir_all(output_updates_path.parent().unwrap())?;
let udpates_file = File::open(updates_path)?;
let mut output_update_file = File::create(output_updates_path)?;
serde_json::Deserializer::from_reader(udpates_file)
.into_iter::<compat::v4::Task>()
.try_for_each(|task| -> anyhow::Result<()> {
let task: Task = task?.into();
serde_json::to_writer(&mut output_update_file, &task)?;
output_update_file.write_all(b"\n")?;
Ok(())
})?;
output_update_file.flush()?;
Ok(())
}

View File

@ -200,7 +200,7 @@ where
.await; .await;
let event = match result { let event = match result {
Ok(Ok(result)) => TaskEvent::Succeded { Ok(Ok(result)) => TaskEvent::Succeeded {
timestamp: OffsetDateTime::now_utc(), timestamp: OffsetDateTime::now_utc(),
result: TaskResult::DocumentAddition { result: TaskResult::DocumentAddition {
indexed_documents: result.indexed_documents, indexed_documents: result.indexed_documents,
@ -594,7 +594,7 @@ mod test {
{ {
assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Failed { .. }), "{:?}", result); assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Failed { .. }), "{:?}", result);
} else { } else {
assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Succeded { .. }), "{:?}", result); assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Succeeded { .. }), "{:?}", result);
} }
} }
}); });

View File

@ -91,7 +91,7 @@ mod test {
if accept { if accept {
let batch = dump_handler.process_batch(batch).await; let batch = dump_handler.process_batch(batch).await;
let last_event = batch.content.first().unwrap().events.last().unwrap(); let last_event = batch.content.first().unwrap().events.last().unwrap();
assert!(matches!(last_event, TaskEvent::Succeded { .. })); assert!(matches!(last_event, TaskEvent::Succeeded { .. }));
} }
}); });

View File

@ -48,7 +48,7 @@ pub enum TaskEvent {
#[serde(with = "time::serde::rfc3339")] #[serde(with = "time::serde::rfc3339")]
OffsetDateTime, OffsetDateTime,
), ),
Succeded { Succeeded {
result: TaskResult, result: TaskResult,
#[cfg_attr(test, proptest(strategy = "test::datetime_strategy()"))] #[cfg_attr(test, proptest(strategy = "test::datetime_strategy()"))]
#[serde(with = "time::serde::rfc3339")] #[serde(with = "time::serde::rfc3339")]
@ -64,7 +64,7 @@ pub enum TaskEvent {
impl TaskEvent { impl TaskEvent {
pub fn succeeded(result: TaskResult) -> Self { pub fn succeeded(result: TaskResult) -> Self {
Self::Succeded { Self::Succeeded {
result, result,
timestamp: OffsetDateTime::now_utc(), timestamp: OffsetDateTime::now_utc(),
} }
@ -106,7 +106,10 @@ impl Task {
/// A task is finished when its last state is either `Succeeded` or `Failed`. /// A task is finished when its last state is either `Succeeded` or `Failed`.
pub fn is_finished(&self) -> bool { pub fn is_finished(&self) -> bool {
self.events.last().map_or(false, |event| { self.events.last().map_or(false, |event| {
matches!(event, TaskEvent::Succeded { .. } | TaskEvent::Failed { .. }) matches!(
event,
TaskEvent::Succeeded { .. } | TaskEvent::Failed { .. }
)
}) })
} }