From b3c9b128d9b884417d3fc45e3ca3a32b57e9a7cb Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 13 Sep 2022 11:46:07 +0200 Subject: [PATCH] polish the global structure of the batch creation --- index-scheduler/src/batch.rs | 207 ++++++++++++++++++++++++++--------- index-scheduler/src/lib.rs | 8 +- index-scheduler/src/task.rs | 24 ++-- 3 files changed, 179 insertions(+), 60 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 8adba7534..1cf6e5bda 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1,5 +1,12 @@ -use crate::{autobatcher::BatchKind, task::Status, Error, IndexScheduler, Result, TaskId}; -use milli::{heed::RoTxn, update::IndexDocumentsMethod}; +use crate::{ + autobatcher::BatchKind, + task::{KindWithContent, Status}, + Error, IndexScheduler, Result, TaskId, +}; +use milli::{ + heed::{RoTxn, RwTxn}, + update::IndexDocumentsMethod, +}; use uuid::Uuid; use crate::{task::Kind, Task}; @@ -8,10 +15,100 @@ pub(crate) enum Batch { Cancel(Task), Snapshot(Vec), Dump(Vec), - IndexSpecific { index_uid: String, kind: BatchKind }, + // IndexSpecific { index_uid: String, kind: BatchKind }, + DocumentAddition { + index_uid: String, + primary_key: Option, + content_files: Vec, + tasks: Vec, + }, + SettingsAndDocumentAddition { + index_uid: String, + + primary_key: Option, + content_files: Vec, + document_addition_tasks: Vec, + + settings: Vec, + settings_tasks: Vec, + }, } impl IndexScheduler { + pub(crate) fn create_next_batch_index( + &self, + rtxn: &RoTxn, + index_uid: String, + batch: BatchKind, + ) -> Result> { + match batch { + BatchKind::ClearAll { ids } => todo!(), + BatchKind::DocumentAddition { addition_ids } => todo!(), + BatchKind::DocumentDeletion { deletion_ids } => todo!(), + BatchKind::ClearAllAndSettings { + other, + settings_ids, + } => todo!(), + BatchKind::SettingsAndDocumentAddition { + addition_ids, + settings_ids, + } => { + // you're not supposed to create an empty BatchKind. + assert!(addition_ids.len() > 0); + assert!(settings_ids.len() > 0); + + let document_addition_tasks = addition_ids + .iter() + .map(|tid| { + self.get_task(rtxn, *tid) + .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) + }) + .collect::>>()?; + let settings_tasks = settings_ids + .iter() + .map(|tid| { + self.get_task(rtxn, *tid) + .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) + }) + .collect::>>()?; + + let primary_key = match document_addition_tasks[0].kind { + KindWithContent::DocumentAddition { primary_key, .. } => primary_key, + _ => unreachable!(), + }; + let content_files = document_addition_tasks + .iter() + .map(|task| match task.kind { + KindWithContent::DocumentAddition { content_file, .. } => content_file, + _ => unreachable!(), + }) + .collect(); + + let settings = settings_tasks + .iter() + .map(|task| match task.kind { + KindWithContent::Settings { new_settings, .. } => new_settings.to_string(), + _ => unreachable!(), + }) + .collect(); + + Ok(Some(Batch::SettingsAndDocumentAddition { + index_uid, + primary_key, + content_files, + document_addition_tasks, + settings, + settings_tasks, + })) + } + BatchKind::Settings { settings_ids } => todo!(), + BatchKind::DeleteIndex { ids } => todo!(), + BatchKind::CreateIndex { id } => todo!(), + BatchKind::SwapIndex { id } => todo!(), + BatchKind::RenameIndex { id } => todo!(), + } + } + /// Create the next batch to be processed; /// 1. We get the *last* task to cancel. /// 2. We get the *next* snapshot to process. @@ -65,12 +162,9 @@ impl IndexScheduler { }) .collect::>>()?; - return Ok(crate::autobatcher::autobatch(enqueued).map(|batch_kind| { - Batch::IndexSpecific { - index_uid: index_name.to_string(), - kind: batch_kind, - } - })); + if let Some(batchkind) = crate::autobatcher::autobatch(enqueued) { + return self.create_next_batch_index(rtxn, index_name.to_string(), batchkind); + } } // If we found no tasks then we were notified for something that got autobatched @@ -80,53 +174,62 @@ impl IndexScheduler { pub(crate) fn process_batch(&self, wtxn: &mut RwTxn, batch: Batch) -> Result> { match batch { - Batch::IndexSpecific { index_uid, kind } => { - let index = create_index(); - match kind { - BatchKind::ClearAll { ids } => todo!(), - BatchKind::DocumentAddition { addition_ids } => { - let index = self.create_index(wtxn, &index_uid)?; - let ret = index.update_documents( - IndexDocumentsMethod::UpdateDocuments, - None, // TODO primary key - self.file_store, - content_files, - )?; + Batch::Cancel(_) => todo!(), + Batch::Snapshot(_) => todo!(), + Batch::Dump(_) => todo!(), + Batch::DocumentAddition { + index_uid, + primary_key, + content_files, + tasks, + } => todo!(), + Batch::SettingsAndDocumentAddition { + index_uid, + primary_key, + content_files, + document_addition_tasks, + settings, + settings_tasks, + } => { + let index = self.create_index(wtxn, &index_uid)?; + let mut updated_tasks = Vec::new(); - assert_eq!(ret.len(), tasks.len(), "Update documents must return the same number of `Result` than the number of tasks."); - - Ok(tasks - .into_iter() - .zip(ret) - .map(|(mut task, res)| match res { - Ok(info) => { - task.status = Status::Succeeded; - task.info = Some(info.to_string()); - } - Err(error) => { - task.status = Status::Failed; - task.error = Some(error.to_string()); - } - }) - .collect()) + /* + let ret = index.update_settings(settings)?; + for (ret, task) in ret.iter().zip(settings_tasks) { + match ret { + Ok(ret) => task.status = Some(ret), + Err(err) => task.error = Some(err), } - BatchKind::DocumentDeletion { deletion_ids } => todo!(), - BatchKind::ClearAllAndSettings { - other, - settings_ids, - } => todo!(), - BatchKind::SettingsAndDocumentAddition { - settings_ids, - addition_ids, - } => todo!(), - BatchKind::Settings { settings_ids } => todo!(), - BatchKind::DeleteIndex { ids } => todo!(), - BatchKind::CreateIndex { id } => todo!(), - BatchKind::SwapIndex { id } => todo!(), - BatchKind::RenameIndex { id } => todo!(), } + */ + + /* + for (ret, task) in ret.iter().zip(settings_tasks) { + match ret { + Ok(ret) => task.status = Some(ret), + Err(err) => task.error = Some(err), + } + updated_tasks.push(task); + } + */ + + let ret = index.update_documents( + IndexDocumentsMethod::ReplaceDocuments, + primary_key, + self.file_store, + content_files.into_iter(), + )?; + + for (ret, task) in ret.iter().zip(document_addition_tasks) { + match ret { + Ok(ret) => task.info = Some(format!("{:?}", ret)), + Err(err) => task.error = Some(err.to_string()), + } + updated_tasks.push(task); + } + Ok(updated_tasks) } - _ => unreachable!(), } } } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index accd13efa..02bf085dd 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -221,15 +221,21 @@ impl IndexScheduler { } }; let mut batch = match self.create_next_batch(&wtxn) { - Ok(batch) => batch, + Ok(Some(batch)) => batch, + Ok(None) => continue, Err(e) => { log::error!("{}", e); continue; } }; + // 1. store the starting date with the bitmap of processing tasks + // 2. update the tasks with a starting date *but* do not write anything on disk + // 3. process the tasks let res = self.process_batch(&mut wtxn, batch); + // 4. store the updated tasks on disk + // TODO: TAMO: do this later // must delete the file on disk // in case of error, must update the tasks with the error diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index c7bead937..82c8ac46b 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -20,8 +20,8 @@ pub enum Status { pub struct Task { pub uid: TaskId, - #[serde(with = "time::serde::rfc3339::option")] - pub enqueued_at: Option, + #[serde(with = "time::serde::rfc3339")] + pub enqueued_at: OffsetDateTime, #[serde(with = "time::serde::rfc3339::option")] pub started_at: Option, #[serde(with = "time::serde::rfc3339::option")] @@ -60,6 +60,7 @@ pub enum KindWithContent { Snapshot, DocumentAddition { index_name: String, + primary_key: Option, content_file: Uuid, }, DocumentDeletion { @@ -69,11 +70,11 @@ pub enum KindWithContent { ClearAllDocuments { index_name: String, }, - // TODO: TAMO: uncomment the settings - // Settings { - // index_name: String, - // new_settings: Settings, - // }, + Settings { + index_name: String, + // TODO: TAMO: fix the type + new_settings: String, + }, RenameIndex { index_name: String, new_name: String, @@ -107,6 +108,10 @@ impl KindWithContent { KindWithContent::SwapIndex { .. } => Kind::SwapIndex, KindWithContent::CancelTask { .. } => Kind::CancelTask, KindWithContent::Snapshot => Kind::Snapshot, + KindWithContent::Settings { + index_name, + new_settings, + } => todo!(), } } @@ -117,6 +122,7 @@ impl KindWithContent { DocumentAddition { index_name: _, content_file: _, + primary_key, } => { // TODO: TAMO: persist the file // content_file.persist(); @@ -124,6 +130,7 @@ impl KindWithContent { } // There is nothing to persist for all these tasks DumpExport { .. } + | Settings { .. } | DocumentDeletion { .. } | ClearAllDocuments { .. } | RenameIndex { .. } @@ -142,6 +149,7 @@ impl KindWithContent { DocumentAddition { index_name: _, content_file: _, + primary_key, } => { // TODO: TAMO: delete the file // content_file.delete(); @@ -149,6 +157,7 @@ impl KindWithContent { } // There is no data associated with all these tasks DumpExport { .. } + | Settings { .. } | DocumentDeletion { .. } | ClearAllDocuments { .. } | RenameIndex { .. } @@ -175,6 +184,7 @@ impl KindWithContent { new_name: rhs, } | SwapIndex { lhs, rhs } => Some(vec![lhs, rhs]), + Settings { index_name, .. } => Some(vec![index_name]), } } }