disable the auto deletion of tasks when the ha mode is enabled

This commit is contained in:
Tamo 2024-02-20 12:16:50 +01:00
parent 507739bd98
commit 1eb1c043b5
5 changed files with 249 additions and 1 deletions

View File

@ -15,6 +15,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let IndexScheduler { let IndexScheduler {
autobatching_enabled, autobatching_enabled,
cleanup_enabled: _,
must_stop_processing: _, must_stop_processing: _,
processing_tasks, processing_tasks,
file_store, file_store,

View File

@ -264,6 +264,9 @@ pub struct IndexSchedulerOptions {
/// Set to `true` iff the index scheduler is allowed to automatically /// Set to `true` iff the index scheduler is allowed to automatically
/// batch tasks together, to process multiple tasks at once. /// batch tasks together, to process multiple tasks at once.
pub autobatching_enabled: bool, pub autobatching_enabled: bool,
/// Set to `true` iff the index scheduler is allowed to automatically
/// delete the finished tasks when there are too many tasks.
pub cleanup_enabled: bool,
/// The maximum number of tasks stored in the task queue before starting /// The maximum number of tasks stored in the task queue before starting
/// to auto schedule task deletions. /// to auto schedule task deletions.
pub max_number_of_tasks: usize, pub max_number_of_tasks: usize,
@ -324,6 +327,9 @@ pub struct IndexScheduler {
/// Whether auto-batching is enabled or not. /// Whether auto-batching is enabled or not.
pub(crate) autobatching_enabled: bool, pub(crate) autobatching_enabled: bool,
/// Whether we should automatically cleanup the task queue or not.
pub(crate) cleanup_enabled: bool,
/// The max number of tasks allowed before the scheduler starts to delete /// The max number of tasks allowed before the scheduler starts to delete
/// the finished tasks automatically. /// the finished tasks automatically.
pub(crate) max_number_of_tasks: usize, pub(crate) max_number_of_tasks: usize,
@ -390,6 +396,7 @@ impl IndexScheduler {
index_mapper: self.index_mapper.clone(), index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(), wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled, autobatching_enabled: self.autobatching_enabled,
cleanup_enabled: self.cleanup_enabled,
max_number_of_tasks: self.max_number_of_tasks, max_number_of_tasks: self.max_number_of_tasks,
max_number_of_batched_tasks: self.max_number_of_batched_tasks, max_number_of_batched_tasks: self.max_number_of_batched_tasks,
puffin_frame: self.puffin_frame.clone(), puffin_frame: self.puffin_frame.clone(),
@ -491,6 +498,7 @@ impl IndexScheduler {
wake_up: Arc::new(SignalEvent::auto(true)), wake_up: Arc::new(SignalEvent::auto(true)),
puffin_frame: Arc::new(puffin::GlobalFrameView::default()), puffin_frame: Arc::new(puffin::GlobalFrameView::default()),
autobatching_enabled: options.autobatching_enabled, autobatching_enabled: options.autobatching_enabled,
cleanup_enabled: options.cleanup_enabled,
max_number_of_tasks: options.max_number_of_tasks, max_number_of_tasks: options.max_number_of_tasks,
max_number_of_batched_tasks: options.max_number_of_batched_tasks, max_number_of_batched_tasks: options.max_number_of_batched_tasks,
dumps_path: options.dumps_path, dumps_path: options.dumps_path,
@ -1134,7 +1142,9 @@ impl IndexScheduler {
self.breakpoint(Breakpoint::Start); self.breakpoint(Breakpoint::Start);
} }
if self.cleanup_enabled {
self.cleanup_task_queue()?; self.cleanup_task_queue()?;
}
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let batch = let batch =
@ -1781,6 +1791,7 @@ mod tests {
index_count: 5, index_count: 5,
indexer_config, indexer_config,
autobatching_enabled: true, autobatching_enabled: true,
cleanup_enabled: true,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: usize::MAX, max_number_of_batched_tasks: usize::MAX,
instance_features: Default::default(), instance_features: Default::default(),
@ -4484,6 +4495,61 @@ mod tests {
drop(rtxn); drop(rtxn);
} }
#[test]
fn test_disable_auto_deletion_of_tasks() {
let (index_scheduler, mut handle) =
IndexScheduler::test_with_custom_config(vec![], |config| {
config.cleanup_enabled = false;
config.max_number_of_tasks = 2;
});
index_scheduler
.register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None,
)
.unwrap();
handle.advance_one_successful_batch();
index_scheduler
.register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None,
)
.unwrap();
handle.advance_one_failed_batch();
// at this point the max number of tasks is reached
// we can still enqueue multiple tasks
index_scheduler
.register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None,
)
.unwrap();
index_scheduler
.register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None,
)
.unwrap();
let rtxn = index_scheduler.env.read_txn().unwrap();
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), name: "task_queue_is_full");
drop(rtxn);
// now we're above the max number of tasks
// and if we try to advance in the tick function no new task deletion should be enqueued
handle.advance_till([Start, BatchCreated]);
let rtxn = index_scheduler.env.read_txn().unwrap();
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "task_deletion_have_not_been_enqueued");
drop(rtxn);
}
#[test] #[test]
fn basic_get_stats() { fn basic_get_stats() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);

View File

@ -0,0 +1,90 @@
---
source: index-scheduler/src/lib.rs
---
[
{
"uid": 0,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "succeeded",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 1,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": {
"message": "Index `doggo` already exists.",
"code": "index_already_exists",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#index_already_exists"
},
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "failed",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 2,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "enqueued",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 3,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "enqueued",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
}
]

View File

@ -0,0 +1,90 @@
---
source: index-scheduler/src/lib.rs
---
[
{
"uid": 0,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "succeeded",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 1,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": {
"message": "Index `doggo` already exists.",
"code": "index_already_exists",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#index_already_exists"
},
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "failed",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 2,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "enqueued",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 3,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "enqueued",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
}
]

View File

@ -300,6 +300,7 @@ fn open_or_create_database_unchecked(
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
indexer_config: (&opt.indexer_options).try_into()?, indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true, autobatching_enabled: true,
cleanup_enabled: !opt.experimental_ha_parameters,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,