stops receiving tasks once the task queue is full

This commit is contained in:
Tamo 2023-04-06 18:26:27 +02:00
parent 950f73b8bb
commit be69ab320d
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
4 changed files with 102 additions and 2 deletions

View File

@ -61,6 +61,8 @@ pub enum Error {
SwapDuplicateIndexesFound(Vec<String>), SwapDuplicateIndexesFound(Vec<String>),
#[error("Index `{0}` not found.")] #[error("Index `{0}` not found.")]
SwapIndexNotFound(String), SwapIndexNotFound(String),
#[error("No space left in database. Free some space by deleting tasks.")]
NoSpaceLeftInTaskQueue,
#[error( #[error(
"Indexes {} not found.", "Indexes {} not found.",
.0.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", ") .0.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", ")
@ -152,6 +154,8 @@ impl ErrorCode for Error {
Error::TaskNotFound(_) => Code::TaskNotFound, Error::TaskNotFound(_) => Code::TaskNotFound,
Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters, Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters,
Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters, Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters,
// TODO: not sure of the Code to use
Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice,
Error::Dump(e) => e.error_code(), Error::Dump(e) => e.error_code(),
Error::Milli(e) => e.error_code(), Error::Milli(e) => e.error_code(),
Error::ProcessBatchPanicked => Code::Internal, Error::ProcessBatchPanicked => Code::Internal,

View File

@ -820,6 +820,13 @@ impl IndexScheduler {
pub fn register(&self, kind: KindWithContent) -> Result<Task> { pub fn register(&self, kind: KindWithContent) -> Result<Task> {
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
// if the task doesn't delete anything and 90% of the task queue is full, we must refuse to enqueue the incomming task
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty())
&& (self.env.real_disk_size()? * 100) / self.env.map_size()? as u64 > 90
{
return Err(Error::NoSpaceLeftInTaskQueue);
}
let mut task = Task { let mut task = Task {
uid: self.next_task_id(&wtxn)?, uid: self.next_task_id(&wtxn)?,
enqueued_at: OffsetDateTime::now_utc(), enqueued_at: OffsetDateTime::now_utc(),

View File

@ -68,7 +68,7 @@ const DEFAULT_LOG_EVERY_N: usize = 100_000;
// The actual size of the virtual address space is computed at startup to determine how many 2TiB indexes can be // The actual size of the virtual address space is computed at startup to determine how many 2TiB indexes can be
// opened simultaneously. // opened simultaneously.
pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB
pub const TASK_DB_SIZE: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB pub const TASK_DB_SIZE: u64 = 11 * 1024 * 1024 * 1024; // 11 GiB
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")] #[serde(rename_all = "UPPERCASE")]

View File

@ -1,11 +1,14 @@
mod errors; mod errors;
use byte_unit::{Byte, ByteUnit};
use meili_snap::insta::assert_json_snapshot; use meili_snap::insta::assert_json_snapshot;
use meili_snap::{json_string, snapshot};
use serde_json::json; use serde_json::json;
use tempfile::TempDir;
use time::format_description::well_known::Rfc3339; use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::common::Server; use crate::common::{default_settings, Server};
#[actix_rt::test] #[actix_rt::test]
async fn error_get_unexisting_task_status() { async fn error_get_unexisting_task_status() {
@ -1000,3 +1003,89 @@ async fn test_summarized_dump_creation() {
} }
"###); "###);
} }
#[actix_web::test]
async fn test_task_queue_is_full() {
let dir = TempDir::new().unwrap();
let mut options = default_settings(dir.path());
options.max_task_db_size = Byte::from_unit(500.0, ByteUnit::B).unwrap();
let server = Server::new_with_options(options).await.unwrap();
// the first task should be enqueued without issue
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
snapshot!(code, @"202 Accepted");
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]" }), @r###"
{
"taskUid": 0,
"indexUid": "doggo",
"status": "enqueued",
"type": "indexCreation",
"enqueuedAt": "[date]"
}
"###);
loop {
let (res, _code) = server.create_index(json!({ "uid": "doggo" })).await;
if res["taskUid"] == json!(null) {
break;
}
}
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
snapshot!(code, @"422 Unprocessable Entity");
snapshot!(json_string!(result), @r###"
{
"message": "No space left in database. Free some space by deleting tasks.",
"code": "no_space_left_on_device",
"type": "system",
"link": "https://docs.meilisearch.com/errors#no_space_left_on_device"
}
"###);
// But we should still be able to register tasks deletion IF they delete something
let (result, code) = server.delete_tasks("uids=0").await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
{
"taskUid": "uid",
"indexUid": null,
"status": "enqueued",
"type": "taskDeletion",
"enqueuedAt": "[date]"
}
"###);
// we're going to fill up the queue once again
loop {
let (res, _code) = server.create_index(json!({ "uid": "doggo" })).await;
if res["taskUid"] == json!(null) {
break;
}
}
// But we should NOT be able to register this task because it doesn't match any tasks
let (result, code) = server.delete_tasks("uids=0").await;
snapshot!(code, @"422 Unprocessable Entity");
snapshot!(json_string!(result), @r###"
{
"message": "No space left in database. Free some space by deleting tasks.",
"code": "no_space_left_on_device",
"type": "system",
"link": "https://docs.meilisearch.com/errors#no_space_left_on_device"
}
"###);
// The deletion still works
let (result, code) = server.delete_tasks("uids=*").await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
{
"taskUid": "uid",
"indexUid": null,
"status": "enqueued",
"type": "taskDeletion",
"enqueuedAt": "[date]"
}
"###);
}