From 8febbf64ceb2173996931ad58ff5e1bb253b740c Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Tue, 23 Jan 2024 16:52:48 +0100 Subject: [PATCH] Switch to tokio channel --- Cargo.lock | 1 + tracing-trace/Cargo.toml | 1 + tracing-trace/src/layer.rs | 19 ++++++++++++++----- tracing-trace/src/lib.rs | 2 +- 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8ef99a0cc..f3821c94a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5716,6 +5716,7 @@ dependencies = [ "serde", "serde_json", "stats_alloc", + "tokio", "tracing", "tracing-error", "tracing-subscriber", diff --git a/tracing-trace/Cargo.toml b/tracing-trace/Cargo.toml index 04b7494d4..da5e2b36c 100644 --- a/tracing-trace/Cargo.toml +++ b/tracing-trace/Cargo.toml @@ -18,4 +18,5 @@ byte-unit = { version = "4.0.19", default-features = false, features = [ "std", "serde", ] } +tokio = { version = "1.35.1", features = ["sync"] } diff --git a/tracing-trace/src/layer.rs b/tracing-trace/src/layer.rs index e3573fdd3..1b9aadfa7 100644 --- a/tracing-trace/src/layer.rs +++ b/tracing-trace/src/layer.rs @@ -19,7 +19,7 @@ use crate::{Error, Trace}; /// Layer that measures the time spent in spans. pub struct TraceLayer { - sender: std::sync::mpsc::Sender, + sender: tokio::sync::mpsc::UnboundedSender, callsites: RwLock>, start_time: std::time::Instant, memory_allocator: Option<&'static StatsAlloc>, @@ -27,7 +27,7 @@ pub struct TraceLayer { impl Trace { pub fn new(writer: W) -> (Self, TraceLayer) { - let (sender, receiver) = std::sync::mpsc::channel(); + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); let trace = Trace { writer, receiver }; let layer = TraceLayer { sender, @@ -42,7 +42,7 @@ impl Trace { writer: W, stats_alloc: &'static StatsAlloc, ) -> (Self, TraceLayer) { - let (sender, receiver) = std::sync::mpsc::channel(); + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); let trace = Trace { writer, receiver }; let layer = TraceLayer { sender, @@ -53,8 +53,17 @@ impl Trace { (trace, layer) } - pub fn receive(&mut self) -> Result, Error> { - let Ok(entry) = self.receiver.recv() else { + pub async fn receive(&mut self) -> Result, Error> { + let Some(entry) = self.receiver.recv().await else { + return Ok(ControlFlow::Break(())); + }; + self.write(entry)?; + Ok(ControlFlow::Continue(())) + } + + /// Panics if called from an asynchronous context + pub fn blocking_receive(&mut self) -> Result, Error> { + let Some(entry) = self.receiver.blocking_recv() else { return Ok(ControlFlow::Break(())); }; self.write(entry)?; diff --git a/tracing-trace/src/lib.rs b/tracing-trace/src/lib.rs index 5e0f46d47..3d00eef10 100644 --- a/tracing-trace/src/lib.rs +++ b/tracing-trace/src/lib.rs @@ -11,7 +11,7 @@ pub use error::Error; pub struct Trace { writer: W, - receiver: std::sync::mpsc::Receiver, + receiver: tokio::sync::mpsc::UnboundedReceiver, } pub struct TraceReader {