stop dumping the current dumping task as enqueued so it's not looping for ever

This commit is contained in:
Tamo 2022-10-16 02:01:45 +02:00 committed by Clément Renault
parent 208c785697
commit dd506e5d87
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -22,6 +22,7 @@ use meilisearch_types::{
Index, Index,
}; };
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
@ -486,6 +487,7 @@ impl IndexScheduler {
} }
Batch::Snapshot(_) => todo!(), Batch::Snapshot(_) => todo!(),
Batch::Dump(mut task) => { Batch::Dump(mut task) => {
let started_at = OffsetDateTime::now_utc();
let KindWithContent::DumpExport { keys, instance_uid, dump_uid } = &task.kind else { let KindWithContent::DumpExport { keys, instance_uid, dump_uid } = &task.kind else {
unreachable!(); unreachable!();
}; };
@ -502,12 +504,27 @@ impl IndexScheduler {
// 2. dump the tasks // 2. dump the tasks
let mut tasks = dump.create_tasks_queue()?; let mut tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? { for ret in self.all_tasks.iter(&rtxn)? {
let (_, task) = ret?; let (_, mut t) = ret?;
let content_file = task.content_uuid().map(|uuid| uuid.clone()); let status = t.status;
let mut dump_content_file = tasks.push_task(&task.into())?; let content_file = t.content_uuid().map(|uuid| uuid.clone());
// 2.1. Dump the `content_file` associated with the task if there is one. // In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if t.uid == task.uid {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
t.status = Status::Succeeded;
t.started_at = Some(started_at);
t.finished_at = Some(finished_at);
}
let mut dump_content_file = tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file { if let Some(content_file) = content_file {
if status == Status::Enqueued {
let content_file = self.file_store.get_update(content_file)?; let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file) let reader = DocumentsBatchReader::from_reader(content_file)
@ -516,9 +533,14 @@ impl IndexScheduler {
let (mut cursor, documents_batch_index) = let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index(); reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().map_err(milli::Error::from)? { while let Some(doc) =
dump_content_file cursor.next_document().map_err(milli::Error::from)?
.push_document(&obkv_to_object(&doc, &documents_batch_index)?)?; {
dump_content_file.push_document(&obkv_to_object(
&doc,
&documents_batch_index,
)?)?;
}
} }
} }
} }