fix the upgrade test

This commit is contained in:
Tamo 2025-01-21 16:11:53 +01:00 committed by Louis Dureuil
parent 5458850d21
commit bac7a1623a
No known key found for this signature in database
6 changed files with 223 additions and 56 deletions

View file

@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::ops::{Bound, RangeBounds};
use meilisearch_types::batches::{Batch, BatchId};
@ -10,7 +11,10 @@ use time::OffsetDateTime;
use super::{Query, Queue};
use crate::processing::ProcessingTasks;
use crate::utils::{insert_task_datetime, keep_ids_within_datetimes, map_bound, ProcessingBatch};
use crate::utils::{
insert_task_datetime, keep_ids_within_datetimes, map_bound, remove_task_datetime,
ProcessingBatch,
};
use crate::{Error, Result, BEI128};
/// Database const names for the `IndexScheduler`.
@ -159,6 +163,8 @@ impl BatchQueue {
}
pub(crate) fn write_batch(&self, wtxn: &mut RwTxn, batch: ProcessingBatch) -> Result<()> {
let old_batch = self.all_batches.get(wtxn, &batch.uid)?;
self.all_batches.put(
wtxn,
&batch.uid,
@ -172,30 +178,91 @@ impl BatchQueue {
},
)?;
// Update the statuses
if let Some(ref old_batch) = old_batch {
for status in old_batch.stats.status.keys() {
self.update_status(wtxn, *status, |bitmap| {
bitmap.remove(batch.uid);
})?;
}
}
for status in batch.statuses {
self.update_status(wtxn, status, |bitmap| {
bitmap.insert(batch.uid);
})?;
}
// Update the kinds / types
if let Some(ref old_batch) = old_batch {
let kinds: HashSet<_> = old_batch.stats.types.keys().cloned().collect();
for kind in kinds.difference(&batch.kinds) {
self.update_kind(wtxn, *kind, |bitmap| {
bitmap.remove(batch.uid);
})?;
}
}
for kind in batch.kinds {
self.update_kind(wtxn, kind, |bitmap| {
bitmap.insert(batch.uid);
})?;
}
// Update the indexes
if let Some(ref old_batch) = old_batch {
let indexes: HashSet<_> = old_batch.stats.index_uids.keys().cloned().collect();
for index in indexes.difference(&batch.indexes) {
self.update_index(wtxn, index, |bitmap| {
bitmap.remove(batch.uid);
})?;
}
}
for index in batch.indexes {
self.update_index(wtxn, &index, |bitmap| {
bitmap.insert(batch.uid);
})?;
}
// Update the enqueued_at, we cannot retrieve the previous enqueued at from the previous batch and
// must instead go through the db looking for it. We cannot look at the task contained in this batch either
// because they may have been removed.
// What we know though is that the task date from before the enqueued_at and only two timestamp have been written
// to the DB per batches.
if let Some(ref old_batch) = old_batch {
let started_at = old_batch.started_at.unix_timestamp_nanos();
let mut exit = false;
let mut iterator = self.enqueued_at.rev_iter_mut(wtxn)?;
while let Some(entry) = iterator.next() {
let (key, mut value) = entry?;
if key > started_at {
continue;
}
if value.remove(old_batch.uid) {
// Safe because the key and value are owned
unsafe {
iterator.put_current(&key, &value)?;
}
if exit {
break;
} else {
exit = true;
}
}
}
}
if let Some(enqueued_at) = batch.oldest_enqueued_at {
insert_task_datetime(wtxn, self.enqueued_at, enqueued_at, batch.uid)?;
}
if let Some(enqueued_at) = batch.earliest_enqueued_at {
insert_task_datetime(wtxn, self.enqueued_at, enqueued_at, batch.uid)?;
}
// Update the started at and finished at
if let Some(ref old_batch) = old_batch {
remove_task_datetime(wtxn, self.started_at, old_batch.started_at, old_batch.uid)?;
if let Some(finished_at) = old_batch.finished_at {
remove_task_datetime(wtxn, self.finished_at, finished_at, old_batch.uid)?;
}
}
insert_task_datetime(wtxn, self.started_at, batch.started_at, batch.uid)?;
insert_task_datetime(wtxn, self.finished_at, batch.finished_at.unwrap(), batch.uid)?;