From a94e78ffb051193ece752a9dd19858a05922f706 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 12 Apr 2023 10:53:00 +0200 Subject: [PATCH 01/10] Disable autobatching of additions and deletions --- index-scheduler/src/autobatcher.rs | 103 +++++++++-------------------- index-scheduler/src/lib.rs | 99 --------------------------- 2 files changed, 33 insertions(+), 169 deletions(-) diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 31634237f..24625a7fb 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -311,18 +311,9 @@ impl BatchKind { }) } ( - BatchKind::DocumentOperation { method, allow_index_creation, primary_key, mut operation_ids }, + this @ BatchKind::DocumentOperation { .. }, K::DocumentDeletion, - ) => { - operation_ids.push(id); - - Continue(BatchKind::DocumentOperation { - method, - allow_index_creation, - primary_key, - operation_ids, - }) - } + ) => Break(this), // but we can't autobatch documents if it's not the same kind // this match branch MUST be AFTER the previous one ( @@ -345,35 +336,7 @@ impl BatchKind { deletion_ids.push(id); Continue(BatchKind::DocumentClear { ids: deletion_ids }) } - // we can autobatch the deletion and import if the index already exists - ( - BatchKind::DocumentDeletion { mut deletion_ids }, - K::DocumentImport { method, allow_index_creation, primary_key } - ) if index_already_exists => { - deletion_ids.push(id); - - Continue(BatchKind::DocumentOperation { - method, - allow_index_creation, - primary_key, - operation_ids: deletion_ids, - }) - } - // we can autobatch the deletion and import if both can't create an index - ( - BatchKind::DocumentDeletion { mut deletion_ids }, - K::DocumentImport { method, allow_index_creation, primary_key } - ) if !allow_index_creation => { - deletion_ids.push(id); - - Continue(BatchKind::DocumentOperation { - method, - allow_index_creation, - primary_key, - operation_ids: deletion_ids, - }) - } - // we can't autobatch a deletion and an import if the index does not exists but would be created by an addition + // we can't autobatch a deletion and an import ( this @ BatchKind::DocumentDeletion { .. }, K::DocumentImport { .. } @@ -674,36 +637,36 @@ mod tests { debug_snapshot!(autobatch_from(false,None, [settings(false)]), @"Some((Settings { allow_index_creation: false, settings_ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [settings(false), settings(false), settings(false)]), @"Some((Settings { allow_index_creation: false, settings_ids: [0, 1, 2] }, false))"); - // We can autobatch document addition with document deletion - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); - debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); - debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - // And the other way around - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + // We can't autobatch document addition with document deletion + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + // we also can't do the only way around + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); } #[test] diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index b402985e3..0f82fb47d 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1869,105 +1869,6 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "both_task_succeeded"); } - #[test] - fn document_addition_and_document_deletion() { - let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); - - let content = r#"[ - { "id": 1, "doggo": "jean bob" }, - { "id": 2, "catto": "jorts" }, - { "id": 3, "doggo": "bork" } - ]"#; - - let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); - file.persist().unwrap(); - index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) - .unwrap(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); - index_scheduler - .register(KindWithContent::DocumentDeletion { - index_uid: S("doggos"), - documents_ids: vec![S("1"), S("2")], - }) - .unwrap(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); - - handle.advance_one_successful_batch(); // The addition AND deletion should've been batched together - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_processing_the_batch"); - - let index = index_scheduler.index("doggos").unwrap(); - let rtxn = index.read_txn().unwrap(); - let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let field_ids = field_ids_map.ids().collect::>(); - let documents = index - .all_documents(&rtxn) - .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) - .collect::>(); - snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); - } - - #[test] - fn document_deletion_and_document_addition() { - let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); - index_scheduler - .register(KindWithContent::DocumentDeletion { - index_uid: S("doggos"), - documents_ids: vec![S("1"), S("2")], - }) - .unwrap(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); - - let content = r#"[ - { "id": 1, "doggo": "jean bob" }, - { "id": 2, "catto": "jorts" }, - { "id": 3, "doggo": "bork" } - ]"#; - - let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); - file.persist().unwrap(); - index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: true, - }) - .unwrap(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); - - // The deletion should have failed because it can't create an index - handle.advance_one_failed_batch(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_failing_the_deletion"); - - // The addition should works - handle.advance_one_successful_batch(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_last_successful_addition"); - - let index = index_scheduler.index("doggos").unwrap(); - let rtxn = index.read_txn().unwrap(); - let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let field_ids = field_ids_map.ids().collect::>(); - let documents = index - .all_documents(&rtxn) - .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) - .collect::>(); - snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); - } - #[test] fn do_not_batch_task_of_different_indexes() { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); From be69ab320dbf13a859ef07a00433089a809d08c4 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 6 Apr 2023 18:26:27 +0200 Subject: [PATCH 02/10] stops receiving tasks once the task queue is full --- index-scheduler/src/error.rs | 4 ++ index-scheduler/src/lib.rs | 7 +++ meilisearch/src/option.rs | 2 +- meilisearch/tests/tasks/mod.rs | 91 +++++++++++++++++++++++++++++++++- 4 files changed, 102 insertions(+), 2 deletions(-) diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index 3264bda7a..14623871b 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -61,6 +61,8 @@ pub enum Error { SwapDuplicateIndexesFound(Vec), #[error("Index `{0}` not found.")] SwapIndexNotFound(String), + #[error("No space left in database. Free some space by deleting tasks.")] + NoSpaceLeftInTaskQueue, #[error( "Indexes {} not found.", .0.iter().map(|s| format!("`{}`", s)).collect::>().join(", ") @@ -152,6 +154,8 @@ impl ErrorCode for Error { Error::TaskNotFound(_) => Code::TaskNotFound, Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters, Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters, + // TODO: not sure of the Code to use + Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice, Error::Dump(e) => e.error_code(), Error::Milli(e) => e.error_code(), Error::ProcessBatchPanicked => Code::Internal, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index b402985e3..692888404 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -820,6 +820,13 @@ impl IndexScheduler { pub fn register(&self, kind: KindWithContent) -> Result { let mut wtxn = self.env.write_txn()?; + // if the task doesn't delete anything and 90% of the task queue is full, we must refuse to enqueue the incomming task + if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) + && (self.env.real_disk_size()? * 100) / self.env.map_size()? as u64 > 90 + { + return Err(Error::NoSpaceLeftInTaskQueue); + } + let mut task = Task { uid: self.next_task_id(&wtxn)?, enqueued_at: OffsetDateTime::now_utc(), diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index 0c6457e7a..d419e0875 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -68,7 +68,7 @@ const DEFAULT_LOG_EVERY_N: usize = 100_000; // The actual size of the virtual address space is computed at startup to determine how many 2TiB indexes can be // opened simultaneously. pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB -pub const TASK_DB_SIZE: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB +pub const TASK_DB_SIZE: u64 = 11 * 1024 * 1024 * 1024; // 11 GiB #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] #[serde(rename_all = "UPPERCASE")] diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index e9b5a2325..6124de4b7 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1,11 +1,14 @@ mod errors; +use byte_unit::{Byte, ByteUnit}; use meili_snap::insta::assert_json_snapshot; +use meili_snap::{json_string, snapshot}; use serde_json::json; +use tempfile::TempDir; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use crate::common::Server; +use crate::common::{default_settings, Server}; #[actix_rt::test] async fn error_get_unexisting_task_status() { @@ -1000,3 +1003,89 @@ async fn test_summarized_dump_creation() { } "###); } + +#[actix_web::test] +async fn test_task_queue_is_full() { + let dir = TempDir::new().unwrap(); + let mut options = default_settings(dir.path()); + options.max_task_db_size = Byte::from_unit(500.0, ByteUnit::B).unwrap(); + + let server = Server::new_with_options(options).await.unwrap(); + + // the first task should be enqueued without issue + let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; + snapshot!(code, @"202 Accepted"); + snapshot!(json_string!(result, { ".enqueuedAt" => "[date]" }), @r###" + { + "taskUid": 0, + "indexUid": "doggo", + "status": "enqueued", + "type": "indexCreation", + "enqueuedAt": "[date]" + } + "###); + + loop { + let (res, _code) = server.create_index(json!({ "uid": "doggo" })).await; + if res["taskUid"] == json!(null) { + break; + } + } + + let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; + snapshot!(code, @"422 Unprocessable Entity"); + snapshot!(json_string!(result), @r###" + { + "message": "No space left in database. Free some space by deleting tasks.", + "code": "no_space_left_on_device", + "type": "system", + "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" + } + "###); + + // But we should still be able to register tasks deletion IF they delete something + let (result, code) = server.delete_tasks("uids=0").await; + snapshot!(code, @"200 OK"); + snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" + { + "taskUid": "uid", + "indexUid": null, + "status": "enqueued", + "type": "taskDeletion", + "enqueuedAt": "[date]" + } + "###); + + // we're going to fill up the queue once again + loop { + let (res, _code) = server.create_index(json!({ "uid": "doggo" })).await; + if res["taskUid"] == json!(null) { + break; + } + } + + // But we should NOT be able to register this task because it doesn't match any tasks + let (result, code) = server.delete_tasks("uids=0").await; + snapshot!(code, @"422 Unprocessable Entity"); + snapshot!(json_string!(result), @r###" + { + "message": "No space left in database. Free some space by deleting tasks.", + "code": "no_space_left_on_device", + "type": "system", + "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" + } + "###); + + // The deletion still works + let (result, code) = server.delete_tasks("uids=*").await; + snapshot!(code, @"200 OK"); + snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" + { + "taskUid": "uid", + "indexUid": null, + "status": "enqueued", + "type": "taskDeletion", + "enqueuedAt": "[date]" + } + "###); +} From 9350a7b01739bdb10ba3e017705a35f449c48b8c Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 11 Apr 2023 16:30:56 +0200 Subject: [PATCH 03/10] improve the test and try to understand the issue happening on windows --- meilisearch/tests/tasks/mod.rs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index 6124de4b7..a9ed5c00f 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1026,10 +1026,16 @@ async fn test_task_queue_is_full() { "###); loop { - let (res, _code) = server.create_index(json!({ "uid": "doggo" })).await; - if res["taskUid"] == json!(null) { + let (res, code) = server.create_index(json!({ "uid": "doggo" })).await; + if code == 422 { break; } + if res["taskUid"] == json!(null) { + panic!( + "Encountered the strange case:\n{}", + serde_json::to_string_pretty(&res).unwrap() + ); + } } let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; @@ -1058,10 +1064,16 @@ async fn test_task_queue_is_full() { // we're going to fill up the queue once again loop { - let (res, _code) = server.create_index(json!({ "uid": "doggo" })).await; - if res["taskUid"] == json!(null) { + let (res, code) = server.create_index(json!({ "uid": "doggo" })).await; + if code == 422 { break; } + if res["taskUid"] == json!(null) { + panic!( + "Encountered the strange case:\n{}", + serde_json::to_string_pretty(&res).unwrap() + ); + } } // But we should NOT be able to register this task because it doesn't match any tasks From b4fabce36dc4e918474b4cfd7314713d51b988fb Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 12 Apr 2023 18:46:24 +0200 Subject: [PATCH 04/10] update the error message + update the task db size to 20GiB with a limit at 50% --- index-scheduler/src/error.rs | 2 +- index-scheduler/src/lib.rs | 4 ++-- meilisearch/src/option.rs | 2 +- meilisearch/tests/tasks/mod.rs | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index 14623871b..7b884e0a4 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -61,7 +61,7 @@ pub enum Error { SwapDuplicateIndexesFound(Vec), #[error("Index `{0}` not found.")] SwapIndexNotFound(String), - #[error("No space left in database. Free some space by deleting tasks.")] + #[error("Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.")] NoSpaceLeftInTaskQueue, #[error( "Indexes {} not found.", diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 692888404..9c52b008d 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -820,9 +820,9 @@ impl IndexScheduler { pub fn register(&self, kind: KindWithContent) -> Result { let mut wtxn = self.env.write_txn()?; - // if the task doesn't delete anything and 90% of the task queue is full, we must refuse to enqueue the incomming task + // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) - && (self.env.real_disk_size()? * 100) / self.env.map_size()? as u64 > 90 + && (self.env.real_disk_size()? * 100) / self.env.map_size()? as u64 > 50 { return Err(Error::NoSpaceLeftInTaskQueue); } diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index d419e0875..563bc3496 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -68,7 +68,7 @@ const DEFAULT_LOG_EVERY_N: usize = 100_000; // The actual size of the virtual address space is computed at startup to determine how many 2TiB indexes can be // opened simultaneously. pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB -pub const TASK_DB_SIZE: u64 = 11 * 1024 * 1024 * 1024; // 11 GiB +pub const TASK_DB_SIZE: u64 = 20 * 1024 * 1024 * 1024; // 20 GiB #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] #[serde(rename_all = "UPPERCASE")] diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index a9ed5c00f..27e212f39 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1042,7 +1042,7 @@ async fn test_task_queue_is_full() { snapshot!(code, @"422 Unprocessable Entity"); snapshot!(json_string!(result), @r###" { - "message": "No space left in database. Free some space by deleting tasks.", + "message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.", "code": "no_space_left_on_device", "type": "system", "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" @@ -1081,7 +1081,7 @@ async fn test_task_queue_is_full() { snapshot!(code, @"422 Unprocessable Entity"); snapshot!(json_string!(result), @r###" { - "message": "No space left in database. Free some space by deleting tasks.", + "message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.", "code": "no_space_left_on_device", "type": "system", "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" From b3f60ee8057f837a5de9107db42faa1cfd4fde17 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 13 Apr 2023 10:18:58 +0200 Subject: [PATCH 05/10] try to fix the ci --- meilisearch/tests/tasks/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index 27e212f39..88f83bb70 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1064,7 +1064,7 @@ async fn test_task_queue_is_full() { // we're going to fill up the queue once again loop { - let (res, code) = server.create_index(json!({ "uid": "doggo" })).await; + let (res, code) = server.delete_tasks("uids=0").await; if code == 422 { break; } From cd45d21d6e92df4f0768c3a338447ceb57d30d50 Mon Sep 17 00:00:00 2001 From: dureuill Date: Thu, 13 Apr 2023 13:25:10 +0000 Subject: [PATCH 06/10] Update version for the next release (v1.1.1) in Cargo.toml --- Cargo.lock | 26 +++++++++++++------------- Cargo.toml | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 853d1a896..f1ff389c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,7 +410,7 @@ checksum = "b645a089122eccb6111b4f81cbc1a49f5900ac4666bb93ac027feaecf15607bf" [[package]] name = "benchmarks" -version = "1.1.0" +version = "1.1.1" dependencies = [ "anyhow", "bytes", @@ -1150,7 +1150,7 @@ dependencies = [ [[package]] name = "dump" -version = "1.1.0" +version = "1.1.1" dependencies = [ "anyhow", "big_s", @@ -1371,7 +1371,7 @@ dependencies = [ [[package]] name = "file-store" -version = "1.1.0" +version = "1.1.1" dependencies = [ "faux", "tempfile", @@ -1393,7 +1393,7 @@ dependencies = [ [[package]] name = "filter-parser" -version = "1.1.0" +version = "1.1.1" dependencies = [ "insta", "nom", @@ -1413,7 +1413,7 @@ dependencies = [ [[package]] name = "flatten-serde-json" -version = "1.1.0" +version = "1.1.1" dependencies = [ "criterion", "serde_json", @@ -1890,7 +1890,7 @@ dependencies = [ [[package]] name = "index-scheduler" -version = "1.1.0" +version = "1.1.1" dependencies = [ "anyhow", "big_s", @@ -2049,7 +2049,7 @@ dependencies = [ [[package]] name = "json-depth-checker" -version = "1.1.0" +version = "1.1.1" dependencies = [ "criterion", "serde_json", @@ -2445,7 +2445,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "meili-snap" -version = "1.1.0" +version = "1.1.1" dependencies = [ "insta", "md5", @@ -2454,7 +2454,7 @@ dependencies = [ [[package]] name = "meilisearch" -version = "1.1.0" +version = "1.1.1" dependencies = [ "actix-cors", "actix-http", @@ -2542,7 +2542,7 @@ dependencies = [ [[package]] name = "meilisearch-auth" -version = "1.1.0" +version = "1.1.1" dependencies = [ "base64 0.13.1", "enum-iterator", @@ -2561,7 +2561,7 @@ dependencies = [ [[package]] name = "meilisearch-types" -version = "1.1.0" +version = "1.1.1" dependencies = [ "actix-web", "anyhow", @@ -2615,7 +2615,7 @@ dependencies = [ [[package]] name = "milli" -version = "1.1.0" +version = "1.1.1" dependencies = [ "big_s", "bimap", @@ -2969,7 +2969,7 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" [[package]] name = "permissive-json-pointer" -version = "1.1.0" +version = "1.1.1" dependencies = [ "big_s", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index b1f475410..c8d4dd1ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ members = [ ] [workspace.package] -version = "1.1.0" +version = "1.1.1" authors = ["Quentin de Quelen ", "Clément Renault "] description = "Meilisearch HTTP server" homepage = "https://meilisearch.com" From fd583501d7ad34258a81f35aa7b8f0c827293188 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 13 Apr 2023 17:07:44 +0200 Subject: [PATCH 07/10] Use non_free_pages_size instead of real_disk_size to check task db space taken --- index-scheduler/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 3bde39040..b0ecef0f3 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -822,7 +822,7 @@ impl IndexScheduler { // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) - && (self.env.real_disk_size()? * 100) / self.env.map_size()? as u64 > 50 + && (self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64 > 50 { return Err(Error::NoSpaceLeftInTaskQueue); } From 066c6bd87582c2eb89cade3e6f9487acfe975d6b Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 13 Apr 2023 17:20:06 +0200 Subject: [PATCH 08/10] test task db full now checks that a task can be successfully added after deleting tasks --- meilisearch/tests/tasks/mod.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index 88f83bb70..723c628bb 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1050,7 +1050,7 @@ async fn test_task_queue_is_full() { "###); // But we should still be able to register tasks deletion IF they delete something - let (result, code) = server.delete_tasks("uids=0").await; + let (result, code) = server.delete_tasks("uids=*").await; snapshot!(code, @"200 OK"); snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" { @@ -1062,6 +1062,19 @@ async fn test_task_queue_is_full() { } "###); + // But we should still be able to register tasks deletion IF they delete something + let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; + snapshot!(code, @"202 Accepted"); + snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" + { + "taskUid": "uid", + "indexUid": "doggo", + "status": "enqueued", + "type": "indexCreation", + "enqueuedAt": "[date]" + } + "###); + // we're going to fill up the queue once again loop { let (res, code) = server.delete_tasks("uids=0").await; From 1e6cbcaf12a4f39be46b6b65f98c994574044bba Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 13 Apr 2023 17:27:12 +0200 Subject: [PATCH 09/10] Update test comment Co-authored-by: Tamo --- meilisearch/tests/tasks/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index 723c628bb..b608040da 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1062,7 +1062,7 @@ async fn test_task_queue_is_full() { } "###); - // But we should still be able to register tasks deletion IF they delete something + // Now we should be able to register tasks again let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; snapshot!(code, @"202 Accepted"); snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" From c2f4b6ced09530ec969fec256a864c635a93f760 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 13 Apr 2023 18:22:42 +0200 Subject: [PATCH 10/10] Test: await for the deletion task to complete before trying to add another task --- meilisearch/tests/tasks/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index b608040da..40093dc41 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1062,6 +1062,9 @@ async fn test_task_queue_is_full() { } "###); + let result = server.wait_task(result["taskUid"].as_u64().unwrap()).await; + snapshot!(json_string!(result["status"]), @r###""succeeded""###); + // Now we should be able to register tasks again let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; snapshot!(code, @"202 Accepted");