diff --git a/Cargo.lock b/Cargo.lock index dd67520ea..cb413bc53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -471,7 +471,7 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "benchmarks" -version = "1.10.0" +version = "1.10.1" dependencies = [ "anyhow", "bytes", @@ -652,7 +652,7 @@ dependencies = [ [[package]] name = "build-info" -version = "1.10.0" +version = "1.10.1" dependencies = [ "anyhow", "time", @@ -1622,7 +1622,7 @@ dependencies = [ [[package]] name = "dump" -version = "1.10.0" +version = "1.10.1" dependencies = [ "anyhow", "big_s", @@ -1834,7 +1834,7 @@ checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" [[package]] name = "file-store" -version = "1.10.0" +version = "1.10.1" dependencies = [ "tempfile", "thiserror", @@ -1856,7 +1856,7 @@ dependencies = [ [[package]] name = "filter-parser" -version = "1.10.0" +version = "1.10.1" dependencies = [ "insta", "nom", @@ -1876,7 +1876,7 @@ dependencies = [ [[package]] name = "flatten-serde-json" -version = "1.10.0" +version = "1.10.1" dependencies = [ "criterion", "serde_json", @@ -2000,7 +2000,7 @@ dependencies = [ [[package]] name = "fuzzers" -version = "1.10.0" +version = "1.10.1" dependencies = [ "arbitrary", "clap", @@ -2552,7 +2552,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d" [[package]] name = "index-scheduler" -version = "1.10.0" +version = "1.10.1" dependencies = [ "anyhow", "arroy", @@ -2746,7 +2746,7 @@ dependencies = [ [[package]] name = "json-depth-checker" -version = "1.10.0" +version = "1.10.1" dependencies = [ "criterion", "serde_json", @@ -3365,7 +3365,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "meili-snap" -version = "1.10.0" +version = "1.10.1" dependencies = [ "insta", "md5", @@ -3374,7 +3374,7 @@ dependencies = [ [[package]] name = "meilisearch" -version = "1.10.0" +version = "1.10.1" dependencies = [ "actix-cors", "actix-http", @@ -3463,7 +3463,7 @@ dependencies = [ [[package]] name = "meilisearch-auth" -version = "1.10.0" +version = "1.10.1" dependencies = [ "base64 0.22.1", "enum-iterator", @@ -3482,7 +3482,7 @@ dependencies = [ [[package]] name = "meilisearch-types" -version = "1.10.0" +version = "1.10.1" dependencies = [ "actix-web", "anyhow", @@ -3512,7 +3512,7 @@ dependencies = [ [[package]] name = "meilitool" -version = "1.10.0" +version = "1.10.1" dependencies = [ "anyhow", "clap", @@ -3542,7 +3542,7 @@ dependencies = [ [[package]] name = "milli" -version = "1.10.0" +version = "1.10.1" dependencies = [ "arroy", "big_s", @@ -3976,7 +3976,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "permissive-json-pointer" -version = "1.10.0" +version = "1.10.1" dependencies = [ "big_s", "serde_json", @@ -6361,7 +6361,7 @@ dependencies = [ [[package]] name = "xtask" -version = "1.10.0" +version = "1.10.1" dependencies = [ "anyhow", "build-info", diff --git a/Cargo.toml b/Cargo.toml index 0fbfa9b12..817da26e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ members = [ ] [workspace.package] -version = "1.10.0" +version = "1.10.1" authors = [ "Quentin de Quelen ", "Clément Renault ", diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 96201bebb..0f6aa8a3a 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -25,8 +25,9 @@ enum AutobatchKind { primary_key: Option, }, DocumentEdition, - DocumentDeletion, - DocumentDeletionByFilter, + DocumentDeletion { + by_filter: bool, + }, DocumentClear, Settings { allow_index_creation: bool, @@ -65,10 +66,12 @@ impl From for AutobatchKind { .. } => AutobatchKind::DocumentImport { method, allow_index_creation, primary_key }, KindWithContent::DocumentEdition { .. } => AutobatchKind::DocumentEdition, - KindWithContent::DocumentDeletion { .. } => AutobatchKind::DocumentDeletion, + KindWithContent::DocumentDeletion { .. } => { + AutobatchKind::DocumentDeletion { by_filter: false } + } KindWithContent::DocumentClear { .. } => AutobatchKind::DocumentClear, KindWithContent::DocumentDeletionByFilter { .. } => { - AutobatchKind::DocumentDeletionByFilter + AutobatchKind::DocumentDeletion { by_filter: true } } KindWithContent::SettingsUpdate { allow_index_creation, is_deletion, .. } => { AutobatchKind::Settings { @@ -105,9 +108,7 @@ pub enum BatchKind { }, DocumentDeletion { deletion_ids: Vec, - }, - DocumentDeletionByFilter { - id: TaskId, + includes_by_filter: bool, }, ClearAndSettings { other: Vec, @@ -205,12 +206,13 @@ impl BatchKind { allow_index_creation, ), K::DocumentEdition => (Break(BatchKind::DocumentEdition { id: task_id }), false), - K::DocumentDeletion => { - (Continue(BatchKind::DocumentDeletion { deletion_ids: vec![task_id] }), false) - } - K::DocumentDeletionByFilter => { - (Break(BatchKind::DocumentDeletionByFilter { id: task_id }), false) - } + K::DocumentDeletion { by_filter: includes_by_filter } => ( + Continue(BatchKind::DocumentDeletion { + deletion_ids: vec![task_id], + includes_by_filter, + }), + false, + ), K::Settings { allow_index_creation } => ( Continue(BatchKind::Settings { allow_index_creation, settings_ids: vec![task_id] }), allow_index_creation, @@ -228,7 +230,7 @@ impl BatchKind { match (self, kind) { // We don't batch any of these operations - (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition | K::DocumentDeletionByFilter) => Break(this), + (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => Break(this), // We must not batch tasks that don't have the same index creation rights if the index doesn't already exists. (this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => { Break(this) @@ -264,7 +266,7 @@ impl BatchKind { // The index deletion can batch with everything but must stop after ( BatchKind::DocumentClear { mut ids } - | BatchKind::DocumentDeletion { deletion_ids: mut ids } + | BatchKind::DocumentDeletion { deletion_ids: mut ids, includes_by_filter: _ } | BatchKind::DocumentOperation { method: _, allow_index_creation: _, primary_key: _, operation_ids: mut ids } | BatchKind::Settings { allow_index_creation: _, settings_ids: mut ids }, K::IndexDeletion, @@ -284,7 +286,7 @@ impl BatchKind { ( BatchKind::DocumentClear { mut ids }, - K::DocumentClear | K::DocumentDeletion, + K::DocumentClear | K::DocumentDeletion { by_filter: _ }, ) => { ids.push(id); Continue(BatchKind::DocumentClear { ids }) @@ -328,7 +330,7 @@ impl BatchKind { } ( BatchKind::DocumentOperation { method, allow_index_creation, primary_key, mut operation_ids }, - K::DocumentDeletion, + K::DocumentDeletion { by_filter: false }, ) => { operation_ids.push(id); @@ -339,6 +341,13 @@ impl BatchKind { operation_ids, }) } + // We can't batch a document operation with a delete by filter + ( + this @ BatchKind::DocumentOperation { .. }, + K::DocumentDeletion { by_filter: true }, + ) => { + Break(this) + } // but we can't autobatch documents if it's not the same kind // this match branch MUST be AFTER the previous one ( @@ -357,13 +366,18 @@ impl BatchKind { operation_ids, }), - (BatchKind::DocumentDeletion { mut deletion_ids }, K::DocumentClear) => { + (BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: _ }, K::DocumentClear) => { deletion_ids.push(id); Continue(BatchKind::DocumentClear { ids: deletion_ids }) } + // we can't autobatch the deletion and import if the document deletion contained a filter + ( + this @ BatchKind::DocumentDeletion { deletion_ids: _, includes_by_filter: true }, + K::DocumentImport { .. } + ) => Break(this), // we can autobatch the deletion and import if the index already exists ( - BatchKind::DocumentDeletion { mut deletion_ids }, + BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false }, K::DocumentImport { method, allow_index_creation, primary_key } ) if index_already_exists => { deletion_ids.push(id); @@ -377,7 +391,7 @@ impl BatchKind { } // we can autobatch the deletion and import if both can't create an index ( - BatchKind::DocumentDeletion { mut deletion_ids }, + BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false }, K::DocumentImport { method, allow_index_creation, primary_key } ) if !allow_index_creation => { deletion_ids.push(id); @@ -396,9 +410,9 @@ impl BatchKind { ) => { Break(this) } - (BatchKind::DocumentDeletion { mut deletion_ids }, K::DocumentDeletion) => { + (BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter }, K::DocumentDeletion { by_filter }) => { deletion_ids.push(id); - Continue(BatchKind::DocumentDeletion { deletion_ids }) + Continue(BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: includes_by_filter | by_filter }) } (this @ BatchKind::DocumentDeletion { .. }, K::Settings { .. }) => Break(this), @@ -412,7 +426,7 @@ impl BatchKind { }), ( this @ BatchKind::Settings { .. }, - K::DocumentImport { .. } | K::DocumentDeletion, + K::DocumentImport { .. } | K::DocumentDeletion { .. }, ) => Break(this), ( BatchKind::Settings { mut settings_ids, allow_index_creation }, @@ -443,7 +457,7 @@ impl BatchKind { settings_ids, allow_index_creation, }, - K::DocumentDeletion, + K::DocumentDeletion { .. }, ) => { other.push(id); Continue(BatchKind::ClearAndSettings { @@ -505,7 +519,7 @@ impl BatchKind { // this MUST be AFTER the two previous branch ( this @ BatchKind::SettingsAndDocumentOperation { .. }, - K::DocumentDeletion | K::DocumentImport { .. }, + K::DocumentDeletion { .. } | K::DocumentImport { .. }, ) => Break(this), ( BatchKind::SettingsAndDocumentOperation { mut settings_ids, method, allow_index_creation,primary_key, operation_ids }, @@ -525,8 +539,7 @@ impl BatchKind { | BatchKind::IndexDeletion { .. } | BatchKind::IndexUpdate { .. } | BatchKind::IndexSwap { .. } - | BatchKind::DocumentEdition { .. } - | BatchKind::DocumentDeletionByFilter { .. }, + | BatchKind::DocumentEdition { .. }, _, ) => { unreachable!() @@ -616,6 +629,13 @@ mod tests { } } + fn doc_del_fil() -> KindWithContent { + KindWithContent::DocumentDeletionByFilter { + index_uid: String::from("doggo"), + filter_expr: serde_json::json!("cuteness > 100"), + } + } + fn doc_clr() -> KindWithContent { KindWithContent::DocumentClear { index_uid: String::from("doggo") } } @@ -676,10 +696,16 @@ mod tests { debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1, 2] }, false))"); // we can autobatch one or multiple DocumentDeletion together - debug_snapshot!(autobatch_from(true, None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2] }, false))"); - debug_snapshot!(autobatch_from(false,None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false,None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: false }, false))"); + + // we can autobatch one or multiple DocumentDeletionByFilter together + debug_snapshot!(autobatch_from(true, None, [doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_del_fil(), doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_del_fil(), doc_del_fil(), doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: true }, false))"); // we can autobatch one or multiple Settings together debug_snapshot!(autobatch_from(true, None, [settings(true)]), @"Some((Settings { allow_index_creation: true, settings_ids: [0] }, true))"); @@ -722,25 +748,63 @@ mod tests { debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + + // But we can't autobatch document addition with document deletion by filter + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + // And the other way around + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); } #[test] fn simple_document_operation_dont_autobatch_with_other() { - // addition, updates and deletion can't batch together + // addition, updates and deletion by filter can't batch together debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_create()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_create()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_update()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_update()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_swap()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_swap()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); } #[test] @@ -807,6 +871,7 @@ mod tests { debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_del()]), @"Some((IndexDeletion { ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_del_fil()]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_clr()]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(true, None, [idx_del(), settings(true)]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(true, None, [idx_del(), settings(false)]), @"Some((IndexDeletion { ids: [0] }, false))"); @@ -816,6 +881,7 @@ mod tests { debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_del()]), @"Some((IndexDeletion { ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_del_fil()]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_clr()]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [idx_del(), settings(true)]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [idx_del(), settings(false)]), @"Some((IndexDeletion { ids: [0] }, false))"); @@ -827,6 +893,7 @@ mod tests { debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_clr(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [settings(true), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, true))"); debug_snapshot!(autobatch_from(true, None, [settings(false), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); @@ -836,6 +903,7 @@ mod tests { debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false,None, [doc_del(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_del_fil(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false,None, [doc_clr(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false,None, [settings(true), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, true))"); debug_snapshot!(autobatch_from(false,None, [settings(false), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); @@ -901,10 +969,10 @@ mod tests { debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), settings(true)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); // batch deletion and addition - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); } #[test] diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 3e6e78614..903ec1217 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -110,9 +110,9 @@ pub(crate) enum IndexOperation { index_uid: String, task: Task, }, - IndexDocumentDeletionByFilter { + DocumentDeletion { index_uid: String, - task: Task, + tasks: Vec, }, DocumentClear { index_uid: String, @@ -165,11 +165,11 @@ impl Batch { Batch::IndexOperation { op, .. } => match op { IndexOperation::DocumentOperation { tasks, .. } | IndexOperation::Settings { tasks, .. } + | IndexOperation::DocumentDeletion { tasks, .. } | IndexOperation::DocumentClear { tasks, .. } => { RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid)) } - IndexOperation::DocumentEdition { task, .. } - | IndexOperation::IndexDocumentDeletionByFilter { task, .. } => { + IndexOperation::DocumentEdition { task, .. } => { RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap() } IndexOperation::SettingsAndDocumentOperation { @@ -234,7 +234,7 @@ impl IndexOperation { match self { IndexOperation::DocumentOperation { index_uid, .. } | IndexOperation::DocumentEdition { index_uid, .. } - | IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. } + | IndexOperation::DocumentDeletion { index_uid, .. } | IndexOperation::DocumentClear { index_uid, .. } | IndexOperation::Settings { index_uid, .. } | IndexOperation::DocumentClearAndSetting { index_uid, .. } @@ -252,8 +252,8 @@ impl fmt::Display for IndexOperation { IndexOperation::DocumentEdition { .. } => { f.write_str("IndexOperation::DocumentEdition") } - IndexOperation::IndexDocumentDeletionByFilter { .. } => { - f.write_str("IndexOperation::IndexDocumentDeletionByFilter") + IndexOperation::DocumentDeletion { .. } => { + f.write_str("IndexOperation::DocumentDeletion") } IndexOperation::DocumentClear { .. } => f.write_str("IndexOperation::DocumentClear"), IndexOperation::Settings { .. } => f.write_str("IndexOperation::Settings"), @@ -289,21 +289,6 @@ impl IndexScheduler { }, must_create_index, })), - BatchKind::DocumentDeletionByFilter { id } => { - let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; - match &task.kind { - KindWithContent::DocumentDeletionByFilter { index_uid, .. } => { - Ok(Some(Batch::IndexOperation { - op: IndexOperation::IndexDocumentDeletionByFilter { - index_uid: index_uid.clone(), - task, - }, - must_create_index: false, - })) - } - _ => unreachable!(), - } - } BatchKind::DocumentEdition { id } => { let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; match &task.kind { @@ -366,30 +351,11 @@ impl IndexScheduler { must_create_index, })) } - BatchKind::DocumentDeletion { deletion_ids } => { + BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => { let tasks = self.get_existing_tasks(rtxn, deletion_ids)?; - let mut operations = Vec::with_capacity(tasks.len()); - let mut documents_counts = Vec::with_capacity(tasks.len()); - for task in &tasks { - match task.kind { - KindWithContent::DocumentDeletion { ref documents_ids, .. } => { - operations.push(DocumentOperation::Delete(documents_ids.clone())); - documents_counts.push(documents_ids.len() as u64); - } - _ => unreachable!(), - } - } - Ok(Some(Batch::IndexOperation { - op: IndexOperation::DocumentOperation { - index_uid, - primary_key: None, - method: IndexDocumentsMethod::ReplaceDocuments, - documents_counts, - operations, - tasks, - }, + op: IndexOperation::DocumentDeletion { index_uid, tasks }, must_create_index, })) } @@ -1439,7 +1405,7 @@ impl IndexScheduler { { (original_filter, context, function) } else { - // In the case of a `documentDeleteByFilter` the details MUST be set + // In the case of a `documentEdition` the details MUST be set unreachable!(); }; @@ -1469,52 +1435,102 @@ impl IndexScheduler { Ok(vec![task]) } - IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => { - let filter = - if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } = - &task.kind - { - filter_expr - } else { - unreachable!() - }; - let deleted_documents = delete_document_by_filter( - index_wtxn, - filter, - self.index_mapper.indexer_config(), - self.must_stop_processing.clone(), - index, - ); - let original_filter = if let Some(Details::DocumentDeletionByFilter { - original_filter, - deleted_documents: _, - }) = task.details - { - original_filter - } else { - // In the case of a `documentDeleteByFilter` the details MUST be set - unreachable!(); - }; + IndexOperation::DocumentDeletion { mut tasks, index_uid: _ } => { + let mut to_delete = RoaringBitmap::new(); + let external_documents_ids = index.external_documents_ids(); - match deleted_documents { - Ok(deleted_documents) => { - task.status = Status::Succeeded; - task.details = Some(Details::DocumentDeletionByFilter { - original_filter, - deleted_documents: Some(deleted_documents), - }); - } - Err(e) => { - task.status = Status::Failed; - task.details = Some(Details::DocumentDeletionByFilter { - original_filter, - deleted_documents: Some(0), - }); - task.error = Some(e.into()); + for task in tasks.iter_mut() { + let before = to_delete.len(); + task.status = Status::Succeeded; + + match &task.kind { + KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => { + for id in documents_ids { + if let Some(id) = external_documents_ids.get(index_wtxn, id)? { + to_delete.insert(id); + } + } + let will_be_removed = to_delete.len() - before; + task.details = Some(Details::DocumentDeletion { + provided_ids: documents_ids.len(), + deleted_documents: Some(will_be_removed), + }); + } + KindWithContent::DocumentDeletionByFilter { index_uid: _, filter_expr } => { + let before = to_delete.len(); + let filter = match Filter::from_json(filter_expr) { + Ok(filter) => filter, + Err(err) => { + // theorically, this should be catched by deserr before reaching the index-scheduler and cannot happens + task.status = Status::Failed; + task.error = match err { + milli::Error::UserError( + milli::UserError::InvalidFilterExpression { .. }, + ) => Some( + Error::from(err) + .with_custom_error_code(Code::InvalidDocumentFilter) + .into(), + ), + e => Some(e.into()), + }; + None + } + }; + if let Some(filter) = filter { + let candidates = + filter.evaluate(index_wtxn, index).map_err(|err| match err { + milli::Error::UserError( + milli::UserError::InvalidFilter(_), + ) => Error::from(err) + .with_custom_error_code(Code::InvalidDocumentFilter), + e => e.into(), + }); + match candidates { + Ok(candidates) => to_delete |= candidates, + Err(err) => { + task.status = Status::Failed; + task.error = Some(err.into()); + } + }; + } + let will_be_removed = to_delete.len() - before; + if let Some(Details::DocumentDeletionByFilter { + original_filter: _, + deleted_documents, + }) = &mut task.details + { + *deleted_documents = Some(will_be_removed); + } else { + // In the case of a `documentDeleteByFilter` the details MUST be set + unreachable!() + } + } + _ => unreachable!(), } } - Ok(vec![task]) + let config = IndexDocumentsConfig { + update_method: IndexDocumentsMethod::ReplaceDocuments, + ..Default::default() + }; + + let must_stop_processing = self.must_stop_processing.clone(); + let mut builder = milli::update::IndexDocuments::new( + index_wtxn, + index, + self.index_mapper.indexer_config(), + config, + |indexing_step| tracing::debug!(update = ?indexing_step), + || must_stop_processing.get(), + )?; + + let (new_builder, _count) = + builder.remove_documents_from_db_no_batch(&to_delete)?; + builder = new_builder; + + let _ = builder.execute()?; + + Ok(tasks) } IndexOperation::Settings { index_uid: _, settings, mut tasks } => { let indexer_config = self.index_mapper.indexer_config(); @@ -1718,46 +1734,6 @@ impl IndexScheduler { } } -fn delete_document_by_filter<'a>( - wtxn: &mut RwTxn<'a>, - filter: &serde_json::Value, - indexer_config: &IndexerConfig, - must_stop_processing: MustStopProcessing, - index: &'a Index, -) -> Result { - let filter = Filter::from_json(filter)?; - Ok(if let Some(filter) = filter { - let candidates = filter.evaluate(wtxn, index).map_err(|err| match err { - milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { - Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter) - } - e => e.into(), - })?; - - let config = IndexDocumentsConfig { - update_method: IndexDocumentsMethod::ReplaceDocuments, - ..Default::default() - }; - - let mut builder = milli::update::IndexDocuments::new( - wtxn, - index, - indexer_config, - config, - |indexing_step| tracing::debug!(update = ?indexing_step), - || must_stop_processing.get(), - )?; - - let (new_builder, count) = builder.remove_documents_from_db_no_batch(&candidates)?; - builder = new_builder; - - let _ = builder.execute()?; - count - } else { - 0 - }) -} - fn edit_documents_by_function<'a>( wtxn: &mut RwTxn<'a>, filter: &Option, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 9bcd70d98..753e8c179 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1764,6 +1764,7 @@ mod tests { use crossbeam::channel::RecvTimeoutError; use file_store::File; use insta::assert_json_snapshot; + use maplit::btreeset; use meili_snap::{json_string, snapshot}; use meilisearch_auth::AuthFilter; use meilisearch_types::document_formats::DocumentFormatError; @@ -2553,6 +2554,117 @@ mod tests { snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } + #[test] + fn fail_in_process_batch_for_document_deletion() { + let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); + + use meilisearch_types::settings::{Settings, Unchecked}; + let mut new_settings: Box> = Box::default(); + new_settings.filterable_attributes = Setting::Set(btreeset!(S("catto"))); + + index_scheduler + .register( + KindWithContent::SettingsUpdate { + index_uid: S("doggos"), + new_settings, + is_deletion: false, + allow_index_creation: true, + }, + None, + false, + ) + .unwrap(); + + let content = r#"[ + { "id": 1, "doggo": "jean bob" }, + { "id": 2, "catto": "jorts" }, + { "id": 3, "doggo": "bork" } + ]"#; + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); + file.persist().unwrap(); + index_scheduler + .register( + KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }, + None, + false, + ) + .unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_setting_and_document_addition"); + + handle.advance_one_successful_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_adding_the_settings"); + handle.advance_one_successful_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_adding_the_documents"); + + index_scheduler + .register( + KindWithContent::DocumentDeletion { + index_uid: S("doggos"), + documents_ids: vec![S("1")], + }, + None, + false, + ) + .unwrap(); + // This one should not be catched by Meilisearch but it's still nice to handle it because if one day we break the filters it could happens + index_scheduler + .register( + KindWithContent::DocumentDeletionByFilter { + index_uid: S("doggos"), + filter_expr: serde_json::json!(true), + }, + None, + false, + ) + .unwrap(); + // Should fail because the ids are not filterable + index_scheduler + .register( + KindWithContent::DocumentDeletionByFilter { + index_uid: S("doggos"), + filter_expr: serde_json::json!("id = 2"), + }, + None, + false, + ) + .unwrap(); + index_scheduler + .register( + KindWithContent::DocumentDeletionByFilter { + index_uid: S("doggos"), + filter_expr: serde_json::json!("catto EXISTS"), + }, + None, + false, + ) + .unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_document_deletions"); + + // Everything should be batched together + handle.advance_one_successful_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_removing_the_documents"); + + let index = index_scheduler.index("doggos").unwrap(); + let rtxn = index.read_txn().unwrap(); + let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let field_ids = field_ids_map.ids().collect::>(); + let documents = index + .all_documents(&rtxn) + .unwrap() + .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .collect::>(); + snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents_remaining_should_only_be_bork"); + } + #[test] fn do_not_batch_task_of_different_indexes() { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_adding_the_documents.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_adding_the_documents.snap new file mode 100644 index 000000000..62e634bc5 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_adding_the_documents.snap @@ -0,0 +1,44 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData }, is_deletion: false, allow_index_creation: true }} +1 {uid: 1, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [1,] +"settingsUpdate" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,] +---------------------------------------------------------------------- +### Index Mapper: +doggos: { number_of_documents: 3, field_distribution: {"catto": 1, "doggo": 2, "id": 3} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_adding_the_settings.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_adding_the_settings.snap new file mode 100644 index 000000000..45065d8b1 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_adding_the_settings.snap @@ -0,0 +1,43 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData }, is_deletion: false, allow_index_creation: true }} +1 {uid: 1, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [1,] +succeeded [0,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [1,] +"settingsUpdate" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,] +---------------------------------------------------------------------- +### Index Mapper: +doggos: { number_of_documents: 0, field_distribution: {} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +---------------------------------------------------------------------- +### File Store: +00000000-0000-0000-0000-000000000000 + +---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_adding_the_settings_and_documents.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_adding_the_settings_and_documents.snap new file mode 100644 index 000000000..45065d8b1 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_adding_the_settings_and_documents.snap @@ -0,0 +1,43 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData }, is_deletion: false, allow_index_creation: true }} +1 {uid: 1, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [1,] +succeeded [0,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [1,] +"settingsUpdate" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,] +---------------------------------------------------------------------- +### Index Mapper: +doggos: { number_of_documents: 0, field_distribution: {} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +---------------------------------------------------------------------- +### File Store: +00000000-0000-0000-0000-000000000000 + +---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_removing_the_documents.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_removing_the_documents.snap new file mode 100644 index 000000000..82748751e --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_removing_the_documents.snap @@ -0,0 +1,56 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData }, is_deletion: false, allow_index_creation: true }} +1 {uid: 1, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +2 {uid: 2, status: succeeded, details: { received_document_ids: 1, deleted_documents: Some(1) }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1"] }} +3 {uid: 3, status: failed, error: ResponseError { code: 200, message: "Invalid type for filter subexpression: expected: String, Array, found: true.", error_code: "invalid_document_filter", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#invalid_document_filter" }, details: { original_filter: true, deleted_documents: Some(0) }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: Bool(true) }} +4 {uid: 4, status: failed, error: ResponseError { code: 200, message: "Attribute `id` is not filterable. Available filterable attributes are: `catto`.\n1:3 id = 2", error_code: "invalid_document_filter", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#invalid_document_filter" }, details: { original_filter: "id = 2", deleted_documents: Some(0) }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: String("id = 2") }} +5 {uid: 5, status: succeeded, details: { original_filter: "catto EXISTS", deleted_documents: Some(1) }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: String("catto EXISTS") }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,2,5,] +failed [3,4,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [1,] +"documentDeletion" [2,3,4,5,] +"settingsUpdate" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,2,3,4,5,] +---------------------------------------------------------------------- +### Index Mapper: +doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,3,4,5,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,3,4,5,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/documents_remaining_should_only_be_bork.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/documents_remaining_should_only_be_bork.snap new file mode 100644 index 000000000..2b56b71d1 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/documents_remaining_should_only_be_bork.snap @@ -0,0 +1,9 @@ +--- +source: index-scheduler/src/lib.rs +--- +[ + { + "id": 3, + "doggo": "bork" + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/registered_the_document_deletions.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/registered_the_document_deletions.snap new file mode 100644 index 000000000..502ff0806 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/registered_the_document_deletions.snap @@ -0,0 +1,53 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData }, is_deletion: false, allow_index_creation: true }} +1 {uid: 1, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +2 {uid: 2, status: enqueued, details: { received_document_ids: 1, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1"] }} +3 {uid: 3, status: enqueued, details: { original_filter: true, deleted_documents: None }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: Bool(true) }} +4 {uid: 4, status: enqueued, details: { original_filter: "id = 2", deleted_documents: None }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: String("id = 2") }} +5 {uid: 5, status: enqueued, details: { original_filter: "catto EXISTS", deleted_documents: None }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: String("catto EXISTS") }} +---------------------------------------------------------------------- +### Status: +enqueued [2,3,4,5,] +succeeded [0,1,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [1,] +"documentDeletion" [2,3,4,5,] +"settingsUpdate" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,2,3,4,5,] +---------------------------------------------------------------------- +### Index Mapper: +doggos: { number_of_documents: 3, field_distribution: {"catto": 1, "doggo": 2, "id": 3} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/registered_the_setting_and_document_addition.snap b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/registered_the_setting_and_document_addition.snap new file mode 100644 index 000000000..f7e5c35d3 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/registered_the_setting_and_document_addition.snap @@ -0,0 +1,39 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData }, is_deletion: false, allow_index_creation: true }} +1 {uid: 1, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [1,] +"settingsUpdate" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,] +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: +00000000-0000-0000-0000-000000000000 + +---------------------------------------------------------------------- diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index b33826141..6f29ba10c 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -13,11 +13,10 @@ pub mod search_queue; use std::fs::File; use std::io::{BufReader, BufWriter}; -use std::num::NonZeroUsize; use std::path::Path; use std::str::FromStr; use std::sync::Arc; -use std::thread::{self, available_parallelism}; +use std::thread; use std::time::Duration; use actix_cors::Cors; @@ -118,6 +117,7 @@ pub type LogStderrType = tracing_subscriber::filter::Filtered< pub fn create_app( index_scheduler: Data, auth_controller: Data, + search_queue: Data, opt: Opt, logs: (LogRouteHandle, LogStderrHandle), analytics: Arc, @@ -137,6 +137,7 @@ pub fn create_app( s, index_scheduler.clone(), auth_controller.clone(), + search_queue.clone(), &opt, logs, analytics.clone(), @@ -469,19 +470,16 @@ pub fn configure_data( config: &mut web::ServiceConfig, index_scheduler: Data, auth: Data, + search_queue: Data, opt: &Opt, (logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle), analytics: Arc, ) { - let search_queue = SearchQueue::new( - opt.experimental_search_queue_size, - available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()), - ); let http_payload_size_limit = opt.http_payload_size_limit.as_u64() as usize; config .app_data(index_scheduler) .app_data(auth) - .app_data(web::Data::new(search_queue)) + .app_data(search_queue) .app_data(web::Data::from(analytics)) .app_data(web::Data::new(logs_route)) .app_data(web::Data::new(logs_stderr)) diff --git a/meilisearch/src/main.rs b/meilisearch/src/main.rs index 2e70b4eb7..b66bfc5b8 100644 --- a/meilisearch/src/main.rs +++ b/meilisearch/src/main.rs @@ -1,8 +1,10 @@ use std::env; use std::io::{stderr, LineWriter, Write}; +use std::num::NonZeroUsize; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; +use std::thread::available_parallelism; use actix_web::http::KeepAlive; use actix_web::web::Data; @@ -11,6 +13,7 @@ use index_scheduler::IndexScheduler; use is_terminal::IsTerminal; use meilisearch::analytics::Analytics; use meilisearch::option::LogMode; +use meilisearch::search_queue::SearchQueue; use meilisearch::{ analytics, create_app, setup_meilisearch, LogRouteHandle, LogRouteType, LogStderrHandle, LogStderrType, Opt, SubscriberForSecondLayer, @@ -148,11 +151,17 @@ async fn run_http( let opt_clone = opt.clone(); let index_scheduler = Data::from(index_scheduler); let auth_controller = Data::from(auth_controller); + let search_queue = SearchQueue::new( + opt.experimental_search_queue_size, + available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()), + ); + let search_queue = Data::new(search_queue); let http_server = HttpServer::new(move || { create_app( index_scheduler.clone(), auth_controller.clone(), + search_queue.clone(), opt.clone(), logs.clone(), analytics.clone(), diff --git a/meilisearch/src/routes/indexes/facet_search.rs b/meilisearch/src/routes/indexes/facet_search.rs index a648987ca..1df80711d 100644 --- a/meilisearch/src/routes/indexes/facet_search.rs +++ b/meilisearch/src/routes/indexes/facet_search.rs @@ -81,7 +81,7 @@ pub async fn search( let index = index_scheduler.index(&index_uid)?; let features = index_scheduler.features(); let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?; - let _permit = search_queue.try_get_search_permit().await?; + let permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || { perform_facet_search( &index, @@ -93,7 +93,9 @@ pub async fn search( locales, ) }) - .await?; + .await; + permit.drop().await; + let search_result = search_result?; if let Ok(ref search_result) = search_result { aggregate.succeed(search_result); diff --git a/meilisearch/src/routes/indexes/search.rs b/meilisearch/src/routes/indexes/search.rs index e60f95948..362bc9937 100644 --- a/meilisearch/src/routes/indexes/search.rs +++ b/meilisearch/src/routes/indexes/search.rs @@ -233,11 +233,13 @@ pub async fn search_with_url_query( let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?; - let _permit = search_queue.try_get_search_permit().await?; + let permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || { perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features()) }) - .await?; + .await; + permit.drop().await; + let search_result = search_result?; if let Ok(ref search_result) = search_result { aggregate.succeed(search_result); } @@ -276,11 +278,13 @@ pub async fn search_with_post( let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?; - let _permit = search_queue.try_get_search_permit().await?; + let permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || { perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features()) }) - .await?; + .await; + permit.drop().await; + let search_result = search_result?; if let Ok(ref search_result) = search_result { aggregate.succeed(search_result); if search_result.degraded { diff --git a/meilisearch/src/routes/multi_search.rs b/meilisearch/src/routes/multi_search.rs index b8822488f..5fcb868c6 100644 --- a/meilisearch/src/routes/multi_search.rs +++ b/meilisearch/src/routes/multi_search.rs @@ -39,7 +39,7 @@ pub async fn multi_search_with_post( ) -> Result { // Since we don't want to process half of the search requests and then get a permit refused // we're going to get one permit for the whole duration of the multi-search request. - let _permit = search_queue.try_get_search_permit().await?; + let permit = search_queue.try_get_search_permit().await?; let federated_search = params.into_inner(); @@ -81,6 +81,7 @@ pub async fn multi_search_with_post( perform_federated_search(&index_scheduler, queries, federation, features) }) .await; + permit.drop().await; if let Ok(Ok(_)) = search_result { multi_aggregate.succeed(); @@ -143,6 +144,7 @@ pub async fn multi_search_with_post( Ok(search_results) } .await; + permit.drop().await; if search_results.is_ok() { multi_aggregate.succeed(); diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index 4f6dccc42..195fa1b6f 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -18,6 +18,7 @@ //! And should drop the Permit only once you have freed all the RAM consumed by the method. use std::num::NonZeroUsize; +use std::time::Duration; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; @@ -29,16 +30,31 @@ use crate::error::MeilisearchHttpError; pub struct SearchQueue { sender: mpsc::Sender>, capacity: usize, + /// If we have waited longer than this to get a permit, we should abort the search request entirely. + /// The client probably already closed the connection, but we have no way to find out. + time_to_abort: Duration, } /// You should only run search requests while holding this permit. /// Once it's dropped, a new search request will be able to process. +/// You should always try to drop the permit yourself calling the `drop` async method on it. #[derive(Debug)] pub struct Permit { sender: mpsc::Sender<()>, } +impl Permit { + /// Drop the permit giving back on permit to the search queue. + pub async fn drop(self) { + // if the channel is closed then the whole instance is down + let _ = self.sender.send(()).await; + } +} + impl Drop for Permit { + /// The implicit drop implementation can still be called in multiple cases: + /// - We forgot to call the explicit one somewhere => this should be fixed on our side asap + /// - The future is cancelled while running and the permit dropped with it fn drop(&mut self) { let sender = self.sender.clone(); // if the channel is closed then the whole instance is down @@ -53,7 +69,11 @@ impl SearchQueue { let (sender, receiver) = mpsc::channel(1); tokio::task::spawn(Self::run(capacity, paralellism, receiver)); - Self { sender, capacity } + Self { sender, capacity, time_to_abort: Duration::from_secs(60) } + } + + pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self { + Self { time_to_abort, ..self } } /// This function is the main loop, it's in charge on scheduling which search request should execute first and @@ -119,9 +139,23 @@ impl SearchQueue { /// Returns a search `Permit`. /// It should be dropped as soon as you've freed all the RAM associated with the search request being processed. pub async fn try_get_search_permit(&self) -> Result { + let now = std::time::Instant::now(); let (sender, receiver) = oneshot::channel(); self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; - receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity)) + let permit = receiver + .await + .map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))?; + + // If we've been for more than one minute to get a search permit, it's better to simply + // abort the search request than spending time processing something were the client + // most certainly exited or got a timeout a long time ago. + // We may find a better solution in https://github.com/actix/actix-web/issues/3462. + if now.elapsed() > self.time_to_abort { + permit.drop().await; + Err(MeilisearchHttpError::TooManySearchRequests(self.capacity)) + } else { + Ok(permit) + } } /// Returns `Ok(())` if everything seems normal. diff --git a/meilisearch/tests/common/server.rs b/meilisearch/tests/common/server.rs index ab3717e22..6d331ebbc 100644 --- a/meilisearch/tests/common/server.rs +++ b/meilisearch/tests/common/server.rs @@ -11,13 +11,11 @@ use actix_web::http::StatusCode; use byte_unit::{Byte, Unit}; use clap::Parser; use meilisearch::option::{IndexerOpts, MaxMemory, MaxThreads, Opt}; -use meilisearch::{analytics, create_app, setup_meilisearch, SubscriberForSecondLayer}; +use meilisearch::setup_meilisearch; use once_cell::sync::Lazy; use tempfile::TempDir; use tokio::sync::OnceCell; use tokio::time::sleep; -use tracing::level_filters::LevelFilter; -use tracing_subscriber::Layer; use uuid::Uuid; use super::index::Index; @@ -183,7 +181,7 @@ impl Server { let options = default_settings(dir.path()); let (index_scheduler, auth) = setup_meilisearch(&options).unwrap(); - let service = Service { index_scheduler, auth, options, api_key: None }; + let service = Service { index_scheduler, auth, api_key: None, options }; Server { service, _dir: Some(dir), _marker: PhantomData } } @@ -263,28 +261,7 @@ impl Server { Response = ServiceResponse, Error = actix_web::Error, > { - let (_route_layer, route_layer_handle) = - tracing_subscriber::reload::Layer::new(None.with_filter( - tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF), - )); - let (_stderr_layer, stderr_layer_handle) = tracing_subscriber::reload::Layer::new( - (Box::new( - tracing_subscriber::fmt::layer() - .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE), - ) - as Box + Send + Sync>) - .with_filter(tracing_subscriber::filter::Targets::new()), - ); - - actix_web::test::init_service(create_app( - self.service.index_scheduler.clone().into(), - self.service.auth.clone().into(), - self.service.options.clone(), - (route_layer_handle, stderr_layer_handle), - analytics::MockAnalytics::new(&self.service.options), - true, - )) - .await + self.service.init_web_app().await } pub async fn list_api_keys(&self, params: &str) -> (Value, StatusCode) { diff --git a/meilisearch/tests/common/service.rs b/meilisearch/tests/common/service.rs index cd78253aa..8addbacf8 100644 --- a/meilisearch/tests/common/service.rs +++ b/meilisearch/tests/common/service.rs @@ -1,10 +1,15 @@ +use std::num::NonZeroUsize; use std::sync::Arc; +use actix_web::body::MessageBody; +use actix_web::dev::ServiceResponse; use actix_web::http::header::ContentType; use actix_web::http::StatusCode; use actix_web::test; use actix_web::test::TestRequest; +use actix_web::web::Data; use index_scheduler::IndexScheduler; +use meilisearch::search_queue::SearchQueue; use meilisearch::{analytics, create_app, Opt, SubscriberForSecondLayer}; use meilisearch_auth::AuthController; use tracing::level_filters::LevelFilter; @@ -106,7 +111,13 @@ impl Service { self.request(req).await } - pub async fn request(&self, mut req: test::TestRequest) -> (Value, StatusCode) { + pub async fn init_web_app( + &self, + ) -> impl actix_web::dev::Service< + actix_http::Request, + Response = ServiceResponse, + Error = actix_web::Error, + > { let (_route_layer, route_layer_handle) = tracing_subscriber::reload::Layer::new(None.with_filter( tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF), @@ -119,16 +130,25 @@ impl Service { as Box + Send + Sync>) .with_filter(tracing_subscriber::filter::Targets::new()), ); + let search_queue = SearchQueue::new( + self.options.experimental_search_queue_size, + NonZeroUsize::new(1).unwrap(), + ); - let app = test::init_service(create_app( + actix_web::test::init_service(create_app( self.index_scheduler.clone().into(), self.auth.clone().into(), + Data::new(search_queue), self.options.clone(), (route_layer_handle, stderr_layer_handle), analytics::MockAnalytics::new(&self.options), true, )) - .await; + .await + } + + pub async fn request(&self, mut req: test::TestRequest) -> (Value, StatusCode) { + let app = self.init_web_app().await; if let Some(api_key) = &self.api_key { req = req.insert_header(("Authorization", ["Bearer ", api_key].concat())); diff --git a/meilisearch/tests/content_type.rs b/meilisearch/tests/content_type.rs index 0fc5b26ac..5ef8a4dce 100644 --- a/meilisearch/tests/content_type.rs +++ b/meilisearch/tests/content_type.rs @@ -6,6 +6,7 @@ use actix_web::test; use crate::common::{Server, Value}; +#[derive(Debug)] enum HttpVerb { Put, Patch, @@ -80,7 +81,7 @@ async fn error_json_bad_content_type() { let status_code = res.status(); let body = test::read_body(res).await; let response: Value = serde_json::from_slice(&body).unwrap_or_default(); - assert_eq!(status_code, 415, "calling the route `{}` without content-type is supposed to throw a bad media type error", route); + assert_eq!(status_code, 415, "calling the route `{verb:?} {route}` without content-type is supposed to throw a bad media type error:\n{}", String::from_utf8_lossy(&body)); assert_eq!( response, json!({ diff --git a/meilisearch/tests/logs/mod.rs b/meilisearch/tests/logs/mod.rs index 3b36d78f8..9f4649dca 100644 --- a/meilisearch/tests/logs/mod.rs +++ b/meilisearch/tests/logs/mod.rs @@ -1,10 +1,13 @@ mod error; +use std::num::NonZeroUsize; use std::rc::Rc; use std::str::FromStr; use actix_web::http::header::ContentType; +use actix_web::web::Data; use meili_snap::snapshot; +use meilisearch::search_queue::SearchQueue; use meilisearch::{analytics, create_app, Opt, SubscriberForSecondLayer}; use tracing::level_filters::LevelFilter; use tracing_subscriber::layer::SubscriberExt; @@ -40,10 +43,15 @@ async fn basic_test_log_stream_route() { .with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE) .with_filter(tracing_subscriber::filter::LevelFilter::from_str("OFF").unwrap()), ); + let search_queue = SearchQueue::new( + server.service.options.experimental_search_queue_size, + NonZeroUsize::new(1).unwrap(), + ); let app = actix_web::test::init_service(create_app( server.service.index_scheduler.clone().into(), server.service.auth.clone().into(), + Data::new(search_queue), server.service.options.clone(), (route_layer_handle, stderr_layer_handle), analytics::MockAnalytics::new(&server.service.options), diff --git a/meilisearch/tests/search/search_queue.rs b/meilisearch/tests/search/search_queue.rs index 3b4fbf252..498b741e5 100644 --- a/meilisearch/tests/search/search_queue.rs +++ b/meilisearch/tests/search/search_queue.rs @@ -37,6 +37,43 @@ async fn search_queue_register() { .unwrap(); } +#[actix_rt::test] +async fn search_queue_register_with_explicit_drop() { + let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap()); + + // First, use all the cores + let permit1 = queue.try_get_search_permit().await.unwrap(); + let _permit2 = queue.try_get_search_permit().await.unwrap(); + + // If we free one spot we should be able to register one new search + permit1.drop().await; + + let permit3 = queue.try_get_search_permit().await.unwrap(); + + // And again + permit3.drop().await; + + let _permit4 = queue.try_get_search_permit().await.unwrap(); +} + +#[actix_rt::test] +async fn search_queue_register_with_time_to_abort() { + let queue = Arc::new( + SearchQueue::new(1, NonZeroUsize::new(1).unwrap()) + .with_time_to_abort(Duration::from_secs(1)), + ); + + // First, use all the cores + let permit1 = queue.try_get_search_permit().await.unwrap(); + let q = queue.clone(); + let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await }); + tokio::time::sleep(Duration::from_secs(1)).await; + permit1.drop().await; + let ret = permit2.await.unwrap(); + + snapshot!(ret.unwrap_err(), @"Too many search requests running at the same time: 1. Retry after 10s."); +} + #[actix_rt::test] async fn wait_till_cores_are_available() { let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));