Implement task date filters

before/after enqueued/started/finished at
This commit is contained in:
Loïc Lecrenier 2022-10-19 12:59:12 +02:00 committed by Clément Renault
parent f3c2be1eb5
commit 1eeef1c1c8
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
23 changed files with 619 additions and 84 deletions

View file

@ -11,6 +11,10 @@ pub type TaskId = u32;
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use serde::Serialize;
use utils::keep_tasks_within_datetimes;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
@ -20,21 +24,20 @@ use file_store::FileStore;
use meilisearch_types::error::ResponseError;
use meilisearch_types::milli;
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use synchronoise::SignalEvent;
use time::OffsetDateTime;
use uuid::Uuid;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env};
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use crate::index_mapper::IndexMapper;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
type BEI128 = meilisearch_types::heed::zerocopy::I128<meilisearch_types::heed::byteorder::BE>;
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Query {
pub limit: Option<u32>,
@ -44,19 +47,19 @@ pub struct Query {
pub kind: Option<Vec<Kind>>,
pub index_uid: Option<Vec<String>>,
pub uid: Option<Vec<TaskId>>,
}
impl Default for Query {
fn default() -> Self {
Self {
limit: None,
from: None,
status: None,
kind: None,
index_uid: None,
uid: None,
}
}
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub before_enqueued_at: Option<OffsetDateTime>,
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub after_enqueued_at: Option<OffsetDateTime>,
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub before_started_at: Option<OffsetDateTime>,
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub after_started_at: Option<OffsetDateTime>,
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub before_finished_at: Option<OffsetDateTime>,
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub after_finished_at: Option<OffsetDateTime>,
}
impl Query {
@ -71,7 +74,13 @@ impl Query {
status: None,
kind: None,
index_uid: None,
uid: None
uid: None,
before_enqueued_at: None,
after_enqueued_at: None,
before_started_at: None,
after_started_at: None,
before_finished_at: None,
after_finished_at: None,
}
)
}
@ -177,6 +186,9 @@ mod db_name {
pub const STATUS: &str = "status";
pub const KIND: &str = "kind";
pub const INDEX_TASKS: &str = "index-tasks";
pub const ENQUEUED_AT: &str = "enqueued-at";
pub const STARTED_AT: &str = "started-at";
pub const FINISHED_AT: &str = "finished-at";
}
/// This module is responsible for two things;
@ -202,6 +214,20 @@ pub struct IndexScheduler {
/// Store the tasks associated to an index.
pub(crate) index_tasks: Database<Str, RoaringBitmapCodec>,
/// Store the task ids of tasks which were enqueued at a specific date
///
/// Note that since we store the date with nanosecond-level precision, it would be
/// reasonable to assume that there is only one task per key. However, it is not a
/// theoretical certainty, and we might want to make it possible to enqueue multiple
/// tasks at a time in the future.
pub(crate) enqueued_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
/// Store the task ids of finished tasks which started being processed at a specific date
pub(crate) started_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
/// Store the task ids of tasks which finished at a specific date
pub(crate) finished_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
/// In charge of creating, opening, storing and returning indexes.
pub(crate) index_mapper: IndexMapper,
@ -247,7 +273,7 @@ impl IndexScheduler {
std::fs::create_dir_all(&dumps_path)?;
let mut options = heed::EnvOpenOptions::new();
options.max_dbs(6);
options.max_dbs(9);
let env = options.open(tasks_path)?;
let file_store = FileStore::new(&update_file_path)?;
@ -261,6 +287,9 @@ impl IndexScheduler {
status: env.create_database(Some(db_name::STATUS))?,
kind: env.create_database(Some(db_name::KIND))?,
index_tasks: env.create_database(Some(db_name::INDEX_TASKS))?,
enqueued_at: env.create_database(Some(db_name::ENQUEUED_AT))?,
started_at: env.create_database(Some(db_name::STARTED_AT))?,
finished_at: env.create_database(Some(db_name::FINISHED_AT))?,
index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)?,
env,
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
@ -287,6 +316,9 @@ impl IndexScheduler {
status: self.status,
kind: self.kind,
index_tasks: self.index_tasks,
enqueued_at: self.enqueued_at,
started_at: self.started_at,
finished_at: self.finished_at,
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
@ -359,6 +391,30 @@ impl IndexScheduler {
}
tasks &= index_tasks;
}
keep_tasks_within_datetimes(
&rtxn,
&mut tasks,
self.enqueued_at,
query.after_enqueued_at,
query.before_enqueued_at,
)?;
keep_tasks_within_datetimes(
&rtxn,
&mut tasks,
self.started_at,
query.after_started_at,
query.before_started_at,
)?;
keep_tasks_within_datetimes(
&rtxn,
&mut tasks,
self.finished_at,
query.after_finished_at,
query.before_finished_at,
)?;
rtxn.commit().unwrap();
Ok(tasks)
}
@ -438,6 +494,8 @@ impl IndexScheduler {
(bitmap.insert(task.uid));
})?;
utils::insert_task_datetime(&mut wtxn, self.enqueued_at, task.enqueued_at, task.uid)?;
if let Err(e) = wtxn.commit() {
self.delete_persisted_task_data(&task)?;
return Err(e.into());