Update the tasks statuses

This commit is contained in:
Kerollmops 2022-10-04 18:50:18 +02:00 committed by Clément Renault
parent 2fbdd104b8
commit 36e5efde0d
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
5 changed files with 126 additions and 42 deletions

View File

@ -39,6 +39,7 @@ pub(crate) enum IndexOperation {
index_uid: String, index_uid: String,
primary_key: Option<String>, primary_key: Option<String>,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
documents_counts: Vec<u64>,
content_files: Vec<Uuid>, content_files: Vec<Uuid>,
tasks: Vec<Task>, tasks: Vec<Task>,
}, },
@ -70,6 +71,7 @@ pub(crate) enum IndexOperation {
primary_key: Option<String>, primary_key: Option<String>,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
documents_counts: Vec<u64>,
content_files: Vec<Uuid>, content_files: Vec<Uuid>,
document_import_tasks: Vec<Task>, document_import_tasks: Vec<Task>,
@ -130,19 +132,27 @@ impl IndexScheduler {
KindWithContent::DocumentImport { primary_key, .. } => primary_key.clone(), KindWithContent::DocumentImport { primary_key, .. } => primary_key.clone(),
_ => unreachable!(), _ => unreachable!(),
}; };
let content_files = tasks
.iter() let mut documents_counts = Vec::new();
.map(|task| match task.kind { let mut content_files = Vec::new();
KindWithContent::DocumentImport { content_file, .. } => content_file, for task in &tasks {
_ => unreachable!(), if let KindWithContent::DocumentImport {
}) content_file,
.collect(); documents_count,
..
} = task.kind
{
documents_counts.push(documents_count);
content_files.push(content_file);
}
}
Ok(Some(Batch::IndexOperation( Ok(Some(Batch::IndexOperation(
IndexOperation::DocumentImport { IndexOperation::DocumentImport {
index_uid, index_uid,
primary_key, primary_key,
method, method,
documents_counts,
content_files, content_files,
tasks, tasks,
}, },
@ -249,6 +259,7 @@ impl IndexScheduler {
( (
Some(Batch::IndexOperation(IndexOperation::DocumentImport { Some(Batch::IndexOperation(IndexOperation::DocumentImport {
primary_key, primary_key,
documents_counts,
content_files, content_files,
tasks: document_import_tasks, tasks: document_import_tasks,
.. ..
@ -263,6 +274,7 @@ impl IndexScheduler {
index_uid, index_uid,
primary_key, primary_key,
method, method,
documents_counts,
content_files, content_files,
document_import_tasks, document_import_tasks,
settings, settings,
@ -409,7 +421,7 @@ impl IndexScheduler {
task, task,
} => { } => {
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?; self.index_mapper.create_index(&mut wtxn, &index_uid)?;
wtxn.commit()?; wtxn.commit()?;
self.process_batch(Batch::IndexUpdate { self.process_batch(Batch::IndexUpdate {
@ -421,12 +433,12 @@ impl IndexScheduler {
Batch::IndexUpdate { Batch::IndexUpdate {
index_uid, index_uid,
primary_key, primary_key,
task, mut task,
} => { } => {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let index = self.index_mapper.index(&rtxn, &index_uid)?; let index = self.index_mapper.index(&rtxn, &index_uid)?;
if let Some(primary_key) = primary_key { if let Some(primary_key) = primary_key.clone() {
let mut index_wtxn = index.write_txn()?; let mut index_wtxn = index.write_txn()?;
let mut builder = milli::update::Settings::new( let mut builder = milli::update::Settings::new(
&mut index_wtxn, &mut index_wtxn,
@ -438,14 +450,28 @@ impl IndexScheduler {
index_wtxn.commit()?; index_wtxn.commit()?;
} }
task.status = Status::Succeeded;
task.details = Some(Details::IndexInfo { primary_key });
Ok(vec![task]) Ok(vec![task])
} }
Batch::IndexDeletion { index_uid, tasks } => { Batch::IndexDeletion {
index_uid,
mut tasks,
} => {
let wtxn = self.env.write_txn()?; let wtxn = self.env.write_txn()?;
// The write transaction is directly owned and commited here. // The write transaction is directly owned and commited inside.
let index = self.index_mapper.delete_index(wtxn, &index_uid)?; self.index_mapper.delete_index(wtxn, &index_uid)?;
todo!("update the tasks and mark them as succeeded"); // We set all the tasks details to the default value.
for task in &mut tasks {
task.status = Status::Succeeded;
// TODO should we put a details = None, here?
// TODO we are putting Details::IndexInfo with a primary_key = None, this is not cool bro'
task.details = task.kind.default_details();
}
Ok(tasks)
} }
} }
} }
@ -457,28 +483,30 @@ impl IndexScheduler {
operation: IndexOperation, operation: IndexOperation,
) -> Result<Vec<Task>> { ) -> Result<Vec<Task>> {
match operation { match operation {
IndexOperation::DocumentClear { IndexOperation::DocumentClear { mut tasks, .. } => {
index_uid,
mut tasks,
} => {
let result = milli::update::ClearDocuments::new(index_wtxn, index).execute(); let result = milli::update::ClearDocuments::new(index_wtxn, index).execute();
for task in &mut tasks { for task in &mut tasks {
match result { match result {
Ok(deleted_documents) => { Ok(deleted_documents) => {
task.status = Status::Succeeded;
task.details = Some(Details::ClearAll { task.details = Some(Details::ClearAll {
deleted_documents: Some(deleted_documents), deleted_documents: Some(deleted_documents),
}) });
}
Err(ref error) => {
task.status = Status::Failed;
task.error = Some(MilliError(error).into())
} }
Err(ref error) => task.error = Some(MilliError(error).into()),
} }
} }
Ok(tasks) Ok(tasks)
} }
IndexOperation::DocumentImport { IndexOperation::DocumentImport {
index_uid, index_uid: _,
primary_key, primary_key,
method, method,
documents_counts,
content_files, content_files,
mut tasks, mut tasks,
} => { } => {
@ -515,13 +543,10 @@ impl IndexScheduler {
builder = new_builder; builder = new_builder;
let user_result = match user_result { let user_result = match user_result {
Ok(count) => { Ok(count) => Ok(DocumentAdditionResult {
let addition = DocumentAdditionResult { indexed_documents: count,
indexed_documents: count, number_of_documents: count,
number_of_documents: count, }),
};
Ok(addition)
}
Err(e) => Err(IndexError::from(e)), Err(e) => Err(IndexError::from(e)),
}; };
@ -533,25 +558,36 @@ impl IndexScheduler {
info!("document addition done: {:?}", addition); info!("document addition done: {:?}", addition);
} }
for (task, ret) in tasks.iter_mut().zip(results) { for (task, (ret, count)) in tasks
.iter_mut()
.zip(results.into_iter().zip(documents_counts))
{
match ret { match ret {
Ok(DocumentAdditionResult { Ok(DocumentAdditionResult {
indexed_documents, indexed_documents,
number_of_documents, number_of_documents,
}) => { }) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentAddition { task.details = Some(Details::DocumentAddition {
received_documents: number_of_documents, received_documents: number_of_documents,
indexed_documents, indexed_documents,
}) });
}
Err(error) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentAddition {
received_documents: count,
indexed_documents: count,
});
task.error = Some(error.into())
} }
Err(error) => task.error = Some(error.into()),
} }
} }
Ok(tasks) Ok(tasks)
} }
IndexOperation::DocumentDeletion { IndexOperation::DocumentDeletion {
index_uid, index_uid: _,
documents, documents,
mut tasks, mut tasks,
} => { } => {
@ -559,27 +595,31 @@ impl IndexScheduler {
documents.iter().for_each(|id| { documents.iter().for_each(|id| {
builder.delete_external_id(id); builder.delete_external_id(id);
}); });
let result = builder.execute();
let result = builder.execute();
for (task, documents) in tasks.iter_mut().zip(documents) { for (task, documents) in tasks.iter_mut().zip(documents) {
match result { match result {
Ok(DocumentDeletionResult { Ok(DocumentDeletionResult {
deleted_documents, deleted_documents,
remaining_documents: _, remaining_documents: _,
}) => { }) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletion { task.details = Some(Details::DocumentDeletion {
received_document_ids: documents.len(), received_document_ids: documents.len(),
deleted_documents: Some(deleted_documents), deleted_documents: Some(deleted_documents),
}); });
} }
Err(ref error) => task.error = Some(MilliError(error).into()), Err(ref error) => {
task.status = Status::Failed;
task.error = Some(MilliError(error).into());
}
} }
} }
Ok(tasks) Ok(tasks)
} }
IndexOperation::Settings { IndexOperation::Settings {
index_uid, index_uid: _,
settings, settings,
mut tasks, mut tasks,
} => { } => {
@ -596,8 +636,12 @@ impl IndexScheduler {
debug!("update: {:?}", indexing_step); debug!("update: {:?}", indexing_step);
}); });
if let Err(ref error) = result { match result {
task.error = Some(MilliError(error).into()); Ok(_) => task.status = Status::Succeeded,
Err(ref error) => {
task.status = Status::Failed;
task.error = Some(MilliError(error).into());
}
} }
} }
@ -607,6 +651,7 @@ impl IndexScheduler {
index_uid, index_uid,
primary_key, primary_key,
method, method,
documents_counts,
content_files, content_files,
document_import_tasks, document_import_tasks,
settings, settings,
@ -629,6 +674,7 @@ impl IndexScheduler {
index_uid, index_uid,
primary_key, primary_key,
method, method,
documents_counts,
content_files, content_files,
tasks: document_import_tasks, tasks: document_import_tasks,
}, },

View File

@ -1,7 +1,7 @@
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, RwLock, RwLockWriteGuard}; use std::sync::{Arc, RwLock};
use std::{fs, thread}; use std::{fs, thread};
use log::error; use log::error;

View File

@ -400,7 +400,6 @@ impl IndexScheduler {
for mut task in tasks { for mut task in tasks {
task.started_at = Some(started_at); task.started_at = Some(started_at);
task.finished_at = Some(finished_at); task.finished_at = Some(finished_at);
task.status = Status::Succeeded;
// the info field should've been set by the process_batch function // the info field should've been set by the process_batch function
self.update_task(&mut wtxn, &task)?; self.update_task(&mut wtxn, &task)?;
@ -616,7 +615,7 @@ mod tests {
let (uuid, mut file) = index_scheduler.create_update_file().unwrap(); let (uuid, mut file) = index_scheduler.create_update_file().unwrap();
let documents_count = let documents_count =
document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap(); document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
index_scheduler index_scheduler
.register(KindWithContent::DocumentImport { .register(KindWithContent::DocumentImport {
index_uid: S("doggos"), index_uid: S("doggos"),

View File

@ -131,7 +131,7 @@ pub enum KindWithContent {
primary_key: Option<String>, primary_key: Option<String>,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
content_file: Uuid, content_file: Uuid,
documents_count: usize, documents_count: u64,
allow_index_creation: bool, allow_index_creation: bool,
}, },
DocumentDeletion { DocumentDeletion {
@ -255,6 +255,45 @@ impl KindWithContent {
IndexSwap { lhs, rhs } => Some(vec![lhs, rhs]), IndexSwap { lhs, rhs } => Some(vec![lhs, rhs]),
} }
} }
/// Returns the default `Details` that correspond to this `KindWithContent`,
/// `None` if it cannot be generated.
pub fn default_details(&self) -> Option<Details> {
match self {
KindWithContent::DocumentImport {
documents_count, ..
} => Some(Details::DocumentAddition {
received_documents: *documents_count,
indexed_documents: 0,
}),
KindWithContent::DocumentDeletion {
index_uid: _,
documents_ids,
} => Some(Details::DocumentDeletion {
received_document_ids: documents_ids.len(),
deleted_documents: None,
}),
KindWithContent::DocumentClear { .. } => Some(Details::ClearAll {
deleted_documents: None,
}),
KindWithContent::Settings { new_settings, .. } => Some(Details::Settings {
settings: new_settings.clone(),
}),
KindWithContent::IndexDeletion { .. } => Some(Details::IndexInfo { primary_key: None }),
KindWithContent::IndexCreation { primary_key, .. }
| KindWithContent::IndexUpdate { primary_key, .. } => Some(Details::IndexInfo {
primary_key: primary_key.clone(),
}),
KindWithContent::IndexSwap { .. } => {
todo!()
}
KindWithContent::CancelTask { .. } => {
todo!()
}
KindWithContent::DumpExport { .. } => None,
KindWithContent::Snapshot => None,
}
}
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]

View File

@ -274,7 +274,7 @@ async fn document_addition(
.await; .await;
let documents_count = match documents_count { let documents_count = match documents_count {
Ok(Ok(documents_count)) => documents_count, Ok(Ok(documents_count)) => documents_count as u64,
Ok(Err(e)) => { Ok(Err(e)) => {
index_scheduler.delete_update_file(uuid)?; index_scheduler.delete_update_file(uuid)?;
return Err(e); return Err(e);