mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-30 23:27:36 +01:00
Remove the batch id from the date time databases
This commit is contained in:
parent
483c52f07b
commit
f21ae1f5d1
@ -16,7 +16,10 @@ use crate::processing::{
|
||||
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
|
||||
UpdateIndexProgress,
|
||||
};
|
||||
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
|
||||
use crate::utils::{
|
||||
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
|
||||
ProcessingBatch,
|
||||
};
|
||||
use crate::{Error, IndexScheduler, Result, TaskId};
|
||||
|
||||
impl IndexScheduler {
|
||||
@ -418,7 +421,6 @@ impl IndexScheduler {
|
||||
to_delete_tasks -= &enqueued_tasks;
|
||||
|
||||
// 2. We now have a list of tasks to delete, delete them
|
||||
|
||||
let mut affected_indexes = HashSet::new();
|
||||
let mut affected_statuses = HashSet::new();
|
||||
let mut affected_kinds = HashSet::new();
|
||||
@ -515,9 +517,34 @@ impl IndexScheduler {
|
||||
tasks -= &to_delete_tasks;
|
||||
// We must remove the batch entirely
|
||||
if tasks.is_empty() {
|
||||
self.queue.batches.all_batches.delete(wtxn, &batch_id)?;
|
||||
self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?;
|
||||
if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? {
|
||||
remove_n_tasks_datetime_earlier_than(
|
||||
wtxn,
|
||||
self.queue.batches.started_at,
|
||||
batch.started_at,
|
||||
if batch.stats.total_nb_tasks >= 2 { 2 } else { 1 },
|
||||
batch_id,
|
||||
)?;
|
||||
remove_task_datetime(
|
||||
wtxn,
|
||||
self.queue.batches.started_at,
|
||||
batch.started_at,
|
||||
batch_id,
|
||||
)?;
|
||||
if let Some(finished_at) = batch.finished_at {
|
||||
remove_task_datetime(
|
||||
wtxn,
|
||||
self.queue.batches.finished_at,
|
||||
finished_at,
|
||||
batch_id,
|
||||
)?;
|
||||
}
|
||||
|
||||
self.queue.batches.all_batches.delete(wtxn, &batch_id)?;
|
||||
self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Anyway, we must remove the batch from all its reverse indexes.
|
||||
// The only way to do that is to check
|
||||
|
||||
|
@ -174,6 +174,33 @@ pub(crate) fn remove_task_datetime(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn remove_n_tasks_datetime_earlier_than(
|
||||
wtxn: &mut RwTxn,
|
||||
database: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
earlier_than: OffsetDateTime,
|
||||
mut count: usize,
|
||||
task_id: TaskId,
|
||||
) -> Result<()> {
|
||||
let earlier_than = earlier_than.unix_timestamp_nanos();
|
||||
let mut iter = database.rev_range_mut(wtxn, &(..earlier_than))?;
|
||||
while let Some((current, mut existing)) = iter.next().transpose()? {
|
||||
count -= existing.remove(task_id) as usize;
|
||||
|
||||
if existing.is_empty() {
|
||||
// safety: We don't keep references to the database
|
||||
unsafe { iter.del_current()? };
|
||||
} else {
|
||||
// safety: We don't keep references to the database
|
||||
unsafe { iter.put_current(¤t, &existing)? };
|
||||
}
|
||||
if count == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn keep_ids_within_datetimes(
|
||||
rtxn: &RoTxn,
|
||||
ids: &mut RoaringBitmap,
|
||||
|
Loading…
x
Reference in New Issue
Block a user