make the generated filter valid

This commit is contained in:
Tamo 2023-04-26 13:55:02 +02:00 committed by Louis Dureuil
parent 9ca6f59546
commit dcbfecf42c
No known key found for this signature in database
8 changed files with 451 additions and 278 deletions

View file

@ -51,6 +51,7 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap;
use synchronoise::SignalEvent;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
use uuid::Uuid;
@ -1118,7 +1119,6 @@ impl IndexScheduler {
let finished = self.status.get(&rtxn, &Status::Succeeded)?.unwrap_or_default()
| self.status.get(&rtxn, &Status::Failed)?.unwrap_or_default()
| self.status.get(&rtxn, &Status::Canceled)?.unwrap_or_default();
drop(rtxn);
let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(100_000));
@ -1135,11 +1135,15 @@ impl IndexScheduler {
to_delete.len()
);
// it's safe to unwrap here because we checked the len above
let newest_task_id = to_delete.iter().last().unwrap();
let task = self.get_task(&rtxn, newest_task_id)?.ok_or(Error::CorruptedTaskQueue)?;
drop(rtxn);
self.register(KindWithContent::TaskDeletion {
query: format!(
"?from={},limit={},status=succeeded,failed,canceled",
to_delete.iter().last().unwrap_or(u32::MAX),
to_delete.len(),
"?beforeEnqueuedAt={},status=succeeded,failed,canceled",
task.enqueued_at.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?,
),
tasks: to_delete,
})?;
@ -1404,7 +1408,7 @@ mod tests {
use big_s::S;
use crossbeam::channel::RecvTimeoutError;
use file_store::File;
use meili_snap::snapshot;
use meili_snap::{json_string, snapshot};
use meilisearch_auth::AuthFilter;
use meilisearch_types::document_formats::DocumentFormatError;
use meilisearch_types::error::ErrorCode;
@ -3860,8 +3864,6 @@ mod tests {
handle.advance_one_failed_batch();
// at this point the max number of tasks is reached
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "max_number_of_tasks");
// we can still enqueue multiple tasks
index_scheduler
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
@ -3870,22 +3872,43 @@ mod tests {
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
.unwrap();
// at this point the max number of tasks is reached
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "above_the_max_number_of_tasks");
let rtxn = index_scheduler.env.read_txn().unwrap();
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), name: "task_queue_is_full");
drop(rtxn);
// now we're above the max number of tasks
// and if we try to advance in the tick function a new task deletion should be enqueued
handle.advance_till([Start, BatchCreated]);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_have_been_enqueued");
let rtxn = index_scheduler.env.read_txn().unwrap();
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "task_deletion_have_been_enqueued");
drop(rtxn);
handle.advance_till([InsideProcessBatch, ProcessBatchSucceeded, AfterProcessing]);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_have_been_processed");
let rtxn = index_scheduler.env.read_txn().unwrap();
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "task_deletion_have_been_processed");
drop(rtxn);
handle.advance_one_failed_batch();
// a new task deletion has been enqueued
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_the_second_task_deletion");
let rtxn = index_scheduler.env.read_txn().unwrap();
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "after_the_second_task_deletion");
drop(rtxn);
handle.advance_one_failed_batch();
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "everything_has_been_processed");
let rtxn = index_scheduler.env.read_txn().unwrap();
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "everything_has_been_processed");
drop(rtxn);
}
}