diff --git a/Cargo.lock b/Cargo.lock index 74db62fda..43e15d05f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1791,6 +1791,7 @@ dependencies = [ "nelson", "roaring 0.9.0", "serde", + "synchronoise", "tempfile", "thiserror", "time", diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index 770ae4424..d6080ca4b 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -19,6 +19,7 @@ tempfile = "3.3.0" thiserror = "1.0.30" time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } uuid = { version = "1.1.2", features = ["serde", "v4"] } +synchronoise = "1.0.1" [dev-dependencies] nelson = { git = "https://github.com/meilisearch/nelson.git", rev = "675f13885548fb415ead8fbb447e9e6d9314000a"} diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 9a056ec82..6acfee57a 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -10,6 +10,7 @@ pub use error::Error; use file_store::FileStore; use index::Index; use index_mapper::IndexMapper; +use synchronoise::SignalEvent; pub use task::Task; use task::{Kind, KindWithContent, Status}; use time::OffsetDateTime; @@ -73,10 +74,16 @@ pub struct IndexScheduler { index_mapper: IndexMapper, // set to true when there is work to do. - wake_up: Arc, + wake_up: Arc, } impl IndexScheduler { + pub fn new() -> Self { + // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things + let wake_up = SignalEvent::auto(true); + todo!() + } + /// Return the index corresponding to the name. If it wasn't opened before /// it'll be opened. But if it doesn't exist on disk it'll throw an /// `IndexNotFound` error. @@ -166,8 +173,7 @@ impl IndexScheduler { /// This worker function must be run in a different thread and must be run only once. fn run(&self) { loop { - // TODO: TAMO: remove this horrible spinlock in favor of a sleep / channel / we’ll see - while !self.wake_up.swap(false, Ordering::Relaxed) {} + self.wake_up.wait(); let mut wtxn = match self.env.write_txn() { Ok(wtxn) => wtxn, @@ -370,7 +376,6 @@ impl IndexScheduler { /// Notify the scheduler there is or may be work to do. pub fn notify(&self) { - self.wake_up - .store(true, std::sync::atomic::Ordering::Relaxed); + self.wake_up.signal() } }