mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-22 12:54:26 +01:00
Merge #3693
3693: Implement the auto deletion of tasks r=dureuill a=irevoire Fixes https://github.com/meilisearch/meilisearch/issues/3622 This PR should be the definite fix for #3622. It adds a limit (1M) to the maximum number of tasks the task queue can hold. Once the task queue reaches this limit (1M of tasks are in the task queue, whatever their status is), meilisearch will schedule a task deletion that tries to delete the oldest 100k tasks. If meilisearch can't delete 100k tasks because some of them are not yet finished, it will delete as many tasks as possible. Once the limit is reached, you're still able to register new tasks. The engine will only stop you from adding new tasks once [the other hard limit](https://github.com/meilisearch/meilisearch/pull/3659) of 10GiB of tasks is reached (that's between 5M and 15M of tasks depending on your workflow). ------- Technically; - We only try to schedule our task deletion when calling the tick function but before creating a new batch. This means we never enqueue a task we're not going to process ~right away. - If our task deletion doesn't delete anything, we don't enqueue it and log a warn the user that the engine is not working properly Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
commit
78e611f282
@ -28,6 +28,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
||||
started_at,
|
||||
finished_at,
|
||||
index_mapper,
|
||||
max_number_of_tasks: _,
|
||||
wake_up: _,
|
||||
dumps_path: _,
|
||||
snapshots_path: _,
|
||||
|
@ -51,6 +51,7 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap
|
||||
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
||||
use roaring::RoaringBitmap;
|
||||
use synchronoise::SignalEvent;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use time::OffsetDateTime;
|
||||
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
|
||||
use uuid::Uuid;
|
||||
@ -241,6 +242,9 @@ pub struct IndexSchedulerOptions {
|
||||
/// Set to `true` iff the index scheduler is allowed to automatically
|
||||
/// batch tasks together, to process multiple tasks at once.
|
||||
pub autobatching_enabled: bool,
|
||||
/// The maximum number of tasks stored in the task queue before starting
|
||||
/// to auto schedule task deletions.
|
||||
pub max_number_of_tasks: usize,
|
||||
}
|
||||
|
||||
/// Structure which holds meilisearch's indexes and schedules the tasks
|
||||
@ -290,6 +294,10 @@ pub struct IndexScheduler {
|
||||
/// Whether auto-batching is enabled or not.
|
||||
pub(crate) autobatching_enabled: bool,
|
||||
|
||||
/// The max number of tasks allowed before the scheduler starts to delete
|
||||
/// the finished tasks automatically.
|
||||
pub(crate) max_number_of_tasks: usize,
|
||||
|
||||
/// The path used to create the dumps.
|
||||
pub(crate) dumps_path: PathBuf,
|
||||
|
||||
@ -339,6 +347,7 @@ impl IndexScheduler {
|
||||
index_mapper: self.index_mapper.clone(),
|
||||
wake_up: self.wake_up.clone(),
|
||||
autobatching_enabled: self.autobatching_enabled,
|
||||
max_number_of_tasks: self.max_number_of_tasks,
|
||||
snapshots_path: self.snapshots_path.clone(),
|
||||
dumps_path: self.dumps_path.clone(),
|
||||
auth_path: self.auth_path.clone(),
|
||||
@ -412,6 +421,7 @@ impl IndexScheduler {
|
||||
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
|
||||
wake_up: Arc::new(SignalEvent::auto(true)),
|
||||
autobatching_enabled: options.autobatching_enabled,
|
||||
max_number_of_tasks: options.max_number_of_tasks,
|
||||
dumps_path: options.dumps_path,
|
||||
snapshots_path: options.snapshots_path,
|
||||
auth_path: options.auth_path,
|
||||
@ -940,14 +950,15 @@ impl IndexScheduler {
|
||||
|
||||
/// Perform one iteration of the run loop.
|
||||
///
|
||||
/// 1. Find the next batch of tasks to be processed.
|
||||
/// 2. Update the information of these tasks following the start of their processing.
|
||||
/// 3. Update the in-memory list of processed tasks accordingly.
|
||||
/// 4. Process the batch:
|
||||
/// 1. See if we need to cleanup the task queue
|
||||
/// 2. Find the next batch of tasks to be processed.
|
||||
/// 3. Update the information of these tasks following the start of their processing.
|
||||
/// 4. Update the in-memory list of processed tasks accordingly.
|
||||
/// 5. Process the batch:
|
||||
/// - perform the actions of each batched task
|
||||
/// - update the information of each batched task following the end
|
||||
/// of their processing.
|
||||
/// 5. Reset the in-memory list of processed tasks.
|
||||
/// 6. Reset the in-memory list of processed tasks.
|
||||
///
|
||||
/// Returns the number of processed tasks.
|
||||
fn tick(&self) -> Result<TickOutcome> {
|
||||
@ -957,6 +968,8 @@ impl IndexScheduler {
|
||||
self.breakpoint(Breakpoint::Start);
|
||||
}
|
||||
|
||||
self.cleanup_task_queue()?;
|
||||
|
||||
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
|
||||
let batch =
|
||||
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
|
||||
@ -1093,6 +1106,55 @@ impl IndexScheduler {
|
||||
Ok(TickOutcome::TickAgain(processed_tasks))
|
||||
}
|
||||
|
||||
/// Register a task to cleanup the task queue if needed
|
||||
fn cleanup_task_queue(&self) -> Result<()> {
|
||||
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
|
||||
|
||||
let nb_tasks = self.all_task_ids(&rtxn)?.len();
|
||||
// if we have less than 1M tasks everything is fine
|
||||
if nb_tasks < self.max_number_of_tasks as u64 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let finished = self.status.get(&rtxn, &Status::Succeeded)?.unwrap_or_default()
|
||||
| self.status.get(&rtxn, &Status::Failed)?.unwrap_or_default()
|
||||
| self.status.get(&rtxn, &Status::Canceled)?.unwrap_or_default();
|
||||
|
||||
let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(100_000));
|
||||
|
||||
// /!\ the len must be at least 2 or else we might enter an infinite loop where we only delete
|
||||
// the deletion tasks we enqueued ourselves.
|
||||
if to_delete.len() < 2 {
|
||||
log::warn!("The task queue is almost full, but no task can be deleted yet.");
|
||||
// the only thing we can do is hope that the user tasks are going to finish
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
log::info!(
|
||||
"The task queue is almost full. Deleting the oldest {} finished tasks.",
|
||||
to_delete.len()
|
||||
);
|
||||
|
||||
// it's safe to unwrap here because we checked the len above
|
||||
let newest_task_id = to_delete.iter().last().unwrap();
|
||||
let last_task_to_delete =
|
||||
self.get_task(&rtxn, newest_task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
drop(rtxn);
|
||||
|
||||
// increase time by one nanosecond so that the enqueuedAt of the last task to delete is also lower than that date.
|
||||
let delete_before = last_task_to_delete.enqueued_at + Duration::from_nanos(1);
|
||||
|
||||
self.register(KindWithContent::TaskDeletion {
|
||||
query: format!(
|
||||
"?beforeEnqueuedAt={}&statuses=succeeded,failed,canceled",
|
||||
delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?,
|
||||
),
|
||||
tasks: to_delete,
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn index_stats(&self, index_uid: &str) -> Result<IndexStats> {
|
||||
let is_indexing = self.is_index_processing(index_uid)?;
|
||||
let rtxn = self.read_txn()?;
|
||||
@ -1350,9 +1412,10 @@ mod tests {
|
||||
use big_s::S;
|
||||
use crossbeam::channel::RecvTimeoutError;
|
||||
use file_store::File;
|
||||
use meili_snap::snapshot;
|
||||
use meili_snap::{json_string, snapshot};
|
||||
use meilisearch_auth::AuthFilter;
|
||||
use meilisearch_types::document_formats::DocumentFormatError;
|
||||
use meilisearch_types::error::ErrorCode;
|
||||
use meilisearch_types::index_uid_pattern::IndexUidPattern;
|
||||
use meilisearch_types::milli::obkv_to_json;
|
||||
use meilisearch_types::milli::update::IndexDocumentsMethod::{
|
||||
@ -1383,13 +1446,22 @@ mod tests {
|
||||
pub fn test(
|
||||
autobatching_enabled: bool,
|
||||
planned_failures: Vec<(usize, FailureLocation)>,
|
||||
) -> (Self, IndexSchedulerHandle) {
|
||||
Self::test_with_custom_config(planned_failures, |config| {
|
||||
config.autobatching_enabled = autobatching_enabled;
|
||||
})
|
||||
}
|
||||
|
||||
pub fn test_with_custom_config(
|
||||
planned_failures: Vec<(usize, FailureLocation)>,
|
||||
configuration: impl Fn(&mut IndexSchedulerOptions),
|
||||
) -> (Self, IndexSchedulerHandle) {
|
||||
let tempdir = TempDir::new().unwrap();
|
||||
let (sender, receiver) = crossbeam::channel::bounded(0);
|
||||
|
||||
let indexer_config = IndexerConfig { skip_index_budget: true, ..Default::default() };
|
||||
|
||||
let options = IndexSchedulerOptions {
|
||||
let mut options = IndexSchedulerOptions {
|
||||
version_file_path: tempdir.path().join(VERSION_FILE_NAME),
|
||||
auth_path: tempdir.path().join("auth"),
|
||||
tasks_path: tempdir.path().join("db_path"),
|
||||
@ -1402,8 +1474,10 @@ mod tests {
|
||||
index_growth_amount: 1000 * 1000, // 1 MB
|
||||
index_count: 5,
|
||||
indexer_config,
|
||||
autobatching_enabled,
|
||||
autobatching_enabled: true,
|
||||
max_number_of_tasks: 1_000_000,
|
||||
};
|
||||
configuration(&mut options);
|
||||
|
||||
let index_scheduler = Self::new(options, sender, planned_failures).unwrap();
|
||||
|
||||
@ -3718,4 +3792,127 @@ mod tests {
|
||||
// No matter what happens in process_batch, the index_scheduler should be internally consistent
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_task_queue_is_full() {
|
||||
let (index_scheduler, mut handle) =
|
||||
IndexScheduler::test_with_custom_config(vec![], |config| {
|
||||
// that's the minimum map size possible
|
||||
config.task_db_size = 1048576;
|
||||
});
|
||||
|
||||
index_scheduler
|
||||
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
|
||||
.unwrap();
|
||||
handle.advance_one_successful_batch();
|
||||
// on average this task takes ~600 bytes
|
||||
loop {
|
||||
let result = index_scheduler.register(KindWithContent::IndexCreation {
|
||||
index_uid: S("doggo"),
|
||||
primary_key: None,
|
||||
});
|
||||
if result.is_err() {
|
||||
break;
|
||||
}
|
||||
handle.advance_one_failed_batch();
|
||||
}
|
||||
index_scheduler.assert_internally_consistent();
|
||||
|
||||
// at this point the task DB shoud have reached its limit and we should not be able to register new tasks
|
||||
let result = index_scheduler
|
||||
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
|
||||
.unwrap_err();
|
||||
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
|
||||
// we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code
|
||||
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
|
||||
|
||||
// Even the task deletion that doesn't delete anything shouldn't be accepted
|
||||
let result = index_scheduler
|
||||
.register(KindWithContent::TaskDeletion {
|
||||
query: S("test"),
|
||||
tasks: RoaringBitmap::new(),
|
||||
})
|
||||
.unwrap_err();
|
||||
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
|
||||
// we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code
|
||||
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
|
||||
|
||||
// But a task deletion that delete something should works
|
||||
index_scheduler
|
||||
.register(KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() })
|
||||
.unwrap();
|
||||
handle.advance_one_successful_batch();
|
||||
|
||||
// Now we should be able to enqueue a few tasks again
|
||||
index_scheduler
|
||||
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
|
||||
.unwrap();
|
||||
handle.advance_one_failed_batch();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_auto_deletion_of_tasks() {
|
||||
let (index_scheduler, mut handle) =
|
||||
IndexScheduler::test_with_custom_config(vec![], |config| {
|
||||
config.max_number_of_tasks = 2;
|
||||
});
|
||||
|
||||
index_scheduler
|
||||
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
|
||||
.unwrap();
|
||||
handle.advance_one_successful_batch();
|
||||
|
||||
index_scheduler
|
||||
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
|
||||
.unwrap();
|
||||
handle.advance_one_failed_batch();
|
||||
|
||||
// at this point the max number of tasks is reached
|
||||
// we can still enqueue multiple tasks
|
||||
index_scheduler
|
||||
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
|
||||
.unwrap();
|
||||
index_scheduler
|
||||
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
|
||||
.unwrap();
|
||||
|
||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
|
||||
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
|
||||
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), name: "task_queue_is_full");
|
||||
drop(rtxn);
|
||||
|
||||
// now we're above the max number of tasks
|
||||
// and if we try to advance in the tick function a new task deletion should be enqueued
|
||||
handle.advance_till([Start, BatchCreated]);
|
||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
|
||||
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
|
||||
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "task_deletion_have_been_enqueued");
|
||||
drop(rtxn);
|
||||
|
||||
handle.advance_till([InsideProcessBatch, ProcessBatchSucceeded, AfterProcessing]);
|
||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
|
||||
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
|
||||
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "task_deletion_have_been_processed");
|
||||
drop(rtxn);
|
||||
|
||||
handle.advance_one_failed_batch();
|
||||
// a new task deletion has been enqueued
|
||||
handle.advance_one_successful_batch();
|
||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
|
||||
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
|
||||
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "after_the_second_task_deletion");
|
||||
drop(rtxn);
|
||||
|
||||
handle.advance_one_failed_batch();
|
||||
handle.advance_one_successful_batch();
|
||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
|
||||
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
|
||||
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "everything_has_been_processed");
|
||||
drop(rtxn);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,68 @@
|
||||
---
|
||||
source: index-scheduler/src/lib.rs
|
||||
---
|
||||
[
|
||||
{
|
||||
"uid": 3,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"IndexInfo": {
|
||||
"primary_key": null
|
||||
}
|
||||
},
|
||||
"status": "enqueued",
|
||||
"kind": {
|
||||
"indexCreation": {
|
||||
"index_uid": "doggo",
|
||||
"primary_key": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"uid": 5,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"TaskDeletion": {
|
||||
"matched_tasks": 2,
|
||||
"deleted_tasks": 2,
|
||||
"original_filter": "[filter]"
|
||||
}
|
||||
},
|
||||
"status": "succeeded",
|
||||
"kind": {
|
||||
"taskDeletion": {
|
||||
"query": "[query]",
|
||||
"tasks": [
|
||||
58,
|
||||
48,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
0,
|
||||
16,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
2,
|
||||
0,
|
||||
4,
|
||||
0
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
@ -0,0 +1,48 @@
|
||||
---
|
||||
source: index-scheduler/src/lib.rs
|
||||
---
|
||||
[
|
||||
{
|
||||
"uid": 6,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"TaskDeletion": {
|
||||
"matched_tasks": 2,
|
||||
"deleted_tasks": 2,
|
||||
"original_filter": "[filter]"
|
||||
}
|
||||
},
|
||||
"status": "succeeded",
|
||||
"kind": {
|
||||
"taskDeletion": {
|
||||
"query": "[query]",
|
||||
"tasks": [
|
||||
58,
|
||||
48,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
0,
|
||||
16,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
3,
|
||||
0,
|
||||
5,
|
||||
0
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
@ -0,0 +1,133 @@
|
||||
---
|
||||
source: index-scheduler/src/lib.rs
|
||||
---
|
||||
[
|
||||
{
|
||||
"uid": 0,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"IndexInfo": {
|
||||
"primary_key": null
|
||||
}
|
||||
},
|
||||
"status": "succeeded",
|
||||
"kind": {
|
||||
"indexCreation": {
|
||||
"index_uid": "doggo",
|
||||
"primary_key": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"uid": 1,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": {
|
||||
"message": "Index `doggo` already exists.",
|
||||
"code": "index_already_exists",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#index_already_exists"
|
||||
},
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"IndexInfo": {
|
||||
"primary_key": null
|
||||
}
|
||||
},
|
||||
"status": "failed",
|
||||
"kind": {
|
||||
"indexCreation": {
|
||||
"index_uid": "doggo",
|
||||
"primary_key": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"uid": 2,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"IndexInfo": {
|
||||
"primary_key": null
|
||||
}
|
||||
},
|
||||
"status": "enqueued",
|
||||
"kind": {
|
||||
"indexCreation": {
|
||||
"index_uid": "doggo",
|
||||
"primary_key": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"uid": 3,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"IndexInfo": {
|
||||
"primary_key": null
|
||||
}
|
||||
},
|
||||
"status": "enqueued",
|
||||
"kind": {
|
||||
"indexCreation": {
|
||||
"index_uid": "doggo",
|
||||
"primary_key": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"uid": 4,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"TaskDeletion": {
|
||||
"matched_tasks": 2,
|
||||
"deleted_tasks": null,
|
||||
"original_filter": "[filter]"
|
||||
}
|
||||
},
|
||||
"status": "enqueued",
|
||||
"kind": {
|
||||
"taskDeletion": {
|
||||
"query": "[query]",
|
||||
"tasks": [
|
||||
58,
|
||||
48,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
0,
|
||||
16,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
0
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
@ -0,0 +1,88 @@
|
||||
---
|
||||
source: index-scheduler/src/lib.rs
|
||||
---
|
||||
[
|
||||
{
|
||||
"uid": 2,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"IndexInfo": {
|
||||
"primary_key": null
|
||||
}
|
||||
},
|
||||
"status": "enqueued",
|
||||
"kind": {
|
||||
"indexCreation": {
|
||||
"index_uid": "doggo",
|
||||
"primary_key": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"uid": 3,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"IndexInfo": {
|
||||
"primary_key": null
|
||||
}
|
||||
},
|
||||
"status": "enqueued",
|
||||
"kind": {
|
||||
"indexCreation": {
|
||||
"index_uid": "doggo",
|
||||
"primary_key": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"uid": 4,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"TaskDeletion": {
|
||||
"matched_tasks": 2,
|
||||
"deleted_tasks": 2,
|
||||
"original_filter": "[filter]"
|
||||
}
|
||||
},
|
||||
"status": "succeeded",
|
||||
"kind": {
|
||||
"taskDeletion": {
|
||||
"query": "[query]",
|
||||
"tasks": [
|
||||
58,
|
||||
48,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
0,
|
||||
16,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
0
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
@ -0,0 +1,90 @@
|
||||
---
|
||||
source: index-scheduler/src/lib.rs
|
||||
---
|
||||
[
|
||||
{
|
||||
"uid": 0,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"IndexInfo": {
|
||||
"primary_key": null
|
||||
}
|
||||
},
|
||||
"status": "succeeded",
|
||||
"kind": {
|
||||
"indexCreation": {
|
||||
"index_uid": "doggo",
|
||||
"primary_key": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"uid": 1,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": {
|
||||
"message": "Index `doggo` already exists.",
|
||||
"code": "index_already_exists",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#index_already_exists"
|
||||
},
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"IndexInfo": {
|
||||
"primary_key": null
|
||||
}
|
||||
},
|
||||
"status": "failed",
|
||||
"kind": {
|
||||
"indexCreation": {
|
||||
"index_uid": "doggo",
|
||||
"primary_key": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"uid": 2,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"IndexInfo": {
|
||||
"primary_key": null
|
||||
}
|
||||
},
|
||||
"status": "enqueued",
|
||||
"kind": {
|
||||
"indexCreation": {
|
||||
"index_uid": "doggo",
|
||||
"primary_key": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"uid": 3,
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"error": null,
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"IndexInfo": {
|
||||
"primary_key": null
|
||||
}
|
||||
},
|
||||
"status": "enqueued",
|
||||
"kind": {
|
||||
"indexCreation": {
|
||||
"index_uid": "doggo",
|
||||
"primary_key": null
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
@ -234,6 +234,7 @@ fn open_or_create_database_unchecked(
|
||||
index_base_map_size: opt.max_index_size.get_bytes() as usize,
|
||||
indexer_config: (&opt.indexer_options).try_into()?,
|
||||
autobatching_enabled: true,
|
||||
max_number_of_tasks: 1_000_000,
|
||||
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
|
||||
index_count: DEFAULT_INDEX_COUNT,
|
||||
})?)
|
||||
|
@ -1,14 +1,11 @@
|
||||
mod errors;
|
||||
|
||||
use byte_unit::{Byte, ByteUnit};
|
||||
use meili_snap::insta::assert_json_snapshot;
|
||||
use meili_snap::{json_string, snapshot};
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::common::{default_settings, Server};
|
||||
use crate::common::Server;
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn error_get_unexisting_task_status() {
|
||||
@ -1003,117 +1000,3 @@ async fn test_summarized_dump_creation() {
|
||||
}
|
||||
"###);
|
||||
}
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_task_queue_is_full() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let mut options = default_settings(dir.path());
|
||||
options.max_task_db_size = Byte::from_unit(500.0, ByteUnit::B).unwrap();
|
||||
|
||||
let server = Server::new_with_options(options).await.unwrap();
|
||||
|
||||
// the first task should be enqueued without issue
|
||||
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
|
||||
snapshot!(code, @"202 Accepted");
|
||||
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]" }), @r###"
|
||||
{
|
||||
"taskUid": 0,
|
||||
"indexUid": "doggo",
|
||||
"status": "enqueued",
|
||||
"type": "indexCreation",
|
||||
"enqueuedAt": "[date]"
|
||||
}
|
||||
"###);
|
||||
|
||||
loop {
|
||||
let (res, code) = server.create_index(json!({ "uid": "doggo" })).await;
|
||||
if code == 422 {
|
||||
break;
|
||||
}
|
||||
if res["taskUid"] == json!(null) {
|
||||
panic!(
|
||||
"Encountered the strange case:\n{}",
|
||||
serde_json::to_string_pretty(&res).unwrap()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
|
||||
snapshot!(code, @"422 Unprocessable Entity");
|
||||
snapshot!(json_string!(result), @r###"
|
||||
{
|
||||
"message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.",
|
||||
"code": "no_space_left_on_device",
|
||||
"type": "system",
|
||||
"link": "https://docs.meilisearch.com/errors#no_space_left_on_device"
|
||||
}
|
||||
"###);
|
||||
|
||||
// But we should still be able to register tasks deletion IF they delete something
|
||||
let (result, code) = server.delete_tasks("uids=*").await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
|
||||
{
|
||||
"taskUid": "uid",
|
||||
"indexUid": null,
|
||||
"status": "enqueued",
|
||||
"type": "taskDeletion",
|
||||
"enqueuedAt": "[date]"
|
||||
}
|
||||
"###);
|
||||
|
||||
let result = server.wait_task(result["taskUid"].as_u64().unwrap()).await;
|
||||
snapshot!(json_string!(result["status"]), @r###""succeeded""###);
|
||||
|
||||
// Now we should be able to register tasks again
|
||||
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
|
||||
snapshot!(code, @"202 Accepted");
|
||||
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
|
||||
{
|
||||
"taskUid": "uid",
|
||||
"indexUid": "doggo",
|
||||
"status": "enqueued",
|
||||
"type": "indexCreation",
|
||||
"enqueuedAt": "[date]"
|
||||
}
|
||||
"###);
|
||||
|
||||
// we're going to fill up the queue once again
|
||||
loop {
|
||||
let (res, code) = server.delete_tasks("uids=0").await;
|
||||
if code == 422 {
|
||||
break;
|
||||
}
|
||||
if res["taskUid"] == json!(null) {
|
||||
panic!(
|
||||
"Encountered the strange case:\n{}",
|
||||
serde_json::to_string_pretty(&res).unwrap()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// But we should NOT be able to register this task because it doesn't match any tasks
|
||||
let (result, code) = server.delete_tasks("uids=0").await;
|
||||
snapshot!(code, @"422 Unprocessable Entity");
|
||||
snapshot!(json_string!(result), @r###"
|
||||
{
|
||||
"message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.",
|
||||
"code": "no_space_left_on_device",
|
||||
"type": "system",
|
||||
"link": "https://docs.meilisearch.com/errors#no_space_left_on_device"
|
||||
}
|
||||
"###);
|
||||
|
||||
// The deletion still works
|
||||
let (result, code) = server.delete_tasks("uids=*").await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
|
||||
{
|
||||
"taskUid": "uid",
|
||||
"indexUid": null,
|
||||
"status": "enqueued",
|
||||
"type": "taskDeletion",
|
||||
"enqueuedAt": "[date]"
|
||||
}
|
||||
"###);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user