fix the task cancelation

This commit is contained in:
Tamo 2024-11-19 01:47:42 +01:00
parent 15eefa4fcc
commit 1fcb9526f5
No known key found for this signature in database
GPG key ID: 20CD8020AFA88D69
180 changed files with 524 additions and 644 deletions

View file

@ -228,7 +228,6 @@ mod db_name {
pub const BATCH_STATUS: &str = "batch-status";
pub const BATCH_KIND: &str = "batch-kind";
pub const BATCH_INDEX_TASKS: &str = "batch-index-tasks";
pub const BATCH_CANCELED_BY: &str = "batch-canceled_by";
pub const BATCH_ENQUEUED_AT: &str = "batch-enqueued-at";
pub const BATCH_STARTED_AT: &str = "batch-started-at";
pub const BATCH_FINISHED_AT: &str = "batch-finished-at";
@ -344,8 +343,6 @@ pub struct IndexScheduler {
pub(crate) batch_kind: Database<SerdeBincode<Kind>, RoaringBitmapCodec>,
/// Store the batches associated to an index.
pub(crate) batch_index_tasks: Database<Str, RoaringBitmapCodec>,
/// Store the batches containing a task canceled by a task uid
pub(crate) batch_canceled_by: Database<BEU32, RoaringBitmapCodec>,
/// Store the batches containing tasks which were enqueued at a specific date
pub(crate) batch_enqueued_at: Database<BEI128, CboRoaringBitmapCodec>,
/// Store the batches containing finished tasks started at a specific date
@ -437,7 +434,6 @@ impl IndexScheduler {
batch_status: self.batch_status,
batch_kind: self.batch_kind,
batch_index_tasks: self.batch_index_tasks,
batch_canceled_by: self.batch_canceled_by,
batch_enqueued_at: self.batch_enqueued_at,
batch_started_at: self.batch_started_at,
batch_finished_at: self.batch_finished_at,
@ -501,7 +497,7 @@ impl IndexScheduler {
let env = unsafe {
heed::EnvOpenOptions::new()
.max_dbs(20)
.max_dbs(19)
.map_size(budget.task_db_size)
.open(options.tasks_path)
}?;
@ -527,7 +523,6 @@ impl IndexScheduler {
let batch_status = env.create_database(&mut wtxn, Some(db_name::BATCH_STATUS))?;
let batch_kind = env.create_database(&mut wtxn, Some(db_name::BATCH_KIND))?;
let batch_index_tasks = env.create_database(&mut wtxn, Some(db_name::BATCH_INDEX_TASKS))?;
let batch_canceled_by = env.create_database(&mut wtxn, Some(db_name::BATCH_CANCELED_BY))?;
let batch_enqueued_at = env.create_database(&mut wtxn, Some(db_name::BATCH_ENQUEUED_AT))?;
let batch_started_at = env.create_database(&mut wtxn, Some(db_name::BATCH_STARTED_AT))?;
let batch_finished_at = env.create_database(&mut wtxn, Some(db_name::BATCH_FINISHED_AT))?;
@ -554,7 +549,6 @@ impl IndexScheduler {
batch_status,
batch_kind,
batch_index_tasks,
batch_canceled_by,
batch_enqueued_at,
batch_started_at,
batch_finished_at,
@ -1003,16 +997,23 @@ impl IndexScheduler {
batches &= batches_by_task_uids;
}
// There is no database for this query, we must retrieve the task queried by the client and ensure it's valid
if let Some(canceled_by) = &query.canceled_by {
let mut all_canceled_batches = RoaringBitmap::new();
for cancel_uid in canceled_by {
if let Some(canceled_by_uid) = self.batch_canceled_by.get(rtxn, cancel_uid)? {
all_canceled_batches |= canceled_by_uid;
if let Some(task) = self.get_task(rtxn, *cancel_uid)? {
if task.kind.as_kind() == Kind::TaskCancelation
&& task.status == Status::Succeeded
{
if let Some(batch_uid) = task.batch_uid {
all_canceled_batches.insert(batch_uid);
}
}
}
}
// if the canceled_by has been specified but no batch
// matches then we prefer matching zero than all tasks.
// matches then we prefer matching zero than all batches.
if all_canceled_batches.is_empty() {
return Ok(RoaringBitmap::new());
} else {
@ -1266,24 +1267,35 @@ impl IndexScheduler {
}
}
// Any task that is internally associated with a non-authorized index
// must be discarded.
// This works because currently batches cannot contains tasks from multiple indexes at the same time.
// Any batch that is internally associated with at least one authorized index
// must be returned.
if !filters.all_indexes_authorized() {
let mut valid_indexes = RoaringBitmap::new();
let mut forbidden_indexes = RoaringBitmap::new();
let all_indexes_iter = self.batch_index_tasks.iter(rtxn)?;
for result in all_indexes_iter {
let (index, index_tasks) = result?;
if !filters.is_index_authorized(index) {
batches -= index_tasks;
if filters.is_index_authorized(index) {
valid_indexes |= index_tasks;
} else {
forbidden_indexes |= index_tasks;
}
}
if let Some(batch) = processing.batch.as_ref() {
for index in &batch.indexes {
if !filters.is_index_authorized(index) {
batches.remove(batch.uid);
if filters.is_index_authorized(index) {
valid_indexes.insert(batch.uid);
} else {
forbidden_indexes.insert(batch.uid);
}
}
}
// If a batch had ONE valid task then it should be returned
let invalid_batches = forbidden_indexes - valid_indexes;
batches -= invalid_batches;
}
Ok((batches, total_batches.len()))
@ -1559,11 +1571,16 @@ impl IndexScheduler {
// 2. Process the tasks
let res = {
let cloned_index_scheduler = self.private_clone();
let handle = std::thread::Builder::new()
.name(String::from("batch-operation"))
.spawn(move || cloned_index_scheduler.process_batch(batch))
.unwrap();
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
let processing_batch = &mut processing_batch;
std::thread::scope(|s| {
let handle = std::thread::Builder::new()
.name(String::from("batch-operation"))
.spawn_scoped(s, move || {
cloned_index_scheduler.process_batch(batch, processing_batch)
})
.unwrap();
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
})
};
// Reset the currently updating index to relinquish the index handle
@ -1582,11 +1599,20 @@ impl IndexScheduler {
let mut success = 0;
let mut failure = 0;
let mut canceled_by = None;
let mut canceled = RoaringBitmap::new();
dbg!(&tasks);
#[allow(unused_variables)]
for (i, mut task) in tasks.into_iter().enumerate() {
task.started_at = Some(processing_batch.started_at);
task.finished_at = Some(finished_at);
if task.status != Status::Canceled {
task.started_at = Some(processing_batch.started_at);
task.finished_at = Some(finished_at);
} else {
canceled.insert(task.uid);
canceled_by = task.canceled_by;
}
#[cfg(test)]
self.maybe_fail(
@ -1604,6 +1630,10 @@ impl IndexScheduler {
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?;
processing_batch.update(&task);
}
if let Some(canceled_by) = canceled_by {
println!("inserting the canceled by {canceled_by}: {canceled:?}");
self.canceled_by.put(&mut wtxn, &canceled_by, &canceled)?;
}
tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks.");
}
// If we have an abortion error we must stop the tick here and re-schedule tasks.
@ -4136,6 +4166,7 @@ mod tests {
tasks: [0, 1, 2, 3].into_iter().collect(),
};
let task_cancelation = index_scheduler.register(kind, None, false).unwrap();
println!("HEEERE");
handle.advance_n_successful_batches(1);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
@ -4577,13 +4608,12 @@ mod tests {
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.unwrap();
// 0 is not returned because it was not canceled, 3 is not returned because it is the uid of the
// taskCancelation itself
snapshot!(snapshot_bitmap(&batches), @"[1,2,]");
// The batch zero was the index creation task, the 1 is the task cancellation
snapshot!(snapshot_bitmap(&batches), @"[1,]");
let query = Query { canceled_by: Some(vec![task_cancelation.uid]), ..Query::default() };
let (batches, _) = index_scheduler
.get_task_ids_from_authorized_indexes(
.get_batch_ids_from_authorized_indexes(
&rtxn,
&query,
&AuthFilter::with_allowed_indexes(
@ -5885,7 +5915,7 @@ mod tests {
let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None };
let task = index_scheduler.register(kind, None, true).unwrap();
snapshot!(task.uid, @"0");
snapshot!(snapshot_index_scheduler(&index_scheduler), @r###"
snapshot!(snapshot_index_scheduler(&index_scheduler), @r"
### Autobatching Enabled = true
### Processing batch None:
[]
@ -5919,9 +5949,6 @@ mod tests {
### Batches Kind:
----------------------------------------------------------------------
### Batches Index Tasks:
----------------------------------------------------------------------
### Batches Canceled By:
----------------------------------------------------------------------
### Batches Enqueued At:
----------------------------------------------------------------------
@ -5932,12 +5959,12 @@ mod tests {
### File Store:
----------------------------------------------------------------------
"###);
");
let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None };
let task = index_scheduler.register(kind, Some(12), true).unwrap();
snapshot!(task.uid, @"12");
snapshot!(snapshot_index_scheduler(&index_scheduler), @r###"
snapshot!(snapshot_index_scheduler(&index_scheduler), @r"
### Autobatching Enabled = true
### Processing batch None:
[]
@ -5971,9 +5998,6 @@ mod tests {
### Batches Kind:
----------------------------------------------------------------------
### Batches Index Tasks:
----------------------------------------------------------------------
### Batches Canceled By:
----------------------------------------------------------------------
### Batches Enqueued At:
----------------------------------------------------------------------
@ -5984,7 +6008,7 @@ mod tests {
### File Store:
----------------------------------------------------------------------
"###);
");
}
#[test]