flush the dump-writer only once everything has been inserted

This commit is contained in:
Tamo 2022-10-17 17:04:52 +02:00 committed by Clément Renault
parent b87b071718
commit 8e469d8d1d
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
3 changed files with 32 additions and 7 deletions

View file

@ -499,17 +499,18 @@ impl IndexScheduler {
unreachable!();
};
let dump = dump::DumpWriter::new(instance_uid.clone())?;
let mut dump_keys = dump.create_keys()?;
// 1. dump the keys
let mut dump_keys = dump.create_keys()?;
for key in keys {
dump_keys.push_key(key)?;
}
dump_keys.flush()?;
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
let mut tasks = dump.create_tasks_queue()?;
let mut dump_tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
let (_, mut t) = ret?;
let status = t.status;
@ -527,7 +528,7 @@ impl IndexScheduler {
t.started_at = Some(started_at);
t.finished_at = Some(finished_at);
}
let mut dump_content_file = tasks.push_task(&t.into())?;
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file {
@ -548,9 +549,11 @@ impl IndexScheduler {
&documents_batch_index,
)?)?;
}
dump_content_file.flush()?;
}
}
}
dump_tasks.flush()?;
// TODO: maybe `self.indexes` could use this rtxn instead of creating its own
drop(rtxn);
@ -585,8 +588,8 @@ impl IndexScheduler {
let file = File::create(path)?;
dump.persist_to(BufWriter::new(file))?;
// if we reached this step we can tell the scheduler we succeeded to dump ourselves.
task.status = Status::Succeeded;
Ok(vec![task])
}
Batch::IndexOperation(operation) => {