implement the auto-deletion of tasks

This commit is contained in:
Tamo 2023-04-24 20:04:50 +02:00 committed by Louis Dureuil
parent 1afde4fea5
commit f9ddd32545
No known key found for this signature in database
2 changed files with 230 additions and 124 deletions

View File

@ -940,14 +940,15 @@ impl IndexScheduler {
/// Perform one iteration of the run loop. /// Perform one iteration of the run loop.
/// ///
/// 1. Find the next batch of tasks to be processed. /// 1. See if we need to cleanup the task queue
/// 2. Update the information of these tasks following the start of their processing. /// 2. Find the next batch of tasks to be processed.
/// 3. Update the in-memory list of processed tasks accordingly. /// 3. Update the information of these tasks following the start of their processing.
/// 4. Process the batch: /// 4. Update the in-memory list of processed tasks accordingly.
/// 5. Process the batch:
/// - perform the actions of each batched task /// - perform the actions of each batched task
/// - update the information of each batched task following the end /// - update the information of each batched task following the end
/// of their processing. /// of their processing.
/// 5. Reset the in-memory list of processed tasks. /// 6. Reset the in-memory list of processed tasks.
/// ///
/// Returns the number of processed tasks. /// Returns the number of processed tasks.
fn tick(&self) -> Result<TickOutcome> { fn tick(&self) -> Result<TickOutcome> {
@ -957,6 +958,8 @@ impl IndexScheduler {
self.breakpoint(Breakpoint::Start); self.breakpoint(Breakpoint::Start);
} }
self.cleanup_task_queue()?;
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let batch = let batch =
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
@ -1093,6 +1096,41 @@ impl IndexScheduler {
Ok(TickOutcome::TickAgain(processed_tasks)) Ok(TickOutcome::TickAgain(processed_tasks))
} }
/// Register a task to cleanup the task queue if needed
fn cleanup_task_queue(&self) -> Result<()> {
// if less than 42% (~9GiB) of the task queue are being used we don't need to do anything
if ((self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64) < 42 {
return Ok(());
}
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let finished = self.status.get(&rtxn, &Status::Succeeded)?.unwrap_or_default()
| self.status.get(&rtxn, &Status::Failed)?.unwrap_or_default()
| self.status.get(&rtxn, &Status::Canceled)?.unwrap_or_default();
drop(rtxn);
let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(1_000_000));
// /!\ the len must be at least 2 or else we might enter an infinite loop where we only delete
// the deletion tasks we enqueued ourselves.
if to_delete.len() < 2 {
// the only thing we can do is hope that the user tasks are going to finish
return Ok(());
}
self.register(KindWithContent::TaskDeletion {
query: format!(
"?from={},limit={},status=succeeded,failed,canceled",
to_delete.iter().last().unwrap_or(u32::MAX),
to_delete.len(),
),
tasks: to_delete,
})?;
Ok(())
}
pub fn index_stats(&self, index_uid: &str) -> Result<IndexStats> { pub fn index_stats(&self, index_uid: &str) -> Result<IndexStats> {
let is_indexing = self.is_index_processing(index_uid)?; let is_indexing = self.is_index_processing(index_uid)?;
let rtxn = self.read_txn()?; let rtxn = self.read_txn()?;
@ -1350,9 +1388,10 @@ mod tests {
use big_s::S; use big_s::S;
use crossbeam::channel::RecvTimeoutError; use crossbeam::channel::RecvTimeoutError;
use file_store::File; use file_store::File;
use meili_snap::snapshot; use meili_snap::{json_string, snapshot};
use meilisearch_auth::AuthFilter; use meilisearch_auth::AuthFilter;
use meilisearch_types::document_formats::DocumentFormatError; use meilisearch_types::document_formats::DocumentFormatError;
use meilisearch_types::error::ErrorCode;
use meilisearch_types::index_uid_pattern::IndexUidPattern; use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::milli::obkv_to_json; use meilisearch_types::milli::obkv_to_json;
use meilisearch_types::milli::update::IndexDocumentsMethod::{ use meilisearch_types::milli::update::IndexDocumentsMethod::{
@ -3718,4 +3757,188 @@ mod tests {
// No matter what happens in process_batch, the index_scheduler should be internally consistent // No matter what happens in process_batch, the index_scheduler should be internally consistent
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed");
} }
#[test]
fn test_task_queue_is_full_and_auto_deletion_of_tasks() {
let (mut index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
// on average this task takes ~500+ bytes, and since our task queue have 1MiB of
// storage we can enqueue ~2000 tasks before reaching the limit.
let mut dump = index_scheduler.register_dumped_task().unwrap();
let now = OffsetDateTime::now_utc();
for i in 0..2000 {
dump.register_dumped_task(
TaskDump {
uid: i,
index_uid: Some(S("doggo")),
status: Status::Enqueued,
kind: KindDump::IndexCreation { primary_key: None },
canceled_by: None,
details: None,
error: None,
enqueued_at: now,
started_at: None,
finished_at: None,
},
None,
)
.unwrap();
}
dump.finish().unwrap();
index_scheduler.assert_internally_consistent();
// at this point the task queue should be full and any new task should be refused
let result = index_scheduler
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
.unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
// we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
// after advancing one batch, the engine should not being able to push a taskDeletion task because everything is finished
handle.advance_one_successful_batch();
index_scheduler.assert_internally_consistent();
let rtxn = index_scheduler.env.read_txn().unwrap();
let ids = index_scheduler
.get_task_ids(
&rtxn,
&Query {
statuses: Some(vec![Status::Succeeded, Status::Failed]),
..Query::default()
},
)
.unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), @r###"
[
{
"uid": 0,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "succeeded",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
}
]
"###);
// The next batch should try to process another task
handle.advance_one_failed_batch();
index_scheduler.assert_internally_consistent();
let rtxn = index_scheduler.env.read_txn().unwrap();
let ids = index_scheduler
.get_task_ids(
&rtxn,
&Query {
statuses: Some(vec![Status::Succeeded, Status::Failed]),
..Query::default()
},
)
.unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), @r###"
[
{
"uid": 0,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "succeeded",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 1,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": {
"message": "Index `doggo` already exists.",
"code": "index_already_exists",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#index_already_exists"
},
"canceledBy": null,
"details": null,
"status": "failed",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
}
]
"###);
// The next batch should create a task deletion tasks that delete the succeeded and failed tasks
handle.advance_one_successful_batch();
index_scheduler.assert_internally_consistent();
let rtxn = index_scheduler.env.read_txn().unwrap();
let ids = index_scheduler
.get_task_ids(
&rtxn,
&Query {
statuses: Some(vec![Status::Succeeded, Status::Failed]),
..Query::default()
},
)
.unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", "[].kind" => "[kind]" }), @r###"
[
{
"uid": 2000,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"TaskDeletion": {
"matched_tasks": 2,
"deleted_tasks": 2,
"original_filter": "?from=1,limit=2,status=succeeded,failed,canceled"
}
},
"status": "succeeded",
"kind": "[kind]"
}
]
"###);
let to_delete = match tasks[0].kind {
KindWithContent::TaskDeletion { ref tasks, .. } => tasks,
_ => unreachable!("the snapshot above should prevent us from running in this case"),
};
snapshot!(format!("{:?}", to_delete), @"RoaringBitmap<[0, 1]>");
}
} }

