Merge branch 'main' into index-swap-error-handling

This commit is contained in:
Tamo 2022-10-27 18:06:45 +02:00 committed by GitHub
commit c9f89d38e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 475 additions and 169 deletions

View file

@ -41,7 +41,7 @@ pub use error::Error;
use file_store::FileStore;
use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env};
use meilisearch_types::heed::{self, Database, Env, RoTxn};
use meilisearch_types::milli;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::update::IndexerConfig;
@ -101,7 +101,7 @@ pub struct Query {
}
impl Query {
/// Return `true` iff every field of the query is set to `None`, such that the query
/// Return `true` if every field of the query is set to `None`, such that the query
/// matches all tasks.
pub fn is_empty(&self) -> bool {
matches!(
@ -394,6 +394,10 @@ impl IndexScheduler {
Ok(this)
}
pub fn read_txn(&self) -> Result<RoTxn> {
self.env.read_txn().map_err(|e| e.into())
}
/// Start the run loop for the given index scheduler.
///
/// This function will execute in a different thread and must be called
@ -443,14 +447,12 @@ impl IndexScheduler {
self.index_mapper.indexes(&rtxn)
}
/// Return the task ids matched by the given query.
pub fn get_task_ids(&self, query: &Query) -> Result<RoaringBitmap> {
let rtxn = self.env.read_txn()?;
/// Return the task ids matched by the given query from the index scheduler's point of view.
pub(crate) fn get_task_ids(&self, rtxn: &RoTxn, query: &Query) -> Result<RoaringBitmap> {
let ProcessingTasks { started_at: started_at_processing, processing: processing_tasks } =
self.processing_tasks.read().unwrap().clone();
let mut tasks = self.all_task_ids(&rtxn)?;
let mut tasks = self.all_task_ids(rtxn)?;
if let Some(from) = &query.from {
tasks.remove_range(from.saturating_add(1)..);
@ -464,7 +466,7 @@ impl IndexScheduler {
Status::Processing => {
status_tasks |= &processing_tasks;
}
status => status_tasks |= &self.get_status(&rtxn, *status)?,
status => status_tasks |= &self.get_status(rtxn, *status)?,
};
}
if !status.contains(&Status::Processing) {
@ -481,7 +483,7 @@ impl IndexScheduler {
if let Some(kind) = &query.kind {
let mut kind_tasks = RoaringBitmap::new();
for kind in kind {
kind_tasks |= self.get_kind(&rtxn, *kind)?;
kind_tasks |= self.get_kind(rtxn, *kind)?;
}
tasks &= &kind_tasks;
}
@ -489,7 +491,7 @@ impl IndexScheduler {
if let Some(index) = &query.index_uid {
let mut index_tasks = RoaringBitmap::new();
for index in index {
index_tasks |= self.index_tasks(&rtxn, index)?;
index_tasks |= self.index_tasks(rtxn, index)?;
}
tasks &= &index_tasks;
}
@ -530,7 +532,7 @@ impl IndexScheduler {
};
keep_tasks_within_datetimes(
&rtxn,
rtxn,
&mut filtered_non_processing_tasks,
self.started_at,
query.after_started_at,
@ -540,7 +542,7 @@ impl IndexScheduler {
};
keep_tasks_within_datetimes(
&rtxn,
rtxn,
&mut tasks,
self.enqueued_at,
query.after_enqueued_at,
@ -548,7 +550,7 @@ impl IndexScheduler {
)?;
keep_tasks_within_datetimes(
&rtxn,
rtxn,
&mut tasks,
self.finished_at,
query.after_finished_at,
@ -562,10 +564,70 @@ impl IndexScheduler {
Ok(tasks)
}
/// Returns the tasks matched by the given query.
pub fn get_tasks(&self, query: Query) -> Result<Vec<Task>> {
let tasks = self.get_task_ids(&query)?;
/// Return true iff there is at least one task associated with this index
/// that is processing.
pub fn is_index_processing(&self, index: &str) -> Result<bool> {
let rtxn = self.env.read_txn()?;
let processing_tasks = self.processing_tasks.read().unwrap().processing.clone();
let index_tasks = self.index_tasks(&rtxn, index)?;
let nbr_index_processing_tasks = processing_tasks.intersection_len(&index_tasks);
Ok(nbr_index_processing_tasks > 0)
}
/// Return the task ids matching the query from the user's point of view.
///
/// There are two differences between an internal query and a query executed by
/// the user.
///
/// 1. IndexSwap tasks are not publicly associated with any index, but they are associated
/// with many indexes internally.
/// 2. The user may not have the rights to access the tasks (internally) associated with all indexes.
pub fn get_task_ids_from_authorized_indexes(
&self,
rtxn: &RoTxn,
query: &Query,
authorized_indexes: &Option<Vec<String>>,
) -> Result<RoaringBitmap> {
let mut tasks = self.get_task_ids(rtxn, query)?;
// If the query contains a list of index_uid, then we must exclude IndexSwap tasks
// from the result (because it is not publicly associated with any index)
if query.index_uid.is_some() {
tasks -= self.get_kind(rtxn, Kind::IndexSwap)?
}
// Any task that is internally associated with a non-authorized index
// must be discarded.
if let Some(authorized_indexes) = authorized_indexes {
let all_indexes_iter = self.index_tasks.iter(rtxn)?;
for result in all_indexes_iter {
let (index, index_tasks) = result?;
if !authorized_indexes.contains(&index.to_owned()) {
tasks -= index_tasks;
}
}
}
Ok(tasks)
}
/// Return the tasks matching the query from the user's point of view.
///
/// There are two differences between an internal query and a query executed by
/// the user.
///
/// 1. IndexSwap tasks are not publicly associated with any index, but they are associated
/// with many indexes internally.
/// 2. The user may not have the rights to access the tasks (internally) associated with all indexes.
pub fn get_tasks_from_authorized_indexes(
&self,
query: Query,
authorized_indexes: Option<Vec<String>>,
) -> Result<Vec<Task>> {
let rtxn = self.env.read_txn()?;
let tasks =
self.get_task_ids_from_authorized_indexes(&rtxn, &query, &authorized_indexes)?;
let tasks = self.get_existing_tasks(
&rtxn,
@ -1192,12 +1254,7 @@ mod tests {
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
tasks.reverse();
assert_eq!(tasks.len(), 3);
assert_eq!(tasks[0].status, Status::Succeeded);
assert_eq!(tasks[1].status, Status::Succeeded);
assert_eq!(tasks[2].status, Status::Succeeded);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_tasks_processed");
}
#[test]
@ -1236,13 +1293,7 @@ mod tests {
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
tasks.reverse();
assert_eq!(tasks.len(), 4);
assert_eq!(tasks[0].status, Status::Succeeded);
assert_eq!(tasks[1].status, Status::Succeeded);
assert_eq!(tasks[2].status, Status::Succeeded);
assert_eq!(tasks[3].status, Status::Succeeded);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_tasks_processed");
}
#[test]
@ -1498,15 +1549,7 @@ mod tests {
index_scheduler.assert_internally_consistent();
}
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
tasks.reverse();
assert_eq!(tasks.len(), 6);
assert_eq!(tasks[0].status, Status::Succeeded);
assert_eq!(tasks[1].status, Status::Succeeded);
assert_eq!(tasks[2].status, Status::Succeeded);
assert_eq!(tasks[3].status, Status::Succeeded);
assert_eq!(tasks[4].status, Status::Succeeded);
assert_eq!(tasks[5].status, Status::Succeeded);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_tasks_processed");
}
#[test]
@ -2110,37 +2153,45 @@ mod tests {
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "finished");
let rtxn = index_scheduler.env.read_txn().unwrap();
let query = Query { limit: Some(0), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[]");
let query = Query { limit: Some(1), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
let query = Query { limit: Some(2), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]");
let query = Query { from: Some(1), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
let query = Query { from: Some(2), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[1,]");
let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
}
#[test]
fn query_processing_tasks() {
fn query_tasks_simple() {
let start_time = OffsetDateTime::now_utc();
let (index_scheduler, handle) =
@ -2157,19 +2208,24 @@ mod tests {
handle.wait_till(Breakpoint::BatchCreated);
let rtxn = index_scheduler.env.read_txn().unwrap();
let query = Query { status: Some(vec![Status::Processing]), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[0,]"); // only the processing tasks in the first tick
let query = Query { status: Some(vec![Status::Enqueued]), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); // only the enqueued tasks in the first tick
let query = Query {
status: Some(vec![Status::Enqueued, Status::Processing]),
..Default::default()
};
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick
let query = Query {
@ -2177,7 +2233,8 @@ mod tests {
after_started_at: Some(start_time),
..Default::default()
};
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
// that comes after the start of the test, which should excludes the enqueued tasks
snapshot!(snapshot_bitmap(&tasks), @"[0,]");
@ -2187,7 +2244,8 @@ mod tests {
before_started_at: Some(start_time),
..Default::default()
};
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
// that comes before the start of the test, which should excludes all of them
snapshot!(snapshot_bitmap(&tasks), @"[]");
@ -2198,7 +2256,8 @@ mod tests {
before_started_at: Some(start_time + Duration::minutes(1)),
..Default::default()
};
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
// that comes after the start of the test and before one minute after the start of the test,
// which should exclude the enqueued tasks and include the only processing task
@ -2206,6 +2265,8 @@ mod tests {
handle.wait_till(Breakpoint::BatchCreated);
let rtxn = index_scheduler.env.read_txn().unwrap();
let second_start_time = OffsetDateTime::now_utc();
let query = Query {
@ -2214,7 +2275,8 @@ mod tests {
before_started_at: Some(start_time + Duration::minutes(1)),
..Default::default()
};
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
// that comes after the start of the test and before one minute after the start of the test,
// which should include all tasks
@ -2225,7 +2287,8 @@ mod tests {
before_started_at: Some(start_time),
..Default::default()
};
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
// that comes before the start of the test, which should exclude all tasks
snapshot!(snapshot_bitmap(&tasks), @"[]");
@ -2236,7 +2299,8 @@ mod tests {
before_started_at: Some(second_start_time + Duration::minutes(1)),
..Default::default()
};
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
// that comes after the start of the second part of the test and before one minute after the
// second start of the test, which should exclude all tasks
@ -2244,7 +2308,11 @@ mod tests {
// now we make one more batch, the started_at field of the new tasks will be past `second_start_time`
handle.wait_till(Breakpoint::BatchCreated);
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let rtxn = index_scheduler.env.read_txn().unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// we run the same query to verify that, and indeed find that the last task is matched
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
@ -2254,15 +2322,19 @@ mod tests {
before_started_at: Some(second_start_time + Duration::minutes(1)),
..Default::default()
};
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// enqueued, succeeded, or processing tasks started after the second part of the test, should
// again only return the last task
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
handle.wait_till(Breakpoint::AfterProcessing);
let rtxn = index_scheduler.read_txn().unwrap();
// now the last task should have failed
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "end");
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// so running the last query should return nothing
snapshot!(snapshot_bitmap(&tasks), @"[]");
@ -2272,7 +2344,8 @@ mod tests {
before_started_at: Some(second_start_time + Duration::minutes(1)),
..Default::default()
};
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// but the same query on failed tasks should return the last task
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
@ -2282,7 +2355,8 @@ mod tests {
before_started_at: Some(second_start_time + Duration::minutes(1)),
..Default::default()
};
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// but the same query on failed tasks should return the last task
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
@ -2293,7 +2367,8 @@ mod tests {
before_started_at: Some(second_start_time + Duration::minutes(1)),
..Default::default()
};
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// same query but with an invalid uid
snapshot!(snapshot_bitmap(&tasks), @"[]");
@ -2304,11 +2379,77 @@ mod tests {
before_started_at: Some(second_start_time + Duration::minutes(1)),
..Default::default()
};
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// same query but with a valid uid
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
}
#[test]
fn query_tasks_special_rules() {
let (index_scheduler, handle) =
IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]);
let kind = index_creation_task("catto", "mouse");
let _task = index_scheduler.register(kind).unwrap();
let kind = index_creation_task("doggo", "sheep");
let _task = index_scheduler.register(kind).unwrap();
let kind = KindWithContent::IndexSwap {
swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }],
};
let _task = index_scheduler.register(kind).unwrap();
let kind = KindWithContent::IndexSwap {
swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "whalo".to_owned()) }],
};
let _task = index_scheduler.register(kind).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
handle.wait_till(Breakpoint::BatchCreated);
let rtxn = index_scheduler.env.read_txn().unwrap();
let query = Query { index_uid: Some(vec!["catto".to_owned()]), ..Default::default() };
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// only the first task associated with catto is returned, the indexSwap tasks are excluded!
snapshot!(snapshot_bitmap(&tasks), @"[0,]");
let query = Query { index_uid: Some(vec!["catto".to_owned()]), ..Default::default() };
let tasks = index_scheduler
.get_task_ids_from_authorized_indexes(&rtxn, &query, &Some(vec!["doggo".to_owned()]))
.unwrap();
// we have asked for only the tasks associated with catto, but are only authorized to retrieve the tasks
// associated with doggo -> empty result
snapshot!(snapshot_bitmap(&tasks), @"[]");
let query = Query::default();
let tasks = index_scheduler
.get_task_ids_from_authorized_indexes(&rtxn, &query, &Some(vec!["doggo".to_owned()]))
.unwrap();
// we asked for all the tasks, but we are only authorized to retrieve the doggo tasks
// -> only the index creation of doggo should be returned
snapshot!(snapshot_bitmap(&tasks), @"[1,]");
let query = Query::default();
let tasks = index_scheduler
.get_task_ids_from_authorized_indexes(
&rtxn,
&query,
&Some(vec!["catto".to_owned(), "doggo".to_owned()]),
)
.unwrap();
// we asked for all the tasks, but we are only authorized to retrieve the doggo and catto tasks
// -> all tasks except the swap of catto with whalo are returned
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
let query = Query::default();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// we asked for all the tasks with all index authorized -> all tasks returned
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,3,]");
}
#[test]
fn fail_in_create_batch_for_index_creation() {
let (index_scheduler, handle) =