diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index 4bc2beb05..bb8827fdc 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -1,7 +1,7 @@ use std::collections::BTreeSet; use std::fmt::Write; -use meilisearch_types::batches::Batch; +use meilisearch_types::batches::{Batch, BatchEnqueuedAt}; use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{Database, RoTxn}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; @@ -341,10 +341,14 @@ pub fn snapshot_canceled_by(rtxn: &RoTxn, db: Database String { let mut snap = String::new(); - let Batch { uid, details, stats, started_at, finished_at, progress: _ } = batch; + let Batch { uid, details, stats, started_at, finished_at, progress: _, enqueued_at } = batch; if let Some(finished_at) = finished_at { assert!(finished_at > started_at); } + let BatchEnqueuedAt { earliest, oldest } = enqueued_at.unwrap(); + assert!(*started_at > earliest); + assert!(earliest >= oldest); + snap.push('{'); snap.push_str(&format!("uid: {uid}, ")); snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap())); diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index 5c8a573ab..970e41110 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -12,8 +12,8 @@ use time::OffsetDateTime; use super::{Query, Queue}; use crate::processing::ProcessingTasks; use crate::utils::{ - insert_task_datetime, keep_ids_within_datetimes, map_bound, remove_task_datetime, - ProcessingBatch, + insert_task_datetime, keep_ids_within_datetimes, map_bound, + remove_n_tasks_datetime_earlier_than, remove_task_datetime, ProcessingBatch, }; use crate::{Error, Result, BEI128}; @@ -181,6 +181,7 @@ impl BatchQueue { stats: batch.stats, started_at: batch.started_at, finished_at: batch.finished_at, + enqueued_at: batch.enqueued_at, }, )?; @@ -234,34 +235,25 @@ impl BatchQueue { // What we know, though, is that the task date is from before the enqueued_at, and max two timestamps have been written // to the DB per batches. if let Some(ref old_batch) = old_batch { - let started_at = old_batch.started_at.unix_timestamp_nanos(); - - // We have either one or two enqueued at to remove - let mut exit = old_batch.stats.total_nb_tasks.clamp(0, 2); - let mut iterator = self.enqueued_at.rev_iter_mut(wtxn)?; - while let Some(entry) = iterator.next() { - let (key, mut value) = entry?; - if key > started_at { - continue; - } - if value.remove(old_batch.uid) { - exit = exit.saturating_sub(1); - // Safe because the key and value are owned - unsafe { - iterator.put_current(&key, &value)?; - } - if exit == 0 { - break; - } - } + if let Some(enqueued_at) = old_batch.enqueued_at { + remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, old_batch.uid)?; + remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, old_batch.uid)?; + } else { + // If we don't have the enqueued at in the batch it means the database comes from the v1.12 + // and we still need to find the date by scrolling the database + remove_n_tasks_datetime_earlier_than( + wtxn, + self.enqueued_at, + old_batch.started_at, + old_batch.stats.total_nb_tasks.clamp(1, 2) as usize, + old_batch.uid, + )?; } } - if let Some(enqueued_at) = batch.oldest_enqueued_at { - insert_task_datetime(wtxn, self.enqueued_at, enqueued_at, batch.uid)?; - } - if let Some(enqueued_at) = batch.earliest_enqueued_at { - insert_task_datetime(wtxn, self.enqueued_at, enqueued_at, batch.uid)?; - } + // A finished batch MUST contains at least one task and have an enqueued_at + let enqueued_at = batch.enqueued_at.as_ref().unwrap(); + insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, batch.uid)?; + insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, batch.uid)?; // Update the started at and finished at if let Some(ref old_batch) = old_batch { diff --git a/crates/index-scheduler/src/queue/batches_test.rs b/crates/index-scheduler/src/queue/batches_test.rs index aa84cdaf0..38e7ad800 100644 --- a/crates/index-scheduler/src/queue/batches_test.rs +++ b/crates/index-scheduler/src/queue/batches_test.rs @@ -102,30 +102,33 @@ fn query_batches_simple() { .unwrap(); assert_eq!(batches.len(), 1); batches[0].started_at = OffsetDateTime::UNIX_EPOCH; + assert!(batches[0].enqueued_at.is_some()); + batches[0].enqueued_at = None; // Insta cannot snapshot our batches because the batch stats contains an enum as key: https://github.com/mitsuhiko/insta/issues/689 let batch = serde_json::to_string_pretty(&batches[0]).unwrap(); snapshot!(batch, @r#" - { - "uid": 0, - "details": { - "primaryKey": "mouse" - }, - "stats": { - "totalNbTasks": 1, - "status": { - "processing": 1 - }, - "types": { - "indexCreation": 1 - }, - "indexUids": { - "catto": 1 - } - }, - "startedAt": "1970-01-01T00:00:00Z", - "finishedAt": null + { + "uid": 0, + "details": { + "primaryKey": "mouse" + }, + "stats": { + "totalNbTasks": 1, + "status": { + "processing": 1 + }, + "types": { + "indexCreation": 1 + }, + "indexUids": { + "catto": 1 } - "#); + }, + "startedAt": "1970-01-01T00:00:00Z", + "finishedAt": null, + "enqueuedAt": null + } + "#); let query = Query { statuses: Some(vec![Status::Enqueued]), ..Default::default() }; let (batches, _) = index_scheduler diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 7eda1d56f..623bdeb53 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -2,7 +2,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::atomic::Ordering; -use meilisearch_types::batches::BatchId; +use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::{self}; @@ -16,7 +16,10 @@ use crate::processing::{ InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, UpdateIndexProgress, }; -use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; +use crate::utils::{ + self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task, + ProcessingBatch, +}; use crate::{Error, IndexScheduler, Result, TaskId}; impl IndexScheduler { @@ -418,7 +421,6 @@ impl IndexScheduler { to_delete_tasks -= &enqueued_tasks; // 2. We now have a list of tasks to delete, delete them - let mut affected_indexes = HashSet::new(); let mut affected_statuses = HashSet::new(); let mut affected_kinds = HashSet::new(); @@ -515,9 +517,51 @@ impl IndexScheduler { tasks -= &to_delete_tasks; // We must remove the batch entirely if tasks.is_empty() { - self.queue.batches.all_batches.delete(wtxn, &batch_id)?; - self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?; + if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? { + if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at { + remove_task_datetime( + wtxn, + self.queue.batches.enqueued_at, + earliest, + batch_id, + )?; + remove_task_datetime( + wtxn, + self.queue.batches.enqueued_at, + oldest, + batch_id, + )?; + } else { + // If we don't have the enqueued at in the batch it means the database comes from the v1.12 + // and we still need to find the date by scrolling the database + remove_n_tasks_datetime_earlier_than( + wtxn, + self.queue.batches.enqueued_at, + batch.started_at, + batch.stats.total_nb_tasks.clamp(1, 2) as usize, + batch_id, + )?; + } + remove_task_datetime( + wtxn, + self.queue.batches.started_at, + batch.started_at, + batch_id, + )?; + if let Some(finished_at) = batch.finished_at { + remove_task_datetime( + wtxn, + self.queue.batches.finished_at, + finished_at, + batch_id, + )?; + } + + self.queue.batches.all_batches.delete(wtxn, &batch_id)?; + self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?; + } } + // Anyway, we must remove the batch from all its reverse indexes. // The only way to do that is to check diff --git a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap index dd3ed4c8a..89f87e29a 100644 --- a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap +++ b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap @@ -56,16 +56,13 @@ succeeded [1,] ### Batches Index Tasks: ---------------------------------------------------------------------- ### Batches Enqueued At: -[timestamp] [0,] [timestamp] [1,] [timestamp] [1,] ---------------------------------------------------------------------- ### Batches Started At: -[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### Batches Finished At: -[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### File Store: diff --git a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap index 9512a8d8d..135b272cd 100644 --- a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap +++ b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap @@ -54,15 +54,12 @@ succeeded [1,] ### Batches Index Tasks: ---------------------------------------------------------------------- ### Batches Enqueued At: -[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### Batches Started At: -[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### Batches Finished At: -[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### File Store: diff --git a/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_removing_the_upgrade_tasks.snap b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_removing_the_upgrade_tasks.snap index 9e490843e..4c828b71d 100644 --- a/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_removing_the_upgrade_tasks.snap +++ b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_removing_the_upgrade_tasks.snap @@ -87,7 +87,6 @@ doggo [2,3,] girafo [4,] ---------------------------------------------------------------------- ### Batches Enqueued At: -[timestamp] [0,] [timestamp] [1,] [timestamp] [2,] [timestamp] [3,] @@ -95,7 +94,6 @@ girafo [4,] [timestamp] [5,] ---------------------------------------------------------------------- ### Batches Started At: -[timestamp] [0,] [timestamp] [1,] [timestamp] [2,] [timestamp] [3,] @@ -103,7 +101,6 @@ girafo [4,] [timestamp] [5,] ---------------------------------------------------------------------- ### Batches Finished At: -[timestamp] [0,] [timestamp] [1,] [timestamp] [2,] [timestamp] [3,] diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 80a0bb5ff..42bf253ad 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeSet, HashSet}; use std::ops::Bound; -use meilisearch_types::batches::{Batch, BatchId, BatchStats}; +use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats}; use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::milli::CboRoaringBitmapCodec; use meilisearch_types::task_view::DetailsView; @@ -30,8 +30,7 @@ pub struct ProcessingBatch { pub kinds: HashSet, pub indexes: HashSet, pub canceled_by: HashSet, - pub oldest_enqueued_at: Option, - pub earliest_enqueued_at: Option, + pub enqueued_at: Option, pub started_at: OffsetDateTime, pub finished_at: Option, } @@ -51,8 +50,7 @@ impl ProcessingBatch { kinds: HashSet::default(), indexes: HashSet::default(), canceled_by: HashSet::default(), - oldest_enqueued_at: None, - earliest_enqueued_at: None, + enqueued_at: None, started_at: OffsetDateTime::now_utc(), finished_at: None, } @@ -80,14 +78,18 @@ impl ProcessingBatch { if let Some(canceled_by) = task.canceled_by { self.canceled_by.insert(canceled_by); } - self.oldest_enqueued_at = - Some(self.oldest_enqueued_at.map_or(task.enqueued_at, |oldest_enqueued_at| { - task.enqueued_at.min(oldest_enqueued_at) - })); - self.earliest_enqueued_at = - Some(self.earliest_enqueued_at.map_or(task.enqueued_at, |earliest_enqueued_at| { - task.enqueued_at.max(earliest_enqueued_at) - })); + match self.enqueued_at.as_mut() { + Some(BatchEnqueuedAt { earliest, oldest }) => { + *oldest = task.enqueued_at.min(*oldest); + *earliest = task.enqueued_at.max(*earliest); + } + None => { + self.enqueued_at = Some(BatchEnqueuedAt { + earliest: task.enqueued_at, + oldest: task.enqueued_at, + }); + } + } } } @@ -138,6 +140,7 @@ impl ProcessingBatch { stats: self.stats.clone(), started_at: self.started_at, finished_at: self.finished_at, + enqueued_at: self.enqueued_at, } } } @@ -174,6 +177,33 @@ pub(crate) fn remove_task_datetime( Ok(()) } +pub(crate) fn remove_n_tasks_datetime_earlier_than( + wtxn: &mut RwTxn, + database: Database, + earlier_than: OffsetDateTime, + mut count: usize, + task_id: TaskId, +) -> Result<()> { + let earlier_than = earlier_than.unix_timestamp_nanos(); + let mut iter = database.rev_range_mut(wtxn, &(..earlier_than))?; + while let Some((current, mut existing)) = iter.next().transpose()? { + count -= existing.remove(task_id) as usize; + + if existing.is_empty() { + // safety: We don't keep references to the database + unsafe { iter.del_current()? }; + } else { + // safety: We don't keep references to the database + unsafe { iter.put_current(¤t, &existing)? }; + } + if count == 0 { + break; + } + } + + Ok(()) +} + pub(crate) fn keep_ids_within_datetimes( rtxn: &RoTxn, ids: &mut RoaringBitmap, @@ -329,14 +359,27 @@ impl crate::IndexScheduler { kind, } = task; assert_eq!(uid, task.uid); - if let Some(ref batch) = batch_uid { + if task.status != Status::Enqueued { + let batch_uid = batch_uid.expect("All non enqueued tasks must be part of a batch"); assert!(self .queue .batch_to_tasks_mapping - .get(&rtxn, batch) + .get(&rtxn, &batch_uid) .unwrap() .unwrap() .contains(uid)); + let batch = self.queue.batches.get_batch(&rtxn, batch_uid).unwrap().unwrap(); + assert_eq!(batch.uid, batch_uid); + if task.status == Status::Processing { + assert!(batch.progress.is_some()); + } else { + assert!(batch.progress.is_none()); + } + assert_eq!(batch.started_at, task.started_at.unwrap()); + assert_eq!(batch.finished_at, task.finished_at); + let enqueued_at = batch.enqueued_at.unwrap(); + assert!(task.enqueued_at >= enqueued_at.oldest); + assert!(task.enqueued_at <= enqueued_at.earliest); } if let Some(task_index_uid) = &task_index_uid { assert!(self diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 7910a5af4..462d314db 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -24,6 +24,18 @@ pub struct Batch { pub started_at: OffsetDateTime, #[serde(with = "time::serde::rfc3339::option")] pub finished_at: Option, + + // Enqueued at is never displayed and is only required when removing a batch. + // It's always some except when upgrading from a database pre v1.12 + pub enqueued_at: Option, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct BatchEnqueuedAt { + #[serde(with = "time::serde::rfc3339")] + pub earliest: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub oldest: OffsetDateTime, } #[derive(Default, Debug, Clone, Serialize, Deserialize, ToSchema)] diff --git a/crates/meilisearch/tests/batches/mod.rs b/crates/meilisearch/tests/batches/mod.rs index 70307ac25..6ef40be8e 100644 --- a/crates/meilisearch/tests/batches/mod.rs +++ b/crates/meilisearch/tests/batches/mod.rs @@ -41,9 +41,8 @@ async fn list_batches() { let index = server.index("test"); let (task, _status_code) = index.create(None).await; index.wait_task(task.uid()).await.succeeded(); - index - .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) - .await; + let (task, _status_code) = index.create(None).await; + index.wait_task(task.uid()).await.failed(); let (response, code) = index.list_batches().await; assert_eq!(code, 200); assert_eq!( @@ -96,11 +95,12 @@ async fn list_batches_pagination_and_reverse() { async fn list_batches_with_star_filters() { let server = Server::new().await; let index = server.index("test"); - let (batch, _code) = index.create(None).await; - index.wait_task(batch.uid()).await.succeeded(); - index - .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) - .await; + let (task, _code) = index.create(None).await; + index.wait_task(task.uid()).await.succeeded(); + let index = server.index("test"); + let (task, _code) = index.create(None).await; + index.wait_task(task.uid()).await.failed(); + let (response, code) = index.service.get("/batches?indexUids=test").await; assert_eq!(code, 200); assert_eq!(response["results"].as_array().unwrap().len(), 2); @@ -187,9 +187,6 @@ async fn list_batches_invalid_canceled_by_filter() { let index = server.index("test"); let (task, _status_code) = index.create(None).await; index.wait_task(task.uid()).await.succeeded(); - index - .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) - .await; let (response, code) = index.filtered_batches(&[], &[], &["0"]).await; assert_eq!(code, 200, "{}", response); @@ -202,9 +199,8 @@ async fn list_batches_status_and_type_filtered() { let index = server.index("test"); let (task, _status_code) = index.create(None).await; index.wait_task(task.uid()).await.succeeded(); - index - .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) - .await; + let (task, _status_code) = index.update(Some("id")).await; + index.wait_task(task.uid()).await.succeeded(); let (response, code) = index.filtered_batches(&["indexCreation"], &["failed"], &[]).await; assert_eq!(code, 200, "{}", response); @@ -212,7 +208,7 @@ async fn list_batches_status_and_type_filtered() { let (response, code) = index .filtered_batches( - &["indexCreation", "documentAdditionOrUpdate"], + &["indexCreation", "IndexUpdate"], &["succeeded", "processing", "enqueued"], &[], )