Implement task date filters

before/after enqueued/started/finished at
This commit is contained in:
Loïc Lecrenier 2022-10-19 12:59:12 +02:00 committed by Clément Renault
parent 5765883600
commit 22cf0559fe
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
23 changed files with 619 additions and 84 deletions

View file

@ -2,6 +2,7 @@ use std::collections::HashSet;
use std::fs::File;
use std::io::BufWriter;
use crate::utils;
use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId};
use dump::IndexMetadata;
@ -1015,6 +1016,13 @@ impl IndexScheduler {
// we can only delete succeeded, failed, and canceled tasks.
// In each of those cases, the persisted data is supposed to
// have been deleted already.
utils::remove_task_datetime(wtxn, self.enqueued_at, task.enqueued_at, task.uid)?;
if let Some(started_at) = task.started_at {
utils::remove_task_datetime(wtxn, self.started_at, started_at, task.uid)?;
}
if let Some(finished_at) = task.finished_at {
utils::remove_task_datetime(wtxn, self.finished_at, finished_at, task.uid)?;
}
}
for index in affected_indexes {

View file

@ -11,6 +11,10 @@ pub type TaskId = u32;
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use serde::Serialize;
use utils::keep_tasks_within_datetimes;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
@ -20,21 +24,20 @@ use file_store::FileStore;
use meilisearch_types::error::ResponseError;
use meilisearch_types::milli;
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use synchronoise::SignalEvent;
use time::OffsetDateTime;
use uuid::Uuid;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env};
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use crate::index_mapper::IndexMapper;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
type BEI128 = meilisearch_types::heed::zerocopy::I128<meilisearch_types::heed::byteorder::BE>;
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Query {
pub limit: Option<u32>,
@ -44,19 +47,19 @@ pub struct Query {
pub kind: Option<Vec<Kind>>,
pub index_uid: Option<Vec<String>>,
pub uid: Option<Vec<TaskId>>,
}
impl Default for Query {
fn default() -> Self {
Self {
limit: None,
from: None,
status: None,
kind: None,
index_uid: None,
uid: None,
}
}
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub before_enqueued_at: Option<OffsetDateTime>,
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub after_enqueued_at: Option<OffsetDateTime>,
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub before_started_at: Option<OffsetDateTime>,
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub after_started_at: Option<OffsetDateTime>,
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub before_finished_at: Option<OffsetDateTime>,
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub after_finished_at: Option<OffsetDateTime>,
}
impl Query {
@ -71,7 +74,13 @@ impl Query {
status: None,
kind: None,
index_uid: None,
uid: None
uid: None,
before_enqueued_at: None,
after_enqueued_at: None,
before_started_at: None,
after_started_at: None,
before_finished_at: None,
after_finished_at: None,
}
)
}
@ -177,6 +186,9 @@ mod db_name {
pub const STATUS: &str = "status";
pub const KIND: &str = "kind";
pub const INDEX_TASKS: &str = "index-tasks";
pub const ENQUEUED_AT: &str = "enqueued-at";
pub const STARTED_AT: &str = "started-at";
pub const FINISHED_AT: &str = "finished-at";
}
/// This module is responsible for two things;
@ -202,6 +214,20 @@ pub struct IndexScheduler {
/// Store the tasks associated to an index.
pub(crate) index_tasks: Database<Str, RoaringBitmapCodec>,
/// Store the task ids of tasks which were enqueued at a specific date
///
/// Note that since we store the date with nanosecond-level precision, it would be
/// reasonable to assume that there is only one task per key. However, it is not a
/// theoretical certainty, and we might want to make it possible to enqueue multiple
/// tasks at a time in the future.
pub(crate) enqueued_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
/// Store the task ids of finished tasks which started being processed at a specific date
pub(crate) started_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
/// Store the task ids of tasks which finished at a specific date
pub(crate) finished_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
/// In charge of creating, opening, storing and returning indexes.
pub(crate) index_mapper: IndexMapper,
@ -247,7 +273,7 @@ impl IndexScheduler {
std::fs::create_dir_all(&dumps_path)?;
let mut options = heed::EnvOpenOptions::new();
options.max_dbs(6);
options.max_dbs(9);
let env = options.open(tasks_path)?;
let file_store = FileStore::new(&update_file_path)?;
@ -261,6 +287,9 @@ impl IndexScheduler {
status: env.create_database(Some(db_name::STATUS))?,
kind: env.create_database(Some(db_name::KIND))?,
index_tasks: env.create_database(Some(db_name::INDEX_TASKS))?,
enqueued_at: env.create_database(Some(db_name::ENQUEUED_AT))?,
started_at: env.create_database(Some(db_name::STARTED_AT))?,
finished_at: env.create_database(Some(db_name::FINISHED_AT))?,
index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)?,
env,
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
@ -287,6 +316,9 @@ impl IndexScheduler {
status: self.status,
kind: self.kind,
index_tasks: self.index_tasks,
enqueued_at: self.enqueued_at,
started_at: self.started_at,
finished_at: self.finished_at,
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
@ -359,6 +391,30 @@ impl IndexScheduler {
}
tasks &= index_tasks;
}
keep_tasks_within_datetimes(
&rtxn,
&mut tasks,
self.enqueued_at,
query.after_enqueued_at,
query.before_enqueued_at,
)?;
keep_tasks_within_datetimes(
&rtxn,
&mut tasks,
self.started_at,
query.after_started_at,
query.before_started_at,
)?;
keep_tasks_within_datetimes(
&rtxn,
&mut tasks,
self.finished_at,
query.after_finished_at,
query.before_finished_at,
)?;
rtxn.commit().unwrap();
Ok(tasks)
}
@ -438,6 +494,8 @@ impl IndexScheduler {
(bitmap.insert(task.uid));
})?;
utils::insert_task_datetime(&mut wtxn, self.enqueued_at, task.enqueued_at, task.uid)?;
if let Err(e) = wtxn.commit() {
self.delete_persisted_task_data(&task)?;
return Err(e.into());

View file

@ -1,4 +1,4 @@
use meilisearch_types::milli::{RoaringBitmapCodec, BEU32};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::Details;
use meilisearch_types::{
heed::{
@ -9,12 +9,13 @@ use meilisearch_types::{
};
use roaring::RoaringBitmap;
use crate::BEI128;
use crate::{index_mapper::IndexMapper, IndexScheduler, Kind, Status};
pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let IndexScheduler {
autobatching_enabled,
must_stop_processing,
must_stop_processing: _,
processing_tasks,
file_store,
env,
@ -22,6 +23,9 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
status,
kind,
index_tasks,
enqueued_at,
started_at,
finished_at,
index_mapper,
wake_up: _,
dumps_path: _,
@ -60,6 +64,18 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
snap.push_str(&snapshot_index_mapper(&rtxn, index_mapper));
snap.push_str("\n----------------------------------------------------------------------\n");
snap.push_str("### Enqueued At:\n");
snap.push_str(&snapshot_date_db(&rtxn, *enqueued_at));
snap.push_str("----------------------------------------------------------------------\n");
snap.push_str("### Started At:\n");
snap.push_str(&snapshot_date_db(&rtxn, *started_at));
snap.push_str("----------------------------------------------------------------------\n");
snap.push_str("### Finished At:\n");
snap.push_str(&snapshot_date_db(&rtxn, *finished_at));
snap.push_str("----------------------------------------------------------------------\n");
snap.push_str("### File Store:\n");
snap.push_str(&snapshot_file_store(file_store));
snap.push_str("\n----------------------------------------------------------------------\n");
@ -97,6 +113,19 @@ fn snapshot_all_tasks(rtxn: &RoTxn, db: Database<OwnedType<BEU32>, SerdeJson<Tas
snap
}
fn snapshot_date_db(
rtxn: &RoTxn,
db: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
) -> String {
let mut snap = String::new();
let mut iter = db.iter(rtxn).unwrap();
while let Some(next) = iter.next() {
let (_timestamp, task_ids) = next.unwrap();
snap.push_str(&format!("[timestamp] {}\n", snapshot_bitmap(&task_ids)));
}
snap
}
fn snapshot_task(task: &Task) -> String {
let mut snap = String::new();
let Task {

View file

@ -20,6 +20,13 @@ doggos [0,]
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000

View file

@ -20,6 +20,13 @@ doggos [0,]
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000

View file

@ -21,6 +21,15 @@ doggos [0,]
### Index Mapper:
["doggos"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View file

@ -23,6 +23,15 @@ doggos [2,]
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View file

@ -27,6 +27,17 @@ doggo [4,]
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View file

@ -22,6 +22,14 @@ doggo [1,]
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000
00000000-0000-0000-0000-000000000001

View file

@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [1,]
@ -23,6 +23,16 @@ doggo [1,]
### Index Mapper:
["catto"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000001

View file

@ -25,6 +25,19 @@ doggo [1,]
### Index Mapper:
["catto"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Started At:
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000001

View file

@ -22,6 +22,14 @@ doggo [1,]
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000
00000000-0000-0000-0000-000000000001

View file

@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [1,]
@ -23,6 +23,16 @@ doggo [1,]
### Index Mapper:
["catto"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000001

View file

@ -24,6 +24,16 @@ doggo [1,]
### Index Mapper:
["catto"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Started At:
[timestamp] [2,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [2,]
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000001

View file

@ -24,6 +24,15 @@ doggo [2,]
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View file

@ -27,6 +27,18 @@ doggo [2,]
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Started At:
[timestamp] [3,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [3,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View file

@ -26,6 +26,16 @@ doggo [2,]
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View file

@ -26,6 +26,16 @@ doggo [2,]
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View file

@ -1,10 +1,15 @@
//! Utility functions on the DBs. Mainly getter and setters.
use meilisearch_types::heed::{types::DecodeIgnore, RoTxn, RwTxn};
use meilisearch_types::milli::BEU32;
use roaring::{MultiOps, RoaringBitmap};
use std::ops::Bound;
use crate::{Error, IndexScheduler, Result, Task, TaskId};
use meilisearch_types::heed::types::OwnedType;
use meilisearch_types::heed::Database;
use meilisearch_types::heed::{types::DecodeIgnore, RoTxn, RwTxn};
use meilisearch_types::milli::{CboRoaringBitmapCodec, BEU32};
use roaring::{MultiOps, RoaringBitmap};
use time::OffsetDateTime;
use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128};
use meilisearch_types::tasks::{Kind, Status};
impl IndexScheduler {
@ -75,6 +80,26 @@ impl IndexScheduler {
})?;
}
if old_task.enqueued_at != task.enqueued_at {
unreachable!("Cannot update a task's enqueued_at time");
}
if old_task.started_at != task.started_at {
if old_task.started_at.is_some() {
unreachable!("Cannot update a task's started_at time");
}
if let Some(started_at) = task.started_at {
insert_task_datetime(wtxn, self.started_at, started_at, task.uid)?;
}
}
if old_task.finished_at != task.finished_at {
if old_task.finished_at.is_some() {
unreachable!("Cannot update a task's finished_at time");
}
if let Some(finished_at) = task.finished_at {
insert_task_datetime(wtxn, self.finished_at, finished_at, task.uid)?;
}
}
self.all_tasks.put(wtxn, &BEU32::new(task.uid), task)?;
Ok(())
}
@ -158,3 +183,73 @@ impl IndexScheduler {
Ok(())
}
}
pub(crate) fn insert_task_datetime(
wtxn: &mut RwTxn,
database: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
time: OffsetDateTime,
task_id: TaskId,
) -> Result<()> {
let timestamp = BEI128::new(time.unix_timestamp_nanos());
let mut task_ids = if let Some(existing) = database.get(&wtxn, &timestamp)? {
existing
} else {
RoaringBitmap::new()
};
task_ids.insert(task_id);
database.put(wtxn, &timestamp, &RoaringBitmap::from_iter([task_id]))?;
Ok(())
}
pub(crate) fn remove_task_datetime(
wtxn: &mut RwTxn,
database: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
time: OffsetDateTime,
task_id: TaskId,
) -> Result<()> {
let timestamp = BEI128::new(time.unix_timestamp_nanos());
if let Some(mut existing) = database.get(&wtxn, &timestamp)? {
existing.remove(task_id);
if existing.is_empty() {
database.delete(wtxn, &timestamp)?;
} else {
database.put(wtxn, &timestamp, &RoaringBitmap::from_iter([task_id]))?;
}
}
Ok(())
}
pub(crate) fn keep_tasks_within_datetimes(
rtxn: &RoTxn,
tasks: &mut RoaringBitmap,
database: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
after: Option<OffsetDateTime>,
before: Option<OffsetDateTime>,
) -> Result<()> {
let (start, end) = match (&after, &before) {
(None, None) => return Ok(()),
(None, Some(before)) => (Bound::Unbounded, Bound::Excluded(*before)),
(Some(after), None) => (Bound::Excluded(*after), Bound::Unbounded),
(Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)),
};
let mut collected_task_ids = RoaringBitmap::new();
let start = map_bound(start, |b| BEI128::new(b.unix_timestamp_nanos()));
let end = map_bound(end, |b| BEI128::new(b.unix_timestamp_nanos()));
let iter = database.range(&rtxn, &(start, end))?;
for r in iter {
let (_timestamp, task_ids) = r?;
collected_task_ids |= task_ids;
}
*tasks &= collected_task_ids;
Ok(())
}
// TODO: remove when Bound::map ( https://github.com/rust-lang/rust/issues/86026 ) is available on stable
fn map_bound<T, U>(bound: Bound<T>, map: impl FnOnce(T) -> U) -> Bound<U> {
match bound {
Bound::Included(x) => Bound::Included(map(x)),
Bound::Excluded(x) => Bound::Excluded(map(x)),
Bound::Unbounded => Bound::Unbounded,
}
}