improve the way we access the mutex

This commit is contained in:
Tamo 2024-11-19 19:40:31 +01:00
parent 4abcd9c04e
commit b906e3ed70
No known key found for this signature in database
GPG key ID: 20CD8020AFA88D69
34 changed files with 155 additions and 155 deletions

View file

@ -163,7 +163,7 @@ impl Query {
}
#[derive(Debug, Clone)]
struct ProcessingTasks {
pub struct ProcessingTasks {
batch: Option<ProcessingBatch>,
/// The list of tasks ids that are currently running.
processing: RoaringBitmap,
@ -948,6 +948,7 @@ impl IndexScheduler {
processing: &ProcessingTasks,
query: &Query,
) -> Result<RoaringBitmap> {
dbg!();
let mut batches = self.all_batch_ids(rtxn)?;
if let Some(batch_id) = processing.batch.as_ref().map(|batch| batch.uid) {
batches.insert(batch_id);
@ -1235,22 +1236,21 @@ impl IndexScheduler {
/// 1. IndexSwap tasks are not publicly associated with any index, but they are associated
/// with many indexes internally.
/// 2. The user may not have the rights to access the tasks (internally) associated with all indexes.
pub fn get_batch_ids_from_authorized_indexes(
fn get_batch_ids_from_authorized_indexes(
&self,
rtxn: &RoTxn,
processing: &ProcessingTasks,
query: &Query,
filters: &meilisearch_auth::AuthFilter,
) -> Result<(RoaringBitmap, u64)> {
let processing = self.processing_tasks.read().unwrap().clone();
// compute all batches matching the filter by ignoring the limits, to find the number of batches matching
// the filter.
// As this causes us to compute the filter twice it is slightly inefficient, but doing it this way spares
// us from modifying the underlying implementation, and the performance remains sufficient.
// Should this change, we would modify `get_batch_ids` to directly return the number of matching batches.
let total_batches =
self.get_batch_ids(rtxn, &processing, &query.clone().without_limits())?;
let mut batches = self.get_batch_ids(rtxn, &processing, query)?;
self.get_batch_ids(rtxn, processing, &query.clone().without_limits())?;
let mut batches = self.get_batch_ids(rtxn, processing, query)?;
// If the query contains a list of index uid or there is a finite list of authorized indexes,
// then we must exclude all the batches that only contains tasks associated to multiple indexes.
@ -1369,11 +1369,14 @@ impl IndexScheduler {
filters: &meilisearch_auth::AuthFilter,
) -> Result<(Vec<Batch>, u64)> {
let rtxn = self.env.read_txn()?;
let processing = self.processing_tasks.read().unwrap().clone();
let (batches, total) =
self.get_batch_ids_from_authorized_indexes(&rtxn, &query, filters)?;
self.get_batch_ids_from_authorized_indexes(&rtxn, &processing, &query, filters)?;
let batches = self.get_existing_batches(
&rtxn,
&processing,
batches.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize),
)?;
@ -4202,46 +4205,47 @@ mod tests {
handle.advance_n_successful_batches(3);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "processed_all_tasks");
let proc = index_scheduler.processing_tasks.read().unwrap().clone();
let rtxn = index_scheduler.env.read_txn().unwrap();
let query = Query { limit: Some(0), ..Default::default() };
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[]");
let query = Query { limit: Some(1), ..Default::default() };
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[2,]");
let query = Query { limit: Some(2), ..Default::default() };
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[1,2,]");
let query = Query { from: Some(1), ..Default::default() };
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[0,1,]");
let query = Query { from: Some(2), ..Default::default() };
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[0,1,2,]");
let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[1,]");
let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[0,1,]");
}
@ -4265,16 +4269,17 @@ mod tests {
handle.advance_till([Start, BatchCreated]);
let rtxn = index_scheduler.env.read_txn().unwrap();
let proc = index_scheduler.processing_tasks.read().unwrap().clone();
let query = Query { statuses: Some(vec![Status::Processing]), ..Default::default() };
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[0,]"); // only the processing batch in the first tick
let query = Query { statuses: Some(vec![Status::Enqueued]), ..Default::default() };
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[]"); // The batches don't contains any enqueued tasks
@ -4283,7 +4288,7 @@ mod tests {
..Default::default()
};
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[0,]"); // both enqueued and processing tasks in the first tick
@ -4293,7 +4298,7 @@ mod tests {
..Default::default()
};
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
// that comes after the start of the test, which should excludes the enqueued tasks
@ -4305,7 +4310,7 @@ mod tests {
..Default::default()
};
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
// that comes before the start of the test, which should excludes all of them
@ -4318,7 +4323,7 @@ mod tests {
..Default::default()
};
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
// that comes after the start of the test and before one minute after the start of the test,
@ -4333,8 +4338,10 @@ mod tests {
Start,
BatchCreated,
]);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after-advancing-a-bit");
let rtxn = index_scheduler.env.read_txn().unwrap();
let proc = index_scheduler.processing_tasks.read().unwrap().clone();
let second_start_time = OffsetDateTime::now_utc();
@ -4345,7 +4352,7 @@ mod tests {
..Default::default()
};
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
// that comes after the start of the test and before one minute after the start of the test,
@ -4358,7 +4365,7 @@ mod tests {
..Default::default()
};
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
// that comes before the start of the test, which should exclude all tasks
@ -4371,7 +4378,7 @@ mod tests {
..Default::default()
};
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
// that comes after the start of the second part of the test and before one minute after the
@ -4389,9 +4396,10 @@ mod tests {
]);
let rtxn = index_scheduler.env.read_txn().unwrap();
let proc = index_scheduler.processing_tasks.read().unwrap().clone();
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// we run the same query to verify that, and indeed find that the last task is matched
snapshot!(snapshot_bitmap(&batches), @"[2,]");
@ -4403,7 +4411,7 @@ mod tests {
..Default::default()
};
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// enqueued, succeeded, or processing tasks started after the second part of the test, should
// again only return the last task
@ -4411,11 +4419,12 @@ mod tests {
handle.advance_till([ProcessBatchFailed, AfterProcessing]);
let rtxn = index_scheduler.read_txn().unwrap();
let proc = index_scheduler.processing_tasks.read().unwrap().clone();
// now the last task should have failed
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "end");
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// so running the last query should return nothing
snapshot!(snapshot_bitmap(&batches), @"[]");
@ -4427,7 +4436,7 @@ mod tests {
..Default::default()
};
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// but the same query on failed tasks should return the last task
snapshot!(snapshot_bitmap(&batches), @"[2,]");
@ -4439,7 +4448,7 @@ mod tests {
..Default::default()
};
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// but the same query on failed tasks should return the last task
snapshot!(snapshot_bitmap(&batches), @"[2,]");
@ -4452,7 +4461,7 @@ mod tests {
..Default::default()
};
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// same query but with an invalid uid
snapshot!(snapshot_bitmap(&batches), @"[]");
@ -4465,7 +4474,7 @@ mod tests {
..Default::default()
};
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// same query but with a valid uid
snapshot!(snapshot_bitmap(&batches), @"[2,]");
@ -4494,10 +4503,11 @@ mod tests {
handle.advance_till([Start, BatchCreated]);
let rtxn = index_scheduler.env.read_txn().unwrap();
let proc = index_scheduler.processing_tasks.read().unwrap().clone();
let query = Query { index_uids: Some(vec!["catto".to_owned()]), ..Default::default() };
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// only the first task associated with catto is returned, the indexSwap tasks are excluded!
snapshot!(snapshot_bitmap(&batches), @"[0,]");
@ -4506,6 +4516,7 @@ mod tests {
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(
&rtxn,
&proc,
&query,
&AuthFilter::with_allowed_indexes(
vec![IndexUidPattern::new_unchecked("doggo")].into_iter().collect(),
@ -4533,6 +4544,7 @@ mod tests {
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(
&rtxn,
&proc,
&query,
&AuthFilter::with_allowed_indexes(
vec![IndexUidPattern::new_unchecked("doggo")].into_iter().collect(),
@ -4547,6 +4559,7 @@ mod tests {
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(
&rtxn,
&proc,
&query,
&AuthFilter::with_allowed_indexes(
vec![
@ -4564,7 +4577,7 @@ mod tests {
let query = Query::default();
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// we asked for all the tasks with all index authorized -> all tasks returned
snapshot!(snapshot_bitmap(&batches), @"[0,1,2,3,]");
@ -4595,9 +4608,10 @@ mod tests {
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
let rtxn = index_scheduler.read_txn().unwrap();
let proc = index_scheduler.processing_tasks.read().unwrap().clone();
let query = Query { canceled_by: Some(vec![task_cancelation.uid]), ..Query::default() };
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
// The batch zero was the index creation task, the 1 is the task cancellation
snapshot!(snapshot_bitmap(&batches), @"[1,]");
@ -4606,6 +4620,7 @@ mod tests {
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(
&rtxn,
&proc,
&query,
&AuthFilter::with_allowed_indexes(
vec![IndexUidPattern::new_unchecked("doggo")].into_iter().collect(),