mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-15 13:58:36 +02:00
Add tracing-trace
This commit is contained in:
parent
a616a1d37b
commit
89401d097b
13 changed files with 998 additions and 6 deletions
18
tracing-trace/src/bin/trace-to-firefox.rs
Normal file
18
tracing-trace/src/bin/trace-to-firefox.rs
Normal file
|
@ -0,0 +1,18 @@
|
|||
use std::ffi::OsString;
|
||||
use std::io::Write;
|
||||
|
||||
fn main() {
|
||||
let input_file = std::env::args_os().nth(1).expect("missing <INPUT> file");
|
||||
let input =
|
||||
std::io::BufReader::new(std::fs::File::open(&input_file).expect("could not open <INPUT>"));
|
||||
let trace = tracing_trace::TraceReader::new(input);
|
||||
let profile =
|
||||
tracing_trace::processor::firefox_profiler::to_firefox_profile(trace, "Meilisearch")
|
||||
.unwrap();
|
||||
let mut output_file = OsString::new();
|
||||
output_file.push("firefox-");
|
||||
output_file.push(input_file);
|
||||
let mut output_file = std::io::BufWriter::new(std::fs::File::create(output_file).unwrap());
|
||||
serde_json::to_writer(&mut output_file, &profile).unwrap();
|
||||
output_file.flush().unwrap();
|
||||
}
|
96
tracing-trace/src/entry.rs
Normal file
96
tracing-trace/src/entry.rs
Normal file
|
@ -0,0 +1,96 @@
|
|||
use std::borrow::Cow;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::span::Id as TracingId;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum Entry {
|
||||
/// A code location was accessed for the first time
|
||||
NewCallsite(NewCallsite),
|
||||
|
||||
/// A new thread was accessed
|
||||
NewThread(NewThread),
|
||||
|
||||
/// A new call started
|
||||
NewSpan(NewSpan),
|
||||
|
||||
/// An already in-flight call started doing work.
|
||||
///
|
||||
/// For synchronous functions, open should always be followed immediately by enter, exit and close,
|
||||
/// but for asynchronous functions, work can suspend (exiting the span without closing it), and then
|
||||
/// later resume (entering the span again without opening it).
|
||||
///
|
||||
/// The timer for a span only starts when the span is entered.
|
||||
SpanEnter(SpanEnter),
|
||||
|
||||
/// An in-flight call suspended and paused work.
|
||||
///
|
||||
/// For synchronous functions, exit should always be followed immediately by close,
|
||||
/// but for asynchronous functions, work can suspend and then later resume.
|
||||
///
|
||||
/// The timer for a span pauses when the span is exited.
|
||||
SpanExit(SpanExit),
|
||||
|
||||
/// A call ended
|
||||
SpanClose(SpanClose),
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
pub struct SpanId(u64);
|
||||
|
||||
impl From<&TracingId> for SpanId {
|
||||
fn from(value: &TracingId) -> Self {
|
||||
Self(value.into_u64())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct NewCallsite {
|
||||
pub call_id: ResourceId,
|
||||
pub name: Cow<'static, str>,
|
||||
pub module_path: Option<Cow<'static, str>>,
|
||||
pub file: Option<Cow<'static, str>>,
|
||||
pub line: Option<u32>,
|
||||
pub target: Cow<'static, str>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct NewThread {
|
||||
pub thread_id: ResourceId,
|
||||
pub name: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub struct SpanEnter {
|
||||
pub id: SpanId,
|
||||
pub time: std::time::Duration,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub struct SpanExit {
|
||||
pub id: SpanId,
|
||||
pub time: std::time::Duration,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub struct NewSpan {
|
||||
pub id: SpanId,
|
||||
pub call_id: ResourceId,
|
||||
pub parent_id: Option<SpanId>,
|
||||
pub thread_id: ResourceId,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub struct SpanClose {
|
||||
pub id: SpanId,
|
||||
pub time: std::time::Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
pub struct ResourceId(pub(crate) usize);
|
||||
|
||||
impl ResourceId {
|
||||
pub fn to_usize(self) -> usize {
|
||||
self.0
|
||||
}
|
||||
}
|
19
tracing-trace/src/error.rs
Normal file
19
tracing-trace/src/error.rs
Normal file
|
@ -0,0 +1,19 @@
|
|||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
Json(serde_json::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str("error de/serializing trace entry:")?;
|
||||
match self {
|
||||
Error::Json(error) => std::fmt::Display::fmt(&error, f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for Error {
|
||||
fn from(value: serde_json::Error) -> Self {
|
||||
Self::Json(value)
|
||||
}
|
||||
}
|
152
tracing-trace/src/layer.rs
Normal file
152
tracing-trace/src/layer.rs
Normal file
|
@ -0,0 +1,152 @@
|
|||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use tracing::span::{Attributes, Id as TracingId};
|
||||
use tracing::{Metadata, Subscriber};
|
||||
use tracing_subscriber::layer::Context;
|
||||
use tracing_subscriber::Layer;
|
||||
|
||||
use crate::entry::{
|
||||
Entry, NewCallsite, NewSpan, NewThread, ResourceId, SpanClose, SpanEnter, SpanExit, SpanId,
|
||||
};
|
||||
use crate::{Error, Trace};
|
||||
|
||||
/// Layer that measures the time spent in spans.
|
||||
pub struct TraceLayer {
|
||||
sender: std::sync::mpsc::Sender<Entry>,
|
||||
callsites: RwLock<HashMap<OpaqueIdentifier, ResourceId>>,
|
||||
start_time: std::time::Instant,
|
||||
// TODO: kero add handle to allocator stats here
|
||||
}
|
||||
|
||||
impl<W: Write> Trace<W> {
|
||||
pub fn new(writer: W) -> (Self, TraceLayer) {
|
||||
let (sender, receiver) = std::sync::mpsc::channel();
|
||||
let trace = Trace { writer, receiver };
|
||||
let layer = TraceLayer {
|
||||
sender,
|
||||
callsites: Default::default(),
|
||||
start_time: std::time::Instant::now(),
|
||||
};
|
||||
(trace, layer)
|
||||
}
|
||||
|
||||
pub fn receive(&mut self) -> Result<ControlFlow<(), ()>, Error> {
|
||||
let Ok(entry) = self.receiver.recv() else {
|
||||
return Ok(ControlFlow::Break(()));
|
||||
};
|
||||
self.write(entry)?;
|
||||
Ok(ControlFlow::Continue(()))
|
||||
}
|
||||
|
||||
pub fn write(&mut self, entry: Entry) -> Result<(), Error> {
|
||||
Ok(serde_json::ser::to_writer(&mut self.writer, &entry)?)
|
||||
}
|
||||
|
||||
pub fn try_receive(&mut self) -> Result<ControlFlow<(), ()>, Error> {
|
||||
let Ok(entry) = self.receiver.try_recv() else {
|
||||
return Ok(ControlFlow::Break(()));
|
||||
};
|
||||
self.write(entry)?;
|
||||
Ok(ControlFlow::Continue(()))
|
||||
}
|
||||
|
||||
pub fn flush(&mut self) -> Result<(), std::io::Error> {
|
||||
self.writer.flush()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Hash)]
|
||||
enum OpaqueIdentifier {
|
||||
Thread(std::thread::ThreadId),
|
||||
Call(tracing::callsite::Identifier),
|
||||
}
|
||||
|
||||
impl TraceLayer {
|
||||
fn resource_id(&self, opaque: OpaqueIdentifier) -> Option<ResourceId> {
|
||||
self.callsites.read().unwrap().get(&opaque).copied()
|
||||
}
|
||||
|
||||
fn register_resource_id(&self, opaque: OpaqueIdentifier) -> ResourceId {
|
||||
let mut map = self.callsites.write().unwrap();
|
||||
let len = map.len();
|
||||
*map.entry(opaque).or_insert(ResourceId(len))
|
||||
}
|
||||
|
||||
fn elapsed(&self) -> std::time::Duration {
|
||||
self.start_time.elapsed()
|
||||
}
|
||||
|
||||
fn send(&self, entry: Entry) {
|
||||
// we never care that the other end hanged on us
|
||||
let _ = self.sender.send(entry);
|
||||
}
|
||||
|
||||
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> ResourceId {
|
||||
let call_id = self.register_resource_id(OpaqueIdentifier::Call(metadata.callsite()));
|
||||
|
||||
let module_path = metadata.module_path();
|
||||
let file = metadata.file();
|
||||
let line = metadata.line();
|
||||
let name = metadata.name();
|
||||
let target = metadata.target();
|
||||
|
||||
self.send(Entry::NewCallsite(NewCallsite {
|
||||
call_id,
|
||||
module_path: module_path.map(Cow::Borrowed),
|
||||
file: file.map(Cow::Borrowed),
|
||||
line,
|
||||
name: Cow::Borrowed(name),
|
||||
target: Cow::Borrowed(target),
|
||||
}));
|
||||
call_id
|
||||
}
|
||||
|
||||
fn register_thread(&self) -> ResourceId {
|
||||
let thread_id = std::thread::current().id();
|
||||
let name = std::thread::current().name().map(ToOwned::to_owned);
|
||||
let thread_id = self.register_resource_id(OpaqueIdentifier::Thread(thread_id));
|
||||
self.send(Entry::NewThread(NewThread { thread_id, name }));
|
||||
thread_id
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Layer<S> for TraceLayer
|
||||
where
|
||||
S: Subscriber,
|
||||
{
|
||||
fn on_new_span(&self, attrs: &Attributes<'_>, id: &TracingId, _ctx: Context<'_, S>) {
|
||||
let call_id = self
|
||||
.resource_id(OpaqueIdentifier::Call(attrs.metadata().callsite()))
|
||||
.unwrap_or_else(|| self.register_callsite(attrs.metadata()));
|
||||
|
||||
let thread_id = self
|
||||
.resource_id(OpaqueIdentifier::Thread(std::thread::current().id()))
|
||||
.unwrap_or_else(|| self.register_thread());
|
||||
|
||||
let parent_id = attrs
|
||||
.parent()
|
||||
.cloned()
|
||||
.or_else(|| tracing::Span::current().id())
|
||||
.map(|id| SpanId::from(&id));
|
||||
|
||||
self.send(Entry::NewSpan(NewSpan { id: id.into(), call_id, parent_id, thread_id }));
|
||||
}
|
||||
|
||||
fn on_enter(&self, id: &TracingId, _ctx: Context<'_, S>) {
|
||||
// TODO kero: add memory here
|
||||
self.send(Entry::SpanEnter(SpanEnter { id: id.into(), time: self.elapsed() }))
|
||||
}
|
||||
|
||||
fn on_exit(&self, id: &TracingId, _ctx: Context<'_, S>) {
|
||||
// TODO kero: add memory here
|
||||
self.send(Entry::SpanExit(SpanExit { id: id.into(), time: self.elapsed() }))
|
||||
}
|
||||
|
||||
fn on_close(&self, id: TracingId, _ctx: Context<'_, S>) {
|
||||
self.send(Entry::SpanClose(SpanClose { id: Into::into(&id), time: self.elapsed() }))
|
||||
}
|
||||
}
|
40
tracing-trace/src/lib.rs
Normal file
40
tracing-trace/src/lib.rs
Normal file
|
@ -0,0 +1,40 @@
|
|||
use std::io::{Read, Write};
|
||||
|
||||
use entry::Entry;
|
||||
|
||||
pub mod entry;
|
||||
mod error;
|
||||
pub mod layer;
|
||||
pub mod processor;
|
||||
|
||||
pub use error::Error;
|
||||
|
||||
pub struct Trace<W: Write> {
|
||||
writer: W,
|
||||
receiver: std::sync::mpsc::Receiver<Entry>,
|
||||
}
|
||||
|
||||
pub struct TraceReader<R: Read> {
|
||||
reader: R,
|
||||
}
|
||||
|
||||
impl<R: Read> TraceReader<R> {
|
||||
pub fn new(reader: R) -> Self {
|
||||
Self { reader }
|
||||
}
|
||||
|
||||
fn read(&mut self) -> Option<Result<Entry, Error>> {
|
||||
serde_json::Deserializer::from_reader(&mut self.reader)
|
||||
.into_iter()
|
||||
.next()
|
||||
.map(|res| res.map_err(Into::into))
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> Iterator for TraceReader<R> {
|
||||
type Item = Result<Entry, Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.read()
|
||||
}
|
||||
}
|
133
tracing-trace/src/main.rs
Normal file
133
tracing-trace/src/main.rs
Normal file
|
@ -0,0 +1,133 @@
|
|||
use tracing::{instrument, Span};
|
||||
use tracing_error::{ErrorLayer, InstrumentResult, SpanTrace, TracedError};
|
||||
|
||||
#[instrument(level = "trace", target = "profile::indexing")]
|
||||
fn foo() -> Result<(), TracedError<Error>> {
|
||||
let _ = bar(40, 2);
|
||||
bar(40, 2)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
XTooBig,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str("x too big")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for Error {}
|
||||
|
||||
#[instrument(level = "trace", target = "profile::indexing")]
|
||||
fn bar(x: u32, y: u32) -> Result<(), TracedError<Error>> {
|
||||
let handle_ok = spawn_in_current_scope(move || baz(y));
|
||||
let handle = spawn_in_current_scope(move || baz(x + y));
|
||||
handle_ok.join().unwrap().and(handle.join().unwrap())
|
||||
}
|
||||
|
||||
pub fn spawn_in_current_scope<F, T>(f: F) -> std::thread::JoinHandle<T>
|
||||
where
|
||||
F: FnOnce() -> T + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
let current = Span::current();
|
||||
std::thread::spawn(move || {
|
||||
let span = tracing::trace_span!(parent: ¤t, "thread_spawn", id = ?std::thread::current().id(), name = tracing::field::Empty);
|
||||
if let Some(name) = std::thread::current().name() {
|
||||
span.record("name", name);
|
||||
}
|
||||
span.in_scope(f)
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "profile::indexing")]
|
||||
fn baz(x: u32) -> Result<(), TracedError<Error>> {
|
||||
if x > 10 {
|
||||
fibo_recursive(10);
|
||||
return Err(Error::XTooBig).in_current_span();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "profile::indexing")]
|
||||
fn fibo_recursive(n: u32) -> u32 {
|
||||
if n == 0 {
|
||||
return 1;
|
||||
}
|
||||
if n == 1 {
|
||||
return 2;
|
||||
}
|
||||
return fibo_recursive(n - 1) - fibo_recursive(n - 2);
|
||||
}
|
||||
|
||||
use tracing_error::ExtractSpanTrace as _;
|
||||
use tracing_subscriber::layer::SubscriberExt as _;
|
||||
use tracing_trace::processor;
|
||||
|
||||
fn on_panic(info: &std::panic::PanicInfo) {
|
||||
let info = info.to_string();
|
||||
let trace = SpanTrace::capture();
|
||||
tracing::error!(%info, %trace);
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let (mut trace, profiling_layer) =
|
||||
tracing_trace::Trace::new(std::fs::File::create("trace.json").unwrap());
|
||||
|
||||
let subscriber = tracing_subscriber::registry()
|
||||
// any number of other subscriber layers may be added before or
|
||||
// after the `ErrorLayer`...
|
||||
.with(ErrorLayer::default())
|
||||
.with(profiling_layer)
|
||||
/*.with(
|
||||
tracing_subscriber::fmt::layer()
|
||||
.with_line_number(true)
|
||||
.with_span_events(FmtSpan::FULL), /*.with_filter(
|
||||
tracing_subscriber::filter::LevelFilter::from_level(tracing::Level::TRACE).and(
|
||||
tracing_subscriber::filter::Targets::new()
|
||||
.with_target("profile", tracing::Level::TRACE)
|
||||
.not(),
|
||||
),
|
||||
)*/
|
||||
)*/;
|
||||
|
||||
// set the subscriber as the default for the application
|
||||
tracing::subscriber::set_global_default(subscriber).unwrap();
|
||||
|
||||
std::panic::set_hook(Box::new(on_panic));
|
||||
|
||||
let res = foo();
|
||||
|
||||
if let Err(error) = res {
|
||||
print_extracted_spantraces(&error)
|
||||
}
|
||||
|
||||
while trace.try_receive().unwrap().is_continue() {}
|
||||
|
||||
trace.flush().unwrap();
|
||||
|
||||
let trace = tracing_trace::TraceReader::new(std::fs::File::open("trace.json").unwrap());
|
||||
|
||||
let profile = processor::firefox_profiler::to_firefox_profile(trace, "test").unwrap();
|
||||
serde_json::to_writer(std::fs::File::create("processed.json").unwrap(), &profile).unwrap();
|
||||
}
|
||||
|
||||
fn print_extracted_spantraces(error: &(dyn std::error::Error + 'static)) {
|
||||
let mut error = Some(error);
|
||||
let mut ind = 0;
|
||||
|
||||
eprintln!("Error:");
|
||||
|
||||
while let Some(err) = error {
|
||||
if let Some(spantrace) = err.span_trace() {
|
||||
eprintln!("found a spantrace:\n{}", color_spantrace::colorize(spantrace));
|
||||
} else {
|
||||
eprintln!("{:>4}: {}", ind, err);
|
||||
}
|
||||
|
||||
error = err.source();
|
||||
ind += 1;
|
||||
}
|
||||
}
|
255
tracing-trace/src/processor/firefox_profiler.rs
Normal file
255
tracing-trace/src/processor/firefox_profiler.rs
Normal file
|
@ -0,0 +1,255 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use fxprof_processed_profile::{
|
||||
CategoryPairHandle, CpuDelta, Frame, FrameFlags, FrameInfo, MarkerDynamicField,
|
||||
MarkerFieldFormat, MarkerLocation, MarkerSchema, MarkerSchemaField, Profile, ProfilerMarker,
|
||||
ReferenceTimestamp, SamplingInterval, StringHandle, Timestamp,
|
||||
};
|
||||
use serde_json::json;
|
||||
|
||||
use crate::entry::{
|
||||
Entry, NewCallsite, NewSpan, ResourceId, SpanClose, SpanEnter, SpanExit, SpanId,
|
||||
};
|
||||
use crate::{Error, TraceReader};
|
||||
|
||||
pub fn to_firefox_profile<R: std::io::Read>(
|
||||
trace: TraceReader<R>,
|
||||
app: &str,
|
||||
) -> Result<Profile, Error> {
|
||||
let mut profile = Profile::new(
|
||||
app,
|
||||
ReferenceTimestamp::from_millis_since_unix_epoch(0.0),
|
||||
SamplingInterval::from_nanos(15),
|
||||
);
|
||||
|
||||
let mut last_timestamp = Timestamp::from_nanos_since_reference(0);
|
||||
let main = profile.add_process(app, 0, last_timestamp);
|
||||
|
||||
let mut calls = HashMap::new();
|
||||
let mut threads = HashMap::new();
|
||||
let mut spans = HashMap::new();
|
||||
|
||||
let category = profile.add_category("general", fxprof_processed_profile::CategoryColor::Blue);
|
||||
let subcategory = profile.add_subcategory(category, "subcategory");
|
||||
|
||||
// TODO kero: add counters profile.add_counters + last_memory_value
|
||||
|
||||
for entry in trace {
|
||||
let entry = entry?;
|
||||
match entry {
|
||||
Entry::NewCallsite(callsite) => {
|
||||
let string_handle = profile.intern_string(callsite.name.as_ref());
|
||||
calls.insert(callsite.call_id, (callsite, string_handle));
|
||||
}
|
||||
Entry::NewThread(thread) => {
|
||||
let thread_handle = profile.add_thread(
|
||||
main,
|
||||
thread.thread_id.to_usize() as u32,
|
||||
last_timestamp,
|
||||
threads.is_empty(),
|
||||
);
|
||||
if let Some(name) = &thread.name {
|
||||
profile.set_thread_name(thread_handle, name)
|
||||
}
|
||||
threads.insert(thread.thread_id, thread_handle);
|
||||
}
|
||||
Entry::NewSpan(span) => {
|
||||
spans.insert(span.id, (span, SpanStatus::Outside));
|
||||
}
|
||||
Entry::SpanEnter(SpanEnter { id, time }) => {
|
||||
let (_span, status) = spans.get_mut(&id).unwrap();
|
||||
|
||||
let SpanStatus::Outside = status else {
|
||||
continue;
|
||||
};
|
||||
|
||||
*status = SpanStatus::Inside(time);
|
||||
|
||||
last_timestamp = Timestamp::from_nanos_since_reference(time.as_nanos() as u64);
|
||||
|
||||
/* TODO kero: compute delta and update them
|
||||
profile.add_counter_sample(
|
||||
counter,
|
||||
timestamp,
|
||||
value_delta,
|
||||
number_of_operations_delta,
|
||||
)
|
||||
*/
|
||||
}
|
||||
Entry::SpanExit(SpanExit { id, time }) => {
|
||||
let (span, status) = spans.get_mut(&id).unwrap();
|
||||
|
||||
let SpanStatus::Inside(begin) = status else {
|
||||
continue;
|
||||
};
|
||||
last_timestamp = Timestamp::from_nanos_since_reference(time.as_nanos() as u64);
|
||||
|
||||
let begin = *begin;
|
||||
|
||||
*status = SpanStatus::Outside;
|
||||
|
||||
let span = *span;
|
||||
let thread_handle = threads.get(&span.thread_id).unwrap();
|
||||
|
||||
let frames = make_frames(span, &spans, &calls, subcategory);
|
||||
|
||||
profile.add_sample(
|
||||
*thread_handle,
|
||||
to_timestamp(begin),
|
||||
frames.iter().rev().cloned(),
|
||||
CpuDelta::ZERO,
|
||||
1,
|
||||
);
|
||||
profile.add_sample(
|
||||
*thread_handle,
|
||||
to_timestamp(time),
|
||||
frames.iter().rev().cloned(),
|
||||
CpuDelta::from_nanos((time - begin).as_nanos() as u64),
|
||||
1,
|
||||
);
|
||||
|
||||
/* TODO kero: compute delta and update them
|
||||
profile.add_counter_sample(
|
||||
counter,
|
||||
timestamp,
|
||||
value_delta,
|
||||
number_of_operations_delta,
|
||||
)
|
||||
*/
|
||||
|
||||
let (callsite, _) = calls.get(&span.call_id).unwrap();
|
||||
|
||||
let marker = SpanMarker { callsite, span: &span };
|
||||
|
||||
profile.add_marker_with_stack(
|
||||
*thread_handle,
|
||||
&callsite.name,
|
||||
marker,
|
||||
fxprof_processed_profile::MarkerTiming::Interval(
|
||||
to_timestamp(begin),
|
||||
to_timestamp(time),
|
||||
),
|
||||
frames.iter().rev().cloned(),
|
||||
)
|
||||
}
|
||||
Entry::SpanClose(SpanClose { id, time }) => {
|
||||
spans.remove(&id);
|
||||
last_timestamp = Timestamp::from_nanos_since_reference(time.as_nanos() as u64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(profile)
|
||||
}
|
||||
|
||||
fn to_timestamp(time: std::time::Duration) -> Timestamp {
|
||||
Timestamp::from_nanos_since_reference(time.as_nanos() as u64)
|
||||
}
|
||||
|
||||
fn make_frames(
|
||||
span: NewSpan,
|
||||
spans: &HashMap<SpanId, (NewSpan, SpanStatus)>,
|
||||
calls: &HashMap<ResourceId, (NewCallsite, StringHandle)>,
|
||||
subcategory: CategoryPairHandle,
|
||||
) -> Vec<FrameInfo> {
|
||||
let mut frames = Vec::new();
|
||||
let mut current_span = span;
|
||||
loop {
|
||||
let frame = make_frame(current_span, calls, subcategory);
|
||||
frames.push(frame);
|
||||
if let Some(parent) = current_span.parent_id {
|
||||
current_span = spans.get(&parent).unwrap().0;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
frames
|
||||
}
|
||||
|
||||
fn make_frame(
|
||||
span: NewSpan,
|
||||
calls: &HashMap<ResourceId, (NewCallsite, StringHandle)>,
|
||||
subcategory: CategoryPairHandle,
|
||||
) -> FrameInfo {
|
||||
let (_, call) = calls.get(&span.call_id).unwrap();
|
||||
FrameInfo { frame: Frame::Label(*call), category_pair: subcategory, flags: FrameFlags::empty() }
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum SpanStatus {
|
||||
Outside,
|
||||
Inside(std::time::Duration),
|
||||
}
|
||||
|
||||
struct SpanMarker<'a> {
|
||||
span: &'a NewSpan,
|
||||
callsite: &'a NewCallsite,
|
||||
}
|
||||
|
||||
impl<'a> ProfilerMarker for SpanMarker<'a> {
|
||||
const MARKER_TYPE_NAME: &'static str = "span";
|
||||
|
||||
fn schema() -> MarkerSchema {
|
||||
let fields = vec![
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "filename",
|
||||
label: "File name",
|
||||
format: MarkerFieldFormat::FilePath,
|
||||
searchable: true,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "line",
|
||||
label: "Line",
|
||||
format: MarkerFieldFormat::Integer,
|
||||
searchable: true,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "module_path",
|
||||
label: "Module path",
|
||||
format: MarkerFieldFormat::String,
|
||||
searchable: true,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "span_id",
|
||||
label: "Span ID",
|
||||
format: MarkerFieldFormat::Integer,
|
||||
searchable: true,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "thread_id",
|
||||
label: "Thread ID",
|
||||
format: MarkerFieldFormat::Integer,
|
||||
searchable: true,
|
||||
}),
|
||||
];
|
||||
|
||||
MarkerSchema {
|
||||
type_name: Self::MARKER_TYPE_NAME,
|
||||
locations: vec![
|
||||
MarkerLocation::MarkerTable,
|
||||
MarkerLocation::MarkerChart,
|
||||
MarkerLocation::TimelineOverview,
|
||||
],
|
||||
chart_label: None,
|
||||
tooltip_label: Some("{marker.name} - {marker.data.filename}:{marker.data.line}"),
|
||||
table_label: Some("{marker.data.filename}:{marker.data.line}"),
|
||||
fields,
|
||||
}
|
||||
}
|
||||
|
||||
fn json_marker_data(&self) -> serde_json::Value {
|
||||
let filename = self.callsite.file.as_deref();
|
||||
let line = self.callsite.line;
|
||||
let module_path = self.callsite.module_path.as_deref();
|
||||
let span_id = self.span.id;
|
||||
let thread_id = self.span.thread_id;
|
||||
json!({
|
||||
"type": Self::MARKER_TYPE_NAME,
|
||||
"filename": filename,
|
||||
"line": line,
|
||||
"module_path": module_path,
|
||||
"span_id": span_id,
|
||||
"thread_id": thread_id,
|
||||
})
|
||||
}
|
||||
}
|
128
tracing-trace/src/processor/fmt.rs
Normal file
128
tracing-trace/src/processor/fmt.rs
Normal file
|
@ -0,0 +1,128 @@
|
|||
use std::collections::HashMap;
|
||||
use std::io::Read;
|
||||
|
||||
use crate::entry::{
|
||||
Entry, NewCallsite, NewSpan, NewThread, ResourceId, SpanClose, SpanEnter, SpanExit, SpanId,
|
||||
};
|
||||
use crate::{Error, TraceReader};
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum SpanStatus {
|
||||
Outside,
|
||||
Inside(std::time::Duration),
|
||||
}
|
||||
|
||||
pub fn print_trace<R: Read>(trace: TraceReader<R>) -> Result<(), Error> {
|
||||
let mut calls = HashMap::new();
|
||||
let mut threads = HashMap::new();
|
||||
let mut spans = HashMap::new();
|
||||
for entry in trace {
|
||||
let entry = entry?;
|
||||
match entry {
|
||||
Entry::NewCallsite(callsite) => {
|
||||
calls.insert(callsite.call_id, callsite);
|
||||
}
|
||||
Entry::NewThread(NewThread { thread_id, name }) => {
|
||||
threads.insert(thread_id, name);
|
||||
}
|
||||
Entry::NewSpan(span) => {
|
||||
spans.insert(span.id, (span, SpanStatus::Outside));
|
||||
}
|
||||
Entry::SpanEnter(SpanEnter { id, time }) => {
|
||||
let (span, status) = spans.get_mut(&id).unwrap();
|
||||
|
||||
let SpanStatus::Outside = status else {
|
||||
continue;
|
||||
};
|
||||
|
||||
*status = SpanStatus::Inside(time);
|
||||
|
||||
let span = *span;
|
||||
|
||||
println!(
|
||||
"[{}]{}::{} <-",
|
||||
print_thread(&threads, span.thread_id),
|
||||
print_backtrace(&spans, &calls, &span),
|
||||
print_span(&calls, &span)
|
||||
);
|
||||
}
|
||||
Entry::SpanExit(SpanExit { id, time }) => {
|
||||
let (span, status) = spans.get_mut(&id).unwrap();
|
||||
|
||||
let SpanStatus::Inside(begin) = status else {
|
||||
continue;
|
||||
};
|
||||
let begin = *begin;
|
||||
|
||||
*status = SpanStatus::Outside;
|
||||
|
||||
let span = *span;
|
||||
|
||||
println!(
|
||||
"[{}]{}::{} -> {}",
|
||||
print_thread(&threads, span.thread_id),
|
||||
print_backtrace(&spans, &calls, &span),
|
||||
print_span(&calls, &span),
|
||||
print_duration(time - begin),
|
||||
)
|
||||
}
|
||||
Entry::SpanClose(SpanClose { id, time: _ }) => {
|
||||
spans.remove(&id);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_thread(threads: &HashMap<ResourceId, Option<String>>, thread_id: ResourceId) -> String {
|
||||
let thread = threads.get(&thread_id).unwrap();
|
||||
let thread =
|
||||
thread.as_ref().cloned().unwrap_or_else(|| format!("ThreadId({})", thread_id.to_usize()));
|
||||
thread
|
||||
}
|
||||
|
||||
fn print_backtrace(
|
||||
spans: &HashMap<SpanId, (NewSpan, SpanStatus)>,
|
||||
calls: &HashMap<ResourceId, NewCallsite>,
|
||||
span: &NewSpan,
|
||||
) -> String {
|
||||
let mut parents = Vec::new();
|
||||
let mut current = span.parent_id;
|
||||
while let Some(current_id) = ¤t {
|
||||
let (span, _) = spans.get(current_id).unwrap();
|
||||
let callsite = calls.get(&span.call_id).unwrap();
|
||||
parents.push(callsite.name.clone());
|
||||
|
||||
current = span.parent_id;
|
||||
}
|
||||
|
||||
let x: Vec<String> = parents.into_iter().rev().map(|x| x.to_string()).collect();
|
||||
x.join("::")
|
||||
}
|
||||
|
||||
fn print_span(calls: &HashMap<ResourceId, NewCallsite>, span: &NewSpan) -> String {
|
||||
let callsite = calls.get(&span.call_id).unwrap();
|
||||
match (callsite.file.clone(), callsite.line) {
|
||||
(Some(file), None) => format!("{} ({})", callsite.name, file),
|
||||
(Some(file), Some(line)) => format!("{} ({}:{})", callsite.name, file, line),
|
||||
_ => callsite.name.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn print_duration(duration: std::time::Duration) -> String {
|
||||
if duration.as_nanos() < 1000 {
|
||||
format!("{}ns", duration.as_nanos())
|
||||
} else if duration.as_micros() < 1000 {
|
||||
format!("{}μs", duration.as_micros())
|
||||
} else if duration.as_millis() < 1000 {
|
||||
format!("{}ms", duration.as_millis())
|
||||
} else if duration.as_secs() < 120 {
|
||||
format!("{}s", duration.as_secs())
|
||||
} else if duration.as_secs_f64() / 60.0 < 60.0 {
|
||||
format!("{}min", duration.as_secs_f64() / 60.0)
|
||||
} else if duration.as_secs_f64() / 3600.0 < 8.0 {
|
||||
format!("{}h", duration.as_secs_f64() / 3600.0)
|
||||
} else {
|
||||
format!("{}d", duration.as_secs_f64() / 3600.0 / 24.0)
|
||||
}
|
||||
}
|
2
tracing-trace/src/processor/mod.rs
Normal file
2
tracing-trace/src/processor/mod.rs
Normal file
|
@ -0,0 +1,2 @@
|
|||
pub mod firefox_profiler;
|
||||
pub mod fmt;
|
Loading…
Add table
Add a link
Reference in a new issue