diff --git a/Cargo.lock b/Cargo.lock index a2f70ff4b..a6cc41fe2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,7 +410,7 @@ checksum = "b645a089122eccb6111b4f81cbc1a49f5900ac4666bb93ac027feaecf15607bf" [[package]] name = "benchmarks" -version = "1.1.0" +version = "1.1.1" dependencies = [ "anyhow", "bytes", @@ -1150,7 +1150,7 @@ dependencies = [ [[package]] name = "dump" -version = "1.1.0" +version = "1.1.1" dependencies = [ "anyhow", "big_s", @@ -1371,7 +1371,7 @@ dependencies = [ [[package]] name = "file-store" -version = "1.1.0" +version = "1.1.1" dependencies = [ "faux", "tempfile", @@ -1393,7 +1393,7 @@ dependencies = [ [[package]] name = "filter-parser" -version = "1.1.0" +version = "1.1.1" dependencies = [ "insta", "nom", @@ -1413,7 +1413,7 @@ dependencies = [ [[package]] name = "flatten-serde-json" -version = "1.1.0" +version = "1.1.1" dependencies = [ "criterion", "serde_json", @@ -1890,7 +1890,7 @@ dependencies = [ [[package]] name = "index-scheduler" -version = "1.1.0" +version = "1.1.1" dependencies = [ "anyhow", "big_s", @@ -2049,7 +2049,7 @@ dependencies = [ [[package]] name = "json-depth-checker" -version = "1.1.0" +version = "1.1.1" dependencies = [ "criterion", "serde_json", @@ -2445,7 +2445,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "meili-snap" -version = "1.1.0" +version = "1.1.1" dependencies = [ "insta", "md5", @@ -2454,7 +2454,7 @@ dependencies = [ [[package]] name = "meilisearch" -version = "1.1.0" +version = "1.1.1" dependencies = [ "actix-cors", "actix-http", @@ -2542,7 +2542,7 @@ dependencies = [ [[package]] name = "meilisearch-auth" -version = "1.1.0" +version = "1.1.1" dependencies = [ "base64 0.13.1", "enum-iterator", @@ -2561,7 +2561,7 @@ dependencies = [ [[package]] name = "meilisearch-types" -version = "1.1.0" +version = "1.1.1" dependencies = [ "actix-web", "anyhow", @@ -2615,7 +2615,7 @@ dependencies = [ [[package]] name = "milli" -version = "1.1.0" +version = "1.1.1" dependencies = [ "big_s", "bimap", @@ -2969,7 +2969,7 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" [[package]] name = "permissive-json-pointer" -version = "1.1.0" +version = "1.1.1" dependencies = [ "big_s", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index b1f475410..c8d4dd1ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ members = [ ] [workspace.package] -version = "1.1.0" +version = "1.1.1" authors = ["Quentin de Quelen ", "Clément Renault "] description = "Meilisearch HTTP server" homepage = "https://meilisearch.com" diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 31634237f..24625a7fb 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -311,18 +311,9 @@ impl BatchKind { }) } ( - BatchKind::DocumentOperation { method, allow_index_creation, primary_key, mut operation_ids }, + this @ BatchKind::DocumentOperation { .. }, K::DocumentDeletion, - ) => { - operation_ids.push(id); - - Continue(BatchKind::DocumentOperation { - method, - allow_index_creation, - primary_key, - operation_ids, - }) - } + ) => Break(this), // but we can't autobatch documents if it's not the same kind // this match branch MUST be AFTER the previous one ( @@ -345,35 +336,7 @@ impl BatchKind { deletion_ids.push(id); Continue(BatchKind::DocumentClear { ids: deletion_ids }) } - // we can autobatch the deletion and import if the index already exists - ( - BatchKind::DocumentDeletion { mut deletion_ids }, - K::DocumentImport { method, allow_index_creation, primary_key } - ) if index_already_exists => { - deletion_ids.push(id); - - Continue(BatchKind::DocumentOperation { - method, - allow_index_creation, - primary_key, - operation_ids: deletion_ids, - }) - } - // we can autobatch the deletion and import if both can't create an index - ( - BatchKind::DocumentDeletion { mut deletion_ids }, - K::DocumentImport { method, allow_index_creation, primary_key } - ) if !allow_index_creation => { - deletion_ids.push(id); - - Continue(BatchKind::DocumentOperation { - method, - allow_index_creation, - primary_key, - operation_ids: deletion_ids, - }) - } - // we can't autobatch a deletion and an import if the index does not exists but would be created by an addition + // we can't autobatch a deletion and an import ( this @ BatchKind::DocumentDeletion { .. }, K::DocumentImport { .. } @@ -674,36 +637,36 @@ mod tests { debug_snapshot!(autobatch_from(false,None, [settings(false)]), @"Some((Settings { allow_index_creation: false, settings_ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [settings(false), settings(false), settings(false)]), @"Some((Settings { allow_index_creation: false, settings_ids: [0, 1, 2] }, false))"); - // We can autobatch document addition with document deletion - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); - debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); - debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - // And the other way around - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + // We can't autobatch document addition with document deletion + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + // we also can't do the only way around + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); } #[test] diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index 3264bda7a..7b884e0a4 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -61,6 +61,8 @@ pub enum Error { SwapDuplicateIndexesFound(Vec), #[error("Index `{0}` not found.")] SwapIndexNotFound(String), + #[error("Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.")] + NoSpaceLeftInTaskQueue, #[error( "Indexes {} not found.", .0.iter().map(|s| format!("`{}`", s)).collect::>().join(", ") @@ -152,6 +154,8 @@ impl ErrorCode for Error { Error::TaskNotFound(_) => Code::TaskNotFound, Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters, Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters, + // TODO: not sure of the Code to use + Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice, Error::Dump(e) => e.error_code(), Error::Milli(e) => e.error_code(), Error::ProcessBatchPanicked => Code::Internal, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 13c3c6bfe..d713fca17 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -828,6 +828,13 @@ impl IndexScheduler { pub fn register(&self, kind: KindWithContent) -> Result { let mut wtxn = self.env.write_txn()?; + // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task + if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) + && (self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64 > 50 + { + return Err(Error::NoSpaceLeftInTaskQueue); + } + let mut task = Task { uid: self.next_task_id(&wtxn)?, enqueued_at: OffsetDateTime::now_utc(), @@ -1936,105 +1943,6 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "both_task_succeeded"); } - #[test] - fn document_addition_and_document_deletion() { - let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); - - let content = r#"[ - { "id": 1, "doggo": "jean bob" }, - { "id": 2, "catto": "jorts" }, - { "id": 3, "doggo": "bork" } - ]"#; - - let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); - file.persist().unwrap(); - index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) - .unwrap(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); - index_scheduler - .register(KindWithContent::DocumentDeletion { - index_uid: S("doggos"), - documents_ids: vec![S("1"), S("2")], - }) - .unwrap(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); - - handle.advance_one_successful_batch(); // The addition AND deletion should've been batched together - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_processing_the_batch"); - - let index = index_scheduler.index("doggos").unwrap(); - let rtxn = index.read_txn().unwrap(); - let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let field_ids = field_ids_map.ids().collect::>(); - let documents = index - .all_documents(&rtxn) - .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) - .collect::>(); - snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); - } - - #[test] - fn document_deletion_and_document_addition() { - let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); - index_scheduler - .register(KindWithContent::DocumentDeletion { - index_uid: S("doggos"), - documents_ids: vec![S("1"), S("2")], - }) - .unwrap(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); - - let content = r#"[ - { "id": 1, "doggo": "jean bob" }, - { "id": 2, "catto": "jorts" }, - { "id": 3, "doggo": "bork" } - ]"#; - - let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); - file.persist().unwrap(); - index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) - .unwrap(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); - - // The deletion should have failed because it can't create an index - handle.advance_one_failed_batch(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_failing_the_deletion"); - - // The addition should works - handle.advance_one_successful_batch(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_last_successful_addition"); - - let index = index_scheduler.index("doggos").unwrap(); - let rtxn = index.read_txn().unwrap(); - let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let field_ids = field_ids_map.ids().collect::>(); - let documents = index - .all_documents(&rtxn) - .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) - .collect::>(); - snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); - } - #[test] fn do_not_batch_task_of_different_indexes() { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index 0c6457e7a..563bc3496 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -68,7 +68,7 @@ const DEFAULT_LOG_EVERY_N: usize = 100_000; // The actual size of the virtual address space is computed at startup to determine how many 2TiB indexes can be // opened simultaneously. pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB -pub const TASK_DB_SIZE: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB +pub const TASK_DB_SIZE: u64 = 20 * 1024 * 1024 * 1024; // 20 GiB #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] #[serde(rename_all = "UPPERCASE")] diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index e9b5a2325..40093dc41 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1,11 +1,14 @@ mod errors; +use byte_unit::{Byte, ByteUnit}; use meili_snap::insta::assert_json_snapshot; +use meili_snap::{json_string, snapshot}; use serde_json::json; +use tempfile::TempDir; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use crate::common::Server; +use crate::common::{default_settings, Server}; #[actix_rt::test] async fn error_get_unexisting_task_status() { @@ -1000,3 +1003,117 @@ async fn test_summarized_dump_creation() { } "###); } + +#[actix_web::test] +async fn test_task_queue_is_full() { + let dir = TempDir::new().unwrap(); + let mut options = default_settings(dir.path()); + options.max_task_db_size = Byte::from_unit(500.0, ByteUnit::B).unwrap(); + + let server = Server::new_with_options(options).await.unwrap(); + + // the first task should be enqueued without issue + let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; + snapshot!(code, @"202 Accepted"); + snapshot!(json_string!(result, { ".enqueuedAt" => "[date]" }), @r###" + { + "taskUid": 0, + "indexUid": "doggo", + "status": "enqueued", + "type": "indexCreation", + "enqueuedAt": "[date]" + } + "###); + + loop { + let (res, code) = server.create_index(json!({ "uid": "doggo" })).await; + if code == 422 { + break; + } + if res["taskUid"] == json!(null) { + panic!( + "Encountered the strange case:\n{}", + serde_json::to_string_pretty(&res).unwrap() + ); + } + } + + let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; + snapshot!(code, @"422 Unprocessable Entity"); + snapshot!(json_string!(result), @r###" + { + "message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.", + "code": "no_space_left_on_device", + "type": "system", + "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" + } + "###); + + // But we should still be able to register tasks deletion IF they delete something + let (result, code) = server.delete_tasks("uids=*").await; + snapshot!(code, @"200 OK"); + snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" + { + "taskUid": "uid", + "indexUid": null, + "status": "enqueued", + "type": "taskDeletion", + "enqueuedAt": "[date]" + } + "###); + + let result = server.wait_task(result["taskUid"].as_u64().unwrap()).await; + snapshot!(json_string!(result["status"]), @r###""succeeded""###); + + // Now we should be able to register tasks again + let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; + snapshot!(code, @"202 Accepted"); + snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" + { + "taskUid": "uid", + "indexUid": "doggo", + "status": "enqueued", + "type": "indexCreation", + "enqueuedAt": "[date]" + } + "###); + + // we're going to fill up the queue once again + loop { + let (res, code) = server.delete_tasks("uids=0").await; + if code == 422 { + break; + } + if res["taskUid"] == json!(null) { + panic!( + "Encountered the strange case:\n{}", + serde_json::to_string_pretty(&res).unwrap() + ); + } + } + + // But we should NOT be able to register this task because it doesn't match any tasks + let (result, code) = server.delete_tasks("uids=0").await; + snapshot!(code, @"422 Unprocessable Entity"); + snapshot!(json_string!(result), @r###" + { + "message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.", + "code": "no_space_left_on_device", + "type": "system", + "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" + } + "###); + + // The deletion still works + let (result, code) = server.delete_tasks("uids=*").await; + snapshot!(code, @"200 OK"); + snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" + { + "taskUid": "uid", + "indexUid": null, + "status": "enqueued", + "type": "taskDeletion", + "enqueuedAt": "[date]" + } + "###); +}