From f21ae1f5d1ce71067650e828236bad34f5ef9f67 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 22 Jan 2025 17:58:58 +0100 Subject: [PATCH 1/9] Remove the batch id from the date time databases --- .../src/scheduler/process_batch.rs | 35 ++++++++++++++++--- crates/index-scheduler/src/utils.rs | 27 ++++++++++++++ 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 7eda1d56f..5531ced9f 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -16,7 +16,10 @@ use crate::processing::{ InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, UpdateIndexProgress, }; -use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; +use crate::utils::{ + self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task, + ProcessingBatch, +}; use crate::{Error, IndexScheduler, Result, TaskId}; impl IndexScheduler { @@ -418,7 +421,6 @@ impl IndexScheduler { to_delete_tasks -= &enqueued_tasks; // 2. We now have a list of tasks to delete, delete them - let mut affected_indexes = HashSet::new(); let mut affected_statuses = HashSet::new(); let mut affected_kinds = HashSet::new(); @@ -515,9 +517,34 @@ impl IndexScheduler { tasks -= &to_delete_tasks; // We must remove the batch entirely if tasks.is_empty() { - self.queue.batches.all_batches.delete(wtxn, &batch_id)?; - self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?; + if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? { + remove_n_tasks_datetime_earlier_than( + wtxn, + self.queue.batches.started_at, + batch.started_at, + if batch.stats.total_nb_tasks >= 2 { 2 } else { 1 }, + batch_id, + )?; + remove_task_datetime( + wtxn, + self.queue.batches.started_at, + batch.started_at, + batch_id, + )?; + if let Some(finished_at) = batch.finished_at { + remove_task_datetime( + wtxn, + self.queue.batches.finished_at, + finished_at, + batch_id, + )?; + } + + self.queue.batches.all_batches.delete(wtxn, &batch_id)?; + self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?; + } } + // Anyway, we must remove the batch from all its reverse indexes. // The only way to do that is to check diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 80a0bb5ff..028d193e9 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -174,6 +174,33 @@ pub(crate) fn remove_task_datetime( Ok(()) } +pub(crate) fn remove_n_tasks_datetime_earlier_than( + wtxn: &mut RwTxn, + database: Database, + earlier_than: OffsetDateTime, + mut count: usize, + task_id: TaskId, +) -> Result<()> { + let earlier_than = earlier_than.unix_timestamp_nanos(); + let mut iter = database.rev_range_mut(wtxn, &(..earlier_than))?; + while let Some((current, mut existing)) = iter.next().transpose()? { + count -= existing.remove(task_id) as usize; + + if existing.is_empty() { + // safety: We don't keep references to the database + unsafe { iter.del_current()? }; + } else { + // safety: We don't keep references to the database + unsafe { iter.put_current(¤t, &existing)? }; + } + if count == 0 { + break; + } + } + + Ok(()) +} + pub(crate) fn keep_ids_within_datetimes( rtxn: &RoTxn, ids: &mut RoaringBitmap, From 6ff37c6fc4b928ade7707197ac1a1369969bdc1c Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 22 Jan 2025 17:59:09 +0100 Subject: [PATCH 2/9] Fix the insta snapshots --- .../task_deletion_deleteable/task_deletion_processed.snap | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap index 9512a8d8d..135b272cd 100644 --- a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap +++ b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap @@ -54,15 +54,12 @@ succeeded [1,] ### Batches Index Tasks: ---------------------------------------------------------------------- ### Batches Enqueued At: -[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### Batches Started At: -[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### Batches Finished At: -[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### File Store: From 508db9020d5cc8acdaaadd8e8c571a0c647c2728 Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 27 Jan 2025 18:32:07 +0100 Subject: [PATCH 3/9] update the snapshots --- .../task_deletion_processed.snap | 2 -- .../task_deletion_deleteable/task_deletion_processed.snap | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap index dd3ed4c8a..f8e1d4ac3 100644 --- a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap +++ b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap @@ -61,11 +61,9 @@ succeeded [1,] [timestamp] [1,] ---------------------------------------------------------------------- ### Batches Started At: -[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### Batches Finished At: -[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### File Store: diff --git a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap index 135b272cd..241c9b24b 100644 --- a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap +++ b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap @@ -54,6 +54,7 @@ succeeded [1,] ### Batches Index Tasks: ---------------------------------------------------------------------- ### Batches Enqueued At: +[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### Batches Started At: From 58f90b70c73e2616951f3717aa3164dd8c6991b8 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 28 Jan 2025 11:04:35 +0100 Subject: [PATCH 4/9] store the enqueued at to eases the batch deletion --- crates/index-scheduler/src/insta_snapshot.rs | 8 +++- crates/index-scheduler/src/queue/batches.rs | 46 ++++++++++--------- .../index-scheduler/src/queue/batches_test.rs | 43 +++++++++-------- .../src/scheduler/process_batch.rs | 33 +++++++++---- .../task_deletion_processed.snap | 1 - .../task_deletion_processed.snap | 1 - .../after_removing_the_upgrade_tasks.snap | 3 -- crates/index-scheduler/src/utils.rs | 29 ++++++------ crates/meilisearch-types/src/batches.rs | 12 +++++ 9 files changed, 107 insertions(+), 69 deletions(-) diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index 4bc2beb05..4a649f1cb 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -1,7 +1,7 @@ use std::collections::BTreeSet; use std::fmt::Write; -use meilisearch_types::batches::Batch; +use meilisearch_types::batches::{Batch, BatchEnqueuedAt}; use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{Database, RoTxn}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; @@ -341,10 +341,14 @@ pub fn snapshot_canceled_by(rtxn: &RoTxn, db: Database String { let mut snap = String::new(); - let Batch { uid, details, stats, started_at, finished_at, progress: _ } = batch; + let Batch { uid, details, stats, started_at, finished_at, progress: _, enqueued_at } = batch; if let Some(finished_at) = finished_at { assert!(finished_at > started_at); } + if let Some(BatchEnqueuedAt { earliest, oldest }) = enqueued_at { + assert!(started_at > earliest); + assert!(earliest >= oldest); + } snap.push('{'); snap.push_str(&format!("uid: {uid}, ")); snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap())); diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index 5c8a573ab..e50b790cf 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -181,6 +181,7 @@ impl BatchQueue { stats: batch.stats, started_at: batch.started_at, finished_at: batch.finished_at, + enqueued_at: batch.enqueued_at, }, )?; @@ -234,33 +235,36 @@ impl BatchQueue { // What we know, though, is that the task date is from before the enqueued_at, and max two timestamps have been written // to the DB per batches. if let Some(ref old_batch) = old_batch { - let started_at = old_batch.started_at.unix_timestamp_nanos(); + if let Some(enqueued_at) = old_batch.enqueued_at { + remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, old_batch.uid)?; + remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, old_batch.uid)?; + } else { + let started_at = old_batch.started_at.unix_timestamp_nanos(); - // We have either one or two enqueued at to remove - let mut exit = old_batch.stats.total_nb_tasks.clamp(0, 2); - let mut iterator = self.enqueued_at.rev_iter_mut(wtxn)?; - while let Some(entry) = iterator.next() { - let (key, mut value) = entry?; - if key > started_at { - continue; - } - if value.remove(old_batch.uid) { - exit = exit.saturating_sub(1); - // Safe because the key and value are owned - unsafe { - iterator.put_current(&key, &value)?; + // We have either one or two enqueued at to remove + let mut exit = old_batch.stats.total_nb_tasks.clamp(0, 2); + let mut iterator = self.enqueued_at.rev_iter_mut(wtxn)?; + while let Some(entry) = iterator.next() { + let (key, mut value) = entry?; + if key > started_at { + continue; } - if exit == 0 { - break; + if value.remove(old_batch.uid) { + exit = exit.saturating_sub(1); + // Safe because the key and value are owned + unsafe { + iterator.put_current(&key, &value)?; + } + if exit == 0 { + break; + } } } } } - if let Some(enqueued_at) = batch.oldest_enqueued_at { - insert_task_datetime(wtxn, self.enqueued_at, enqueued_at, batch.uid)?; - } - if let Some(enqueued_at) = batch.earliest_enqueued_at { - insert_task_datetime(wtxn, self.enqueued_at, enqueued_at, batch.uid)?; + if let Some(enqueued_at) = batch.enqueued_at.as_ref() { + insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, batch.uid)?; + insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, batch.uid)?; } // Update the started at and finished at diff --git a/crates/index-scheduler/src/queue/batches_test.rs b/crates/index-scheduler/src/queue/batches_test.rs index aa84cdaf0..38e7ad800 100644 --- a/crates/index-scheduler/src/queue/batches_test.rs +++ b/crates/index-scheduler/src/queue/batches_test.rs @@ -102,30 +102,33 @@ fn query_batches_simple() { .unwrap(); assert_eq!(batches.len(), 1); batches[0].started_at = OffsetDateTime::UNIX_EPOCH; + assert!(batches[0].enqueued_at.is_some()); + batches[0].enqueued_at = None; // Insta cannot snapshot our batches because the batch stats contains an enum as key: https://github.com/mitsuhiko/insta/issues/689 let batch = serde_json::to_string_pretty(&batches[0]).unwrap(); snapshot!(batch, @r#" - { - "uid": 0, - "details": { - "primaryKey": "mouse" - }, - "stats": { - "totalNbTasks": 1, - "status": { - "processing": 1 - }, - "types": { - "indexCreation": 1 - }, - "indexUids": { - "catto": 1 - } - }, - "startedAt": "1970-01-01T00:00:00Z", - "finishedAt": null + { + "uid": 0, + "details": { + "primaryKey": "mouse" + }, + "stats": { + "totalNbTasks": 1, + "status": { + "processing": 1 + }, + "types": { + "indexCreation": 1 + }, + "indexUids": { + "catto": 1 } - "#); + }, + "startedAt": "1970-01-01T00:00:00Z", + "finishedAt": null, + "enqueuedAt": null + } + "#); let query = Query { statuses: Some(vec![Status::Enqueued]), ..Default::default() }; let (batches, _) = index_scheduler diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 5531ced9f..c374044f5 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -2,7 +2,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::atomic::Ordering; -use meilisearch_types::batches::BatchId; +use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::{self}; @@ -518,13 +518,30 @@ impl IndexScheduler { // We must remove the batch entirely if tasks.is_empty() { if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? { - remove_n_tasks_datetime_earlier_than( - wtxn, - self.queue.batches.started_at, - batch.started_at, - if batch.stats.total_nb_tasks >= 2 { 2 } else { 1 }, - batch_id, - )?; + if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at { + remove_task_datetime( + wtxn, + self.queue.batches.enqueued_at, + earliest, + batch_id, + )?; + remove_task_datetime( + wtxn, + self.queue.batches.enqueued_at, + oldest, + batch_id, + )?; + } else { + // If we don't have the enqueued at in the batch it means the database comes from the v1.12 + // and we still need to find the date by scrolling the database + remove_n_tasks_datetime_earlier_than( + wtxn, + self.queue.batches.enqueued_at, + batch.started_at, + if batch.stats.total_nb_tasks >= 2 { 2 } else { 1 }, + batch_id, + )?; + } remove_task_datetime( wtxn, self.queue.batches.started_at, diff --git a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap index f8e1d4ac3..89f87e29a 100644 --- a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap +++ b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap @@ -56,7 +56,6 @@ succeeded [1,] ### Batches Index Tasks: ---------------------------------------------------------------------- ### Batches Enqueued At: -[timestamp] [0,] [timestamp] [1,] [timestamp] [1,] ---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap index 241c9b24b..135b272cd 100644 --- a/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap +++ b/crates/index-scheduler/src/scheduler/snapshots/test.rs/task_deletion_deleteable/task_deletion_processed.snap @@ -54,7 +54,6 @@ succeeded [1,] ### Batches Index Tasks: ---------------------------------------------------------------------- ### Batches Enqueued At: -[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### Batches Started At: diff --git a/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_removing_the_upgrade_tasks.snap b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_removing_the_upgrade_tasks.snap index 9e490843e..4c828b71d 100644 --- a/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_removing_the_upgrade_tasks.snap +++ b/crates/index-scheduler/src/scheduler/snapshots/test_failure.rs/upgrade_failure/after_removing_the_upgrade_tasks.snap @@ -87,7 +87,6 @@ doggo [2,3,] girafo [4,] ---------------------------------------------------------------------- ### Batches Enqueued At: -[timestamp] [0,] [timestamp] [1,] [timestamp] [2,] [timestamp] [3,] @@ -95,7 +94,6 @@ girafo [4,] [timestamp] [5,] ---------------------------------------------------------------------- ### Batches Started At: -[timestamp] [0,] [timestamp] [1,] [timestamp] [2,] [timestamp] [3,] @@ -103,7 +101,6 @@ girafo [4,] [timestamp] [5,] ---------------------------------------------------------------------- ### Batches Finished At: -[timestamp] [0,] [timestamp] [1,] [timestamp] [2,] [timestamp] [3,] diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 028d193e9..2a0c47626 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeSet, HashSet}; use std::ops::Bound; -use meilisearch_types::batches::{Batch, BatchId, BatchStats}; +use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats}; use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::milli::CboRoaringBitmapCodec; use meilisearch_types::task_view::DetailsView; @@ -30,8 +30,7 @@ pub struct ProcessingBatch { pub kinds: HashSet, pub indexes: HashSet, pub canceled_by: HashSet, - pub oldest_enqueued_at: Option, - pub earliest_enqueued_at: Option, + pub enqueued_at: Option, pub started_at: OffsetDateTime, pub finished_at: Option, } @@ -51,8 +50,7 @@ impl ProcessingBatch { kinds: HashSet::default(), indexes: HashSet::default(), canceled_by: HashSet::default(), - oldest_enqueued_at: None, - earliest_enqueued_at: None, + enqueued_at: None, started_at: OffsetDateTime::now_utc(), finished_at: None, } @@ -80,14 +78,18 @@ impl ProcessingBatch { if let Some(canceled_by) = task.canceled_by { self.canceled_by.insert(canceled_by); } - self.oldest_enqueued_at = - Some(self.oldest_enqueued_at.map_or(task.enqueued_at, |oldest_enqueued_at| { - task.enqueued_at.min(oldest_enqueued_at) - })); - self.earliest_enqueued_at = - Some(self.earliest_enqueued_at.map_or(task.enqueued_at, |earliest_enqueued_at| { - task.enqueued_at.max(earliest_enqueued_at) - })); + match self.enqueued_at.as_mut() { + Some(BatchEnqueuedAt { earliest, oldest }) => { + *oldest = task.enqueued_at.min(*oldest); + *earliest = task.enqueued_at.max(*earliest); + } + None => { + self.enqueued_at = Some(BatchEnqueuedAt { + earliest: task.enqueued_at, + oldest: task.enqueued_at, + }); + } + } } } @@ -138,6 +140,7 @@ impl ProcessingBatch { stats: self.stats.clone(), started_at: self.started_at, finished_at: self.finished_at, + enqueued_at: self.enqueued_at, } } } diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 7910a5af4..462d314db 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -24,6 +24,18 @@ pub struct Batch { pub started_at: OffsetDateTime, #[serde(with = "time::serde::rfc3339::option")] pub finished_at: Option, + + // Enqueued at is never displayed and is only required when removing a batch. + // It's always some except when upgrading from a database pre v1.12 + pub enqueued_at: Option, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct BatchEnqueuedAt { + #[serde(with = "time::serde::rfc3339")] + pub earliest: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub oldest: OffsetDateTime, } #[derive(Default, Debug, Clone, Serialize, Deserialize, ToSchema)] From 485e3127c723508184c621681e05645776104a53 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 28 Jan 2025 11:29:05 +0100 Subject: [PATCH 5/9] use the remove_n_tasks_datetime_earlier_than function when updating batches --- crates/index-scheduler/src/queue/batches.rs | 42 ++++++++------------- 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index e50b790cf..67c3f71fc 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -12,8 +12,8 @@ use time::OffsetDateTime; use super::{Query, Queue}; use crate::processing::ProcessingTasks; use crate::utils::{ - insert_task_datetime, keep_ids_within_datetimes, map_bound, remove_task_datetime, - ProcessingBatch, + insert_task_datetime, keep_ids_within_datetimes, map_bound, + remove_n_tasks_datetime_earlier_than, remove_task_datetime, ProcessingBatch, }; use crate::{Error, Result, BEI128}; @@ -239,33 +239,21 @@ impl BatchQueue { remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, old_batch.uid)?; remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, old_batch.uid)?; } else { - let started_at = old_batch.started_at.unix_timestamp_nanos(); - - // We have either one or two enqueued at to remove - let mut exit = old_batch.stats.total_nb_tasks.clamp(0, 2); - let mut iterator = self.enqueued_at.rev_iter_mut(wtxn)?; - while let Some(entry) = iterator.next() { - let (key, mut value) = entry?; - if key > started_at { - continue; - } - if value.remove(old_batch.uid) { - exit = exit.saturating_sub(1); - // Safe because the key and value are owned - unsafe { - iterator.put_current(&key, &value)?; - } - if exit == 0 { - break; - } - } - } + // If we don't have the enqueued at in the batch it means the database comes from the v1.12 + // and we still need to find the date by scrolling the database + remove_n_tasks_datetime_earlier_than( + wtxn, + self.enqueued_at, + old_batch.started_at, + if old_batch.stats.total_nb_tasks >= 2 { 2 } else { 1 }, + old_batch.uid, + )?; } } - if let Some(enqueued_at) = batch.enqueued_at.as_ref() { - insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, batch.uid)?; - insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, batch.uid)?; - } + // A finished batch MUST contains at least one task and have an enqueued_at + let enqueued_at = batch.enqueued_at.as_ref().unwrap(); + insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, batch.uid)?; + insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, batch.uid)?; // Update the started at and finished at if let Some(ref old_batch) = old_batch { From e0f0da57e2b05d61b95302d6fe9c97519a531c81 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 28 Jan 2025 11:51:07 +0100 Subject: [PATCH 6/9] make sure the batches we snapshots actually all contains an enqueued_at --- crates/index-scheduler/src/insta_snapshot.rs | 8 ++++---- crates/index-scheduler/src/utils.rs | 17 +++++++++++++++-- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index 4a649f1cb..bb8827fdc 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -345,10 +345,10 @@ pub fn snapshot_batch(batch: &Batch) -> String { if let Some(finished_at) = finished_at { assert!(finished_at > started_at); } - if let Some(BatchEnqueuedAt { earliest, oldest }) = enqueued_at { - assert!(started_at > earliest); - assert!(earliest >= oldest); - } + let BatchEnqueuedAt { earliest, oldest } = enqueued_at.unwrap(); + assert!(*started_at > earliest); + assert!(earliest >= oldest); + snap.push('{'); snap.push_str(&format!("uid: {uid}, ")); snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap())); diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 2a0c47626..42bf253ad 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -359,14 +359,27 @@ impl crate::IndexScheduler { kind, } = task; assert_eq!(uid, task.uid); - if let Some(ref batch) = batch_uid { + if task.status != Status::Enqueued { + let batch_uid = batch_uid.expect("All non enqueued tasks must be part of a batch"); assert!(self .queue .batch_to_tasks_mapping - .get(&rtxn, batch) + .get(&rtxn, &batch_uid) .unwrap() .unwrap() .contains(uid)); + let batch = self.queue.batches.get_batch(&rtxn, batch_uid).unwrap().unwrap(); + assert_eq!(batch.uid, batch_uid); + if task.status == Status::Processing { + assert!(batch.progress.is_some()); + } else { + assert!(batch.progress.is_none()); + } + assert_eq!(batch.started_at, task.started_at.unwrap()); + assert_eq!(batch.finished_at, task.finished_at); + let enqueued_at = batch.enqueued_at.unwrap(); + assert!(task.enqueued_at >= enqueued_at.oldest); + assert!(task.enqueued_at <= enqueued_at.earliest); } if let Some(task_index_uid) = &task_index_uid { assert!(self From ef47a0d820610e0dc306cf8a8d2e6b93e70c9798 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 28 Jan 2025 12:07:02 +0100 Subject: [PATCH 7/9] apply review comment --- crates/index-scheduler/src/queue/batches.rs | 2 +- crates/index-scheduler/src/scheduler/process_batch.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index 67c3f71fc..970e41110 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -245,7 +245,7 @@ impl BatchQueue { wtxn, self.enqueued_at, old_batch.started_at, - if old_batch.stats.total_nb_tasks >= 2 { 2 } else { 1 }, + old_batch.stats.total_nb_tasks.clamp(1, 2) as usize, old_batch.uid, )?; } diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index c374044f5..623bdeb53 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -538,7 +538,7 @@ impl IndexScheduler { wtxn, self.queue.batches.enqueued_at, batch.started_at, - if batch.stats.total_nb_tasks >= 2 { 2 } else { 1 }, + batch.stats.total_nb_tasks.clamp(1, 2) as usize, batch_id, )?; } From 8676e94f5c07e1a1bbf27694cd2c4fe037869ee9 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 28 Jan 2025 14:57:42 +0100 Subject: [PATCH 8/9] fix the flaky tests --- crates/meilisearch/tests/batches/mod.rs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/crates/meilisearch/tests/batches/mod.rs b/crates/meilisearch/tests/batches/mod.rs index 70307ac25..327e2cd64 100644 --- a/crates/meilisearch/tests/batches/mod.rs +++ b/crates/meilisearch/tests/batches/mod.rs @@ -96,11 +96,12 @@ async fn list_batches_pagination_and_reverse() { async fn list_batches_with_star_filters() { let server = Server::new().await; let index = server.index("test"); - let (batch, _code) = index.create(None).await; - index.wait_task(batch.uid()).await.succeeded(); - index - .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) - .await; + let (task, _code) = index.create(None).await; + index.wait_task(task.uid()).await.succeeded(); + let index = server.index("test"); + let (task, _code) = index.create(None).await; + index.wait_task(task.uid()).await.failed(); + let (response, code) = index.service.get("/batches?indexUids=test").await; assert_eq!(code, 200); assert_eq!(response["results"].as_array().unwrap().len(), 2); @@ -187,9 +188,6 @@ async fn list_batches_invalid_canceled_by_filter() { let index = server.index("test"); let (task, _status_code) = index.create(None).await; index.wait_task(task.uid()).await.succeeded(); - index - .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) - .await; let (response, code) = index.filtered_batches(&[], &[], &["0"]).await; assert_eq!(code, 200, "{}", response); @@ -202,9 +200,8 @@ async fn list_batches_status_and_type_filtered() { let index = server.index("test"); let (task, _status_code) = index.create(None).await; index.wait_task(task.uid()).await.succeeded(); - index - .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) - .await; + let (task, _status_code) = index.update(Some("id")).await; + index.wait_task(task.uid()).await.succeeded(); let (response, code) = index.filtered_batches(&["indexCreation"], &["failed"], &[]).await; assert_eq!(code, 200, "{}", response); @@ -212,7 +209,7 @@ async fn list_batches_status_and_type_filtered() { let (response, code) = index .filtered_batches( - &["indexCreation", "documentAdditionOrUpdate"], + &["indexCreation", "IndexUpdate"], &["succeeded", "processing", "enqueued"], &[], ) From 1beda3b9afabe16c15b7b98c9b902e52cc94ac58 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 28 Jan 2025 16:02:14 +0100 Subject: [PATCH 9/9] fix another flaky test --- crates/meilisearch/tests/batches/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/meilisearch/tests/batches/mod.rs b/crates/meilisearch/tests/batches/mod.rs index 327e2cd64..6ef40be8e 100644 --- a/crates/meilisearch/tests/batches/mod.rs +++ b/crates/meilisearch/tests/batches/mod.rs @@ -41,9 +41,8 @@ async fn list_batches() { let index = server.index("test"); let (task, _status_code) = index.create(None).await; index.wait_task(task.uid()).await.succeeded(); - index - .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) - .await; + let (task, _status_code) = index.create(None).await; + index.wait_task(task.uid()).await.failed(); let (response, code) = index.list_batches().await; assert_eq!(code, 200); assert_eq!(