Do not block when sending update notifications

This commit is contained in:
Tamo 2021-06-30 17:29:22 +02:00
parent a4ca79c9b3
commit a95c44193d
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69

View File

@ -8,6 +8,7 @@ use std::sync::Arc;
use std::{
collections::{BTreeMap, HashSet},
path::PathBuf,
time::Duration,
};
use arc_swap::ArcSwap;
@ -19,6 +20,8 @@ use log::error;
use parking_lot::{Mutex, MutexGuard};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::timeout;
use uuid::Uuid;
use codec::*;
@ -120,7 +123,7 @@ impl UpdateStore {
let state = Arc::new(StateLock::from_state(State::Idle));
let (notification_sender, notification_receiver) = mpsc::channel(10);
let (notification_sender, notification_receiver) = mpsc::channel(1);
Ok((
Self {
@ -146,22 +149,22 @@ impl UpdateStore {
let update_store = Arc::new(update_store);
// Send a first notification to trigger the process.
let _ = update_store.notification_sender.send(());
// Init update loop to perform any pending updates at launch.
// Since we just launched the update store, and we still own the receiving end of the
// channel, this call is guaranteed to succeed.
update_store
.notification_sender
.try_send(())
.expect("Failed to init update store");
if let Err(TrySendError::Closed(())) = update_store.notification_sender.try_send(()) {
panic!("Failed to init update store");
}
// We need a weak reference so we can take ownership on the arc later when we
// want to close the index.
let duration = Duration::from_secs(10 * 60); // 10 minutes
let update_store_weak = Arc::downgrade(&update_store);
tokio::task::spawn(async move {
// Block and wait for something to process.
'outer: while notification_receiver.recv().await.is_some() {
// Block and wait for something to process with a timeout. The timeout
// function returns a Result and we must just unlock the loop on Result.
'outer: while timeout(duration, notification_receiver.recv())
.await
.transpose()
.map_or(false, |r| r.is_ok())
{
loop {
match update_store_weak.upgrade() {
Some(update_store) => {
@ -245,9 +248,10 @@ impl UpdateStore {
txn.commit()?;
self.notification_sender
.blocking_send(())
.expect("Update store loop exited.");
if let Err(TrySendError::Closed(())) = self.notification_sender.try_send(()) {
panic!("Update store loop exited");
}
Ok(meta)
}