make the autodeletion work with a fixed number of tasks and update the tests

This commit is contained in:
Tamo 2023-04-25 17:26:34 +02:00 committed by Louis Dureuil
parent 972bb2831c
commit aa7537a11e
No known key found for this signature in database
9 changed files with 390 additions and 170 deletions

View File

@ -28,6 +28,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
started_at,
finished_at,
index_mapper,
max_number_of_tasks: _,
wake_up: _,
dumps_path: _,
snapshots_path: _,

View File

@ -241,6 +241,9 @@ pub struct IndexSchedulerOptions {
/// Set to `true` iff the index scheduler is allowed to automatically
/// batch tasks together, to process multiple tasks at once.
pub autobatching_enabled: bool,
/// The maximum number of tasks stored in the task queue before starting
/// to auto schedule task deletions.
pub max_number_of_tasks: usize,
}
/// Structure which holds meilisearch's indexes and schedules the tasks
@ -290,6 +293,10 @@ pub struct IndexScheduler {
/// Whether auto-batching is enabled or not.
pub(crate) autobatching_enabled: bool,
/// The max number of tasks allowed before the scheduler starts to delete
/// the finished tasks automatically.
pub(crate) max_number_of_tasks: usize,
/// The path used to create the dumps.
pub(crate) dumps_path: PathBuf,
@ -339,6 +346,7 @@ impl IndexScheduler {
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
max_number_of_tasks: self.max_number_of_tasks,
snapshots_path: self.snapshots_path.clone(),
dumps_path: self.dumps_path.clone(),
auth_path: self.auth_path.clone(),
@ -412,6 +420,7 @@ impl IndexScheduler {
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled: options.autobatching_enabled,
max_number_of_tasks: options.max_number_of_tasks,
dumps_path: options.dumps_path,
snapshots_path: options.snapshots_path,
auth_path: options.auth_path,
@ -1098,19 +1107,20 @@ impl IndexScheduler {
/// Register a task to cleanup the task queue if needed
fn cleanup_task_queue(&self) -> Result<()> {
// if less than 42% (~9GiB) of the task queue are being used we don't need to do anything
if ((self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64) < 42 {
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let nb_tasks = self.all_task_ids(&rtxn)?.len();
// if we have less than 1M tasks everything is fine
if nb_tasks < self.max_number_of_tasks as u64 {
return Ok(());
}
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let finished = self.status.get(&rtxn, &Status::Succeeded)?.unwrap_or_default()
| self.status.get(&rtxn, &Status::Failed)?.unwrap_or_default()
| self.status.get(&rtxn, &Status::Canceled)?.unwrap_or_default();
drop(rtxn);
let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(1_000_000));
let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(100_000));
// /!\ the len must be at least 2 or else we might enter an infinite loop where we only delete
// the deletion tasks we enqueued ourselves.
@ -1394,7 +1404,7 @@ mod tests {
use big_s::S;
use crossbeam::channel::RecvTimeoutError;
use file_store::File;
use meili_snap::{json_string, snapshot};
use meili_snap::snapshot;
use meilisearch_auth::AuthFilter;
use meilisearch_types::document_formats::DocumentFormatError;
use meilisearch_types::error::ErrorCode;
@ -1428,13 +1438,22 @@ mod tests {
pub fn test(
autobatching_enabled: bool,
planned_failures: Vec<(usize, FailureLocation)>,
) -> (Self, IndexSchedulerHandle) {
Self::test_with_custom_config(planned_failures, |config| {
config.autobatching_enabled = autobatching_enabled;
})
}
pub fn test_with_custom_config(
planned_failures: Vec<(usize, FailureLocation)>,
configuration: impl Fn(&mut IndexSchedulerOptions),
) -> (Self, IndexSchedulerHandle) {
let tempdir = TempDir::new().unwrap();
let (sender, receiver) = crossbeam::channel::bounded(0);
let indexer_config = IndexerConfig { skip_index_budget: true, ..Default::default() };
let options = IndexSchedulerOptions {
let mut options = IndexSchedulerOptions {
version_file_path: tempdir.path().join(VERSION_FILE_NAME),
auth_path: tempdir.path().join("auth"),
tasks_path: tempdir.path().join("db_path"),
@ -1447,8 +1466,10 @@ mod tests {
index_growth_amount: 1000 * 1000, // 1 MB
index_count: 5,
indexer_config,
autobatching_enabled,
autobatching_enabled: true,
max_number_of_tasks: 1_000_000,
};
configuration(&mut options);
let index_scheduler = Self::new(options, sender, planned_failures).unwrap();
@ -3765,186 +3786,106 @@ mod tests {
}
#[test]
fn test_task_queue_is_full_and_auto_deletion_of_tasks() {
let (mut index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
fn test_task_queue_is_full() {
let (index_scheduler, mut handle) =
IndexScheduler::test_with_custom_config(vec![], |config| {
// that's the minimum map size possible
config.task_db_size = 1048576;
});
// on average this task takes ~500+ bytes, and since our task queue have 1MiB of
// storage we can enqueue ~2000 tasks before reaching the limit.
let mut dump = index_scheduler.register_dumped_task().unwrap();
let now = OffsetDateTime::now_utc();
for i in 0..2000 {
dump.register_dumped_task(
TaskDump {
uid: i,
index_uid: Some(S("doggo")),
status: Status::Enqueued,
kind: KindDump::IndexCreation { primary_key: None },
canceled_by: None,
details: None,
error: None,
enqueued_at: now,
started_at: None,
finished_at: None,
},
None,
)
index_scheduler
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
.unwrap();
handle.advance_one_successful_batch();
// on average this task takes ~600 bytes
loop {
let result = index_scheduler.register(KindWithContent::IndexCreation {
index_uid: S("doggo"),
primary_key: None,
});
if result.is_err() {
break;
}
handle.advance_one_failed_batch();
}
dump.finish().unwrap();
index_scheduler.assert_internally_consistent();
// at this point the task queue should be full and any new task should be refused
// at this point the task DB shoud have reached its limit and we should not be able to register new tasks
let result = index_scheduler
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
.unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
// we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
// after advancing one batch, the engine should not being able to push a taskDeletion task because everything is finished
handle.advance_one_successful_batch();
index_scheduler.assert_internally_consistent();
let rtxn = index_scheduler.env.read_txn().unwrap();
let ids = index_scheduler
.get_task_ids(
&rtxn,
&Query {
statuses: Some(vec![Status::Succeeded, Status::Failed]),
..Query::default()
},
)
.unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), @r###"
[
{
"uid": 0,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "succeeded",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
}
]
"###);
// Even the task deletion that doesn't delete anything shouldn't be accepted
let result = index_scheduler
.register(KindWithContent::TaskDeletion {
query: S("test"),
tasks: RoaringBitmap::new(),
})
.unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
// we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
// The next batch should try to process another task
// But a task deletion that delete something should works
index_scheduler
.register(KindWithContent::TaskDeletion { query: S("test"), tasks: (0..50).collect() })
.unwrap();
handle.advance_one_successful_batch();
// Now we should be able to enqueue a few tasks again
index_scheduler
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
.unwrap();
handle.advance_one_failed_batch();
index_scheduler.assert_internally_consistent();
let rtxn = index_scheduler.env.read_txn().unwrap();
let ids = index_scheduler
.get_task_ids(
&rtxn,
&Query {
statuses: Some(vec![Status::Succeeded, Status::Failed]),
..Query::default()
},
)
.unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), @r###"
[
{
"uid": 0,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "succeeded",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 1,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": {
"message": "Index `doggo` already exists.",
"code": "index_already_exists",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#index_already_exists"
},
"canceledBy": null,
"details": null,
"status": "failed",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
}
]
"###);
}
// The next batch should create a task deletion tasks that delete the succeeded and failed tasks
#[test]
fn test_auto_deletion_of_tasks() {
let (index_scheduler, mut handle) =
IndexScheduler::test_with_custom_config(vec![], |config| {
config.max_number_of_tasks = 2;
});
index_scheduler
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
.unwrap();
handle.advance_one_successful_batch();
index_scheduler.assert_internally_consistent();
let rtxn = index_scheduler.env.read_txn().unwrap();
let ids = index_scheduler
.get_task_ids(
&rtxn,
&Query {
statuses: Some(vec![Status::Succeeded, Status::Failed]),
..Query::default()
},
)
index_scheduler
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
.unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", "[].kind" => "[kind]" }), @r###"
[
{
"uid": 2000,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"TaskDeletion": {
"matched_tasks": 2,
"deleted_tasks": 2,
"original_filter": "?from=1,limit=2,status=succeeded,failed,canceled"
}
},
"status": "succeeded",
"kind": "[kind]"
}
]
"###);
handle.advance_one_failed_batch();
let to_delete = match tasks[0].kind {
KindWithContent::TaskDeletion { ref tasks, .. } => tasks,
_ => unreachable!("the snapshot above should prevent us from running in this case"),
};
// at this point the max number of tasks is reached
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "max_number_of_tasks");
snapshot!(format!("{:?}", to_delete), @"RoaringBitmap<[0, 1]>");
// we can still enqueue multiple tasks
index_scheduler
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
.unwrap();
index_scheduler
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
.unwrap();
// at this point the max number of tasks is reached
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "above_the_max_number_of_tasks");
// and if we try to advance in the tick function a new task deletion should be enqueued
handle.advance_till([Start, BatchCreated]);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_have_been_enqueued");
handle.advance_till([InsideProcessBatch, ProcessBatchSucceeded, AfterProcessing]);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_have_been_processed");
handle.advance_one_failed_batch();
// a new task deletion has been enqueued
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_the_second_task_deletion");
handle.advance_one_failed_batch();
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "everything_has_been_processed");
}
}

View File

@ -0,0 +1,49 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
1 {uid: 1, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
2 {uid: 2, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
3 {uid: 3, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
----------------------------------------------------------------------
### Status:
enqueued [2,3,]
succeeded [0,]
failed [1,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,2,3,]
----------------------------------------------------------------------
### Index Tasks:
doggo [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
doggo: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,44 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
3 {uid: 3, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
5 {uid: 5, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(2), original_filter: "?from=4,limit=2,status=succeeded,failed,canceled" }, kind: TaskDeletion { query: "?from=4,limit=2,status=succeeded,failed,canceled", tasks: RoaringBitmap<[2, 4]> }}
----------------------------------------------------------------------
### Status:
enqueued [3,]
succeeded [5,]
failed []
----------------------------------------------------------------------
### Kind:
"indexCreation" [3,]
"taskDeletion" [5,]
----------------------------------------------------------------------
### Index Tasks:
doggo [3,]
----------------------------------------------------------------------
### Index Mapper:
doggo: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [3,]
[timestamp] [5,]
----------------------------------------------------------------------
### Started At:
[timestamp] [5,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [5,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,41 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
6 {uid: 6, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(2), original_filter: "?from=5,limit=2,status=succeeded,failed,canceled" }, kind: TaskDeletion { query: "?from=5,limit=2,status=succeeded,failed,canceled", tasks: RoaringBitmap<[3, 5]> }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [6,]
failed []
----------------------------------------------------------------------
### Kind:
"indexCreation" []
"taskDeletion" [6,]
----------------------------------------------------------------------
### Index Tasks:
----------------------------------------------------------------------
### Index Mapper:
doggo: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [6,]
----------------------------------------------------------------------
### Started At:
[timestamp] [6,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [6,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,45 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
1 {uid: 1, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,]
failed [1,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,]
----------------------------------------------------------------------
### Index Tasks:
doggo [0,1,]
----------------------------------------------------------------------
### Index Mapper:
doggo: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,52 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[4,]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
1 {uid: 1, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
2 {uid: 2, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
3 {uid: 3, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
4 {uid: 4, status: enqueued, details: { matched_tasks: 2, deleted_tasks: None, original_filter: "?from=1,limit=2,status=succeeded,failed,canceled" }, kind: TaskDeletion { query: "?from=1,limit=2,status=succeeded,failed,canceled", tasks: RoaringBitmap<[0, 1]> }}
----------------------------------------------------------------------
### Status:
enqueued [2,3,4,]
succeeded [0,]
failed [1,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,2,3,]
"taskDeletion" [4,]
----------------------------------------------------------------------
### Index Tasks:
doggo [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
doggo: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,46 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
2 {uid: 2, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
3 {uid: 3, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }}
4 {uid: 4, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(2), original_filter: "?from=1,limit=2,status=succeeded,failed,canceled" }, kind: TaskDeletion { query: "?from=1,limit=2,status=succeeded,failed,canceled", tasks: RoaringBitmap<[0, 1]> }}
----------------------------------------------------------------------
### Status:
enqueued [2,3,]
succeeded [4,]
failed []
----------------------------------------------------------------------
### Kind:
"indexCreation" [2,3,]
"taskDeletion" [4,]
----------------------------------------------------------------------
### Index Tasks:
doggo [2,3,]
----------------------------------------------------------------------
### Index Mapper:
doggo: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
----------------------------------------------------------------------
### Started At:
[timestamp] [4,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [4,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -234,6 +234,7 @@ fn open_or_create_database_unchecked(
index_base_map_size: opt.max_index_size.get_bytes() as usize,
indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true,
max_number_of_tasks: 1_000_000,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT,
})?)