fix a deadlock

This commit is contained in:
Tamo 2022-09-26 22:26:30 +02:00 committed by Clément Renault
parent 22bfb5a7a0
commit 0ba1c46e19
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 52 additions and 13 deletions

View File

@ -427,7 +427,7 @@ impl IndexScheduler {
Ok(None)
}
pub(crate) fn process_batch(&self, wtxn: &mut RwTxn, batch: Batch) -> Result<Vec<Task>> {
pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> {
match batch {
Batch::Cancel(_) => todo!(),
Batch::Snapshot(_) => todo!(),
@ -439,7 +439,12 @@ impl IndexScheduler {
content_files,
mut tasks,
} => {
let index = self.index_mapper.create_index(wtxn, &index_uid)?;
// we NEED a write transaction for the index creation.
// To avoid blocking the whole process we're going to commit asap.
let mut wtxn = self.env.write_txn()?;
let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?;
wtxn.commit()?;
let ret = index.update_documents(
IndexDocumentsMethod::ReplaceDocuments,
primary_key,
@ -474,7 +479,9 @@ impl IndexScheduler {
settings: _,
settings_tasks: _,
} => {
let index = self.index_mapper.create_index(wtxn, &index_uid)?;
let mut wtxn = self.env.write_txn()?;
let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?;
wtxn.commit()?;
let mut updated_tasks = Vec::new();
/*

View File

@ -133,7 +133,8 @@ pub struct IndexScheduler {
pub enum Breakpoint {
Start,
BatchCreated,
BatchProcessed,
BeforeProcessing,
AfterProcessing,
}
impl IndexScheduler {
@ -346,11 +347,13 @@ impl IndexScheduler {
#[cfg(test)]
self.test_breakpoint_sdr.send(Breakpoint::Start).unwrap();
let mut wtxn = self.env.write_txn()?;
let batch = match self.create_next_batch(&wtxn)? {
let rtxn = self.env.read_txn()?;
let batch = match self.create_next_batch(&rtxn)? {
Some(batch) => batch,
None => return Ok(()),
};
// we don't need this transaction any longer.
drop(rtxn);
// 1. store the starting date with the bitmap of processing tasks.
let mut ids = batch.ids();
@ -360,12 +363,19 @@ impl IndexScheduler {
*self.processing_tasks.write().unwrap() = (started_at, processing_tasks);
#[cfg(test)]
{
self.test_breakpoint_sdr
.send(Breakpoint::BatchCreated)
.unwrap();
self.test_breakpoint_sdr
.send(Breakpoint::BeforeProcessing)
.unwrap();
}
// 2. process the tasks
let res = self.process_batch(&mut wtxn, batch);
// 2. Process the tasks
let res = self.process_batch(batch);
let mut wtxn = self.env.write_txn()?;
let finished_at = OffsetDateTime::now_utc();
match res {
@ -403,7 +413,7 @@ impl IndexScheduler {
#[cfg(test)]
self.test_breakpoint_sdr
.send(Breakpoint::BatchProcessed)
.send(Breakpoint::AfterProcessing)
.unwrap();
Ok(())
@ -551,6 +561,28 @@ mod tests {
assert_smol_debug_snapshot!(index_tasks, @r###"[("catto", RoaringBitmap<[0, 1, 3]>), ("doggo", RoaringBitmap<[4]>)]"###);
}
#[test]
fn insert_task_while_another_task_is_processing() {
let (index_scheduler, handle) = IndexScheduler::test();
index_scheduler.register(KindWithContent::Snapshot).unwrap();
handle.wait_till(Breakpoint::BatchCreated);
// while the task is processing can we register another task?
index_scheduler.register(KindWithContent::Snapshot).unwrap();
index_scheduler
.register(KindWithContent::IndexDeletion {
index_uid: S("doggos"),
})
.unwrap();
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
tasks.reverse();
assert_eq!(tasks.len(), 3);
assert_eq!(tasks[0].status, Status::Processing);
assert_eq!(tasks[1].status, Status::Enqueued);
assert_eq!(tasks[2].status, Status::Enqueued);
}
#[test]
fn document_addition() {
let (index_scheduler, handle) = IndexScheduler::test();
@ -609,7 +641,7 @@ mod tests {
}
]
"###);
assert_eq!(handle.next_breakpoint(), Breakpoint::BatchProcessed);
handle.wait_till(Breakpoint::AfterProcessing);
let task = index_scheduler.get_tasks(Query::default()).unwrap();
assert_json_snapshot!(task,