Introduce a notification channel for the UpdateStore

This commit is contained in:
Clément Renault 2020-10-18 16:37:37 +02:00
parent 83c1db8763
commit eca49e3a03
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 98 additions and 21 deletions

69
Cargo.lock generated
View File

@ -219,6 +219,12 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.13"
@ -251,13 +257,19 @@ dependencies = [
"bitflags",
]
[[package]]
name = "const_fn"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce90df4c658c62f12d78f7508cf92f9173e5184a539c10bfe54a3107b3ffd0f2"
[[package]]
name = "crc32fast"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
]
[[package]]
@ -296,6 +308,16 @@ dependencies = [
"itertools",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils 0.8.0",
]
[[package]]
name = "crossbeam-deque"
version = "0.7.3"
@ -303,7 +325,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
"crossbeam-utils 0.7.2",
"maybe-uninit",
]
@ -314,8 +336,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
dependencies = [
"autocfg 1.0.0",
"cfg-if",
"crossbeam-utils",
"cfg-if 0.1.10",
"crossbeam-utils 0.7.2",
"lazy_static",
"maybe-uninit",
"memoffset",
@ -328,8 +350,8 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab6bffe714b6bb07e42f201352c34f51fefd355ace793f9e638ebd52d23f98d2"
dependencies = [
"cfg-if",
"crossbeam-utils",
"cfg-if 0.1.10",
"crossbeam-utils 0.7.2",
]
[[package]]
@ -339,7 +361,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
dependencies = [
"autocfg 1.0.0",
"cfg-if",
"cfg-if 0.1.10",
"lazy_static",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec91540d98355f690a86367e566ecad2e9e579f230230eb7c21398372be73ea5"
dependencies = [
"autocfg 1.0.0",
"cfg-if 1.0.0",
"const_fn",
"lazy_static",
]
@ -398,7 +432,7 @@ version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "766d0e77a2c1502169d4a93ff3b8c15a71fd946cd0126309752104e5f3c46d94"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"crc32fast",
"libc",
"miniz_oxide",
@ -563,7 +597,7 @@ version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"libc",
"wasi",
]
@ -920,7 +954,7 @@ version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
]
[[package]]
@ -960,6 +994,7 @@ dependencies = [
"bstr",
"byteorder",
"criterion",
"crossbeam-channel",
"csv",
"flate2",
"fst",
@ -1041,7 +1076,7 @@ version = "0.6.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"fuchsia-zircon",
"fuchsia-zircon-sys",
"iovec",
@ -1131,7 +1166,7 @@ version = "0.2.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ba7c918ac76704fb42afcbbb43891e72731f3dcca3bef2a19786297baf14af7"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"libc",
"winapi 0.3.8",
]
@ -1144,7 +1179,7 @@ checksum = "85db2feff6bf70ebc3a4793191517d5f0331100a2f10f9bf93b5e5214f32b7b7"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"cfg-if 0.1.10",
"libc",
]
@ -1551,7 +1586,7 @@ checksum = "e92e15d89083484e11353891f1af602cc661426deb9564c298b270c726973280"
dependencies = [
"crossbeam-deque",
"crossbeam-queue",
"crossbeam-utils",
"crossbeam-utils 0.7.2",
"lazy_static",
"num_cpus",
]
@ -1805,7 +1840,7 @@ version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"libc",
"redox_syscall",
"winapi 0.3.8",
@ -1888,7 +1923,7 @@ version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"libc",
"rand 0.7.3",
"redox_syscall",
@ -2246,7 +2281,7 @@ version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"wasm-bindgen-macro",
]

View File

@ -9,6 +9,7 @@ default-run = "indexer"
anyhow = "1.0.28"
bstr = "0.2.13"
byteorder = "1.3.4"
crossbeam-channel = "0.5.0"
csv = "1.1.3"
flate2 = "1.0.17"
fst = "0.4.4"

View File

@ -1,7 +1,10 @@
use std::path::Path;
use std::sync::Arc;
use crossbeam_channel::{bounded, Sender, Receiver};
use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice};
use heed::{EnvOpenOptions, Env, Database};
use once_cell::sync::OnceCell;
use serde::{Serialize, Deserialize};
use crate::BEU64;
@ -12,15 +15,49 @@ pub struct UpdateStore<M> {
pending_meta: Database<OwnedType<BEU64>, SerdeJson<M>>,
pending: Database<OwnedType<BEU64>, ByteSlice>,
processed_meta: Database<OwnedType<BEU64>, SerdeJson<M>>,
notification_sender: Sender<()>,
}
impl<M: 'static> UpdateStore<M> {
pub fn open<P: AsRef<Path>>(options: EnvOpenOptions, path: P) -> heed::Result<UpdateStore<M>> {
impl<M: 'static + Send + Sync> UpdateStore<M> {
pub fn open<P, F>(
options: EnvOpenOptions,
path: P,
mut update_function: F,
) -> heed::Result<Arc<UpdateStore<M>>>
where
P: AsRef<Path>,
F: FnMut(u64, M, &[u8]) -> heed::Result<M> + Send + 'static,
M: for<'a> Deserialize<'a> + Serialize,
{
let env = options.open(path)?;
let pending_meta = env.create_database(Some("pending-meta"))?;
let pending = env.create_database(Some("pending"))?;
let processed_meta = env.create_database(Some("processed-meta"))?;
Ok(UpdateStore { env, pending, pending_meta, processed_meta })
let (notification_sender, notification_receiver) = bounded(1);
let update_store = Arc::new(UpdateStore {
env,
pending,
pending_meta,
processed_meta,
notification_sender,
});
let update_store_cloned = update_store.clone();
std::thread::spawn(move || {
// Block and wait for something to process.
for () in notification_receiver {
loop {
match update_store_cloned.process_pending_update(&mut update_function) {
Ok(Some(_)) => (),
Ok(None) => break,
Err(e) => eprintln!("error while processing update: {}", e),
}
}
}
});
Ok(update_store)
}
/// Returns the new biggest id to use to store the new update.
@ -64,13 +101,17 @@ impl<M: 'static> UpdateStore<M> {
wtxn.commit()?;
if let Err(e) = self.notification_sender.try_send(()) {
assert!(!e.is_disconnected(), "update notification channel is disconnected");
}
Ok(update_id)
}
/// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and
/// only writing the result meta to the processed-meta store *after* it has been processed.
pub fn process_pending_update<F>(&self, mut f: F) -> heed::Result<Option<(u64, M)>>
fn process_pending_update<F>(&self, mut f: F) -> heed::Result<Option<(u64, M)>>
where
F: FnMut(u64, M, &[u8]) -> heed::Result<M>,
M: for<'a> Deserialize<'a> + Serialize,