From 767cb725a5a354b18b47963ed3e3b9994f38ca81 Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 23 Jan 2023 20:16:16 +0100 Subject: [PATCH] reimplement the batching of task with or without primary key in the autobatcher --- index-scheduler/src/autobatcher.rs | 256 ++++++++++++------ index-scheduler/src/batch.rs | 31 ++- index-scheduler/src/lib.rs | 16 +- .../second_and_third_tasks_fails.snap | 11 +- ...eds.snap => all_other_tasks_succeeds.snap} | 6 +- .../fourth_and_fifth_tasks_succeeds.snap | 57 ---- ...eds.snap => all_other_tasks_succeeds.snap} | 6 +- .../fourth_and_fifth_tasks_succeeds.snap | 57 ---- 8 files changed, 206 insertions(+), 234 deletions(-) rename index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key/{sixth_task_succeeds.snap => all_other_tasks_succeeds.snap} (98%) delete mode 100644 index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key/fourth_and_fifth_tasks_succeeds.snap rename index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/{sixth_task_succeeds.snap => all_other_tasks_succeeds.snap} (97%) delete mode 100644 index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/fourth_and_fifth_tasks_succeeds.snap diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index d1ed691c6..28ae886ad 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -19,10 +19,16 @@ use crate::KindWithContent; /// /// Only the non-prioritised tasks that can be grouped in a batch have a corresponding [`AutobatchKind`] enum AutobatchKind { - DocumentImport { method: IndexDocumentsMethod, allow_index_creation: bool }, + DocumentImport { + method: IndexDocumentsMethod, + allow_index_creation: bool, + primary_key: Option, + }, DocumentDeletion, DocumentClear, - Settings { allow_index_creation: bool }, + Settings { + allow_index_creation: bool, + }, IndexCreation, IndexDeletion, IndexUpdate, @@ -38,14 +44,24 @@ impl AutobatchKind { _ => None, } } + + fn primary_key(&self) -> Option> { + match self { + AutobatchKind::DocumentImport { primary_key, .. } => Some(primary_key.as_deref()), + _ => None, + } + } } impl From for AutobatchKind { fn from(kind: KindWithContent) -> Self { match kind { - KindWithContent::DocumentAdditionOrUpdate { method, allow_index_creation, .. } => { - AutobatchKind::DocumentImport { method, allow_index_creation } - } + KindWithContent::DocumentAdditionOrUpdate { + method, + allow_index_creation, + primary_key, + .. + } => AutobatchKind::DocumentImport { method, allow_index_creation, primary_key }, KindWithContent::DocumentDeletion { .. } => AutobatchKind::DocumentDeletion, KindWithContent::DocumentClear { .. } => AutobatchKind::DocumentClear, KindWithContent::SettingsUpdate { allow_index_creation, is_deletion, .. } => { @@ -75,6 +91,7 @@ pub enum BatchKind { DocumentImport { method: IndexDocumentsMethod, allow_index_creation: bool, + primary_key: Option, import_ids: Vec, }, DocumentDeletion { @@ -89,6 +106,7 @@ pub enum BatchKind { settings_ids: Vec, method: IndexDocumentsMethod, allow_index_creation: bool, + primary_key: Option, import_ids: Vec, }, Settings { @@ -120,6 +138,16 @@ impl BatchKind { _ => None, } } + + fn primary_key(&self) -> Option> { + match self { + BatchKind::DocumentImport { primary_key, .. } + | BatchKind::SettingsAndDocumentImport { primary_key, .. } => { + Some(primary_key.as_deref()) + } + _ => None, + } + } } impl BatchKind { @@ -131,6 +159,7 @@ impl BatchKind { pub fn new( task_id: TaskId, kind: KindWithContent, + primary_key: Option<&str>, ) -> (ControlFlow, bool) { use AutobatchKind as K; @@ -140,10 +169,28 @@ impl BatchKind { K::IndexUpdate => (Break(BatchKind::IndexUpdate { id: task_id }), false), K::IndexSwap => (Break(BatchKind::IndexSwap { id: task_id }), false), K::DocumentClear => (Continue(BatchKind::DocumentClear { ids: vec![task_id] }), false), - K::DocumentImport { method, allow_index_creation } => ( - Continue(BatchKind::DocumentImport { + K::DocumentImport { method, allow_index_creation, primary_key: pk } + if primary_key.is_none() || pk.is_none() || primary_key == pk.as_deref() => + { + ( + Continue(BatchKind::DocumentImport { + method, + allow_index_creation, + primary_key: pk, + import_ids: vec![task_id], + }), + allow_index_creation, + ) + } + // if the primary key set in the task was different than ours we should stop and make this batch fail asap. + // TODO: maybe we could continue to batch tasks that'll fail? But that would mean we need to be extra + // cautious with the index deletion and document clear because we should remember that these tasks were + // supposed to fail even if we don't execute them. + K::DocumentImport { method, allow_index_creation, primary_key } => ( + Break(BatchKind::DocumentImport { method, allow_index_creation, + primary_key, import_ids: vec![task_id], }), allow_index_creation, @@ -163,7 +210,7 @@ impl BatchKind { /// To ease the writting of the code. `true` can be returned when you don't need to create an index /// but false can't be returned if you needs to create an index. #[rustfmt::skip] - fn accumulate(self, id: TaskId, kind: AutobatchKind, index_already_exists: bool) -> ControlFlow { + fn accumulate(self, id: TaskId, kind: AutobatchKind, index_already_exists: bool, primary_key: Option<&str>) -> ControlFlow { use AutobatchKind as K; match (self, kind) { @@ -173,11 +220,51 @@ impl BatchKind { (this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => { Break(this) }, + // + // 1. If both task don't interact with primary key -> we can continue + // 2. Else -> + // 2.1 If we already have a primary-key -> + // 2.1.1 If the task we're trying to accumulate have a pk -> it must be equal to our primary key to continue + // 2.1.2 If the task don't have a primary-key -> we can continue + // (We've already ensured that the current batch was correct according to our pk in a previous step of the autobatcher) + // 2.2 If we don't have a primary-key -> + // 2.2.1 If both the batch and the task have a primary key they should be equal + // 2.2.2 If the batch is set to Some(None), the task should be too + // 2.2.3 If the batch is set to None -> we can continue + // + // NOTE: We need to negate the whole condition since we're checking if we need to break instead of continue. + // I wrote it this way because it's easier to understand than the other way around. + (this, kind) if !( + // 1. If both task don't interact with primary key -> we can continue + (this.primary_key().is_none() && kind.primary_key().is_none()) || + // 2. Else -> + ( + // 2.1 If we already have a primary-key -> + ( + primary_key.is_some() && + // 2.1.1 If the task we're trying to accumulate have a pk it must be equal to our primary key + // 2.1.2 If the task don't have a primary-key -> we can continue + dbg!(kind.primary_key()).map_or(true, |pk| pk == primary_key) + ) || + // 2.2 If we don't have a primary-key -> + ( + // 2.2.1 If both the batch and the task have a primary key they should be equal + // 2.2.2 If the batch is set to Some(None), the task should be too + // 2.2.3 If the batch is set to None -> we can continue + dbg!(&this.primary_key()).zip(dbg!(kind.primary_key())).map_or(true, |(this, kind)| this == kind) + ) + ) + + ) // closing the negation + + => { + Break(this) + }, // The index deletion can batch with everything but must stop after ( BatchKind::DocumentClear { mut ids } | BatchKind::DocumentDeletion { deletion_ids: mut ids } - | BatchKind::DocumentImport { method: _, allow_index_creation: _, import_ids: mut ids } + | BatchKind::DocumentImport { method: _, allow_index_creation: _, primary_key: _, import_ids: mut ids } | BatchKind::Settings { allow_index_creation: _, settings_ids: mut ids }, K::IndexDeletion, ) => { @@ -186,7 +273,7 @@ impl BatchKind { } ( BatchKind::ClearAndSettings { settings_ids: mut ids, allow_index_creation: _, mut other } - | BatchKind::SettingsAndDocumentImport { import_ids: mut ids, method: _, allow_index_creation: _, settings_ids: mut other }, + | BatchKind::SettingsAndDocumentImport { import_ids: mut ids, method: _, allow_index_creation: _, primary_key: _, settings_ids: mut other }, K::IndexDeletion, ) => { ids.push(id); @@ -206,7 +293,7 @@ impl BatchKind { K::DocumentImport { .. } | K::Settings { .. }, ) => Break(this), ( - BatchKind::DocumentImport { method: _, allow_index_creation: _, import_ids: mut ids }, + BatchKind::DocumentImport { method: _, allow_index_creation: _, primary_key: _, import_ids: mut ids }, K::DocumentClear, ) => { ids.push(id); @@ -215,24 +302,27 @@ impl BatchKind { // we can autobatch the same kind of document additions / updates ( - BatchKind::DocumentImport { method: ReplaceDocuments, allow_index_creation, mut import_ids }, - K::DocumentImport { method: ReplaceDocuments, .. }, + BatchKind::DocumentImport { method: ReplaceDocuments, allow_index_creation, primary_key: _, mut import_ids }, + K::DocumentImport { method: ReplaceDocuments, primary_key: pk, .. }, ) => { import_ids.push(id); Continue(BatchKind::DocumentImport { method: ReplaceDocuments, allow_index_creation, import_ids, + primary_key: pk, }) } ( - BatchKind::DocumentImport { method: UpdateDocuments, allow_index_creation, mut import_ids }, - K::DocumentImport { method: UpdateDocuments, .. }, + BatchKind::DocumentImport { method: UpdateDocuments, allow_index_creation, primary_key: _, mut import_ids }, + K::DocumentImport { method: UpdateDocuments, primary_key: pk, .. }, ) => { + import_ids.push(id); Continue(BatchKind::DocumentImport { method: UpdateDocuments, allow_index_creation, + primary_key: pk, import_ids, }) } @@ -245,12 +335,13 @@ impl BatchKind { ) => Break(this), ( - BatchKind::DocumentImport { method, allow_index_creation, import_ids }, + BatchKind::DocumentImport { method, allow_index_creation, primary_key, import_ids }, K::Settings { .. }, ) => Continue(BatchKind::SettingsAndDocumentImport { settings_ids: vec![id], method, allow_index_creation, + primary_key, import_ids, }), @@ -327,7 +418,7 @@ impl BatchKind { }) } ( - BatchKind::SettingsAndDocumentImport { settings_ids, method: _, import_ids: mut other, allow_index_creation }, + BatchKind::SettingsAndDocumentImport { settings_ids, method: _, import_ids: mut other, allow_index_creation, primary_key: _ }, K::DocumentClear, ) => { other.push(id); @@ -339,26 +430,28 @@ impl BatchKind { } ( - BatchKind::SettingsAndDocumentImport { settings_ids, method: ReplaceDocuments, mut import_ids, allow_index_creation }, - K::DocumentImport { method: ReplaceDocuments, .. }, + BatchKind::SettingsAndDocumentImport { settings_ids, method: ReplaceDocuments, mut import_ids, allow_index_creation, primary_key: _}, + K::DocumentImport { method: ReplaceDocuments, primary_key: pk2, .. }, ) => { import_ids.push(id); Continue(BatchKind::SettingsAndDocumentImport { settings_ids, method: ReplaceDocuments, allow_index_creation, + primary_key: pk2, import_ids, }) } ( - BatchKind::SettingsAndDocumentImport { settings_ids, method: UpdateDocuments, allow_index_creation, mut import_ids }, - K::DocumentImport { method: UpdateDocuments, .. }, + BatchKind::SettingsAndDocumentImport { settings_ids, method: UpdateDocuments, allow_index_creation, primary_key: _, mut import_ids }, + K::DocumentImport { method: UpdateDocuments, primary_key: pk2, .. }, ) => { import_ids.push(id); Continue(BatchKind::SettingsAndDocumentImport { settings_ids, method: UpdateDocuments, allow_index_creation, + primary_key: pk2, import_ids, }) } @@ -369,7 +462,7 @@ impl BatchKind { K::DocumentDeletion | K::DocumentImport { .. }, ) => Break(this), ( - BatchKind::SettingsAndDocumentImport { mut settings_ids, method, allow_index_creation, import_ids }, + BatchKind::SettingsAndDocumentImport { mut settings_ids, method, allow_index_creation,primary_key, import_ids }, K::Settings { .. }, ) => { settings_ids.push(id); @@ -377,6 +470,7 @@ impl BatchKind { settings_ids, method, allow_index_creation, + primary_key, import_ids, }) } @@ -406,6 +500,7 @@ impl BatchKind { pub fn autobatch( enqueued: Vec<(TaskId, KindWithContent)>, index_already_exists: bool, + primary_key: Option<&str>, ) -> Option<(BatchKind, bool)> { let mut enqueued = enqueued.into_iter(); let (id, kind) = enqueued.next()?; @@ -413,7 +508,7 @@ pub fn autobatch( // index_exist will keep track of if the index should exist at this point after the tasks we batched. let mut index_exist = index_already_exists; - let (mut acc, must_create_index) = match BatchKind::new(id, kind) { + let (mut acc, must_create_index) = match BatchKind::new(id, kind, primary_key) { (Continue(acc), create) => (acc, create), (Break(acc), create) => return Some((acc, create)), }; @@ -422,7 +517,7 @@ pub fn autobatch( index_exist |= must_create_index; for (id, kind) in enqueued { - acc = match acc.accumulate(id, kind.into(), index_exist) { + acc = match acc.accumulate(id, kind.into(), index_exist, primary_key) { Continue(acc) => acc, Break(acc) => return Some((acc, must_create_index)), }; @@ -446,6 +541,7 @@ mod tests { autobatch( input.into_iter().enumerate().map(|(id, kind)| (id as TaskId, kind)).collect(), index_already_exists, + None, ) } @@ -502,29 +598,29 @@ mod tests { fn autobatch_simple_operation_together() { // we can autobatch one or multiple `ReplaceDocuments` together. // if the index exists. - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, false)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), doc_imp( ReplaceDocuments, true ), doc_imp(ReplaceDocuments, true )]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 1, 2] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, false), doc_imp( ReplaceDocuments, false ), doc_imp(ReplaceDocuments, false )]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0, 1, 2] }, false))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, false)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), doc_imp( ReplaceDocuments, true ), doc_imp(ReplaceDocuments, true )]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1, 2] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, false), doc_imp( ReplaceDocuments, false ), doc_imp(ReplaceDocuments, false )]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0, 1, 2] }, false))"); // if it doesn't exists. - debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, false)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, true), doc_imp( ReplaceDocuments, true ), doc_imp(ReplaceDocuments, true )]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 1, 2] }, true))"); - debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, false), doc_imp( ReplaceDocuments, true ), doc_imp(ReplaceDocuments, true )]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, false)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, true), doc_imp( ReplaceDocuments, true ), doc_imp(ReplaceDocuments, true )]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1, 2] }, true))"); + debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, false), doc_imp( ReplaceDocuments, true ), doc_imp(ReplaceDocuments, true )]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); // we can autobatch one or multiple `UpdateDocuments` together. // if the index exists. - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0, 1, 2] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, false)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, false), doc_imp(UpdateDocuments, false), doc_imp(UpdateDocuments, false)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, import_ids: [0, 1, 2] }, false))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1, 2] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, false)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, false), doc_imp(UpdateDocuments, false), doc_imp(UpdateDocuments, false)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, primary_key: None, import_ids: [0, 1, 2] }, false))"); // if it doesn't exists. - debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, true)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0, 1, 2] }, true))"); - debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, false)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, false), doc_imp(UpdateDocuments, false), doc_imp(UpdateDocuments, false)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, import_ids: [0, 1, 2] }, false))"); + debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, true)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1, 2] }, true))"); + debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, false)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, false), doc_imp(UpdateDocuments, false), doc_imp(UpdateDocuments, false)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, primary_key: None, import_ids: [0, 1, 2] }, false))"); // we can autobatch one or multiple DocumentDeletion together debug_snapshot!(autobatch_from(true, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); @@ -547,51 +643,51 @@ mod tests { #[test] fn simple_document_operation_dont_autobatch_with_other() { // addition, updates and deletion can't batch together - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), doc_imp(UpdateDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), doc_del()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), doc_del()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), doc_imp(UpdateDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), doc_del()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), doc_del()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, [doc_del(), doc_imp(ReplaceDocuments, true)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); debug_snapshot!(autobatch_from(true, [doc_del(), doc_imp(UpdateDocuments, true)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), idx_create()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), idx_create()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), idx_create()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), idx_create()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, [doc_del(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), idx_update()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), idx_update()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), idx_update()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), idx_update()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, [doc_del(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), idx_swap()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), idx_swap()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), idx_swap()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), idx_swap()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, [doc_del(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); } #[test] fn document_addition_batch_with_settings() { // simple case - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); // multiple settings and doc addition - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true), settings(true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [2, 3], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true), settings(true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [2, 3], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true), settings(true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [2, 3], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true), settings(true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [2, 3], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1] }, true))"); // addition and setting unordered - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true), doc_imp(ReplaceDocuments, true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1, 3], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 2] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true), doc_imp(UpdateDocuments, true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1, 3], method: UpdateDocuments, allow_index_creation: true, import_ids: [0, 2] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true), doc_imp(ReplaceDocuments, true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1, 3], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 2] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true), doc_imp(UpdateDocuments, true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1, 3], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 2] }, true))"); // We ensure this kind of batch doesn't batch with forbidden operations - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true), doc_imp(UpdateDocuments, true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true), doc_imp(ReplaceDocuments, true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true), doc_del()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true), doc_del()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true), idx_create()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true), idx_create()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true), idx_update()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true), idx_update()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true), idx_swap()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true), idx_swap()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true), doc_imp(UpdateDocuments, true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true), doc_imp(ReplaceDocuments, true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true), doc_del()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true), doc_del()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true), idx_create()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true), idx_create()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true), idx_update()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true), idx_update()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true), idx_swap()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(UpdateDocuments, true), settings(true), idx_swap()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); } #[test] @@ -703,25 +799,25 @@ mod tests { debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, false), settings(false), doc_clr(), idx_del()]), @"Some((IndexDeletion { ids: [1, 3, 0, 2] }, false))"); // The third and final case is when the first task doesn't create an index but is directly followed by a task creating an index. In this case we can't batch whith what // follows because we first need to process the erronous batch. - debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments,false), settings(true), idx_del()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, false), settings(true), idx_del()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments,false), settings(true), doc_clr(), idx_del()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, false), settings(true), doc_clr(), idx_del()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments,false), settings(true), idx_del()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, false), settings(true), idx_del()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments,false), settings(true), doc_clr(), idx_del()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, [doc_imp(UpdateDocuments, false), settings(true), doc_clr(), idx_del()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); } #[test] fn allowed_and_disallowed_index_creation() { // `DocumentImport` can't be mixed with those disallowed to do so except if the index already exists. - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, false), doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, false), doc_imp(ReplaceDocuments, false)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, false), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: false, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, false), doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, false), doc_imp(ReplaceDocuments, false)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, [doc_imp(ReplaceDocuments, false), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, false), doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, false), doc_imp(ReplaceDocuments, false)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, false), settings(true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, false), doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, false), doc_imp(ReplaceDocuments, false)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false, [doc_imp(ReplaceDocuments, false), settings(true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); } } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index d364081a8..b52d3d495 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -207,7 +207,7 @@ impl IndexScheduler { must_create_index, })), BatchKind::DocumentImport { method, import_ids, .. } => { - let mut tasks = self.get_existing_tasks(rtxn, import_ids)?; + let tasks = self.get_existing_tasks(rtxn, import_ids)?; let primary_key = match &tasks[0].kind { KindWithContent::DocumentAdditionOrUpdate { primary_key, .. } => { primary_key.clone() @@ -217,18 +217,9 @@ impl IndexScheduler { let mut documents_counts = Vec::new(); let mut content_files = Vec::new(); - let mut drain_after = tasks.len(); - for (i, task) in tasks.iter().enumerate() { + for task in tasks.iter() { match task.kind { - KindWithContent::DocumentAdditionOrUpdate { - primary_key: ref pk, .. - } if pk != &primary_key => { - // we can't autobatch document additions that don't share the same - // primary key because that would make the whole batch fails. - drain_after = i; - break; - } KindWithContent::DocumentAdditionOrUpdate { content_file, documents_count, @@ -241,8 +232,6 @@ impl IndexScheduler { } } - tasks.drain(drain_after..); - Ok(Some(Batch::IndexOperation { op: IndexOperation::DocumentImport { index_uid, @@ -337,6 +326,7 @@ impl IndexScheduler { settings_ids, method, allow_index_creation, + primary_key, import_ids, } => { let settings = self.create_next_batch_index( @@ -349,7 +339,12 @@ impl IndexScheduler { let document_import = self.create_next_batch_index( rtxn, index_uid.clone(), - BatchKind::DocumentImport { method, allow_index_creation, import_ids }, + BatchKind::DocumentImport { + method, + allow_index_creation, + primary_key, + import_ids, + }, must_create_index, )?; @@ -479,6 +474,12 @@ impl IndexScheduler { }; let index_already_exists = self.index_mapper.exists(rtxn, index_name)?; + let mut primary_key = None; + if index_already_exists { + let index = self.index_mapper.index(rtxn, index_name)?; + let rtxn = index.read_txn()?; + primary_key = index.primary_key(&rtxn)?.map(|pk| pk.to_string()); + } let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued; @@ -496,7 +497,7 @@ impl IndexScheduler { .collect::>>()?; if let Some((batchkind, create_index)) = - autobatcher::autobatch(enqueued, index_already_exists) + autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref()) { return self.create_next_batch_index( rtxn, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 6e0cea644..895b97813 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -3281,13 +3281,9 @@ mod tests { snapshot!(primary_key, @"paw"); // We should be able to batch together the next two tasks that don't specify any primary key - // and it should succeed. + // + the last task that matches the current primary-key. Everything should succeed. handle.advance_one_successful_batch(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "fourth_and_fifth_tasks_succeeds"); - - // Finally the last task should succeed since its primary key is the same as the valid one. - handle.advance_one_successful_batch(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "sixth_task_succeeds"); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_other_tasks_succeeds"); // Is the primary key still what we expect? let index = index_scheduler.index("doggos").unwrap(); @@ -3360,13 +3356,9 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_task_succeeds"); // We should be able to batch together the next two tasks that don't specify any primary key - // and it should succeed. + // + the last task that matches the current primary-key. Everything should succeed. handle.advance_one_successful_batch(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "fourth_and_fifth_tasks_succeeds"); - - // Finally the last task should succeed. - handle.advance_one_successful_batch(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "sixth_task_succeeds"); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_other_tasks_succeeds"); // Is the primary key still what we expect? let index = index_scheduler.index("doggos").unwrap(); diff --git a/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_multiple_primary_key_batch_wrong_key/second_and_third_tasks_fails.snap b/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_multiple_primary_key_batch_wrong_key/second_and_third_tasks_fails.snap index e27e95b51..26b0f6584 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_multiple_primary_key_batch_wrong_key/second_and_third_tasks_fails.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_multiple_primary_key_batch_wrong_key/second_and_third_tasks_fails.snap @@ -8,12 +8,12 @@ source: index-scheduler/src/lib.rs ### All Tasks: 0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} 1 {uid: 1, status: failed, error: ResponseError { code: 200, message: "Index already has a primary key: `id`.", error_code: "index_primary_key_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_primary_key_already_exists" }, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("bork"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }} -2 {uid: 2, status: failed, error: ResponseError { code: 200, message: "Index already has a primary key: `id`.", error_code: "index_primary_key_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_primary_key_already_exists" }, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("bork"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }} +2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("bork"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }} ---------------------------------------------------------------------- ### Status: -enqueued [] +enqueued [2,] succeeded [0,] -failed [1,2,] +failed [1,] ---------------------------------------------------------------------- ### Kind: "documentAdditionOrUpdate" [0,1,2,] @@ -34,13 +34,14 @@ doggos [0,1,2,] ---------------------------------------------------------------------- ### Started At: [timestamp] [0,] -[timestamp] [1,2,] +[timestamp] [1,] ---------------------------------------------------------------------- ### Finished At: [timestamp] [0,] -[timestamp] [1,2,] +[timestamp] [1,] ---------------------------------------------------------------------- ### File Store: +00000000-0000-0000-0000-000000000002 ---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key/sixth_task_succeeds.snap b/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key/all_other_tasks_succeeds.snap similarity index 98% rename from index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key/sixth_task_succeeds.snap rename to index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key/all_other_tasks_succeeds.snap index efbabec0d..69cbd3def 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key/sixth_task_succeeds.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key/all_other_tasks_succeeds.snap @@ -42,15 +42,13 @@ doggos [0,1,2,3,4,5,] [timestamp] [0,] [timestamp] [1,] [timestamp] [2,] -[timestamp] [3,4,] -[timestamp] [5,] +[timestamp] [3,4,5,] ---------------------------------------------------------------------- ### Finished At: [timestamp] [0,] [timestamp] [1,] [timestamp] [2,] -[timestamp] [3,4,] -[timestamp] [5,] +[timestamp] [3,4,5,] ---------------------------------------------------------------------- ### File Store: diff --git a/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key/fourth_and_fifth_tasks_succeeds.snap b/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key/fourth_and_fifth_tasks_succeeds.snap deleted file mode 100644 index af131b74a..000000000 --- a/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key/fourth_and_fifth_tasks_succeeds.snap +++ /dev/null @@ -1,57 +0,0 @@ ---- -source: index-scheduler/src/lib.rs ---- -### Autobatching Enabled = true -### Processing Tasks: -[] ----------------------------------------------------------------------- -### All Tasks: -0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "The primary key inference failed as the engine did not find any field ending with `id` in its name. Please specify the primary key manually using the `primaryKey` query parameter.", error_code: "index_primary_key_no_candidate_found", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_primary_key_no_candidate_found" }, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} -1 {uid: 1, status: failed, error: ResponseError { code: 200, message: "Document doesn't have a `bork` attribute: `{\"paw\":1,\"doggo\":\"jean bob\"}`.", error_code: "missing_document_id", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#missing_document_id" }, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("bork"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }} -2 {uid: 2, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("paw"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }} -3 {uid: 3, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000003, documents_count: 1, allow_index_creation: true }} -4 {uid: 4, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000004, documents_count: 1, allow_index_creation: true }} -5 {uid: 5, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("paw"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000005, documents_count: 1, allow_index_creation: true }} ----------------------------------------------------------------------- -### Status: -enqueued [5,] -succeeded [2,3,4,] -failed [0,1,] ----------------------------------------------------------------------- -### Kind: -"documentAdditionOrUpdate" [0,1,2,3,4,5,] ----------------------------------------------------------------------- -### Index Tasks: -doggos [0,1,2,3,4,5,] ----------------------------------------------------------------------- -### Index Mapper: -["doggos"] ----------------------------------------------------------------------- -### Canceled By: - ----------------------------------------------------------------------- -### Enqueued At: -[timestamp] [0,] -[timestamp] [1,] -[timestamp] [2,] -[timestamp] [3,] -[timestamp] [4,] -[timestamp] [5,] ----------------------------------------------------------------------- -### Started At: -[timestamp] [0,] -[timestamp] [1,] -[timestamp] [2,] -[timestamp] [3,4,] ----------------------------------------------------------------------- -### Finished At: -[timestamp] [0,] -[timestamp] [1,] -[timestamp] [2,] -[timestamp] [3,4,] ----------------------------------------------------------------------- -### File Store: -00000000-0000-0000-0000-000000000005 - ----------------------------------------------------------------------- - diff --git a/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/sixth_task_succeeds.snap b/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/all_other_tasks_succeeds.snap similarity index 97% rename from index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/sixth_task_succeeds.snap rename to index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/all_other_tasks_succeeds.snap index e6b003712..437c6375e 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/sixth_task_succeeds.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/all_other_tasks_succeeds.snap @@ -42,15 +42,13 @@ doggos [0,1,2,3,4,5,] [timestamp] [0,] [timestamp] [1,] [timestamp] [2,] -[timestamp] [3,4,] -[timestamp] [5,] +[timestamp] [3,4,5,] ---------------------------------------------------------------------- ### Finished At: [timestamp] [0,] [timestamp] [1,] [timestamp] [2,] -[timestamp] [3,4,] -[timestamp] [5,] +[timestamp] [3,4,5,] ---------------------------------------------------------------------- ### File Store: diff --git a/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/fourth_and_fifth_tasks_succeeds.snap b/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/fourth_and_fifth_tasks_succeeds.snap deleted file mode 100644 index 6d61b01df..000000000 --- a/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/fourth_and_fifth_tasks_succeeds.snap +++ /dev/null @@ -1,57 +0,0 @@ ---- -source: index-scheduler/src/lib.rs ---- -### Autobatching Enabled = true -### Processing Tasks: -[] ----------------------------------------------------------------------- -### All Tasks: -0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} -1 {uid: 1, status: failed, error: ResponseError { code: 200, message: "Index already has a primary key: `doggoid`.", error_code: "index_primary_key_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_primary_key_already_exists" }, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("bork"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }} -2 {uid: 2, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("doggoid"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }} -3 {uid: 3, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000003, documents_count: 1, allow_index_creation: true }} -4 {uid: 4, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000004, documents_count: 1, allow_index_creation: true }} -5 {uid: 5, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("doggoid"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000005, documents_count: 1, allow_index_creation: true }} ----------------------------------------------------------------------- -### Status: -enqueued [5,] -succeeded [0,2,3,4,] -failed [1,] ----------------------------------------------------------------------- -### Kind: -"documentAdditionOrUpdate" [0,1,2,3,4,5,] ----------------------------------------------------------------------- -### Index Tasks: -doggos [0,1,2,3,4,5,] ----------------------------------------------------------------------- -### Index Mapper: -["doggos"] ----------------------------------------------------------------------- -### Canceled By: - ----------------------------------------------------------------------- -### Enqueued At: -[timestamp] [0,] -[timestamp] [1,] -[timestamp] [2,] -[timestamp] [3,] -[timestamp] [4,] -[timestamp] [5,] ----------------------------------------------------------------------- -### Started At: -[timestamp] [0,] -[timestamp] [1,] -[timestamp] [2,] -[timestamp] [3,4,] ----------------------------------------------------------------------- -### Finished At: -[timestamp] [0,] -[timestamp] [1,] -[timestamp] [2,] -[timestamp] [3,4,] ----------------------------------------------------------------------- -### File Store: -00000000-0000-0000-0000-000000000005 - ----------------------------------------------------------------------- -