Brodacast the updates infos to every ws clients

This commit is contained in:
Clément Renault 2020-10-20 11:19:34 +02:00
parent 56c3a61d83
commit 35c9a3c558
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 22 additions and 45 deletions

33
Cargo.lock generated
View File

@ -77,17 +77,6 @@ dependencies = [
"warp", "warp",
] ]
[[package]]
name = "async-channel"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.11" version = "0.2.11"
@ -206,12 +195,6 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "cache-padded"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba"
[[package]] [[package]]
name = "cast" name = "cast"
version = "0.2.3" version = "0.2.3"
@ -274,15 +257,6 @@ dependencies = [
"bitflags", "bitflags",
] ]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]] [[package]]
name = "const_fn" name = "const_fn"
version = "0.4.2" version = "0.4.2"
@ -446,12 +420,6 @@ version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3"
[[package]]
name = "event-listener"
version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]] [[package]]
name = "fake-simd" name = "fake-simd"
version = "0.1.2" version = "0.1.2"
@ -1033,7 +1001,6 @@ dependencies = [
"anyhow", "anyhow",
"askama", "askama",
"askama_warp", "askama_warp",
"async-channel",
"bstr", "bstr",
"byteorder", "byteorder",
"bytes", "bytes",

View File

@ -6,7 +6,6 @@ edition = "2018"
[dependencies] [dependencies]
anyhow = "1.0.28" anyhow = "1.0.28"
async-channel = "1.5.1"
bstr = "0.2.13" bstr = "0.2.13"
byteorder = "1.3.4" byteorder = "1.3.4"
crossbeam-channel = "0.5.0" crossbeam-channel = "0.5.0"

View File

@ -8,13 +8,14 @@ use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use askama_warp::Template; use askama_warp::Template;
use futures::FutureExt; use futures::{FutureExt, StreamExt};
use futures::StreamExt; use futures::stream;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use serde::Deserialize; use serde::Deserialize;
use structopt::StructOpt; use structopt::StructOpt;
use tokio::fs::File as TFile; use tokio::fs::File as TFile;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::sync::broadcast;
use warp::filters::ws::Message; use warp::filters::ws::Message;
use warp::{Filter, http::Response}; use warp::{Filter, http::Response};
@ -110,15 +111,15 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
let update_store_path = opt.database.join("updates.mdb"); let update_store_path = opt.database.join("updates.mdb");
create_dir_all(&update_store_path)?; create_dir_all(&update_store_path)?;
let (update_status_sender, update_status_receiver) = async_channel::unbounded(); let (update_status_sender, _) = broadcast::channel(100);
let update_status_sender_cloned = update_status_sender.clone(); let update_status_sender_cloned = update_status_sender.clone();
let update_store = UpdateStore::open( let update_store = UpdateStore::open(
update_store_options, update_store_options,
update_store_path, update_store_path,
move |uid, meta: String, _content| { move |uid, meta: String, _content| {
let _ = update_status_sender_cloned.try_send(format!("processing update {}", uid)); let _ = update_status_sender_cloned.send(format!("processing update {}", uid));
std::thread::sleep(Duration::from_secs(3)); std::thread::sleep(Duration::from_secs(3));
let _ = update_status_sender_cloned.try_send(format!("update {} processed", uid)); let _ = update_status_sender_cloned.send(format!("update {} processed", uid));
Ok(meta) Ok(meta)
})?; })?;
@ -288,7 +289,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
async fn buf_stream( async fn buf_stream(
update_store: Arc<UpdateStore<String>>, update_store: Arc<UpdateStore<String>>,
update_status_sender: async_channel::Sender<String>, update_status_sender: broadcast::Sender<String>,
mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin, mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin,
) -> Result<impl warp::Reply, warp::Rejection> ) -> Result<impl warp::Reply, warp::Rejection>
{ {
@ -305,29 +306,39 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
let meta = String::from("I am the metadata"); let meta = String::from("I am the metadata");
let uid = update_store.register_update(&meta, &mmap[..]).unwrap(); let uid = update_store.register_update(&meta, &mmap[..]).unwrap();
update_status_sender.try_send(format!("update {} pending", uid)).unwrap(); update_status_sender.send(format!("update {} pending", uid)).unwrap();
eprintln!("Registering update {}", uid); eprintln!("Registering update {}", uid);
Ok(warp::reply()) Ok(warp::reply())
} }
let update_store_cloned = update_store.clone(); let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let indexing_route = warp::filters::method::post() let indexing_route = warp::filters::method::post()
.and(warp::path!("documents")) .and(warp::path!("documents"))
.and(warp::body::stream()) .and(warp::body::stream())
.and_then(move |stream| { .and_then(move |stream| {
buf_stream(update_store_cloned.clone(), update_status_sender.clone(), stream) buf_stream(update_store_cloned.clone(), update_status_sender_cloned.clone(), stream)
}); });
let update_ws_route = warp::ws() let update_ws_route = warp::ws()
.and(warp::path!("updates" / "ws")) .and(warp::path!("updates" / "ws"))
.map(move |ws: warp::ws::Ws| { .map(move |ws: warp::ws::Ws| {
// And then our closure will be called when it completes... // And then our closure will be called when it completes...
let update_status_receiver_cloned = update_status_receiver.clone(); let update_status_receiver = update_status_sender.subscribe();
ws.on_upgrade(|websocket| { ws.on_upgrade(|websocket| {
// Just echo all updates messages... // Just echo all updates messages...
update_status_receiver_cloned update_status_receiver
.map(|msg| Ok(Message::text(msg))) .into_stream()
.flat_map(|result| {
match result{
Ok(msg) => stream::iter(Some(Ok(Message::text(msg)))),
Err(e) => {
eprintln!("channel error: {:?}", e);
stream::iter(None)
},
}
})
.forward(websocket) .forward(websocket)
.map(|result| { .map(|result| {
if let Err(e) = result { if let Err(e) = result {