From 3c3a258a22d80c6617dd09adea4b86e3c8fccdf3 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 30 Jan 2024 14:19:46 +0100 Subject: [PATCH] start exposing the profiling layer --- meilisearch/src/routes/logs.rs | 105 +++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 38 deletions(-) diff --git a/meilisearch/src/routes/logs.rs b/meilisearch/src/routes/logs.rs index 2bf4ecce7..f8fa5a301 100644 --- a/meilisearch/src/routes/logs.rs +++ b/meilisearch/src/routes/logs.rs @@ -1,34 +1,32 @@ use std::fmt; use std::io::Write; -use std::pin::Pin; +use std::ops::ControlFlow; use std::str::FromStr; -use std::task::Poll; use actix_web::web::{Bytes, Data}; use actix_web::{web, HttpRequest, HttpResponse}; use deserr::actix_web::AwebJson; use deserr::Deserr; -use futures_util::{pin_mut, FutureExt}; use meilisearch_auth::AuthController; use meilisearch_types::deserr::DeserrJsonError; use meilisearch_types::error::deserr_codes::*; use meilisearch_types::error::ResponseError; use tokio::sync::mpsc::{self, UnboundedSender}; -use tracing_subscriber::layer::SubscriberExt; +use tracing::instrument::WithSubscriber; use tracing_subscriber::Layer; use crate::error::MeilisearchHttpError; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::sequential_extractor::SeqHandler; -use crate::{LogRouteHandle, LogRouteType}; +use crate::LogRouteHandle; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(SeqHandler(get_logs)))); } #[derive(Debug, Default, Clone, Copy, Deserr)] -#[serde(rename_all = "lowercase")] +#[deserr(rename_all = lowercase)] pub enum LogLevel { Error, Warn, @@ -38,11 +36,22 @@ pub enum LogLevel { Trace, } +#[derive(Debug, Default, Clone, Copy, Deserr)] +#[deserr(rename_all = lowercase)] +pub enum LogMode { + #[default] + Fmt, + Profile, +} + #[derive(Debug, Deserr)] #[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] pub struct GetLogs { #[deserr(default, error = DeserrJsonError)] pub level: LogLevel, + + #[deserr(default, error = DeserrJsonError)] + pub mode: LogMode, } impl fmt::Display for LogLevel { @@ -76,29 +85,6 @@ struct LogStreamer { receiver: mpsc::UnboundedReceiver>, } -impl futures_util::Stream for LogStreamer { - type Item = Result; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - let future = self.get_mut().receiver.recv(); - pin_mut!(future); - - match future.poll_unpin(cx) { - std::task::Poll::Ready(recv) => match recv { - Some(buf) => { - // let bytes = Bytes::copy_from_slice(buf.as_slice()); - Poll::Ready(Some(Ok(buf.into()))) - } - None => Poll::Ready(None), - }, - Poll::Pending => Poll::Pending, - } - } -} - impl LogStreamer { pub fn into_stream(self) -> impl futures_util::Stream> { futures_util::stream::unfold(self, move |mut this| async move { @@ -115,13 +101,38 @@ pub fn make_layer< opt: &GetLogs, sender: UnboundedSender>, ) -> Box + Send + Sync> { - let fmt_layer = tracing_subscriber::fmt::layer() - .with_line_number(true) - .with_writer(move || LogWriter { sender: sender.clone() }) - .with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE); - // let subscriber = tracing_subscriber::registry().with(fmt_layer); + match opt.mode { + LogMode::Fmt => { + let fmt_layer = tracing_subscriber::fmt::layer() + .with_line_number(true) + .with_writer(move || LogWriter { sender: sender.clone() }) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE); - Box::new(fmt_layer) as Box + Send + Sync> + Box::new(fmt_layer) as Box + Send + Sync> + } + LogMode::Profile => { + let (mut trace, layer) = + tracing_trace::Trace::new(LogWriter { sender: sender.clone() }); + + tokio::task::spawn(async move { + loop { + match tokio::time::timeout(std::time::Duration::from_secs(1), trace.receive()) + .await + { + Ok(Ok(ControlFlow::Continue(()))) => continue, + Ok(Ok(ControlFlow::Break(_))) => break, + // the other half of the channel was dropped + Ok(Err(_)) => break, + Err(_) => trace.flush().unwrap(), + } + } + while trace.try_receive().is_ok() {} + trace.flush().unwrap(); + }); + + Box::new(layer) as Box + Send + Sync> + } + } } pub async fn get_logs( @@ -153,10 +164,28 @@ pub async fn get_logs( logs.modify(|layer| match layer.inner_mut() { None => { - was_available = true; - *layer.filter_mut() = - tracing_subscriber::filter::LevelFilter::from_str(&opt.level.to_string()).unwrap(); // there is no one getting logs + was_available = true; + match opt.mode { + LogMode::Fmt => { + *layer.filter_mut() = + tracing_subscriber::filter::LevelFilter::from_str(&opt.level.to_string()) + .unwrap(); + } + LogMode::Profile => { + *layer.filter_mut() = + tracing_subscriber::filter::LevelFilter::from_str(&opt.level.to_string()) + .unwrap(); + // *layer.filter_mut() = tracing_subscriber::filter::Targets::new() + // .with_target("indexing::", tracing::Level::TRACE) + // .with_filter( + // tracing_subscriber::filter::LevelFilter::from_str( + // &opt.level.to_string(), + // ) + // .unwrap(), + // ) + } + } let new_layer = make_layer(&opt, sender); *layer.inner_mut() = Some(new_layer)