View File

@ -1,14 +1,11 @@
mod errors; mod errors;
use byte_unit::{Byte, ByteUnit};
use meili_snap::insta::assert_json_snapshot; use meili_snap::insta::assert_json_snapshot;
use meili_snap::{json_string, snapshot};
use serde_json::json; use serde_json::json;
use tempfile::TempDir;
use time::format_description::well_known::Rfc3339; use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::common::{default_settings, Server}; use crate::common::Server;
#[actix_rt::test] #[actix_rt::test]
async fn error_get_unexisting_task_status() { async fn error_get_unexisting_task_status() {
@ -1003,117 +1000,3 @@ async fn test_summarized_dump_creation() {
} }
"###); "###);
} }
#[actix_web::test]
async fn test_task_queue_is_full() {
let dir = TempDir::new().unwrap();
let mut options = default_settings(dir.path());
options.max_task_db_size = Byte::from_unit(500.0, ByteUnit::B).unwrap();
let server = Server::new_with_options(options).await.unwrap();
// the first task should be enqueued without issue
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
snapshot!(code, @"202 Accepted");
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]" }), @r###"
{
"taskUid": 0,
"indexUid": "doggo",
"status": "enqueued",
"type": "indexCreation",
"enqueuedAt": "[date]"
}
"###);
loop {
let (res, code) = server.create_index(json!({ "uid": "doggo" })).await;
if code == 422 {
break;
}
if res["taskUid"] == json!(null) {
panic!(
"Encountered the strange case:\n{}",
serde_json::to_string_pretty(&res).unwrap()
);
}
}
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
snapshot!(code, @"422 Unprocessable Entity");
snapshot!(json_string!(result), @r###"
{
"message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.",
"code": "no_space_left_on_device",
"type": "system",
"link": "https://docs.meilisearch.com/errors#no_space_left_on_device"
}
"###);
// But we should still be able to register tasks deletion IF they delete something
let (result, code) = server.delete_tasks("uids=*").await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
{
"taskUid": "uid",
"indexUid": null,
"status": "enqueued",
"type": "taskDeletion",
"enqueuedAt": "[date]"
}
"###);
let result = server.wait_task(result["taskUid"].as_u64().unwrap()).await;
snapshot!(json_string!(result["status"]), @r###""succeeded""###);
// Now we should be able to register tasks again
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
snapshot!(code, @"202 Accepted");
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
{
"taskUid": "uid",
"indexUid": "doggo",
"status": "enqueued",
"type": "indexCreation",
"enqueuedAt": "[date]"
}
"###);
// we're going to fill up the queue once again
loop {
let (res, code) = server.delete_tasks("uids=0").await;
if code == 422 {
break;
}
if res["taskUid"] == json!(null) {
panic!(
"Encountered the strange case:\n{}",
serde_json::to_string_pretty(&res).unwrap()
);
}
}
// But we should NOT be able to register this task because it doesn't match any tasks
let (result, code) = server.delete_tasks("uids=0").await;
snapshot!(code, @"422 Unprocessable Entity");
snapshot!(json_string!(result), @r###"
{
"message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.",
"code": "no_space_left_on_device",
"type": "system",
"link": "https://docs.meilisearch.com/errors#no_space_left_on_device"
}
"###);
// The deletion still works
let (result, code) = server.delete_tasks("uids=*").await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
{
"taskUid": "uid",
"indexUid": null,
"status": "enqueued",
"type": "taskDeletion",
"enqueuedAt": "[date]"
}
"###);
}