mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-22 12:54:26 +01:00
Merge #3659
3659: stops receiving tasks once the task queue is full r=Kerollmops a=irevoire Give 20GiB to the task queue + once 50% of the task queue is used, it blocks itself and only receives task deletion requests to ensure we never get in a state where we can’t do anything. Also, create a new error message when we reach this case: ``` Meilisearch cannot receive write operations because the size limit of the tasks database has been reached. Please delete tasks to continue performing write operations. ``` Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
f9960be115
@ -61,6 +61,8 @@ pub enum Error {
|
|||||||
SwapDuplicateIndexesFound(Vec<String>),
|
SwapDuplicateIndexesFound(Vec<String>),
|
||||||
#[error("Index `{0}` not found.")]
|
#[error("Index `{0}` not found.")]
|
||||||
SwapIndexNotFound(String),
|
SwapIndexNotFound(String),
|
||||||
|
#[error("Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.")]
|
||||||
|
NoSpaceLeftInTaskQueue,
|
||||||
#[error(
|
#[error(
|
||||||
"Indexes {} not found.",
|
"Indexes {} not found.",
|
||||||
.0.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", ")
|
.0.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", ")
|
||||||
@ -152,6 +154,8 @@ impl ErrorCode for Error {
|
|||||||
Error::TaskNotFound(_) => Code::TaskNotFound,
|
Error::TaskNotFound(_) => Code::TaskNotFound,
|
||||||
Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters,
|
Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters,
|
||||||
Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters,
|
Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters,
|
||||||
|
// TODO: not sure of the Code to use
|
||||||
|
Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice,
|
||||||
Error::Dump(e) => e.error_code(),
|
Error::Dump(e) => e.error_code(),
|
||||||
Error::Milli(e) => e.error_code(),
|
Error::Milli(e) => e.error_code(),
|
||||||
Error::ProcessBatchPanicked => Code::Internal,
|
Error::ProcessBatchPanicked => Code::Internal,
|
||||||
|
@ -820,6 +820,13 @@ impl IndexScheduler {
|
|||||||
pub fn register(&self, kind: KindWithContent) -> Result<Task> {
|
pub fn register(&self, kind: KindWithContent) -> Result<Task> {
|
||||||
let mut wtxn = self.env.write_txn()?;
|
let mut wtxn = self.env.write_txn()?;
|
||||||
|
|
||||||
|
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task
|
||||||
|
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty())
|
||||||
|
&& (self.env.real_disk_size()? * 100) / self.env.map_size()? as u64 > 50
|
||||||
|
{
|
||||||
|
return Err(Error::NoSpaceLeftInTaskQueue);
|
||||||
|
}
|
||||||
|
|
||||||
let mut task = Task {
|
let mut task = Task {
|
||||||
uid: self.next_task_id(&wtxn)?,
|
uid: self.next_task_id(&wtxn)?,
|
||||||
enqueued_at: OffsetDateTime::now_utc(),
|
enqueued_at: OffsetDateTime::now_utc(),
|
||||||
|
@ -68,7 +68,7 @@ const DEFAULT_LOG_EVERY_N: usize = 100_000;
|
|||||||
// The actual size of the virtual address space is computed at startup to determine how many 2TiB indexes can be
|
// The actual size of the virtual address space is computed at startup to determine how many 2TiB indexes can be
|
||||||
// opened simultaneously.
|
// opened simultaneously.
|
||||||
pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB
|
pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB
|
||||||
pub const TASK_DB_SIZE: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB
|
pub const TASK_DB_SIZE: u64 = 20 * 1024 * 1024 * 1024; // 20 GiB
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
|
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "UPPERCASE")]
|
#[serde(rename_all = "UPPERCASE")]
|
||||||
|
@ -1,11 +1,14 @@
|
|||||||
mod errors;
|
mod errors;
|
||||||
|
|
||||||
|
use byte_unit::{Byte, ByteUnit};
|
||||||
use meili_snap::insta::assert_json_snapshot;
|
use meili_snap::insta::assert_json_snapshot;
|
||||||
|
use meili_snap::{json_string, snapshot};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
use tempfile::TempDir;
|
||||||
use time::format_description::well_known::Rfc3339;
|
use time::format_description::well_known::Rfc3339;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
|
|
||||||
use crate::common::Server;
|
use crate::common::{default_settings, Server};
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn error_get_unexisting_task_status() {
|
async fn error_get_unexisting_task_status() {
|
||||||
@ -1000,3 +1003,101 @@ async fn test_summarized_dump_creation() {
|
|||||||
}
|
}
|
||||||
"###);
|
"###);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_web::test]
|
||||||
|
async fn test_task_queue_is_full() {
|
||||||
|
let dir = TempDir::new().unwrap();
|
||||||
|
let mut options = default_settings(dir.path());
|
||||||
|
options.max_task_db_size = Byte::from_unit(500.0, ByteUnit::B).unwrap();
|
||||||
|
|
||||||
|
let server = Server::new_with_options(options).await.unwrap();
|
||||||
|
|
||||||
|
// the first task should be enqueued without issue
|
||||||
|
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
|
||||||
|
snapshot!(code, @"202 Accepted");
|
||||||
|
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]" }), @r###"
|
||||||
|
{
|
||||||
|
"taskUid": 0,
|
||||||
|
"indexUid": "doggo",
|
||||||
|
"status": "enqueued",
|
||||||
|
"type": "indexCreation",
|
||||||
|
"enqueuedAt": "[date]"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (res, code) = server.create_index(json!({ "uid": "doggo" })).await;
|
||||||
|
if code == 422 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if res["taskUid"] == json!(null) {
|
||||||
|
panic!(
|
||||||
|
"Encountered the strange case:\n{}",
|
||||||
|
serde_json::to_string_pretty(&res).unwrap()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
|
||||||
|
snapshot!(code, @"422 Unprocessable Entity");
|
||||||
|
snapshot!(json_string!(result), @r###"
|
||||||
|
{
|
||||||
|
"message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.",
|
||||||
|
"code": "no_space_left_on_device",
|
||||||
|
"type": "system",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#no_space_left_on_device"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
// But we should still be able to register tasks deletion IF they delete something
|
||||||
|
let (result, code) = server.delete_tasks("uids=0").await;
|
||||||
|
snapshot!(code, @"200 OK");
|
||||||
|
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
|
||||||
|
{
|
||||||
|
"taskUid": "uid",
|
||||||
|
"indexUid": null,
|
||||||
|
"status": "enqueued",
|
||||||
|
"type": "taskDeletion",
|
||||||
|
"enqueuedAt": "[date]"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
// we're going to fill up the queue once again
|
||||||
|
loop {
|
||||||
|
let (res, code) = server.delete_tasks("uids=0").await;
|
||||||
|
if code == 422 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if res["taskUid"] == json!(null) {
|
||||||
|
panic!(
|
||||||
|
"Encountered the strange case:\n{}",
|
||||||
|
serde_json::to_string_pretty(&res).unwrap()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// But we should NOT be able to register this task because it doesn't match any tasks
|
||||||
|
let (result, code) = server.delete_tasks("uids=0").await;
|
||||||
|
snapshot!(code, @"422 Unprocessable Entity");
|
||||||
|
snapshot!(json_string!(result), @r###"
|
||||||
|
{
|
||||||
|
"message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.",
|
||||||
|
"code": "no_space_left_on_device",
|
||||||
|
"type": "system",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#no_space_left_on_device"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
// The deletion still works
|
||||||
|
let (result, code) = server.delete_tasks("uids=*").await;
|
||||||
|
snapshot!(code, @"200 OK");
|
||||||
|
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
|
||||||
|
{
|
||||||
|
"taskUid": "uid",
|
||||||
|
"indexUid": null,
|
||||||
|
"status": "enqueued",
|
||||||
|
"type": "taskDeletion",
|
||||||
|
"enqueuedAt": "[date]"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user