diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index 42f041578..988e75b81 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -15,6 +15,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { let IndexScheduler { autobatching_enabled, + cleanup_enabled: _, must_stop_processing: _, processing_tasks, file_store, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index b1edaabe5..9a1799469 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -264,6 +264,9 @@ pub struct IndexSchedulerOptions { /// Set to `true` iff the index scheduler is allowed to automatically /// batch tasks together, to process multiple tasks at once. pub autobatching_enabled: bool, + /// Set to `true` iff the index scheduler is allowed to automatically + /// delete the finished tasks when there are too many tasks. + pub cleanup_enabled: bool, /// The maximum number of tasks stored in the task queue before starting /// to auto schedule task deletions. pub max_number_of_tasks: usize, @@ -324,6 +327,9 @@ pub struct IndexScheduler { /// Whether auto-batching is enabled or not. pub(crate) autobatching_enabled: bool, + /// Whether we should automatically cleanup the task queue or not. + pub(crate) cleanup_enabled: bool, + /// The max number of tasks allowed before the scheduler starts to delete /// the finished tasks automatically. pub(crate) max_number_of_tasks: usize, @@ -390,6 +396,7 @@ impl IndexScheduler { index_mapper: self.index_mapper.clone(), wake_up: self.wake_up.clone(), autobatching_enabled: self.autobatching_enabled, + cleanup_enabled: self.cleanup_enabled, max_number_of_tasks: self.max_number_of_tasks, max_number_of_batched_tasks: self.max_number_of_batched_tasks, puffin_frame: self.puffin_frame.clone(), @@ -491,6 +498,7 @@ impl IndexScheduler { wake_up: Arc::new(SignalEvent::auto(true)), puffin_frame: Arc::new(puffin::GlobalFrameView::default()), autobatching_enabled: options.autobatching_enabled, + cleanup_enabled: options.cleanup_enabled, max_number_of_tasks: options.max_number_of_tasks, max_number_of_batched_tasks: options.max_number_of_batched_tasks, dumps_path: options.dumps_path, @@ -1134,7 +1142,9 @@ impl IndexScheduler { self.breakpoint(Breakpoint::Start); } - self.cleanup_task_queue()?; + if self.cleanup_enabled { + self.cleanup_task_queue()?; + } let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let batch = @@ -1781,6 +1791,7 @@ mod tests { index_count: 5, indexer_config, autobatching_enabled: true, + cleanup_enabled: true, max_number_of_tasks: 1_000_000, max_number_of_batched_tasks: usize::MAX, instance_features: Default::default(), @@ -4484,6 +4495,61 @@ mod tests { drop(rtxn); } + #[test] + fn test_disable_auto_deletion_of_tasks() { + let (index_scheduler, mut handle) = + IndexScheduler::test_with_custom_config(vec![], |config| { + config.cleanup_enabled = false; + config.max_number_of_tasks = 2; + }); + + index_scheduler + .register( + KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, + None, + ) + .unwrap(); + handle.advance_one_successful_batch(); + + index_scheduler + .register( + KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, + None, + ) + .unwrap(); + handle.advance_one_failed_batch(); + + // at this point the max number of tasks is reached + // we can still enqueue multiple tasks + index_scheduler + .register( + KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, + None, + ) + .unwrap(); + index_scheduler + .register( + KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, + None, + ) + .unwrap(); + + let rtxn = index_scheduler.env.read_txn().unwrap(); + let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), name: "task_queue_is_full"); + drop(rtxn); + + // now we're above the max number of tasks + // and if we try to advance in the tick function no new task deletion should be enqueued + handle.advance_till([Start, BatchCreated]); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "task_deletion_have_not_been_enqueued"); + drop(rtxn); + } + #[test] fn basic_get_stats() { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); diff --git a/index-scheduler/src/snapshots/lib.rs/test_disable_auto_deletion_of_tasks/task_deletion_have_not_been_enqueued.snap b/index-scheduler/src/snapshots/lib.rs/test_disable_auto_deletion_of_tasks/task_deletion_have_not_been_enqueued.snap new file mode 100644 index 000000000..988df76ec --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_disable_auto_deletion_of_tasks/task_deletion_have_not_been_enqueued.snap @@ -0,0 +1,90 @@ +--- +source: index-scheduler/src/lib.rs +--- +[ + { + "uid": 0, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "succeeded", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 1, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": { + "message": "Index `doggo` already exists.", + "code": "index_already_exists", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#index_already_exists" + }, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "failed", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 2, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 3, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/test_disable_auto_deletion_of_tasks/task_queue_is_full.snap b/index-scheduler/src/snapshots/lib.rs/test_disable_auto_deletion_of_tasks/task_queue_is_full.snap new file mode 100644 index 000000000..988df76ec --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_disable_auto_deletion_of_tasks/task_queue_is_full.snap @@ -0,0 +1,90 @@ +--- +source: index-scheduler/src/lib.rs +--- +[ + { + "uid": 0, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "succeeded", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 1, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": { + "message": "Index `doggo` already exists.", + "code": "index_already_exists", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#index_already_exists" + }, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "failed", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 2, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 3, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + } +] diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index a6a0f0d77..292a87259 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -300,6 +300,7 @@ fn open_or_create_database_unchecked( enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, indexer_config: (&opt.indexer_options).try_into()?, autobatching_enabled: true, + cleanup_enabled: !opt.experimental_ha_parameters, max_number_of_tasks: 1_000_000, max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,