diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index 4bc2beb05..4a649f1cb 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -1,7 +1,7 @@ use std::collections::BTreeSet; use std::fmt::Write; -use meilisearch_types::batches::Batch; +use meilisearch_types::batches::{Batch, BatchEnqueuedAt}; use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{Database, RoTxn}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; @@ -341,10 +341,14 @@ pub fn snapshot_canceled_by(rtxn: &RoTxn, db: Database String { let mut snap = String::new(); - let Batch { uid, details, stats, started_at, finished_at, progress: _ } = batch; + let Batch { uid, details, stats, started_at, finished_at, progress: _, enqueued_at } = batch; if let Some(finished_at) = finished_at { assert!(finished_at > started_at); } + if let Some(BatchEnqueuedAt { earliest, oldest }) = enqueued_at { + assert!(started_at > earliest); + assert!(earliest >= oldest); + } snap.push('{'); snap.push_str(&format!("uid: {uid}, ")); snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap())); diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index 5c8a573ab..e50b790cf 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -181,6 +181,7 @@ impl BatchQueue { stats: batch.stats, started_at: batch.started_at, finished_at: batch.finished_at, + enqueued_at: batch.enqueued_at, }, )?; @@ -234,33 +235,36 @@ impl BatchQueue { // What we know, though, is that the task date is from before the enqueued_at, and max two timestamps have been written // to the DB per batches. if let Some(ref old_batch) = old_batch { - let started_at = old_batch.started_at.unix_timestamp_nanos(); + if let Some(enqueued_at) = old_batch.enqueued_at { + remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, old_batch.uid)?; + remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, old_batch.uid)?; + } else { + let started_at = old_batch.started_at.unix_timestamp_nanos(); - // We have either one or two enqueued at to remove - let mut exit = old_batch.stats.total_nb_tasks.clamp(0, 2); - let mut iterator = self.enqueued_at.rev_iter_mut(wtxn)?; - while let Some(entry) = iterator.next() { - let (key, mut value) = entry?; - if key > started_at { - continue; - } - if value.remove(old_batch.uid) { - exit = exit.saturating_sub(1); - // Safe because the key and value are owned - unsafe { - iterator.put_current(&key, &value)?; + // We have either one or two enqueued at to remove + let mut exit = old_batch.stats.total_nb_tasks.clamp(0, 2); + let mut iterator = self.enqueued_at.rev_iter_mut(wtxn)?; + while let Some(entry) = iterator.next() { + let (key, mut value) = entry?; + if key > started_at { + continue; } - if exit == 0 { - break; + if value.remove(old_batch.uid) { + exit = exit.saturating_sub(1); + // Safe because the key and value are owned + unsafe { + iterator.put_current(&key, &value)?; + } + if exit == 0 { + break; + } } } } } - if let Some(enqueued_at) = batch.oldest_enqueued_at { - insert_task_datetime(wtxn, self.enqueued_at, enqueued_at, batch.uid)?; - } - if let Some(enqueued_at) = batch.earliest_enqueued_at { - insert_task_datetime(wtxn, self.enqueued_at, enqueued_at, batch.uid)?; + if let Some(enqueued_at) = batch.enqueued_at.as_ref() { + insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, batch.uid)?; + insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, batch.uid)?; } // Update the started at and finished at diff --git a/crates/index-scheduler/src/queue/batches_test.rs b/crates/index-scheduler/src/queue/batches_test.rs index aa84cdaf0..38e7ad800 100644 --- a/crates/index-scheduler/src/queue/batches_test.rs +++ b/crates/index-scheduler/src/queue/batches_test.rs @@ -102,30 +102,33 @@ fn query_batches_simple() { .unwrap(); assert_eq!(batches.len(), 1); batches[0].started_at = OffsetDateTime::UNIX_EPOCH; + assert!(batches[0].enqueued_at.is_some()); + batches[0].enqueued_at = None; // Insta cannot snapshot our batches because the batch stats contains an enum as key: https://github.com/mitsuhiko/insta/issues/689 let batch = serde_json::to_string_pretty(&batches[0]).unwrap(); snapshot!(batch, @r#" - { - "uid": 0, - "details": { - "primaryKey": "mouse" - }, - "stats": { - "totalNbTasks": 1, - "status": { - "processing": 1 - }, - "types": { - "indexCreation": 1 - }, - "indexUids": { - "catto": 1 - } - }, - "startedAt": "1970-01-01T00:00:00Z", - "finishedAt": null + { + "uid": 0, + "details": { + "primaryKey": "mouse" + }, + "stats": { + "totalNbTasks": 1, + "status": { + "processing": 1 + }, + "types": { + "indexCreation": 1 + }, + "indexUids": { + "catto": 1 } - "#); + }, + "startedAt": "1970-01-01T00:00:00Z", + "finishedAt": null, + "enqueuedAt": null + } + "#); let query = Query { statuses: Some(vec![Status::Enqueued]), ..Default::default() }; let (batches, _) = index_scheduler diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 5531ced9f..c374044f5 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -2,7 +2,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::atomic::Ordering; -use meilisearch_types::batches::BatchId; +use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::{self}; @@ -518,13 +518,30 @@ impl IndexScheduler { // We must remove the batch entirely if tasks.is_empty() { if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? { - remove_n_tasks_datetime_earlier_than( - wtxn, - self.queue.batches.started_at, - batch.started_at, - if batch.stats.total_nb_tasks >= 2 { 2 } else { 1 }, - batch_id, - )?; + if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at { + remove_task_datetime( + wtxn, + self.queue.batches.enqueued_at, + earliest, + batch_id, + )?; + remove_task_datetime( + wtxn, + self.queue.batches.enqueued_at, + oldest, + batch_id, + )?; + } else { + // If we don't have the enqueued at in the batch it means the database comes from the v1.12 + // and we still need to find the date by scrolling the database + remove_n_tasks_datetime_earlier_than( + wtxn, + self.queue.batches.enqueued_at, + batch.started_at, + if batch.stats.total_nb_tasks >= 2 { 2 } else { 1 }, + batch_id, + )?; + } remove_task_datetime( wtxn, self.queue.batches.started_at, diff --git a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap index f8e1d4ac3..89f87e29a 100644 --- a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap +++ b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap @@ -56,7 +56,6 @@ succeeded [1,] ### Batches Index Tasks: ---------------------------------------------------------------------- ### Batches Enqueued At: -[timestamp] [0,] [timestamp] [1,] [timestamp] [1,] ---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap index 241c9b24b..135b272cd 100644 --- a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap +++ b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap @@ -54,7 +54,6 @@ succeeded [1,] ### Batches Index Tasks: ---------------------------------------------------------------------- ### Batches Enqueued At: -[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### Batches Started At: diff --git a/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_removing_the_upgrade_tasks.snap b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_removing_the_upgrade_tasks.snap index 9e490843e..4c828b71d 100644 --- a/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_removing_the_upgrade_tasks.snap +++ b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_removing_the_upgrade_tasks.snap @@ -87,7 +87,6 @@ doggo [2,3,] girafo [4,] ---------------------------------------------------------------------- ### Batches Enqueued At: -[timestamp] [0,] [timestamp] [1,] [timestamp] [2,] [timestamp] [3,] @@ -95,7 +94,6 @@ girafo [4,] [timestamp] [5,] ---------------------------------------------------------------------- ### Batches Started At: -[timestamp] [0,] [timestamp] [1,] [timestamp] [2,] [timestamp] [3,] @@ -103,7 +101,6 @@ girafo [4,] [timestamp] [5,] ---------------------------------------------------------------------- ### Batches Finished At: -[timestamp] [0,] [timestamp] [1,] [timestamp] [2,] [timestamp] [3,] diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 028d193e9..2a0c47626 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeSet, HashSet}; use std::ops::Bound; -use meilisearch_types::batches::{Batch, BatchId, BatchStats}; +use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats}; use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::milli::CboRoaringBitmapCodec; use meilisearch_types::task_view::DetailsView; @@ -30,8 +30,7 @@ pub struct ProcessingBatch { pub kinds: HashSet, pub indexes: HashSet, pub canceled_by: HashSet, - pub oldest_enqueued_at: Option, - pub earliest_enqueued_at: Option, + pub enqueued_at: Option, pub started_at: OffsetDateTime, pub finished_at: Option, } @@ -51,8 +50,7 @@ impl ProcessingBatch { kinds: HashSet::default(), indexes: HashSet::default(), canceled_by: HashSet::default(), - oldest_enqueued_at: None, - earliest_enqueued_at: None, + enqueued_at: None, started_at: OffsetDateTime::now_utc(), finished_at: None, } @@ -80,14 +78,18 @@ impl ProcessingBatch { if let Some(canceled_by) = task.canceled_by { self.canceled_by.insert(canceled_by); } - self.oldest_enqueued_at = - Some(self.oldest_enqueued_at.map_or(task.enqueued_at, |oldest_enqueued_at| { - task.enqueued_at.min(oldest_enqueued_at) - })); - self.earliest_enqueued_at = - Some(self.earliest_enqueued_at.map_or(task.enqueued_at, |earliest_enqueued_at| { - task.enqueued_at.max(earliest_enqueued_at) - })); + match self.enqueued_at.as_mut() { + Some(BatchEnqueuedAt { earliest, oldest }) => { + *oldest = task.enqueued_at.min(*oldest); + *earliest = task.enqueued_at.max(*earliest); + } + None => { + self.enqueued_at = Some(BatchEnqueuedAt { + earliest: task.enqueued_at, + oldest: task.enqueued_at, + }); + } + } } } @@ -138,6 +140,7 @@ impl ProcessingBatch { stats: self.stats.clone(), started_at: self.started_at, finished_at: self.finished_at, + enqueued_at: self.enqueued_at, } } } diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 7910a5af4..462d314db 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -24,6 +24,18 @@ pub struct Batch { pub started_at: OffsetDateTime, #[serde(with = "time::serde::rfc3339::option")] pub finished_at: Option, + + // Enqueued at is never displayed and is only required when removing a batch. + // It's always some except when upgrading from a database pre v1.12 + pub enqueued_at: Option, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct BatchEnqueuedAt { + #[serde(with = "time::serde::rfc3339")] + pub earliest: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub oldest: OffsetDateTime, } #[derive(Default, Debug, Clone, Serialize, Deserialize, ToSchema)]