Add assert method to verify validity of index scheduler state

This commit is contained in:
Loïc Lecrenier 2022-10-20 13:11:50 +02:00 committed by Clément Renault
parent ecf4e43b3d
commit e3848b5f28
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
23 changed files with 282 additions and 695 deletions

View file

@ -266,3 +266,183 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
}
}
}
#[cfg(test)]
use meilisearch_types::tasks::Details;
#[cfg(test)]
impl IndexScheduler {
/// Asserts that the index scheduler's content is internally consistent.
pub fn assert_internally_consistent(&self) {
let rtxn = self.env.read_txn().unwrap();
for task in self.all_tasks.iter(&rtxn).unwrap() {
let (task_id, task) = task.unwrap();
let task_id = task_id.get();
let task_index_uid = task.index_uid().map(ToOwned::to_owned);
let Task {
uid,
enqueued_at,
started_at,
finished_at,
error: _,
canceled_by,
details,
status,
kind,
} = task;
assert_eq!(uid, task.uid);
if let Some(task_index_uid) = &task_index_uid {
assert!(self
.index_tasks
.get(&rtxn, task_index_uid.as_str())
.unwrap()
.unwrap()
.contains(task.uid));
}
let db_enqueued_at = self
.enqueued_at
.get(&rtxn, &BEI128::new(enqueued_at.unix_timestamp_nanos()))
.unwrap()
.unwrap();
assert!(db_enqueued_at.contains(task_id));
if let Some(started_at) = started_at {
let db_started_at = self
.started_at
.get(&rtxn, &BEI128::new(started_at.unix_timestamp_nanos()))
.unwrap()
.unwrap();
assert!(db_started_at.contains(task_id));
}
if let Some(finished_at) = finished_at {
let db_finished_at = self
.finished_at
.get(&rtxn, &BEI128::new(finished_at.unix_timestamp_nanos()))
.unwrap()
.unwrap();
assert!(db_finished_at.contains(task_id));
}
if let Some(canceled_by) = canceled_by {
let db_canceled_tasks = self.get_status(&rtxn, Status::Canceled).unwrap();
assert!(db_canceled_tasks.contains(canceled_by));
let db_canceling_task = self.get_task(&rtxn, canceled_by).unwrap().unwrap();
assert_eq!(db_canceling_task.status, Status::Succeeded);
match db_canceling_task.kind {
KindWithContent::TaskCancelation { query: _, tasks } => {
assert!(tasks.contains(uid));
}
_ => panic!(),
}
}
match details {
Some(details) => match details {
Details::IndexSwap { swaps } => {
todo!()
}
Details::DocumentAdditionOrUpdate { received_documents, indexed_documents } => {
assert_eq!(kind.as_kind(), Kind::DocumentAdditionOrUpdate);
if let Some(indexed_documents) = indexed_documents {
assert_eq!(status, Status::Succeeded);
assert!(indexed_documents <= received_documents);
} else {
assert_ne!(status, Status::Succeeded);
}
}
Details::SettingsUpdate { settings: _ } => {
assert_eq!(kind.as_kind(), Kind::SettingsUpdate);
}
Details::IndexInfo { primary_key: pk1 } => match &kind {
KindWithContent::IndexCreation { index_uid, primary_key: pk2 }
| KindWithContent::IndexUpdate { index_uid, primary_key: pk2 } => {
self.index_tasks
.get(&rtxn, index_uid.as_str())
.unwrap()
.unwrap()
.contains(uid);
assert_eq!(&pk1, pk2);
}
_ => panic!(),
},
Details::DocumentDeletion { received_document_ids, deleted_documents } => {
if let Some(deleted_documents) = deleted_documents {
assert_eq!(status, Status::Succeeded);
assert!(deleted_documents <= received_document_ids as u64);
assert_eq!(kind.as_kind(), Kind::DocumentDeletion);
match &kind {
KindWithContent::DocumentDeletion { index_uid, documents_ids } => {
assert_eq!(&task_index_uid.unwrap(), index_uid);
assert!(documents_ids.len() >= received_document_ids);
}
_ => panic!(),
}
} else {
assert_ne!(status, Status::Succeeded);
}
}
Details::ClearAll { deleted_documents } => {
assert!(matches!(
kind.as_kind(),
Kind::DocumentDeletion | Kind::IndexDeletion
));
if deleted_documents.is_some() {
assert_eq!(status, Status::Succeeded);
} else {
assert_ne!(status, Status::Succeeded);
}
}
Details::TaskCancelation { matched_tasks, canceled_tasks, original_query } => {
if let Some(canceled_tasks) = canceled_tasks {
assert_eq!(status, Status::Succeeded);
assert!(canceled_tasks <= matched_tasks);
match &kind {
KindWithContent::TaskCancelation { query, tasks } => {
assert_eq!(query, &original_query);
assert_eq!(tasks.len(), matched_tasks);
}
_ => panic!(),
}
} else {
assert_ne!(status, Status::Succeeded);
}
}
Details::TaskDeletion { matched_tasks, deleted_tasks, original_query } => {
if let Some(deleted_tasks) = deleted_tasks {
assert_eq!(status, Status::Succeeded);
assert!(deleted_tasks <= matched_tasks);
match &kind {
KindWithContent::TaskDeletion { query, tasks } => {
assert_eq!(query, &original_query);
assert_eq!(tasks.len(), matched_tasks);
}
_ => panic!(),
}
} else {
assert_ne!(status, Status::Succeeded);
}
}
Details::Dump { dump_uid: d1 } => {
assert!(
matches!(&kind, KindWithContent::DumpExport { dump_uid: d2, keys: _, instance_uid: _ } if &d1 == d2 )
);
}
},
None => (),
}
assert!(self.get_status(&rtxn, status).unwrap().contains(uid));
assert!(self.get_kind(&rtxn, kind.as_kind()).unwrap().contains(uid));
match kind {
KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => match status {
Status::Enqueued | Status::Processing => {
assert!(self.file_store.__all_uuids().contains(&content_file));
}
Status::Succeeded | Status::Failed | Status::Canceled => {
assert!(!self.file_store.__all_uuids().contains(&content_file));
}
},
_ => (),
}
}
}
}