diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index 43509aa84..4adea97e3 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -28,6 +28,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { started_at, finished_at, index_mapper, + max_number_of_tasks: _, wake_up: _, dumps_path: _, snapshots_path: _, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index d713fca17..3fe0acf1a 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -51,6 +51,7 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use roaring::RoaringBitmap; use synchronoise::SignalEvent; +use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; use uuid::Uuid; @@ -241,6 +242,9 @@ pub struct IndexSchedulerOptions { /// Set to `true` iff the index scheduler is allowed to automatically /// batch tasks together, to process multiple tasks at once. pub autobatching_enabled: bool, + /// The maximum number of tasks stored in the task queue before starting + /// to auto schedule task deletions. + pub max_number_of_tasks: usize, } /// Structure which holds meilisearch's indexes and schedules the tasks @@ -290,6 +294,10 @@ pub struct IndexScheduler { /// Whether auto-batching is enabled or not. pub(crate) autobatching_enabled: bool, + /// The max number of tasks allowed before the scheduler starts to delete + /// the finished tasks automatically. + pub(crate) max_number_of_tasks: usize, + /// The path used to create the dumps. pub(crate) dumps_path: PathBuf, @@ -339,6 +347,7 @@ impl IndexScheduler { index_mapper: self.index_mapper.clone(), wake_up: self.wake_up.clone(), autobatching_enabled: self.autobatching_enabled, + max_number_of_tasks: self.max_number_of_tasks, snapshots_path: self.snapshots_path.clone(), dumps_path: self.dumps_path.clone(), auth_path: self.auth_path.clone(), @@ -412,6 +421,7 @@ impl IndexScheduler { // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things wake_up: Arc::new(SignalEvent::auto(true)), autobatching_enabled: options.autobatching_enabled, + max_number_of_tasks: options.max_number_of_tasks, dumps_path: options.dumps_path, snapshots_path: options.snapshots_path, auth_path: options.auth_path, @@ -940,14 +950,15 @@ impl IndexScheduler { /// Perform one iteration of the run loop. /// - /// 1. Find the next batch of tasks to be processed. - /// 2. Update the information of these tasks following the start of their processing. - /// 3. Update the in-memory list of processed tasks accordingly. - /// 4. Process the batch: + /// 1. See if we need to cleanup the task queue + /// 2. Find the next batch of tasks to be processed. + /// 3. Update the information of these tasks following the start of their processing. + /// 4. Update the in-memory list of processed tasks accordingly. + /// 5. Process the batch: /// - perform the actions of each batched task /// - update the information of each batched task following the end /// of their processing. - /// 5. Reset the in-memory list of processed tasks. + /// 6. Reset the in-memory list of processed tasks. /// /// Returns the number of processed tasks. fn tick(&self) -> Result { @@ -957,6 +968,8 @@ impl IndexScheduler { self.breakpoint(Breakpoint::Start); } + self.cleanup_task_queue()?; + let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let batch = match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { @@ -1093,6 +1106,55 @@ impl IndexScheduler { Ok(TickOutcome::TickAgain(processed_tasks)) } + /// Register a task to cleanup the task queue if needed + fn cleanup_task_queue(&self) -> Result<()> { + let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; + + let nb_tasks = self.all_task_ids(&rtxn)?.len(); + // if we have less than 1M tasks everything is fine + if nb_tasks < self.max_number_of_tasks as u64 { + return Ok(()); + } + + let finished = self.status.get(&rtxn, &Status::Succeeded)?.unwrap_or_default() + | self.status.get(&rtxn, &Status::Failed)?.unwrap_or_default() + | self.status.get(&rtxn, &Status::Canceled)?.unwrap_or_default(); + + let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(100_000)); + + // /!\ the len must be at least 2 or else we might enter an infinite loop where we only delete + // the deletion tasks we enqueued ourselves. + if to_delete.len() < 2 { + log::warn!("The task queue is almost full, but no task can be deleted yet."); + // the only thing we can do is hope that the user tasks are going to finish + return Ok(()); + } + + log::info!( + "The task queue is almost full. Deleting the oldest {} finished tasks.", + to_delete.len() + ); + + // it's safe to unwrap here because we checked the len above + let newest_task_id = to_delete.iter().last().unwrap(); + let last_task_to_delete = + self.get_task(&rtxn, newest_task_id)?.ok_or(Error::CorruptedTaskQueue)?; + drop(rtxn); + + // increase time by one nanosecond so that the enqueuedAt of the last task to delete is also lower than that date. + let delete_before = last_task_to_delete.enqueued_at + Duration::from_nanos(1); + + self.register(KindWithContent::TaskDeletion { + query: format!( + "?beforeEnqueuedAt={}&statuses=succeeded,failed,canceled", + delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?, + ), + tasks: to_delete, + })?; + + Ok(()) + } + pub fn index_stats(&self, index_uid: &str) -> Result { let is_indexing = self.is_index_processing(index_uid)?; let rtxn = self.read_txn()?; @@ -1350,9 +1412,10 @@ mod tests { use big_s::S; use crossbeam::channel::RecvTimeoutError; use file_store::File; - use meili_snap::snapshot; + use meili_snap::{json_string, snapshot}; use meilisearch_auth::AuthFilter; use meilisearch_types::document_formats::DocumentFormatError; + use meilisearch_types::error::ErrorCode; use meilisearch_types::index_uid_pattern::IndexUidPattern; use meilisearch_types::milli::obkv_to_json; use meilisearch_types::milli::update::IndexDocumentsMethod::{ @@ -1383,13 +1446,22 @@ mod tests { pub fn test( autobatching_enabled: bool, planned_failures: Vec<(usize, FailureLocation)>, + ) -> (Self, IndexSchedulerHandle) { + Self::test_with_custom_config(planned_failures, |config| { + config.autobatching_enabled = autobatching_enabled; + }) + } + + pub fn test_with_custom_config( + planned_failures: Vec<(usize, FailureLocation)>, + configuration: impl Fn(&mut IndexSchedulerOptions), ) -> (Self, IndexSchedulerHandle) { let tempdir = TempDir::new().unwrap(); let (sender, receiver) = crossbeam::channel::bounded(0); let indexer_config = IndexerConfig { skip_index_budget: true, ..Default::default() }; - let options = IndexSchedulerOptions { + let mut options = IndexSchedulerOptions { version_file_path: tempdir.path().join(VERSION_FILE_NAME), auth_path: tempdir.path().join("auth"), tasks_path: tempdir.path().join("db_path"), @@ -1402,8 +1474,10 @@ mod tests { index_growth_amount: 1000 * 1000, // 1 MB index_count: 5, indexer_config, - autobatching_enabled, + autobatching_enabled: true, + max_number_of_tasks: 1_000_000, }; + configuration(&mut options); let index_scheduler = Self::new(options, sender, planned_failures).unwrap(); @@ -3718,4 +3792,127 @@ mod tests { // No matter what happens in process_batch, the index_scheduler should be internally consistent snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed"); } + + #[test] + fn test_task_queue_is_full() { + let (index_scheduler, mut handle) = + IndexScheduler::test_with_custom_config(vec![], |config| { + // that's the minimum map size possible + config.task_db_size = 1048576; + }); + + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap(); + handle.advance_one_successful_batch(); + // on average this task takes ~600 bytes + loop { + let result = index_scheduler.register(KindWithContent::IndexCreation { + index_uid: S("doggo"), + primary_key: None, + }); + if result.is_err() { + break; + } + handle.advance_one_failed_batch(); + } + index_scheduler.assert_internally_consistent(); + + // at this point the task DB shoud have reached its limit and we should not be able to register new tasks + let result = index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap_err(); + snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); + // we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code + snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice"); + + // Even the task deletion that doesn't delete anything shouldn't be accepted + let result = index_scheduler + .register(KindWithContent::TaskDeletion { + query: S("test"), + tasks: RoaringBitmap::new(), + }) + .unwrap_err(); + snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); + // we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code + snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice"); + + // But a task deletion that delete something should works + index_scheduler + .register(KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() }) + .unwrap(); + handle.advance_one_successful_batch(); + + // Now we should be able to enqueue a few tasks again + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap(); + handle.advance_one_failed_batch(); + } + + #[test] + fn test_auto_deletion_of_tasks() { + let (index_scheduler, mut handle) = + IndexScheduler::test_with_custom_config(vec![], |config| { + config.max_number_of_tasks = 2; + }); + + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap(); + handle.advance_one_successful_batch(); + + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap(); + handle.advance_one_failed_batch(); + + // at this point the max number of tasks is reached + // we can still enqueue multiple tasks + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap(); + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap(); + + let rtxn = index_scheduler.env.read_txn().unwrap(); + let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), name: "task_queue_is_full"); + drop(rtxn); + + // now we're above the max number of tasks + // and if we try to advance in the tick function a new task deletion should be enqueued + handle.advance_till([Start, BatchCreated]); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "task_deletion_have_been_enqueued"); + drop(rtxn); + + handle.advance_till([InsideProcessBatch, ProcessBatchSucceeded, AfterProcessing]); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "task_deletion_have_been_processed"); + drop(rtxn); + + handle.advance_one_failed_batch(); + // a new task deletion has been enqueued + handle.advance_one_successful_batch(); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "after_the_second_task_deletion"); + drop(rtxn); + + handle.advance_one_failed_batch(); + handle.advance_one_successful_batch(); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "everything_has_been_processed"); + drop(rtxn); + } } diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/after_the_second_task_deletion.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/after_the_second_task_deletion.snap new file mode 100644 index 000000000..59948c58c --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/after_the_second_task_deletion.snap @@ -0,0 +1,68 @@ +--- +source: index-scheduler/src/lib.rs +--- +[ + { + "uid": 3, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 5, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "TaskDeletion": { + "matched_tasks": 2, + "deleted_tasks": 2, + "original_filter": "[filter]" + } + }, + "status": "succeeded", + "kind": { + "taskDeletion": { + "query": "[query]", + "tasks": [ + 58, + 48, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 1, + 0, + 16, + 0, + 0, + 0, + 2, + 0, + 4, + 0 + ] + } + } + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/everything_has_been_processed.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/everything_has_been_processed.snap new file mode 100644 index 000000000..0f2f366e9 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/everything_has_been_processed.snap @@ -0,0 +1,48 @@ +--- +source: index-scheduler/src/lib.rs +--- +[ + { + "uid": 6, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "TaskDeletion": { + "matched_tasks": 2, + "deleted_tasks": 2, + "original_filter": "[filter]" + } + }, + "status": "succeeded", + "kind": { + "taskDeletion": { + "query": "[query]", + "tasks": [ + 58, + 48, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 1, + 0, + 16, + 0, + 0, + 0, + 3, + 0, + 5, + 0 + ] + } + } + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_enqueued.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_enqueued.snap new file mode 100644 index 000000000..dc6b03517 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_enqueued.snap @@ -0,0 +1,133 @@ +--- +source: index-scheduler/src/lib.rs +--- +[ + { + "uid": 0, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "succeeded", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 1, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": { + "message": "Index `doggo` already exists.", + "code": "index_already_exists", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#index_already_exists" + }, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "failed", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 2, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 3, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 4, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "TaskDeletion": { + "matched_tasks": 2, + "deleted_tasks": null, + "original_filter": "[filter]" + } + }, + "status": "enqueued", + "kind": { + "taskDeletion": { + "query": "[query]", + "tasks": [ + 58, + 48, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 1, + 0, + 16, + 0, + 0, + 0, + 0, + 0, + 1, + 0 + ] + } + } + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_processed.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_processed.snap new file mode 100644 index 000000000..0200f7f4a --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_processed.snap @@ -0,0 +1,88 @@ +--- +source: index-scheduler/src/lib.rs +--- +[ + { + "uid": 2, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 3, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 4, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "TaskDeletion": { + "matched_tasks": 2, + "deleted_tasks": 2, + "original_filter": "[filter]" + } + }, + "status": "succeeded", + "kind": { + "taskDeletion": { + "query": "[query]", + "tasks": [ + 58, + 48, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 1, + 0, + 16, + 0, + 0, + 0, + 0, + 0, + 1, + 0 + ] + } + } + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_queue_is_full.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_queue_is_full.snap new file mode 100644 index 000000000..988df76ec --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_queue_is_full.snap @@ -0,0 +1,90 @@ +--- +source: index-scheduler/src/lib.rs +--- +[ + { + "uid": 0, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "succeeded", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 1, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": { + "message": "Index `doggo` already exists.", + "code": "index_already_exists", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#index_already_exists" + }, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "failed", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 2, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 3, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + } +] diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 9f85a4c5c..67d8bbd5c 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -234,6 +234,7 @@ fn open_or_create_database_unchecked( index_base_map_size: opt.max_index_size.get_bytes() as usize, indexer_config: (&opt.indexer_options).try_into()?, autobatching_enabled: true, + max_number_of_tasks: 1_000_000, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, index_count: DEFAULT_INDEX_COUNT, })?) diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index 40093dc41..e9b5a2325 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1,14 +1,11 @@ mod errors; -use byte_unit::{Byte, ByteUnit}; use meili_snap::insta::assert_json_snapshot; -use meili_snap::{json_string, snapshot}; use serde_json::json; -use tempfile::TempDir; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use crate::common::{default_settings, Server}; +use crate::common::Server; #[actix_rt::test] async fn error_get_unexisting_task_status() { @@ -1003,117 +1000,3 @@ async fn test_summarized_dump_creation() { } "###); } - -#[actix_web::test] -async fn test_task_queue_is_full() { - let dir = TempDir::new().unwrap(); - let mut options = default_settings(dir.path()); - options.max_task_db_size = Byte::from_unit(500.0, ByteUnit::B).unwrap(); - - let server = Server::new_with_options(options).await.unwrap(); - - // the first task should be enqueued without issue - let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; - snapshot!(code, @"202 Accepted"); - snapshot!(json_string!(result, { ".enqueuedAt" => "[date]" }), @r###" - { - "taskUid": 0, - "indexUid": "doggo", - "status": "enqueued", - "type": "indexCreation", - "enqueuedAt": "[date]" - } - "###); - - loop { - let (res, code) = server.create_index(json!({ "uid": "doggo" })).await; - if code == 422 { - break; - } - if res["taskUid"] == json!(null) { - panic!( - "Encountered the strange case:\n{}", - serde_json::to_string_pretty(&res).unwrap() - ); - } - } - - let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; - snapshot!(code, @"422 Unprocessable Entity"); - snapshot!(json_string!(result), @r###" - { - "message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.", - "code": "no_space_left_on_device", - "type": "system", - "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" - } - "###); - - // But we should still be able to register tasks deletion IF they delete something - let (result, code) = server.delete_tasks("uids=*").await; - snapshot!(code, @"200 OK"); - snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" - { - "taskUid": "uid", - "indexUid": null, - "status": "enqueued", - "type": "taskDeletion", - "enqueuedAt": "[date]" - } - "###); - - let result = server.wait_task(result["taskUid"].as_u64().unwrap()).await; - snapshot!(json_string!(result["status"]), @r###""succeeded""###); - - // Now we should be able to register tasks again - let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; - snapshot!(code, @"202 Accepted"); - snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" - { - "taskUid": "uid", - "indexUid": "doggo", - "status": "enqueued", - "type": "indexCreation", - "enqueuedAt": "[date]" - } - "###); - - // we're going to fill up the queue once again - loop { - let (res, code) = server.delete_tasks("uids=0").await; - if code == 422 { - break; - } - if res["taskUid"] == json!(null) { - panic!( - "Encountered the strange case:\n{}", - serde_json::to_string_pretty(&res).unwrap() - ); - } - } - - // But we should NOT be able to register this task because it doesn't match any tasks - let (result, code) = server.delete_tasks("uids=0").await; - snapshot!(code, @"422 Unprocessable Entity"); - snapshot!(json_string!(result), @r###" - { - "message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.", - "code": "no_space_left_on_device", - "type": "system", - "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" - } - "###); - - // The deletion still works - let (result, code) = server.delete_tasks("uids=*").await; - snapshot!(code, @"200 OK"); - snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" - { - "taskUid": "uid", - "indexUid": null, - "status": "enqueued", - "type": "taskDeletion", - "enqueuedAt": "[date]" - } - "###); -}