From 22d24dba56b3aa3f06014d6a7411d5a3b3f69139 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 6 Sep 2022 23:49:19 +0200 Subject: [PATCH] implement the get_batch method --- index-scheduler/src/error.rs | 10 +++ index-scheduler/src/lib.rs | 168 ++++++++++++++++++++++++++++++++--- index-scheduler/src/task.rs | 98 ++++++++++++-------- 3 files changed, 226 insertions(+), 50 deletions(-) diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index bc9a2e4c7..5b467456b 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -1,7 +1,17 @@ +use milli::heed; use thiserror::Error; #[derive(Error, Debug)] pub enum Error { #[error("Index not found")] IndexNotFound, + #[error("Corrupted task queue.")] + CorruptedTaskQueue, + #[error(transparent)] + Heed(#[from] heed::Error), + #[error(transparent)] + Milli(#[from] milli::Error), + + #[error(transparent)] + Anyhow(#[from] anyhow::Error), } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 7dbf14623..244d6e5b9 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -4,18 +4,18 @@ pub mod task; use error::Error; use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str}; pub use task::Task; -use task::{Kind, Status}; +use task::{Kind, KindWithContent, Status}; use std::collections::hash_map::Entry; use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::{collections::HashMap, sync::RwLock}; -use anyhow::Result; use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; use milli::{Index, RoaringBitmapCodec, BEU32}; use roaring::RoaringBitmap; +pub type Result = std::result::Result; pub type TaskId = u32; type IndexName = String; type IndexUuid = String; @@ -42,12 +42,12 @@ pub struct IndexScheduler { // All the tasks ids grouped by their status. status: Database, RoaringBitmapCodec>, // All the tasks ids grouped by their kind. - kind: Database, RoaringBitmapCodec>, + kind: Database, RoaringBitmapCodec>, // Map an index name with an indexuuid. index_name_mapper: Database, // Store the tasks associated to an index. - index_tasks: Database, + index_tasks: Database, // set to true when there is work to do. wake_up: Arc, @@ -113,7 +113,7 @@ impl IndexScheduler { bitmap })?; - self.update_kind(&mut wtxn, &task.kind, |mut bitmap| { + self.update_kind(&mut wtxn, task.kind.as_kind(), |mut bitmap| { bitmap.insert(task_id); bitmap })?; @@ -134,20 +134,158 @@ impl IndexScheduler { Ok(()) } + /// Create the next batch to be processed; + /// 1. We get the *last* task to cancel. + /// 2. We get the *next* snapshot to process. + /// 3. We get the *next* dump to process. + /// 4. We get the *next* tasks to process for a specific index. + fn get_next_batch(&self, rtxn: &RoTxn) -> Result { + let enqueued = &self.get_status(rtxn, Status::Enqueued)?; + let to_cancel = self.get_kind(rtxn, Kind::CancelTask)? & enqueued; + + // 1. we get the last task to cancel. + if let Some(task_id) = to_cancel.max() { + return Ok(Batch::Cancel( + self.get_task(rtxn, task_id)? + .ok_or(Error::CorruptedTaskQueue)?, + )); + } + + // 2. we batch the snapshot. + let to_snapshot = self.get_kind(rtxn, Kind::Snapshot)? & enqueued; + if !to_snapshot.is_empty() { + return Ok(Batch::Snapshot(self.get_existing_tasks(rtxn, to_snapshot)?)); + } + + // 3. we batch the dumps. + let to_dump = self.get_kind(rtxn, Kind::DumpExport)? & enqueued; + if !to_dump.is_empty() { + return Ok(Batch::Dump(self.get_existing_tasks(rtxn, to_dump)?)); + } + + // 4. We take the next task and try to batch all the tasks associated with this index. + if let Some(task_id) = enqueued.min() { + let task = self + .get_task(rtxn, task_id)? + .ok_or(Error::CorruptedTaskQueue)?; + match task.kind { + // We can batch all the consecutive tasks coming next which + // have the kind `DocumentAddition`. + KindWithContent::DocumentAddition { index_name, .. } => { + return self.batch_contiguous_kind(rtxn, &index_name, Kind::DocumentAddition) + } + // We can batch all the consecutive tasks coming next which + // have the kind `DocumentDeletion`. + KindWithContent::DocumentDeletion { index_name, .. } => { + return self.batch_contiguous_kind(rtxn, &index_name, Kind::DocumentAddition) + } + // The following tasks can't be batched + KindWithContent::ClearAllDocuments { .. } + | KindWithContent::RenameIndex { .. } + | KindWithContent::CreateIndex { .. } + | KindWithContent::DeleteIndex { .. } + | KindWithContent::SwapIndex { .. } => return Ok(Batch::One(task)), + + // The following tasks have already been batched and thus can't appear here. + KindWithContent::CancelTask { .. } + | KindWithContent::DumpExport { .. } + | KindWithContent::Snapshot => { + unreachable!() + } + } + } + + // If we found no tasks then we were notified for something that got autobatched + // somehow and there is nothing to do. + Ok(Batch::Empty) + } + + /// Batch all the consecutive tasks coming next that shares the same `Kind` + /// for a specific index. There *MUST* be at least ONE task of this kind. + fn batch_contiguous_kind(&self, rtxn: &RoTxn, index: &str, kind: Kind) -> Result { + let enqueued = &self.get_status(rtxn, Status::Enqueued)?; + + // [1, 2, 4, 5] + let index_tasks = self.get_index(rtxn, &index)? & enqueued; + // [1, 2, 5] + let tasks_kind = &index_tasks & self.get_kind(rtxn, kind)?; + // [4] + let not_kind = &index_tasks - &tasks_kind; + + // [1, 2] + let mut to_process = tasks_kind.clone(); + if let Some(max) = not_kind.max() { + // it's safe to unwrap since we already ensured there + // was AT LEAST one task with the document addition tasks_kind. + to_process.remove_range(tasks_kind.min().unwrap()..max); + } + + Ok(Batch::Contiguous { + tasks: self.get_existing_tasks(rtxn, to_process)?, + kind, + }) + } + + fn get_task(&self, rtxn: &RoTxn, task_id: TaskId) -> Result> { + Ok(self.all_tasks.get(rtxn, &BEU32::new(task_id))?) + } + pub fn notify(&self) { self.wake_up .store(true, std::sync::atomic::Ordering::Relaxed); } + // =========== Utility functions on the DBs + + /// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a + // `CorruptedTaskQueue` error will be throwed. + fn get_existing_tasks( + &self, + rtxn: &RoTxn, + tasks: impl IntoIterator, + ) -> Result> { + tasks + .into_iter() + .map(|task_id| { + self.get_task(rtxn, task_id) + .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) + }) + .collect::>() + } + + fn get_index(&self, rtxn: &RoTxn, index: &str) -> Result { + Ok(self.index_tasks.get(&rtxn, index)?.unwrap_or_default()) + } + + fn put_index(&self, wtxn: &mut RwTxn, index: &str, bitmap: &RoaringBitmap) -> Result<()> { + Ok(self.index_tasks.put(wtxn, index, bitmap)?) + } + + fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result { + Ok(self.status.get(&rtxn, &status)?.unwrap_or_default()) + } + + fn put_status(&self, wtxn: &mut RwTxn, status: Status, bitmap: &RoaringBitmap) -> Result<()> { + Ok(self.status.put(wtxn, &status, bitmap)?) + } + + fn get_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result { + Ok(self.kind.get(&rtxn, &kind)?.unwrap_or_default()) + } + + fn put_kind(&self, wtxn: &mut RwTxn, kind: Kind, bitmap: &RoaringBitmap) -> Result<()> { + Ok(self.kind.put(wtxn, &kind, bitmap)?) + } + fn update_status( &self, wtxn: &mut RwTxn, status: Status, f: impl Fn(RoaringBitmap) -> RoaringBitmap, ) -> Result<()> { - let tasks = self.status.get(&wtxn, &status)?.unwrap_or_default(); + let tasks = self.get_status(&wtxn, status)?; let tasks = f(tasks); - self.status.put(wtxn, &status, &tasks)?; + self.put_status(wtxn, status, &tasks)?; Ok(()) } @@ -155,14 +293,22 @@ impl IndexScheduler { fn update_kind( &self, wtxn: &mut RwTxn, - kind: &Kind, + kind: Kind, f: impl Fn(RoaringBitmap) -> RoaringBitmap, ) -> Result<()> { - let kind = BEU32::new(kind.to_u32()); - let tasks = self.kind.get(&wtxn, &kind)?.unwrap_or_default(); + let tasks = self.get_kind(&wtxn, kind)?; let tasks = f(tasks); - self.kind.put(wtxn, &kind, &tasks)?; + self.put_kind(wtxn, kind, &tasks)?; Ok(()) } } + +enum Batch { + Cancel(Task), + Snapshot(Vec), + Dump(Vec), + Contiguous { tasks: Vec, kind: Kind }, + One(Task), + Empty, +} diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index 3c928c280..06ee76dfd 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -25,7 +25,7 @@ pub struct Task { pub finished_at: Option, pub status: Status, - pub kind: Kind, + pub kind: KindWithContent, } impl Task { @@ -40,10 +40,11 @@ impl Task { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub enum Kind { +pub enum KindWithContent { DumpExport { output: PathBuf, }, + Snapshot, DocumentAddition { index_name: String, content_file: String, @@ -80,62 +81,81 @@ pub enum Kind { }, } -impl Kind { +impl KindWithContent { + pub fn as_kind(&self) -> Kind { + match self { + KindWithContent::DumpExport { .. } => Kind::DumpExport, + KindWithContent::DocumentAddition { .. } => Kind::DocumentAddition, + KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion, + KindWithContent::ClearAllDocuments { .. } => Kind::ClearAllDocuments, + KindWithContent::RenameIndex { .. } => Kind::RenameIndex, + KindWithContent::CreateIndex { .. } => Kind::CreateIndex, + KindWithContent::DeleteIndex { .. } => Kind::DeleteIndex, + KindWithContent::SwapIndex { .. } => Kind::SwapIndex, + KindWithContent::CancelTask { .. } => Kind::CancelTask, + KindWithContent::Snapshot => Kind::Snapshot, + } + } + pub fn persist(&self) -> Result<()> { match self { - Kind::DocumentAddition { - index_name, - content_file, + KindWithContent::DocumentAddition { + index_name: _, + content_file: _, } => { // TODO: TAMO: persist the file // content_file.persist(); Ok(()) } // There is nothing to persist for all these tasks - Kind::DumpExport { .. } - | Kind::DocumentDeletion { .. } - | Kind::ClearAllDocuments { .. } - | Kind::RenameIndex { .. } - | Kind::CreateIndex { .. } - | Kind::DeleteIndex { .. } - | Kind::SwapIndex { .. } - | Kind::CancelTask { .. } => Ok(()), + KindWithContent::DumpExport { .. } + | KindWithContent::DocumentDeletion { .. } + | KindWithContent::ClearAllDocuments { .. } + | KindWithContent::RenameIndex { .. } + | KindWithContent::CreateIndex { .. } + | KindWithContent::DeleteIndex { .. } + | KindWithContent::SwapIndex { .. } + | KindWithContent::CancelTask { .. } + | KindWithContent::Snapshot => Ok(()), } } pub fn remove_data(&self) -> Result<()> { match self { - Kind::DocumentAddition { - index_name, - content_file, + KindWithContent::DocumentAddition { + index_name: _, + content_file: _, } => { // TODO: TAMO: delete the file // content_file.delete(); Ok(()) } // There is no data associated with all these tasks - Kind::DumpExport { .. } - | Kind::DocumentDeletion { .. } - | Kind::ClearAllDocuments { .. } - | Kind::RenameIndex { .. } - | Kind::CreateIndex { .. } - | Kind::DeleteIndex { .. } - | Kind::SwapIndex { .. } - | Kind::CancelTask { .. } => Ok(()), - } - } - - pub fn to_u32(&self) -> u32 { - match self { - Kind::DumpExport { .. } => 0, - Kind::DocumentAddition { .. } => 1, - Kind::DocumentDeletion { .. } => 2, - Kind::ClearAllDocuments { .. } => 3, - Kind::RenameIndex { .. } => 4, - Kind::CreateIndex { .. } => 5, - Kind::DeleteIndex { .. } => 6, - Kind::SwapIndex { .. } => 7, - Kind::CancelTask { .. } => 8, + KindWithContent::DumpExport { .. } + | KindWithContent::DocumentDeletion { .. } + | KindWithContent::ClearAllDocuments { .. } + | KindWithContent::RenameIndex { .. } + | KindWithContent::CreateIndex { .. } + | KindWithContent::DeleteIndex { .. } + | KindWithContent::SwapIndex { .. } + | KindWithContent::CancelTask { .. } + | KindWithContent::Snapshot => Ok(()), } } } + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Kind { + CancelTask, + ClearAllDocuments, + CreateIndex, + DeleteIndex, + DocumentAddition, + DocumentDeletion, + DumpExport, + RenameIndex, + Settings, + Snapshot, + SwapIndex, +}