From 6f4dcc0c387409d64930b3e2fbbe66f10fc509e1 Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 26 Sep 2022 17:36:06 +0200 Subject: [PATCH] start implementing some logic to test the internal states of the scheduler --- Cargo.lock | 15 +++ index-scheduler/Cargo.toml | 1 + index-scheduler/src/index_scheduler.rs | 157 +++++++++++++++++++++++-- index-scheduler/src/lib.rs | 23 +--- 4 files changed, 164 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 88e9b64b2..55e514aca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -889,6 +889,20 @@ dependencies = [ "riscv", ] +[[package]] +name = "crossbeam" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -1826,6 +1840,7 @@ dependencies = [ "anyhow", "big_s", "bincode", + "crossbeam", "csv", "derive_builder", "document-formats", diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index 1b2818f00..a9df99866 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -25,6 +25,7 @@ synchronoise = "1.0.1" derive_builder = "0.11.2" [dev-dependencies] +crossbeam = "0.8.2" nelson = { git = "https://github.com/meilisearch/nelson.git", rev = "675f13885548fb415ead8fbb447e9e6d9314000a"} insta = { version = "1.19.1", features = ["json", "redactions"] } big_s = "1.0.2" diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs index b6b256ddb..d814aa72e 100644 --- a/index-scheduler/src/index_scheduler.rs +++ b/index-scheduler/src/index_scheduler.rs @@ -5,6 +5,7 @@ use file_store::{File, FileStore}; use index::Index; use milli::update::IndexerConfig; use synchronoise::SignalEvent; +use tempfile::TempDir; use time::OffsetDateTime; use uuid::Uuid; @@ -120,11 +121,33 @@ pub struct IndexScheduler { /// In charge of creating, opening, storing and returning indexes. pub(crate) index_mapper: IndexMapper, - // set to true when there is work to do. + /// Get a signal when a batch needs to be processed. pub(crate) wake_up: Arc, + + // ================= tests + // The next entries are dedicated to the tests. + // It helps us to stop the scheduler and check what it is doing efficiently + /// Provide a way to break in multiple part of the scheduler. + #[cfg(test)] + test_breakpoint_rcv: crossbeam::channel::Receiver, + #[cfg(test)] + test_breakpoint_sdr: crossbeam::channel::Sender, + + /// Hold a reference to its own tempdir to delete itself once dropped. + #[cfg(test)] + test_tempdir: Arc, +} + +#[cfg(test)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Breakpoint { + Start, + BatchCreated, + BatchProcessed, } impl IndexScheduler { + #[cfg(not(test))] pub fn new( tasks_path: PathBuf, update_file_path: PathBuf, @@ -140,9 +163,6 @@ impl IndexScheduler { options.max_dbs(6); let env = options.open(tasks_path)?; - // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things - let wake_up = SignalEvent::auto(true); - let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new()); let file_store = FileStore::new(&update_file_path)?; @@ -156,10 +176,52 @@ impl IndexScheduler { index_tasks: env.create_database(Some(db_name::INDEX_TASKS))?, index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)?, env, - wake_up: Arc::new(wake_up), + // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things + wake_up: Arc::new(SignalEvent::auto(true)), }) } + #[cfg(test)] + pub fn test() -> Self { + let dir = TempDir::new().unwrap(); + let tasks_path = dir.path().join("db_path"); + let update_file_path = dir.path().join("file_store"); + let indexes_path = dir.path().join("indexes"); + let index_size = 1024 * 1024; + let indexer_config = IndexerConfig::default(); + + std::fs::create_dir_all(&tasks_path).unwrap(); + std::fs::create_dir_all(&update_file_path).unwrap(); + std::fs::create_dir_all(&indexes_path).unwrap(); + + let mut options = heed::EnvOpenOptions::new(); + options.max_dbs(6); + + let env = options.open(tasks_path).unwrap(); + let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new()); + let file_store = FileStore::new(&update_file_path).unwrap(); + + let (sender, receiver) = crossbeam::channel::bounded(0); + + Self { + // by default there is no processing tasks + processing_tasks: Arc::new(RwLock::new(processing_tasks)), + file_store, + all_tasks: env.create_database(Some(db_name::ALL_TASKS)).unwrap(), + status: env.create_database(Some(db_name::STATUS)).unwrap(), + kind: env.create_database(Some(db_name::KIND)).unwrap(), + index_tasks: env.create_database(Some(db_name::INDEX_TASKS)).unwrap(), + index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config).unwrap(), + env, + // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things + wake_up: Arc::new(SignalEvent::auto(true)), + + test_breakpoint_rcv: receiver, + test_breakpoint_sdr: sender, + test_tempdir: Arc::new(dir), + } + } + /// Return the index corresponding to the name. If it wasn't opened before /// it'll be opened. But if it doesn't exist on disk it'll throw an /// `IndexNotFound` error. @@ -221,7 +283,7 @@ impl IndexScheduler { .map_err(|_| Error::CorruptedTaskQueue)? .clone(); - let mut ret = tasks.into_iter().map(|task| task.as_task_view()); + let ret = tasks.into_iter().map(|task| task.as_task_view()); if processing.is_empty() { Ok(ret.collect()) } else { @@ -309,6 +371,10 @@ impl IndexScheduler { /// Create and execute and store the result of one batch of registered tasks. fn tick(&self) -> Result<()> { + // We notifiy we're starting a tick. + #[cfg(test)] + self.test_breakpoint_sdr.send(Breakpoint::Start); + let mut wtxn = self.env.write_txn()?; let batch = match self.create_next_batch(&wtxn)? { Some(batch) => batch, @@ -322,6 +388,10 @@ impl IndexScheduler { let started_at = OffsetDateTime::now_utc(); *self.processing_tasks.write().unwrap() = (started_at, processing_tasks); + // We notifiy we've finished creating the tasks. + #[cfg(test)] + self.test_breakpoint_sdr.send(Breakpoint::BatchCreated); + // 2. process the tasks let res = self.process_batch(&mut wtxn, batch); @@ -358,6 +428,11 @@ impl IndexScheduler { wtxn.commit()?; log::info!("A batch of tasks was successfully completed."); + + // We notifiy we finished processing the tasks. + #[cfg(test)] + self.test_breakpoint_sdr.send(Breakpoint::BatchProcessed); + Ok(()) } @@ -365,6 +440,28 @@ impl IndexScheduler { pub fn notify(&self) { self.wake_up.signal() } + + /// /!\ Used only for tests purposes. + /// Wait until the provided breakpoint is reached. + #[cfg(test)] + fn test_wait_till(&self, breakpoint: Breakpoint) { + self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint); + } + + /// /!\ Used only for tests purposes. + /// Wait until the provided breakpoint is reached. + #[cfg(test)] + fn test_next_breakpoint(&self) -> Breakpoint { + self.test_breakpoint_rcv.recv().unwrap() + } + + /// /!\ Used only for tests purposes. + /// The scheduler will not stop on breakpoints. + #[cfg(test)] + fn test_dont_block(&self) { + // unroll and ignore all the state the scheduler is going to send us. + self.test_breakpoint_rcv.iter().last(); + } } #[cfg(test)] @@ -373,13 +470,13 @@ mod tests { use insta::*; use uuid::Uuid; - use crate::{assert_smol_debug_snapshot, tests::index_scheduler}; + use crate::assert_smol_debug_snapshot; use super::*; #[test] fn register() { - let (index_scheduler, _) = index_scheduler(); + let index_scheduler = IndexScheduler::test(); let kinds = [ KindWithContent::IndexCreation { index_uid: S("catto"), @@ -453,7 +550,7 @@ mod tests { #[test] fn document_addition() { - let (index_scheduler, _dir) = index_scheduler(); + let index_scheduler = IndexScheduler::test(); let content = r#" { @@ -474,7 +571,47 @@ mod tests { .unwrap(); file.persist().unwrap(); - index_scheduler.tick().unwrap(); + // After registering the task we should see the update being enqueued + let task = index_scheduler.get_tasks(Query::default()).unwrap(); + assert_json_snapshot!(task, + { "[].enqueuedAt" => "date", "[].startedAt" => "date", "[].finishedAt" => "date", "[].duration" => "duration" } + ,@r###" + [ + { + "uid": 0, + "indexUid": "doggos", + "status": "enqueued", + "type": "documentAddition", + "enqueuedAt": "date" + } + ] + "###); + + let t_index_scheduler = index_scheduler.clone(); + std::thread::spawn(move || t_index_scheduler.tick().unwrap()); + + index_scheduler.test_wait_till(Breakpoint::BatchCreated); + + // Once the task has started being batched it should be marked as processing + let task = index_scheduler.get_tasks(Query::default()).unwrap(); + assert_json_snapshot!(task, + { "[].enqueuedAt" => "date", "[].startedAt" => "date", "[].finishedAt" => "date", "[].duration" => "duration" } + ,@r###" + [ + { + "uid": 0, + "indexUid": "doggos", + "status": "processing", + "type": "documentAddition", + "enqueuedAt": "date", + "startedAt": "date" + } + ] + "###); + assert_eq!( + index_scheduler.test_next_breakpoint(), + Breakpoint::BatchProcessed + ); let task = index_scheduler.get_tasks(Query::default()).unwrap(); assert_json_snapshot!(task, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 8d0544331..a26d61213 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -15,11 +15,6 @@ pub use task::{Kind, KindWithContent, Status, TaskView}; #[cfg(test)] mod tests { - use milli::update::IndexerConfig; - use tempfile::TempDir; - - use crate::IndexScheduler; - #[macro_export] macro_rules! assert_smol_debug_snapshot { ($value:expr, @$snapshot:literal) => {{ @@ -36,24 +31,8 @@ mod tests { }}; } - pub fn index_scheduler() -> (IndexScheduler, TempDir) { - let dir = TempDir::new().unwrap(); - - ( - IndexScheduler::new( - dir.path().join("db_path"), - dir.path().join("file_store"), - dir.path().join("indexes"), - 1024 * 1024, - IndexerConfig::default(), - ) - .unwrap(), - dir, - ) - } - #[test] fn simple_new() { - index_scheduler(); + crate::IndexScheduler::test(); } }