fix the upgrade test

This commit is contained in:
Tamo 2025-01-21 16:11:53 +01:00
parent 87a0367524
commit 1af895bdfb
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
6 changed files with 223 additions and 56 deletions

View File

@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::ops::{Bound, RangeBounds};
use meilisearch_types::batches::{Batch, BatchId};
@ -10,7 +11,10 @@ use time::OffsetDateTime;
use super::{Query, Queue};
use crate::processing::ProcessingTasks;
use crate::utils::{insert_task_datetime, keep_ids_within_datetimes, map_bound, ProcessingBatch};
use crate::utils::{
insert_task_datetime, keep_ids_within_datetimes, map_bound, remove_task_datetime,
ProcessingBatch,
};
use crate::{Error, Result, BEI128};
/// Database const names for the `IndexScheduler`.
@ -159,6 +163,8 @@ impl BatchQueue {
}
pub(crate) fn write_batch(&self, wtxn: &mut RwTxn, batch: ProcessingBatch) -> Result<()> {
let old_batch = self.all_batches.get(wtxn, &batch.uid)?;
self.all_batches.put(
wtxn,
&batch.uid,
@ -172,30 +178,91 @@ impl BatchQueue {
},
)?;
// Update the statuses
if let Some(ref old_batch) = old_batch {
for status in old_batch.stats.status.keys() {
self.update_status(wtxn, *status, |bitmap| {
bitmap.remove(batch.uid);
})?;
}
}
for status in batch.statuses {
self.update_status(wtxn, status, |bitmap| {
bitmap.insert(batch.uid);
})?;
}
// Update the kinds / types
if let Some(ref old_batch) = old_batch {
let kinds: HashSet<_> = old_batch.stats.types.keys().cloned().collect();
for kind in kinds.difference(&batch.kinds) {
self.update_kind(wtxn, *kind, |bitmap| {
bitmap.remove(batch.uid);
})?;
}
}
for kind in batch.kinds {
self.update_kind(wtxn, kind, |bitmap| {
bitmap.insert(batch.uid);
})?;
}
// Update the indexes
if let Some(ref old_batch) = old_batch {
let indexes: HashSet<_> = old_batch.stats.index_uids.keys().cloned().collect();
for index in indexes.difference(&batch.indexes) {
self.update_index(wtxn, index, |bitmap| {
bitmap.remove(batch.uid);
})?;
}
}
for index in batch.indexes {
self.update_index(wtxn, &index, |bitmap| {
bitmap.insert(batch.uid);
})?;
}
// Update the enqueued_at, we cannot retrieve the previous enqueued at from the previous batch and
// must instead go through the db looking for it. We cannot look at the task contained in this batch either
// because they may have been removed.
// What we know though is that the task date from before the enqueued_at and only two timestamp have been written
// to the DB per batches.
if let Some(ref old_batch) = old_batch {
let started_at = old_batch.started_at.unix_timestamp_nanos();
let mut exit = false;
let mut iterator = self.enqueued_at.rev_iter_mut(wtxn)?;
while let Some(entry) = iterator.next() {
let (key, mut value) = entry?;
if key > started_at {
continue;
}
if value.remove(old_batch.uid) {
// Safe because the key and value are owned
unsafe {
iterator.put_current(&key, &value)?;
}
if exit {
break;
} else {
exit = true;
}
}
}
}
if let Some(enqueued_at) = batch.oldest_enqueued_at {
insert_task_datetime(wtxn, self.enqueued_at, enqueued_at, batch.uid)?;
}
if let Some(enqueued_at) = batch.earliest_enqueued_at {
insert_task_datetime(wtxn, self.enqueued_at, enqueued_at, batch.uid)?;
}
// Update the started at and finished at
if let Some(ref old_batch) = old_batch {
remove_task_datetime(wtxn, self.started_at, old_batch.started_at, old_batch.uid)?;
if let Some(finished_at) = old_batch.finished_at {
remove_task_datetime(wtxn, self.finished_at, finished_at, old_batch.uid)?;
}
}
insert_task_datetime(wtxn, self.started_at, batch.started_at, batch.uid)?;
insert_task_datetime(wtxn, self.finished_at, batch.finished_at.unwrap(), batch.uid)?;

View File

@ -436,10 +436,15 @@ impl IndexScheduler {
let failed = &self.queue.tasks.get_status(rtxn, Status::Failed)?;
// 0. The priority over everything is to upgrade the instance
let upgrade = self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)? & (enqueued | failed);
// There shouldn't be multiple upgrade tasks but just in case we're going to batch all of them at the same time
let upgrade = self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)? & (enqueued | failed);
if !upgrade.is_empty() {
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, upgrade)?;
// In the case of an upgrade database batch, we want to find back the original batch that tried processing it
// and re-use its id
if let Some(batch_uid) = tasks.last().unwrap().batch_uid {
current_batch.uid = batch_uid;
}
current_batch.processing(&mut tasks);
return Ok(Some((Batch::UpgradeDatabase { tasks }, current_batch)));
}

