start implementing some logic to test the internal states of the scheduler

This commit is contained in:
Tamo 2022-09-26 17:36:06 +02:00 committed by Clément Renault
parent 84cd5cef0b
commit 6f4dcc0c38
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
4 changed files with 164 additions and 32 deletions

15
Cargo.lock generated
View File

@ -889,6 +889,20 @@ dependencies = [
"riscv", "riscv",
] ]
[[package]]
name = "crossbeam"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c"
dependencies = [
"cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.6" version = "0.5.6"
@ -1826,6 +1840,7 @@ dependencies = [
"anyhow", "anyhow",
"big_s", "big_s",
"bincode", "bincode",
"crossbeam",
"csv", "csv",
"derive_builder", "derive_builder",
"document-formats", "document-formats",

View File

@ -25,6 +25,7 @@ synchronoise = "1.0.1"
derive_builder = "0.11.2" derive_builder = "0.11.2"
[dev-dependencies] [dev-dependencies]
crossbeam = "0.8.2"
nelson = { git = "https://github.com/meilisearch/nelson.git", rev = "675f13885548fb415ead8fbb447e9e6d9314000a"} nelson = { git = "https://github.com/meilisearch/nelson.git", rev = "675f13885548fb415ead8fbb447e9e6d9314000a"}
insta = { version = "1.19.1", features = ["json", "redactions"] } insta = { version = "1.19.1", features = ["json", "redactions"] }
big_s = "1.0.2" big_s = "1.0.2"

View File

@ -5,6 +5,7 @@ use file_store::{File, FileStore};
use index::Index; use index::Index;
use milli::update::IndexerConfig; use milli::update::IndexerConfig;
use synchronoise::SignalEvent; use synchronoise::SignalEvent;
use tempfile::TempDir;
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
@ -120,11 +121,33 @@ pub struct IndexScheduler {
/// In charge of creating, opening, storing and returning indexes. /// In charge of creating, opening, storing and returning indexes.
pub(crate) index_mapper: IndexMapper, pub(crate) index_mapper: IndexMapper,
// set to true when there is work to do. /// Get a signal when a batch needs to be processed.
pub(crate) wake_up: Arc<SignalEvent>, pub(crate) wake_up: Arc<SignalEvent>,
// ================= tests
// The next entries are dedicated to the tests.
// It helps us to stop the scheduler and check what it is doing efficiently
/// Provide a way to break in multiple part of the scheduler.
#[cfg(test)]
test_breakpoint_rcv: crossbeam::channel::Receiver<Breakpoint>,
#[cfg(test)]
test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>,
/// Hold a reference to its own tempdir to delete itself once dropped.
#[cfg(test)]
test_tempdir: Arc<TempDir>,
}
#[cfg(test)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Breakpoint {
Start,
BatchCreated,
BatchProcessed,
} }
impl IndexScheduler { impl IndexScheduler {
#[cfg(not(test))]
pub fn new( pub fn new(
tasks_path: PathBuf, tasks_path: PathBuf,
update_file_path: PathBuf, update_file_path: PathBuf,
@ -140,9 +163,6 @@ impl IndexScheduler {
options.max_dbs(6); options.max_dbs(6);
let env = options.open(tasks_path)?; let env = options.open(tasks_path)?;
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
let wake_up = SignalEvent::auto(true);
let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new()); let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new());
let file_store = FileStore::new(&update_file_path)?; let file_store = FileStore::new(&update_file_path)?;
@ -156,10 +176,52 @@ impl IndexScheduler {
index_tasks: env.create_database(Some(db_name::INDEX_TASKS))?, index_tasks: env.create_database(Some(db_name::INDEX_TASKS))?,
index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)?, index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)?,
env, env,
wake_up: Arc::new(wake_up), // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
}) })
} }
#[cfg(test)]
pub fn test() -> Self {
let dir = TempDir::new().unwrap();
let tasks_path = dir.path().join("db_path");
let update_file_path = dir.path().join("file_store");
let indexes_path = dir.path().join("indexes");
let index_size = 1024 * 1024;
let indexer_config = IndexerConfig::default();
std::fs::create_dir_all(&tasks_path).unwrap();
std::fs::create_dir_all(&update_file_path).unwrap();
std::fs::create_dir_all(&indexes_path).unwrap();
let mut options = heed::EnvOpenOptions::new();
options.max_dbs(6);
let env = options.open(tasks_path).unwrap();
let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new());
let file_store = FileStore::new(&update_file_path).unwrap();
let (sender, receiver) = crossbeam::channel::bounded(0);
Self {
// by default there is no processing tasks
processing_tasks: Arc::new(RwLock::new(processing_tasks)),
file_store,
all_tasks: env.create_database(Some(db_name::ALL_TASKS)).unwrap(),
status: env.create_database(Some(db_name::STATUS)).unwrap(),
kind: env.create_database(Some(db_name::KIND)).unwrap(),
index_tasks: env.create_database(Some(db_name::INDEX_TASKS)).unwrap(),
index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config).unwrap(),
env,
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
test_breakpoint_rcv: receiver,
test_breakpoint_sdr: sender,
test_tempdir: Arc::new(dir),
}
}
/// Return the index corresponding to the name. If it wasn't opened before /// Return the index corresponding to the name. If it wasn't opened before
/// it'll be opened. But if it doesn't exist on disk it'll throw an /// it'll be opened. But if it doesn't exist on disk it'll throw an
/// `IndexNotFound` error. /// `IndexNotFound` error.
@ -221,7 +283,7 @@ impl IndexScheduler {
.map_err(|_| Error::CorruptedTaskQueue)? .map_err(|_| Error::CorruptedTaskQueue)?
.clone(); .clone();
let mut ret = tasks.into_iter().map(|task| task.as_task_view()); let ret = tasks.into_iter().map(|task| task.as_task_view());
if processing.is_empty() { if processing.is_empty() {
Ok(ret.collect()) Ok(ret.collect())
} else { } else {
@ -309,6 +371,10 @@ impl IndexScheduler {
/// Create and execute and store the result of one batch of registered tasks. /// Create and execute and store the result of one batch of registered tasks.
fn tick(&self) -> Result<()> { fn tick(&self) -> Result<()> {
// We notifiy we're starting a tick.
#[cfg(test)]
self.test_breakpoint_sdr.send(Breakpoint::Start);
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
let batch = match self.create_next_batch(&wtxn)? { let batch = match self.create_next_batch(&wtxn)? {
Some(batch) => batch, Some(batch) => batch,
@ -322,6 +388,10 @@ impl IndexScheduler {
let started_at = OffsetDateTime::now_utc(); let started_at = OffsetDateTime::now_utc();
*self.processing_tasks.write().unwrap() = (started_at, processing_tasks); *self.processing_tasks.write().unwrap() = (started_at, processing_tasks);
// We notifiy we've finished creating the tasks.
#[cfg(test)]
self.test_breakpoint_sdr.send(Breakpoint::BatchCreated);
// 2. process the tasks // 2. process the tasks
let res = self.process_batch(&mut wtxn, batch); let res = self.process_batch(&mut wtxn, batch);
@ -358,6 +428,11 @@ impl IndexScheduler {
wtxn.commit()?; wtxn.commit()?;
log::info!("A batch of tasks was successfully completed."); log::info!("A batch of tasks was successfully completed.");
// We notifiy we finished processing the tasks.
#[cfg(test)]
self.test_breakpoint_sdr.send(Breakpoint::BatchProcessed);
Ok(()) Ok(())
} }
@ -365,6 +440,28 @@ impl IndexScheduler {
pub fn notify(&self) { pub fn notify(&self) {
self.wake_up.signal() self.wake_up.signal()
} }
/// /!\ Used only for tests purposes.
/// Wait until the provided breakpoint is reached.
#[cfg(test)]
fn test_wait_till(&self, breakpoint: Breakpoint) {
self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint);
}
/// /!\ Used only for tests purposes.
/// Wait until the provided breakpoint is reached.
#[cfg(test)]
fn test_next_breakpoint(&self) -> Breakpoint {
self.test_breakpoint_rcv.recv().unwrap()
}
/// /!\ Used only for tests purposes.
/// The scheduler will not stop on breakpoints.
#[cfg(test)]
fn test_dont_block(&self) {
// unroll and ignore all the state the scheduler is going to send us.
self.test_breakpoint_rcv.iter().last();
}
} }
#[cfg(test)] #[cfg(test)]
@ -373,13 +470,13 @@ mod tests {
use insta::*; use insta::*;
use uuid::Uuid; use uuid::Uuid;
use crate::{assert_smol_debug_snapshot, tests::index_scheduler}; use crate::assert_smol_debug_snapshot;
use super::*; use super::*;
#[test] #[test]
fn register() { fn register() {
let (index_scheduler, _) = index_scheduler(); let index_scheduler = IndexScheduler::test();
let kinds = [ let kinds = [
KindWithContent::IndexCreation { KindWithContent::IndexCreation {
index_uid: S("catto"), index_uid: S("catto"),
@ -453,7 +550,7 @@ mod tests {
#[test] #[test]
fn document_addition() { fn document_addition() {
let (index_scheduler, _dir) = index_scheduler(); let index_scheduler = IndexScheduler::test();
let content = r#" let content = r#"
{ {
@ -474,7 +571,47 @@ mod tests {
.unwrap(); .unwrap();
file.persist().unwrap(); file.persist().unwrap();
index_scheduler.tick().unwrap(); // After registering the task we should see the update being enqueued
let task = index_scheduler.get_tasks(Query::default()).unwrap();
assert_json_snapshot!(task,
{ "[].enqueuedAt" => "date", "[].startedAt" => "date", "[].finishedAt" => "date", "[].duration" => "duration" }
,@r###"
[
{
"uid": 0,
"indexUid": "doggos",
"status": "enqueued",
"type": "documentAddition",
"enqueuedAt": "date"
}
]
"###);
let t_index_scheduler = index_scheduler.clone();
std::thread::spawn(move || t_index_scheduler.tick().unwrap());
index_scheduler.test_wait_till(Breakpoint::BatchCreated);
// Once the task has started being batched it should be marked as processing
let task = index_scheduler.get_tasks(Query::default()).unwrap();
assert_json_snapshot!(task,
{ "[].enqueuedAt" => "date", "[].startedAt" => "date", "[].finishedAt" => "date", "[].duration" => "duration" }
,@r###"
[
{
"uid": 0,
"indexUid": "doggos",
"status": "processing",
"type": "documentAddition",
"enqueuedAt": "date",
"startedAt": "date"
}
]
"###);
assert_eq!(
index_scheduler.test_next_breakpoint(),
Breakpoint::BatchProcessed
);
let task = index_scheduler.get_tasks(Query::default()).unwrap(); let task = index_scheduler.get_tasks(Query::default()).unwrap();
assert_json_snapshot!(task, assert_json_snapshot!(task,

View File

@ -15,11 +15,6 @@ pub use task::{Kind, KindWithContent, Status, TaskView};
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use milli::update::IndexerConfig;
use tempfile::TempDir;
use crate::IndexScheduler;
#[macro_export] #[macro_export]
macro_rules! assert_smol_debug_snapshot { macro_rules! assert_smol_debug_snapshot {
($value:expr, @$snapshot:literal) => {{ ($value:expr, @$snapshot:literal) => {{
@ -36,24 +31,8 @@ mod tests {
}}; }};
} }
pub fn index_scheduler() -> (IndexScheduler, TempDir) {
let dir = TempDir::new().unwrap();
(
IndexScheduler::new(
dir.path().join("db_path"),
dir.path().join("file_store"),
dir.path().join("indexes"),
1024 * 1024,
IndexerConfig::default(),
)
.unwrap(),
dir,
)
}
#[test] #[test]
fn simple_new() { fn simple_new() {
index_scheduler(); crate::IndexScheduler::test();
} }
} }