From 1702b5cf44192764625725692cb93b337c4779fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 8 May 2024 15:26:21 +0200 Subject: [PATCH] Prepare for processing documents edition --- dump/src/lib.rs | 1 + index-scheduler/src/autobatcher.rs | 9 ++++++- index-scheduler/src/batch.rs | 29 +++++++++++++++++++++- index-scheduler/src/insta_snapshot.rs | 6 +++++ index-scheduler/src/utils.rs | 22 ++++++++++++++++- meilisearch-types/src/task_view.rs | 9 +++++++ meilisearch-types/src/tasks.rs | 35 ++++++++++++++++++++++++++- 7 files changed, 107 insertions(+), 4 deletions(-) diff --git a/dump/src/lib.rs b/dump/src/lib.rs index 42cb0e444..4c511b28e 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -166,6 +166,7 @@ impl From for KindDump { documents_count, allow_index_creation, }, + KindWithContent::DocumentEdition { .. } => todo!(), KindWithContent::DocumentDeletion { documents_ids, .. } => { KindDump::DocumentDeletion { documents_ids } } diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index dc184947c..96201bebb 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -24,6 +24,7 @@ enum AutobatchKind { allow_index_creation: bool, primary_key: Option, }, + DocumentEdition, DocumentDeletion, DocumentDeletionByFilter, DocumentClear, @@ -63,6 +64,7 @@ impl From for AutobatchKind { primary_key, .. } => AutobatchKind::DocumentImport { method, allow_index_creation, primary_key }, + KindWithContent::DocumentEdition { .. } => AutobatchKind::DocumentEdition, KindWithContent::DocumentDeletion { .. } => AutobatchKind::DocumentDeletion, KindWithContent::DocumentClear { .. } => AutobatchKind::DocumentClear, KindWithContent::DocumentDeletionByFilter { .. } => { @@ -98,6 +100,9 @@ pub enum BatchKind { primary_key: Option, operation_ids: Vec, }, + DocumentEdition { + id: TaskId, + }, DocumentDeletion { deletion_ids: Vec, }, @@ -199,6 +204,7 @@ impl BatchKind { }), allow_index_creation, ), + K::DocumentEdition => (Break(BatchKind::DocumentEdition { id: task_id }), false), K::DocumentDeletion => { (Continue(BatchKind::DocumentDeletion { deletion_ids: vec![task_id] }), false) } @@ -222,7 +228,7 @@ impl BatchKind { match (self, kind) { // We don't batch any of these operations - (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentDeletionByFilter) => Break(this), + (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition | K::DocumentDeletionByFilter) => Break(this), // We must not batch tasks that don't have the same index creation rights if the index doesn't already exists. (this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => { Break(this) @@ -519,6 +525,7 @@ impl BatchKind { | BatchKind::IndexDeletion { .. } | BatchKind::IndexUpdate { .. } | BatchKind::IndexSwap { .. } + | BatchKind::DocumentEdition { .. } | BatchKind::DocumentDeletionByFilter { .. }, _, ) => { diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index cd5525eea..d996881c2 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -106,6 +106,10 @@ pub(crate) enum IndexOperation { operations: Vec, tasks: Vec, }, + DocumentEdition { + index_uid: String, + task: Task, + }, IndexDocumentDeletionByFilter { index_uid: String, task: Task, @@ -164,7 +168,8 @@ impl Batch { | IndexOperation::DocumentClear { tasks, .. } => { RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid)) } - IndexOperation::IndexDocumentDeletionByFilter { task, .. } => { + IndexOperation::DocumentEdition { task, .. } + | IndexOperation::IndexDocumentDeletionByFilter { task, .. } => { RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap() } IndexOperation::SettingsAndDocumentOperation { @@ -228,6 +233,7 @@ impl IndexOperation { pub fn index_uid(&self) -> &str { match self { IndexOperation::DocumentOperation { index_uid, .. } + | IndexOperation::DocumentEdition { index_uid, .. } | IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. } | IndexOperation::DocumentClear { index_uid, .. } | IndexOperation::Settings { index_uid, .. } @@ -243,6 +249,9 @@ impl fmt::Display for IndexOperation { IndexOperation::DocumentOperation { .. } => { f.write_str("IndexOperation::DocumentOperation") } + IndexOperation::DocumentEdition { .. } => { + f.write_str("IndexOperation::DocumentEdition") + } IndexOperation::IndexDocumentDeletionByFilter { .. } => { f.write_str("IndexOperation::IndexDocumentDeletionByFilter") } @@ -295,6 +304,21 @@ impl IndexScheduler { _ => unreachable!(), } } + BatchKind::DocumentEdition { id } => { + let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; + match &task.kind { + KindWithContent::DocumentEdition { index_uid, .. } => { + Ok(Some(Batch::IndexOperation { + op: IndexOperation::DocumentEdition { + index_uid: index_uid.clone(), + task, + }, + must_create_index: false, + })) + } + _ => unreachable!(), + } + } BatchKind::DocumentOperation { method, operation_ids, .. } => { let tasks = self.get_existing_tasks(rtxn, operation_ids)?; let primary_key = tasks @@ -1386,6 +1410,9 @@ impl IndexScheduler { Ok(tasks) } + IndexOperation::DocumentEdition { .. } => { + todo!() + } IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => { let filter = if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } = diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index d8625a2c7..915f1b5dd 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -177,6 +177,12 @@ fn snapshot_details(d: &Details) -> String { } => { format!("{{ received_documents: {received_documents}, indexed_documents: {indexed_documents:?} }}") } + Details::DocumentEdition { + edited_documents, + edition_code, + } => { + format!("{{ edited_documents: {edited_documents:?}, edition_code: {edition_code:?} }}") + } Details::SettingsUpdate { settings } => { format!("{{ settings: {settings:?} }}") } diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index 260ff6ee4..788a70fb8 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -238,6 +238,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) { let mut index_uids = vec![]; match &mut task.kind { K::DocumentAdditionOrUpdate { index_uid, .. } => index_uids.push(index_uid), + K::DocumentEdition { index_uid, .. } => index_uids.push(index_uid), K::DocumentDeletion { index_uid, .. } => index_uids.push(index_uid), K::DocumentDeletionByFilter { index_uid, .. } => index_uids.push(index_uid), K::DocumentClear { index_uid } => index_uids.push(index_uid), @@ -408,7 +409,26 @@ impl IndexScheduler { match status { Status::Succeeded => assert!(indexed_documents <= received_documents), Status::Failed | Status::Canceled => assert_eq!(indexed_documents, 0), - status => panic!("DocumentAddition can't have an indexed_document set if it's {}", status), + status => panic!("DocumentAddition can't have an indexed_documents set if it's {}", status), + } + } + None => { + assert!(matches!(status, Status::Enqueued | Status::Processing)) + } + } + } + Details::DocumentEdition { edited_documents, .. } => { + assert_eq!(kind.as_kind(), Kind::DocumentEdition); + match edited_documents { + Some(edited_documents) => { + assert!(matches!( + status, + Status::Succeeded | Status::Failed | Status::Canceled + )); + match status { + Status::Succeeded => (), + Status::Failed | Status::Canceled => assert_eq!(edited_documents, 0), + status => panic!("DocumentEdition can't have an edited_documents set if it's {}", status), } } None => { diff --git a/meilisearch-types/src/task_view.rs b/meilisearch-types/src/task_view.rs index 659427c9d..f09ed5f45 100644 --- a/meilisearch-types/src/task_view.rs +++ b/meilisearch-types/src/task_view.rs @@ -54,6 +54,8 @@ pub struct DetailsView { #[serde(skip_serializing_if = "Option::is_none")] pub indexed_documents: Option>, #[serde(skip_serializing_if = "Option::is_none")] + pub edited_documents: Option>, + #[serde(skip_serializing_if = "Option::is_none")] pub primary_key: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub provided_ids: Option, @@ -70,6 +72,8 @@ pub struct DetailsView { #[serde(skip_serializing_if = "Option::is_none")] pub dump_uid: Option>, #[serde(skip_serializing_if = "Option::is_none")] + pub edition_code: Option, + #[serde(skip_serializing_if = "Option::is_none")] #[serde(flatten)] pub settings: Option>>, #[serde(skip_serializing_if = "Option::is_none")] @@ -86,6 +90,11 @@ impl From
for DetailsView { ..DetailsView::default() } } + Details::DocumentEdition { edited_documents, edition_code } => DetailsView { + edited_documents: Some(edited_documents), + edition_code: Some(edition_code), + ..DetailsView::default() + }, Details::SettingsUpdate { mut settings } => { settings.hide_secrets(); DetailsView { settings: Some(settings), ..DetailsView::default() } diff --git a/meilisearch-types/src/tasks.rs b/meilisearch-types/src/tasks.rs index 693ee4242..e722d15da 100644 --- a/meilisearch-types/src/tasks.rs +++ b/meilisearch-types/src/tasks.rs @@ -48,6 +48,7 @@ impl Task { | TaskDeletion { .. } | IndexSwap { .. } => None, DocumentAdditionOrUpdate { index_uid, .. } + | DocumentEdition { index_uid, .. } | DocumentDeletion { index_uid, .. } | DocumentDeletionByFilter { index_uid, .. } | DocumentClear { index_uid } @@ -67,7 +68,8 @@ impl Task { pub fn content_uuid(&self) -> Option { match self.kind { KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => Some(content_file), - KindWithContent::DocumentDeletion { .. } + KindWithContent::DocumentEdition { .. } + | KindWithContent::DocumentDeletion { .. } | KindWithContent::DocumentDeletionByFilter { .. } | KindWithContent::DocumentClear { .. } | KindWithContent::SettingsUpdate { .. } @@ -94,6 +96,10 @@ pub enum KindWithContent { documents_count: u64, allow_index_creation: bool, }, + DocumentEdition { + index_uid: String, + edition_code: String, + }, DocumentDeletion { index_uid: String, documents_ids: Vec, @@ -150,6 +156,7 @@ impl KindWithContent { pub fn as_kind(&self) -> Kind { match self { KindWithContent::DocumentAdditionOrUpdate { .. } => Kind::DocumentAdditionOrUpdate, + KindWithContent::DocumentEdition { .. } => Kind::DocumentEdition, KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion, KindWithContent::DocumentDeletionByFilter { .. } => Kind::DocumentDeletion, KindWithContent::DocumentClear { .. } => Kind::DocumentDeletion, @@ -174,6 +181,7 @@ impl KindWithContent { | TaskCancelation { .. } | TaskDeletion { .. } => vec![], DocumentAdditionOrUpdate { index_uid, .. } + | DocumentEdition { index_uid, .. } | DocumentDeletion { index_uid, .. } | DocumentDeletionByFilter { index_uid, .. } | DocumentClear { index_uid } @@ -202,6 +210,12 @@ impl KindWithContent { indexed_documents: None, }) } + KindWithContent::DocumentEdition { edition_code, .. } => { + Some(Details::DocumentEdition { + edited_documents: None, + edition_code: edition_code.clone(), + }) + } KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => { Some(Details::DocumentDeletion { provided_ids: documents_ids.len(), @@ -250,6 +264,12 @@ impl KindWithContent { indexed_documents: Some(0), }) } + KindWithContent::DocumentEdition { edition_code, .. } => { + Some(Details::DocumentEdition { + edited_documents: Some(0), + edition_code: edition_code.clone(), + }) + } KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => { Some(Details::DocumentDeletion { provided_ids: documents_ids.len(), @@ -301,6 +321,12 @@ impl From<&KindWithContent> for Option
{ indexed_documents: None, }) } + KindWithContent::DocumentEdition { edition_code, .. } => { + Some(Details::DocumentEdition { + edited_documents: None, + edition_code: edition_code.clone(), + }) + } KindWithContent::DocumentDeletion { .. } => None, KindWithContent::DocumentDeletionByFilter { .. } => None, KindWithContent::DocumentClear { .. } => None, @@ -394,6 +420,7 @@ impl std::error::Error for ParseTaskStatusError {} #[serde(rename_all = "camelCase")] pub enum Kind { DocumentAdditionOrUpdate, + DocumentEdition, DocumentDeletion, SettingsUpdate, IndexCreation, @@ -410,6 +437,7 @@ impl Kind { pub fn related_to_one_index(&self) -> bool { match self { Kind::DocumentAdditionOrUpdate + | Kind::DocumentEdition | Kind::DocumentDeletion | Kind::SettingsUpdate | Kind::IndexCreation @@ -427,6 +455,7 @@ impl Display for Kind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Kind::DocumentAdditionOrUpdate => write!(f, "documentAdditionOrUpdate"), + Kind::DocumentEdition => write!(f, "documentEdition"), Kind::DocumentDeletion => write!(f, "documentDeletion"), Kind::SettingsUpdate => write!(f, "settingsUpdate"), Kind::IndexCreation => write!(f, "indexCreation"), @@ -454,6 +483,8 @@ impl FromStr for Kind { Ok(Kind::IndexDeletion) } else if kind.eq_ignore_ascii_case("documentAdditionOrUpdate") { Ok(Kind::DocumentAdditionOrUpdate) + } else if kind.eq_ignore_ascii_case("documentEdition") { + Ok(Kind::DocumentEdition) } else if kind.eq_ignore_ascii_case("documentDeletion") { Ok(Kind::DocumentDeletion) } else if kind.eq_ignore_ascii_case("settingsUpdate") { @@ -496,6 +527,7 @@ impl std::error::Error for ParseTaskKindError {} #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub enum Details { DocumentAdditionOrUpdate { received_documents: u64, indexed_documents: Option }, + DocumentEdition { edited_documents: Option, edition_code: String }, SettingsUpdate { settings: Box> }, IndexInfo { primary_key: Option }, DocumentDeletion { provided_ids: usize, deleted_documents: Option }, @@ -514,6 +546,7 @@ impl Details { Self::DocumentAdditionOrUpdate { indexed_documents, .. } => { *indexed_documents = Some(0) } + Self::DocumentEdition { edited_documents, .. } => *edited_documents = Some(0), Self::DocumentDeletion { deleted_documents, .. } => *deleted_documents = Some(0), Self::DocumentDeletionByFilter { deleted_documents, .. } => { *deleted_documents = Some(0)