move as many fields as possible out of the IndexScheduler

This commit is contained in:
Tamo 2022-09-26 19:56:06 +02:00 committed by Clément Renault
parent 9e1f38ec7c
commit 64e132ce53
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -5,13 +5,11 @@ use file_store::{File, FileStore};
use index::Index; use index::Index;
use milli::update::IndexerConfig; use milli::update::IndexerConfig;
use synchronoise::SignalEvent; use synchronoise::SignalEvent;
use tempfile::TempDir;
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::{Arc, RwLock};
use std::sync::RwLock;
use milli::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use milli::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use milli::heed::{self, Database, Env}; use milli::heed::{self, Database, Env};
@ -124,18 +122,11 @@ pub struct IndexScheduler {
/// Get a signal when a batch needs to be processed. /// Get a signal when a batch needs to be processed.
pub(crate) wake_up: Arc<SignalEvent>, pub(crate) wake_up: Arc<SignalEvent>,
// ================= tests // ================= test
// The next entries are dedicated to the tests. /// The next entry is dedicated to the tests.
// It helps us to stop the scheduler and check what it is doing efficiently /// It provide a way to break in multiple part of the scheduler.
/// Provide a way to break in multiple part of the scheduler.
#[cfg(test)]
test_breakpoint_rcv: crossbeam::channel::Receiver<Breakpoint>,
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>, test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>,
/// Hold a reference to its own tempdir to delete itself once dropped.
#[cfg(test)]
test_tempdir: Arc<TempDir>,
} }
#[cfg(test)] #[cfg(test)]
@ -147,7 +138,6 @@ enum Breakpoint {
} }
impl IndexScheduler { impl IndexScheduler {
#[cfg(not(test))]
pub fn new( pub fn new(
tasks_path: PathBuf, tasks_path: PathBuf,
update_file_path: PathBuf, update_file_path: PathBuf,
@ -166,6 +156,8 @@ impl IndexScheduler {
let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new()); let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new());
let file_store = FileStore::new(&update_file_path)?; let file_store = FileStore::new(&update_file_path)?;
// allow unreachable_code to get rids of the warning in the case of a test build.
#[allow(unreachable_code)]
Ok(Self { Ok(Self {
// by default there is no processing tasks // by default there is no processing tasks
processing_tasks: Arc::new(RwLock::new(processing_tasks)), processing_tasks: Arc::new(RwLock::new(processing_tasks)),
@ -178,6 +170,11 @@ impl IndexScheduler {
env, env,
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)), wake_up: Arc::new(SignalEvent::auto(true)),
#[cfg(test)]
test_breakpoint_sdr: panic!(
"Can't use `IndexScheduler::new` in the tests. See `IndexScheduler::test`."
),
}) })
} }
@ -403,34 +400,15 @@ impl IndexScheduler {
pub fn notify(&self) { pub fn notify(&self) {
self.wake_up.signal() self.wake_up.signal()
} }
/// /!\ Used only for tests purposes.
/// Wait until the provided breakpoint is reached.
#[cfg(test)]
fn test_wait_till(&self, breakpoint: Breakpoint) {
self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint);
}
/// /!\ Used only for tests purposes.
/// Wait until the provided breakpoint is reached.
#[cfg(test)]
fn test_next_breakpoint(&self) -> Breakpoint {
self.test_breakpoint_rcv.recv().unwrap()
}
/// /!\ Used only for tests purposes.
/// The scheduler will not stop on breakpoints.
#[cfg(test)]
fn test_dont_block(&self) {
// unroll and ignore all the state the scheduler is going to send us.
self.test_breakpoint_rcv.iter().last();
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc;
use big_s::S; use big_s::S;
use insta::*; use insta::*;
use tempfile::TempDir;
use uuid::Uuid; use uuid::Uuid;
use crate::assert_smol_debug_snapshot; use crate::assert_smol_debug_snapshot;
@ -438,11 +416,11 @@ mod tests {
use super::*; use super::*;
impl IndexScheduler { impl IndexScheduler {
pub fn test() -> Self { pub fn test() -> (Self, IndexSchedulerHandle) {
let dir = TempDir::new().unwrap(); let tempdir = TempDir::new().unwrap();
let tasks_path = dir.path().join("db_path"); let tasks_path = tempdir.path().join("db_path");
let update_file_path = dir.path().join("file_store"); let update_file_path = tempdir.path().join("file_store");
let indexes_path = dir.path().join("indexes"); let indexes_path = tempdir.path().join("indexes");
let index_size = 1024 * 1024; let index_size = 1024 * 1024;
let indexer_config = IndexerConfig::default(); let indexer_config = IndexerConfig::default();
@ -459,7 +437,7 @@ mod tests {
let (sender, receiver) = crossbeam::channel::bounded(0); let (sender, receiver) = crossbeam::channel::bounded(0);
Self { let index_scheduler = Self {
// by default there is no processing tasks // by default there is no processing tasks
processing_tasks: Arc::new(RwLock::new(processing_tasks)), processing_tasks: Arc::new(RwLock::new(processing_tasks)),
file_store, file_store,
@ -473,16 +451,43 @@ mod tests {
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)), wake_up: Arc::new(SignalEvent::auto(true)),
test_breakpoint_rcv: receiver,
test_breakpoint_sdr: sender, test_breakpoint_sdr: sender,
test_tempdir: Arc::new(dir), };
let index_scheduler_handle = IndexSchedulerHandle {
_tempdir: tempdir,
test_breakpoint_rcv: receiver,
};
(index_scheduler, index_scheduler_handle)
} }
} }
pub struct IndexSchedulerHandle {
_tempdir: TempDir,
test_breakpoint_rcv: crossbeam::channel::Receiver<Breakpoint>,
}
impl IndexSchedulerHandle {
/// Wait until the provided breakpoint is reached.
fn test_wait_till(&self, breakpoint: Breakpoint) {
self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint);
}
/// Wait until the provided breakpoint is reached.
fn test_next_breakpoint(&self) -> Breakpoint {
self.test_breakpoint_rcv.recv().unwrap()
}
/// The scheduler will not stop on breakpoints.
fn test_dont_block(&self) {
// unroll and ignore all the state the scheduler is going to send us.
self.test_breakpoint_rcv.iter().last();
}
} }
#[test] #[test]
fn register() { fn register() {
let index_scheduler = IndexScheduler::test(); let (index_scheduler, _handle) = IndexScheduler::test();
let kinds = [ let kinds = [
KindWithContent::IndexCreation { KindWithContent::IndexCreation {
index_uid: S("catto"), index_uid: S("catto"),
@ -556,7 +561,7 @@ mod tests {
#[test] #[test]
fn document_addition() { fn document_addition() {
let index_scheduler = IndexScheduler::test(); let (index_scheduler, handle) = IndexScheduler::test();
let content = r#" let content = r#"
{ {
@ -596,7 +601,7 @@ mod tests {
let t_index_scheduler = index_scheduler.clone(); let t_index_scheduler = index_scheduler.clone();
std::thread::spawn(move || t_index_scheduler.tick().unwrap()); std::thread::spawn(move || t_index_scheduler.tick().unwrap());
index_scheduler.test_wait_till(Breakpoint::BatchCreated); handle.test_wait_till(Breakpoint::BatchCreated);
// Once the task has started being batched it should be marked as processing // Once the task has started being batched it should be marked as processing
let task = index_scheduler.get_tasks(Query::default()).unwrap(); let task = index_scheduler.get_tasks(Query::default()).unwrap();
@ -615,7 +620,7 @@ mod tests {
] ]
"###); "###);
assert_eq!( assert_eq!(
index_scheduler.test_next_breakpoint(), handle.test_next_breakpoint(),
Breakpoint::BatchProcessed Breakpoint::BatchProcessed
); );