View File

@ -7,11 +7,11 @@ snapshot_kind: text
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 3, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, batch_uid: 2, status: succeeded, kind: UpgradeDatabase { from: (1, 12, 0) }}
2 {uid: 2, batch_uid: 4, status: succeeded, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, batch_uid: 5, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
4 {uid: 4, batch_uid: 6, status: succeeded, details: { primary_key: Some("leaves") }, kind: IndexCreation { index_uid: "girafo", primary_key: Some("leaves") }}
0 {uid: 0, batch_uid: 1, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, batch_uid: 0, status: succeeded, kind: UpgradeDatabase { from: (1, 12, 0) }}
2 {uid: 2, batch_uid: 2, status: succeeded, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, batch_uid: 3, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
4 {uid: 4, batch_uid: 4, status: succeeded, details: { primary_key: Some("leaves") }, kind: IndexCreation { index_uid: "girafo", primary_key: Some("leaves") }}
----------------------------------------------------------------------
### Status:
enqueued []
@ -58,42 +58,38 @@ girafo: { number_of_documents: 0, field_distribution: {} }
[timestamp] [4,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, }
1 {uid: 1, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, }
2 {uid: 2, details: {}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, }
3 {uid: 3, details: {"primaryKey":"mouse"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"catto":1}}, }
4 {uid: 4, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, }
5 {uid: 5, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, }
6 {uid: 6, details: {"primaryKey":"leaves"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"girafo":1}}, }
0 {uid: 0, details: {}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, }
1 {uid: 1, details: {"primaryKey":"mouse"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"catto":1}}, }
2 {uid: 2, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, }
3 {uid: 3, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, }
4 {uid: 4, details: {"primaryKey":"leaves"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"girafo":1}}, }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [1,]
1 [1,]
2 [1,]
3 [0,]
4 [2,]
5 [3,]
6 [4,]
1 [0,]
2 [2,]
3 [3,]
4 [4,]
----------------------------------------------------------------------
### Batches Status:
succeeded [2,3,4,6,]
failed [0,1,5,]
succeeded [0,1,2,4,]
failed [3,]
----------------------------------------------------------------------
### Batches Kind:
"indexCreation" [3,4,5,6,]
"upgradeDatabase" [0,1,2,]
"indexCreation" [1,2,3,4,]
"upgradeDatabase" [0,]
----------------------------------------------------------------------
### Batches Index Tasks:
catto [3,]
doggo [4,5,]
girafo [6,]
catto [1,]
doggo [2,3,]
girafo [4,]
----------------------------------------------------------------------
### Batches Enqueued At:
[timestamp] [1,]
[timestamp] [0,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [0,1,2,]
[timestamp] [4,]
[timestamp] [5,]
[timestamp] [6,]
----------------------------------------------------------------------
### Batches Started At:
[timestamp] [0,]
@ -101,8 +97,6 @@ girafo [6,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
[timestamp] [6,]
----------------------------------------------------------------------
### Batches Finished At:
[timestamp] [0,]
@ -110,8 +104,6 @@ girafo [6,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
[timestamp] [6,]
----------------------------------------------------------------------
### File Store:

View File

@ -0,0 +1,115 @@
---
source: crates/index-scheduler/src/scheduler/test_failure.rs
snapshot_kind: text
---
### Autobatching Enabled = true
### Processing batch None:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 1, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, batch_uid: 2, status: succeeded, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, batch_uid: 3, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
4 {uid: 4, batch_uid: 4, status: succeeded, details: { primary_key: Some("leaves") }, kind: IndexCreation { index_uid: "girafo", primary_key: Some("leaves") }}
5 {uid: 5, batch_uid: 5, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(1), original_filter: "test" }, kind: TaskDeletion { query: "test", tasks: RoaringBitmap<[1]> }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,2,4,5,]
failed [3,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,2,3,4,]
"taskDeletion" [5,]
"upgradeDatabase" []
----------------------------------------------------------------------
### Index Tasks:
catto [0,]
doggo [2,3,]
girafo [4,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 0, field_distribution: {} }
doggo: { number_of_documents: 0, field_distribution: {} }
girafo: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### All Batches:
1 {uid: 1, details: {"primaryKey":"mouse"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"catto":1}}, }
2 {uid: 2, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, }
3 {uid: 3, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, }
4 {uid: 4, details: {"primaryKey":"leaves"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"girafo":1}}, }
5 {uid: 5, details: {"matchedTasks":1,"deletedTasks":1,"originalFilter":"test"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"taskDeletion":1},"indexUids":{}}, }
----------------------------------------------------------------------
### Batch to tasks mapping:
1 [0,]
2 [2,]
3 [3,]
4 [4,]
5 [5,]
----------------------------------------------------------------------
### Batches Status:
succeeded [1,2,4,5,]
failed [3,]
----------------------------------------------------------------------
### Batches Kind:
"indexCreation" [1,2,3,4,]
"taskDeletion" [5,]
"upgradeDatabase" []
----------------------------------------------------------------------
### Batches Index Tasks:
catto [1,]
doggo [2,3,]
girafo [4,]
----------------------------------------------------------------------
### Batches Enqueued At:
[timestamp] [1,]
[timestamp] [0,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### Batches Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### Batches Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -8,7 +8,7 @@ snapshot_kind: text
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, batch_uid: 1, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, kind: UpgradeDatabase { from: (1, 12, 0) }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
----------------------------------------------------------------------
### Status:
@ -42,30 +42,26 @@ doggo [2,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, }
1 {uid: 1, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [1,]
1 [1,]
----------------------------------------------------------------------
### Batches Status:
failed [0,1,]
failed [0,]
----------------------------------------------------------------------
### Batches Kind:
"upgradeDatabase" [0,1,]
"upgradeDatabase" [0,]
----------------------------------------------------------------------
### Batches Index Tasks:
----------------------------------------------------------------------
### Batches Enqueued At:
[timestamp] [0,1,]
[timestamp] [0,]
----------------------------------------------------------------------
### Batches Started At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Batches Finished At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### File Store:

View File

@ -8,7 +8,7 @@ snapshot_kind: text
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, batch_uid: 2, status: succeeded, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, batch_uid: 0, status: succeeded, kind: UpgradeDatabase { from: (1, 12, 0) }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
----------------------------------------------------------------------
@ -44,36 +44,28 @@ doggo [2,3,]
[timestamp] [1,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, }
1 {uid: 1, details: {}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, }
2 {uid: 2, details: {}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, }
0 {uid: 0, details: {}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [1,]
1 [1,]
2 [1,]
----------------------------------------------------------------------
### Batches Status:
succeeded [2,]
failed [0,1,]
succeeded [0,]
failed []
----------------------------------------------------------------------
### Batches Kind:
"upgradeDatabase" [0,1,2,]
"upgradeDatabase" [0,]
----------------------------------------------------------------------
### Batches Index Tasks:
----------------------------------------------------------------------
### Batches Enqueued At:
[timestamp] [0,1,2,]
[timestamp] [0,]
----------------------------------------------------------------------
### Batches Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Batches Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### File Store: