use std::fmt; use std::io::Write; use std::ops::ControlFlow; use std::str::FromStr; use std::sync::Arc; use actix_web::web::{Bytes, Data}; use actix_web::{web, HttpRequest, HttpResponse}; use deserr::actix_web::AwebJson; use deserr::Deserr; use meilisearch_auth::AuthController; use meilisearch_types::deserr::DeserrJsonError; use meilisearch_types::error::deserr_codes::*; use meilisearch_types::error::ResponseError; use tokio::sync::mpsc::{self, UnboundedSender}; use tracing_subscriber::Layer; use crate::error::MeilisearchHttpError; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::sequential_extractor::SeqHandler; use crate::LogRouteHandle; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service( web::resource("") .route(web::post().to(SeqHandler(get_logs))) .route(web::delete().to(SeqHandler(cancel_logs))), ); } #[derive(Debug, Default, Clone, Copy, Deserr)] #[deserr(rename_all = lowercase)] pub enum LogLevel { Error, Warn, #[default] Info, Debug, Trace, } #[derive(Debug, Default, Clone, Copy, Deserr)] #[deserr(rename_all = lowercase)] pub enum LogMode { #[default] Fmt, Profile, } #[derive(Debug, Deserr)] #[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] pub struct GetLogs { #[deserr(default, error = DeserrJsonError)] pub target: String, #[deserr(default, error = DeserrJsonError)] pub mode: LogMode, } impl fmt::Display for LogLevel { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { LogLevel::Error => f.write_str("error"), LogLevel::Warn => f.write_str("warn"), LogLevel::Info => f.write_str("info"), LogLevel::Debug => f.write_str("debug"), LogLevel::Trace => f.write_str("trace"), } } } struct LogWriter { sender: mpsc::UnboundedSender>, } impl Drop for LogWriter { fn drop(&mut self) { println!("hello"); } } impl Write for LogWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { self.sender.send(buf.to_vec()).map_err(std::io::Error::other)?; Ok(buf.len()) } fn flush(&mut self) -> std::io::Result<()> { Ok(()) } } struct LogStreamer { receiver: mpsc::UnboundedReceiver>, /// We need to keep an handle on the logs to make it available again when the streamer is dropped logs: Arc, } impl Drop for LogStreamer { fn drop(&mut self) { println!("log streamer being dropped"); if let Err(e) = self.logs.modify(|layer| *layer.inner_mut() = None) { tracing::error!("Could not free the logs route: {e}"); } } } impl LogStreamer { pub fn into_stream(self) -> impl futures_util::Stream> { futures_util::stream::unfold(self, move |mut this| async move { let vec = this.receiver.recv().await; vec.map(From::from).map(Ok).map(|a| (a, this)) }) } } pub fn make_layer< S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>, >( opt: &GetLogs, sender: UnboundedSender>, ) -> Box + Send + Sync> { match opt.mode { LogMode::Fmt => { let fmt_layer = tracing_subscriber::fmt::layer() .with_line_number(true) .with_writer(move || LogWriter { sender: sender.clone() }) .with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE); Box::new(fmt_layer) as Box + Send + Sync> } LogMode::Profile => { let (mut trace, layer) = tracing_trace::Trace::new(LogWriter { sender: sender.clone() }); tokio::task::spawn(async move { loop { match tokio::time::timeout(std::time::Duration::from_secs(1), trace.receive()) .await { Ok(Ok(ControlFlow::Continue(()))) => continue, Ok(Ok(ControlFlow::Break(_))) => break, // the other half of the channel was dropped Ok(Err(_)) => break, Err(_) => trace.flush().unwrap(), } } while trace.try_receive().is_ok() {} trace.flush().unwrap(); }); Box::new(layer) as Box + Send + Sync> } } } pub async fn get_logs( _auth_controller: GuardedData, Data>, logs: Data, body: AwebJson, _req: HttpRequest, ) -> Result { let opt = body.into_inner(); let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); let mut was_available = false; logs.modify(|layer| match layer.inner_mut() { None => { // there is no one getting logs was_available = true; *layer.filter_mut() = tracing_subscriber::filter::Targets::from_str(&opt.target).unwrap(); let new_layer = make_layer(&opt, sender); *layer.inner_mut() = Some(new_layer) } Some(_) => { // there is already someone getting logs was_available = false; } }) .unwrap(); if was_available { Ok(HttpResponse::Ok() .streaming(LogStreamer { receiver, logs: logs.into_inner() }.into_stream())) } else { Err(MeilisearchHttpError::AlreadyUsedLogRoute.into()) } } pub async fn cancel_logs( _auth_controller: GuardedData, Data>, logs: Data, _req: HttpRequest, ) -> Result { if let Err(e) = logs.modify(|layer| *layer.inner_mut() = None) { tracing::error!("Could not free the logs route: {e}"); } Ok(HttpResponse::NoContent().finish()) }