From 6b9689a1c00280930d2e3952486e3cfed4b94c1e Mon Sep 17 00:00:00 2001 From: Tamo Date: Fri, 9 Sep 2022 01:09:50 +0200 Subject: [PATCH] fix the whole batchKind thingy --- index-scheduler/src/batch.rs | 285 ++++++++++++++++++++++++++++++++++- index-scheduler/src/lib.rs | 7 + index-scheduler/src/task.rs | 3 + index-scheduler/src/utils.rs | 2 + 4 files changed, 292 insertions(+), 5 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 7ef056362..ff1281f8b 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -2,7 +2,8 @@ use crate::{ task::{KindWithContent, Status}, Error, IndexScheduler, Result, TaskId, }; -use milli::heed::RoTxn; +use milli::{heed::RoTxn, update::IndexDocumentsMethod}; +use uuid::Uuid; use crate::{task::Kind, Task}; @@ -10,9 +11,53 @@ pub(crate) enum Batch { Cancel(Task), Snapshot(Vec), Dump(Vec), - Contiguous { tasks: Vec, kind: Kind }, - One(Task), - Empty, + DocumentAddition { + index_uid: String, + tasks: Vec, + primary_key: Option, + content_files: Vec, + }, +} + +impl IndexScheduler { + /* + pub(crate) fn process_batch(&self, wtxn: &mut RwTxn, batch: Batch) -> Result> { + match batch { + Batch::DocumentAddition { + tasks, + primary_key, + content_files, + index_uid, + } => { + let index = self.create_index(wtxn, &index_uid)?; + let ret = index.update_documents( + IndexDocumentsMethod::UpdateDocuments, + primary_key, + self.file_store, + content_files, + )?; + + assert_eq!(ret.len(), tasks.len(), "Update documents must return the same number of `Result` than the number of tasks."); + + Ok(tasks + .into_iter() + .zip(ret) + .map(|(mut task, res)| match res { + Ok(info) => { + task.status = Status::Succeeded; + task.info = Some(info.to_string()); + } + Err(error) => { + task.status = Status::Failed; + task.error = Some(error.to_string()); + } + }) + .collect()) + } + _ => unreachable!(), + } + } + */ } impl IndexScheduler { @@ -21,7 +66,7 @@ impl IndexScheduler { /// 2. We get the *next* snapshot to process. /// 3. We get the *next* dump to process. /// 4. We get the *next* tasks to process for a specific index. - pub(crate) fn get_next_batch(&self, rtxn: &RoTxn) -> Result { + pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result { let enqueued = &self.get_status(rtxn, Status::Enqueued)?; let to_cancel = self.get_kind(rtxn, Kind::CancelTask)? & enqueued; @@ -122,3 +167,233 @@ impl Batch { } } } + +pub(crate) enum BatchKind { + ClearAll { + ids: Vec, + }, + DocumentAddition { + addition_ids: Vec, + }, + DocumentDeletion { + deletion_ids: Vec, + }, + ClearAllAndSettings { + other: Vec, + settings_ids: Vec, + }, + SettingsAndDocumentAddition { + settings_ids: Vec, + addition_ids: Vec, + }, + Settings { + settings_ids: Vec, + }, + DeleteIndex { + ids: Vec, + }, + CreateIndex { + id: TaskId, + }, + SwapIndex { + id: TaskId, + }, + RenameIndex { + id: TaskId, + }, +} + +impl BatchKind { + /// return true if you must stop right there. + pub fn new(task_id: TaskId, kind: Kind) -> (Self, bool) { + match kind { + Kind::CreateIndex => (BatchKind::CreateIndex { id: task_id }, true), + Kind::DeleteIndex => (BatchKind::DeleteIndex { ids: vec![task_id] }, true), + Kind::RenameIndex => (BatchKind::RenameIndex { id: task_id }, true), + Kind::SwapIndex => (BatchKind::SwapIndex { id: task_id }, true), + Kind::ClearAllDocuments => (BatchKind::ClearAll { ids: vec![task_id] }, false), + Kind::DocumentAddition => ( + BatchKind::DocumentAddition { + addition_ids: vec![task_id], + }, + false, + ), + Kind::DocumentDeletion => ( + BatchKind::DocumentDeletion { + deletion_ids: vec![task_id], + }, + false, + ), + Kind::Settings => ( + BatchKind::Settings { + settings_ids: vec![task_id], + }, + false, + ), + + Kind::DumpExport | Kind::Snapshot | Kind::CancelTask => unreachable!(), + } + } + + /// Return true if you must stop. + fn accumulate(&mut self, id: TaskId, kind: Kind) -> bool { + match (self, kind) { + // must handle the deleteIndex + (_, Kind::CreateIndex | Kind::RenameIndex | Kind::SwapIndex) => true, + + (BatchKind::ClearAll { ids }, Kind::ClearAllDocuments | Kind::DocumentDeletion) => { + ids.push(id); + false + } + (BatchKind::ClearAll { .. }, Kind::DocumentAddition | Kind::Settings) => true, + (BatchKind::DocumentAddition { addition_ids }, Kind::ClearAllDocuments) => { + addition_ids.push(id); + *self = BatchKind::ClearAll { + ids: addition_ids.clone(), + }; + false + } + + (BatchKind::DocumentAddition { addition_ids }, Kind::DocumentAddition) => { + addition_ids.push(id); + false + } + (BatchKind::DocumentAddition { .. }, Kind::DocumentDeletion) => true, + (BatchKind::DocumentAddition { addition_ids }, Kind::Settings) => { + *self = BatchKind::SettingsAndDocumentAddition { + settings_ids: vec![id], + addition_ids: addition_ids.clone(), + }; + false + } + + (BatchKind::DocumentDeletion { deletion_ids }, Kind::ClearAllDocuments) => { + deletion_ids.push(id); + *self = BatchKind::ClearAll { + ids: deletion_ids.clone(), + }; + false + } + (BatchKind::DocumentDeletion { .. }, Kind::DocumentAddition) => true, + (BatchKind::DocumentDeletion { deletion_ids }, Kind::DocumentDeletion) => { + deletion_ids.push(id); + false + } + (BatchKind::DocumentDeletion { .. }, Kind::Settings) => true, + + (BatchKind::Settings { settings_ids }, Kind::ClearAllDocuments) => { + *self = BatchKind::ClearAllAndSettings { + settings_ids: settings_ids.clone(), + other: vec![id], + }; + false + } + (BatchKind::Settings { .. }, Kind::DocumentAddition) => true, + (BatchKind::Settings { .. }, Kind::DocumentDeletion) => true, + (BatchKind::Settings { settings_ids }, Kind::Settings) => { + settings_ids.push(id); + false + } + + ( + BatchKind::ClearAllAndSettings { + other, + settings_ids, + }, + Kind::ClearAllDocuments, + ) => { + other.push(id); + false + } + (BatchKind::ClearAllAndSettings { .. }, Kind::DocumentAddition) => true, + ( + BatchKind::ClearAllAndSettings { + other, + settings_ids, + }, + Kind::DocumentDeletion, + ) => { + other.push(id); + false + } + ( + BatchKind::ClearAllAndSettings { + settings_ids, + other, + }, + Kind::Settings, + ) => { + settings_ids.push(id); + false + } + ( + BatchKind::SettingsAndDocumentAddition { + settings_ids, + addition_ids, + }, + Kind::ClearAllDocuments, + ) => { + addition_ids.push(id); + *self = BatchKind::ClearAllAndSettings { + settings_ids: settings_ids.clone(), + other: addition_ids.clone(), + }; + false + } + ( + BatchKind::SettingsAndDocumentAddition { + settings_ids, + addition_ids, + }, + Kind::DocumentAddition, + ) => { + addition_ids.push(id); + false + } + ( + BatchKind::SettingsAndDocumentAddition { + settings_ids, + addition_ids, + }, + Kind::DocumentDeletion, + ) => true, + ( + BatchKind::SettingsAndDocumentAddition { + settings_ids, + addition_ids, + }, + Kind::Settings, + ) => { + settings_ids.push(id); + false + } + (_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(), + ( + BatchKind::CreateIndex { .. } + | BatchKind::DeleteIndex { .. } + | BatchKind::SwapIndex { .. } + | BatchKind::RenameIndex { .. }, + _, + ) => { + unreachable!() + } + } + } +} + +pub fn autobatcher(enqueued: Vec<(TaskId, Kind)>) -> Option { + let mut enqueued = enqueued.into_iter(); + let (id, kind) = enqueued.next()?; + let (mut acc, is_finished) = BatchKind::new(id, kind); + if is_finished { + return Some(acc); + } + + for (id, kind) in enqueued { + if acc.accumulate(id, kind) { + break; + } + } + + Some(acc) +} diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 35caea8ac..5da767416 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -80,6 +80,10 @@ pub struct IndexScheduler { } impl IndexScheduler { + pub fn create_index(&self, rwtxn: &mut RwTxn, name: &str) -> Result { + todo!() + } + /// Return the index corresponding to the name. If it wasn't opened before /// it'll be opened. But if it doesn't exist on disk it'll throw an /// `IndexNotFound` error. @@ -226,6 +230,9 @@ impl IndexScheduler { let res = self.process_batch(&mut wtxn, &mut batch); // TODO: TAMO: do this later + // must delete the file on disk + // in case of error, must update the tasks with the error + // in case of « success » we must update all the task on disk // self.handle_batch_result(res); match wtxn.commit() { diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index 3fdc1c2c3..c7bead937 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -27,6 +27,9 @@ pub struct Task { #[serde(with = "time::serde::rfc3339::option")] pub finished_at: Option, + pub error: Option, + pub info: Option, + pub status: Status, pub kind: KindWithContent, } diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index 99190e086..ca52de038 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -67,6 +67,8 @@ impl IndexScheduler { })?; } + // TODO: TAMO: update the task in `all_tasks` + Ok(()) }