diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index d713fca17..524c8f32b 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -940,14 +940,15 @@ impl IndexScheduler { /// Perform one iteration of the run loop. /// - /// 1. Find the next batch of tasks to be processed. - /// 2. Update the information of these tasks following the start of their processing. - /// 3. Update the in-memory list of processed tasks accordingly. - /// 4. Process the batch: + /// 1. See if we need to cleanup the task queue + /// 2. Find the next batch of tasks to be processed. + /// 3. Update the information of these tasks following the start of their processing. + /// 4. Update the in-memory list of processed tasks accordingly. + /// 5. Process the batch: /// - perform the actions of each batched task /// - update the information of each batched task following the end /// of their processing. - /// 5. Reset the in-memory list of processed tasks. + /// 6. Reset the in-memory list of processed tasks. /// /// Returns the number of processed tasks. fn tick(&self) -> Result { @@ -957,6 +958,8 @@ impl IndexScheduler { self.breakpoint(Breakpoint::Start); } + self.cleanup_task_queue()?; + let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let batch = match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { @@ -1093,6 +1096,41 @@ impl IndexScheduler { Ok(TickOutcome::TickAgain(processed_tasks)) } + /// Register a task to cleanup the task queue if needed + fn cleanup_task_queue(&self) -> Result<()> { + // if less than 42% (~9GiB) of the task queue are being used we don't need to do anything + if ((self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64) < 42 { + return Ok(()); + } + + let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; + + let finished = self.status.get(&rtxn, &Status::Succeeded)?.unwrap_or_default() + | self.status.get(&rtxn, &Status::Failed)?.unwrap_or_default() + | self.status.get(&rtxn, &Status::Canceled)?.unwrap_or_default(); + drop(rtxn); + + let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(1_000_000)); + + // /!\ the len must be at least 2 or else we might enter an infinite loop where we only delete + // the deletion tasks we enqueued ourselves. + if to_delete.len() < 2 { + // the only thing we can do is hope that the user tasks are going to finish + return Ok(()); + } + + self.register(KindWithContent::TaskDeletion { + query: format!( + "?from={},limit={},status=succeeded,failed,canceled", + to_delete.iter().last().unwrap_or(u32::MAX), + to_delete.len(), + ), + tasks: to_delete, + })?; + + Ok(()) + } + pub fn index_stats(&self, index_uid: &str) -> Result { let is_indexing = self.is_index_processing(index_uid)?; let rtxn = self.read_txn()?; @@ -1350,9 +1388,10 @@ mod tests { use big_s::S; use crossbeam::channel::RecvTimeoutError; use file_store::File; - use meili_snap::snapshot; + use meili_snap::{json_string, snapshot}; use meilisearch_auth::AuthFilter; use meilisearch_types::document_formats::DocumentFormatError; + use meilisearch_types::error::ErrorCode; use meilisearch_types::index_uid_pattern::IndexUidPattern; use meilisearch_types::milli::obkv_to_json; use meilisearch_types::milli::update::IndexDocumentsMethod::{ @@ -3718,4 +3757,188 @@ mod tests { // No matter what happens in process_batch, the index_scheduler should be internally consistent snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed"); } + + #[test] + fn test_task_queue_is_full_and_auto_deletion_of_tasks() { + let (mut index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); + + // on average this task takes ~500+ bytes, and since our task queue have 1MiB of + // storage we can enqueue ~2000 tasks before reaching the limit. + + let mut dump = index_scheduler.register_dumped_task().unwrap(); + let now = OffsetDateTime::now_utc(); + for i in 0..2000 { + dump.register_dumped_task( + TaskDump { + uid: i, + index_uid: Some(S("doggo")), + status: Status::Enqueued, + kind: KindDump::IndexCreation { primary_key: None }, + canceled_by: None, + details: None, + error: None, + enqueued_at: now, + started_at: None, + finished_at: None, + }, + None, + ) + .unwrap(); + } + dump.finish().unwrap(); + + index_scheduler.assert_internally_consistent(); + + // at this point the task queue should be full and any new task should be refused + + let result = index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap_err(); + + snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); + // we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code + snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice"); + + // after advancing one batch, the engine should not being able to push a taskDeletion task because everything is finished + handle.advance_one_successful_batch(); + index_scheduler.assert_internally_consistent(); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let ids = index_scheduler + .get_task_ids( + &rtxn, + &Query { + statuses: Some(vec![Status::Succeeded, Status::Failed]), + ..Query::default() + }, + ) + .unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), @r###" + [ + { + "uid": 0, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "succeeded", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + } + ] + "###); + + // The next batch should try to process another task + handle.advance_one_failed_batch(); + index_scheduler.assert_internally_consistent(); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let ids = index_scheduler + .get_task_ids( + &rtxn, + &Query { + statuses: Some(vec![Status::Succeeded, Status::Failed]), + ..Query::default() + }, + ) + .unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), @r###" + [ + { + "uid": 0, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "succeeded", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 1, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": { + "message": "Index `doggo` already exists.", + "code": "index_already_exists", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#index_already_exists" + }, + "canceledBy": null, + "details": null, + "status": "failed", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + } + ] + "###); + + // The next batch should create a task deletion tasks that delete the succeeded and failed tasks + handle.advance_one_successful_batch(); + index_scheduler.assert_internally_consistent(); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let ids = index_scheduler + .get_task_ids( + &rtxn, + &Query { + statuses: Some(vec![Status::Succeeded, Status::Failed]), + ..Query::default() + }, + ) + .unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", "[].kind" => "[kind]" }), @r###" + [ + { + "uid": 2000, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "TaskDeletion": { + "matched_tasks": 2, + "deleted_tasks": 2, + "original_filter": "?from=1,limit=2,status=succeeded,failed,canceled" + } + }, + "status": "succeeded", + "kind": "[kind]" + } + ] + "###); + + let to_delete = match tasks[0].kind { + KindWithContent::TaskDeletion { ref tasks, .. } => tasks, + _ => unreachable!("the snapshot above should prevent us from running in this case"), + }; + + snapshot!(format!("{:?}", to_delete), @"RoaringBitmap<[0, 1]>"); + } } diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index 40093dc41..e9b5a2325 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1,14 +1,11 @@ mod errors; -use byte_unit::{Byte, ByteUnit}; use meili_snap::insta::assert_json_snapshot; -use meili_snap::{json_string, snapshot}; use serde_json::json; -use tempfile::TempDir; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use crate::common::{default_settings, Server}; +use crate::common::Server; #[actix_rt::test] async fn error_get_unexisting_task_status() { @@ -1003,117 +1000,3 @@ async fn test_summarized_dump_creation() { } "###); } - -#[actix_web::test] -async fn test_task_queue_is_full() { - let dir = TempDir::new().unwrap(); - let mut options = default_settings(dir.path()); - options.max_task_db_size = Byte::from_unit(500.0, ByteUnit::B).unwrap(); - - let server = Server::new_with_options(options).await.unwrap(); - - // the first task should be enqueued without issue - let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; - snapshot!(code, @"202 Accepted"); - snapshot!(json_string!(result, { ".enqueuedAt" => "[date]" }), @r###" - { - "taskUid": 0, - "indexUid": "doggo", - "status": "enqueued", - "type": "indexCreation", - "enqueuedAt": "[date]" - } - "###); - - loop { - let (res, code) = server.create_index(json!({ "uid": "doggo" })).await; - if code == 422 { - break; - } - if res["taskUid"] == json!(null) { - panic!( - "Encountered the strange case:\n{}", - serde_json::to_string_pretty(&res).unwrap() - ); - } - } - - let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; - snapshot!(code, @"422 Unprocessable Entity"); - snapshot!(json_string!(result), @r###" - { - "message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.", - "code": "no_space_left_on_device", - "type": "system", - "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" - } - "###); - - // But we should still be able to register tasks deletion IF they delete something - let (result, code) = server.delete_tasks("uids=*").await; - snapshot!(code, @"200 OK"); - snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" - { - "taskUid": "uid", - "indexUid": null, - "status": "enqueued", - "type": "taskDeletion", - "enqueuedAt": "[date]" - } - "###); - - let result = server.wait_task(result["taskUid"].as_u64().unwrap()).await; - snapshot!(json_string!(result["status"]), @r###""succeeded""###); - - // Now we should be able to register tasks again - let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; - snapshot!(code, @"202 Accepted"); - snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" - { - "taskUid": "uid", - "indexUid": "doggo", - "status": "enqueued", - "type": "indexCreation", - "enqueuedAt": "[date]" - } - "###); - - // we're going to fill up the queue once again - loop { - let (res, code) = server.delete_tasks("uids=0").await; - if code == 422 { - break; - } - if res["taskUid"] == json!(null) { - panic!( - "Encountered the strange case:\n{}", - serde_json::to_string_pretty(&res).unwrap() - ); - } - } - - // But we should NOT be able to register this task because it doesn't match any tasks - let (result, code) = server.delete_tasks("uids=0").await; - snapshot!(code, @"422 Unprocessable Entity"); - snapshot!(json_string!(result), @r###" - { - "message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.", - "code": "no_space_left_on_device", - "type": "system", - "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" - } - "###); - - // The deletion still works - let (result, code) = server.delete_tasks("uids=*").await; - snapshot!(code, @"200 OK"); - snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" - { - "taskUid": "uid", - "indexUid": null, - "status": "enqueued", - "type": "taskDeletion", - "enqueuedAt": "[date]" - } - "###); -}