mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-06-28 15:22:59 +02:00
192 lines
5.7 KiB
Rust
192 lines
5.7 KiB
Rust
use std::fmt;
|
|
use std::io::Write;
|
|
use std::ops::ControlFlow;
|
|
use std::str::FromStr;
|
|
use std::sync::Arc;
|
|
|
|
use actix_web::web::{Bytes, Data};
|
|
use actix_web::{web, HttpRequest, HttpResponse};
|
|
use deserr::actix_web::AwebJson;
|
|
use deserr::Deserr;
|
|
use meilisearch_auth::AuthController;
|
|
use meilisearch_types::deserr::DeserrJsonError;
|
|
use meilisearch_types::error::deserr_codes::*;
|
|
use meilisearch_types::error::ResponseError;
|
|
use tokio::sync::mpsc::{self, UnboundedSender};
|
|
use tracing::instrument::WithSubscriber;
|
|
use tracing_subscriber::filter::Targets;
|
|
use tracing_subscriber::Layer;
|
|
|
|
use crate::error::MeilisearchHttpError;
|
|
use crate::extractors::authentication::policies::*;
|
|
use crate::extractors::authentication::GuardedData;
|
|
use crate::extractors::sequential_extractor::SeqHandler;
|
|
use crate::LogRouteHandle;
|
|
|
|
pub fn configure(cfg: &mut web::ServiceConfig) {
|
|
cfg.service(web::resource("").route(web::post().to(SeqHandler(get_logs))));
|
|
}
|
|
|
|
#[derive(Debug, Default, Clone, Copy, Deserr)]
|
|
#[deserr(rename_all = lowercase)]
|
|
pub enum LogLevel {
|
|
Error,
|
|
Warn,
|
|
#[default]
|
|
Info,
|
|
Debug,
|
|
Trace,
|
|
}
|
|
|
|
#[derive(Debug, Default, Clone, Copy, Deserr)]
|
|
#[deserr(rename_all = lowercase)]
|
|
pub enum LogMode {
|
|
#[default]
|
|
Fmt,
|
|
Profile,
|
|
}
|
|
|
|
#[derive(Debug, Deserr)]
|
|
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
|
|
pub struct GetLogs {
|
|
#[deserr(default, error = DeserrJsonError<BadRequest>)]
|
|
pub target: String,
|
|
|
|
#[deserr(default, error = DeserrJsonError<BadRequest>)]
|
|
pub mode: LogMode,
|
|
}
|
|
|
|
impl fmt::Display for LogLevel {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
match self {
|
|
LogLevel::Error => f.write_str("error"),
|
|
LogLevel::Warn => f.write_str("warn"),
|
|
LogLevel::Info => f.write_str("info"),
|
|
LogLevel::Debug => f.write_str("debug"),
|
|
LogLevel::Trace => f.write_str("trace"),
|
|
}
|
|
}
|
|
}
|
|
|
|
struct LogWriter {
|
|
sender: mpsc::UnboundedSender<Vec<u8>>,
|
|
}
|
|
|
|
impl Drop for LogWriter {
|
|
fn drop(&mut self) {
|
|
println!("hello");
|
|
}
|
|
}
|
|
|
|
impl Write for LogWriter {
|
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
|
self.sender.send(buf.to_vec()).map_err(std::io::Error::other)?;
|
|
Ok(buf.len())
|
|
}
|
|
|
|
fn flush(&mut self) -> std::io::Result<()> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
struct LogStreamer {
|
|
receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
|
/// We need to keep an handle on the logs to make it available again when the streamer is dropped
|
|
logs: Arc<LogRouteHandle>,
|
|
}
|
|
|
|
impl Drop for LogStreamer {
|
|
fn drop(&mut self) {
|
|
println!("log streamer being dropped");
|
|
if let Err(e) = self.logs.modify(|layer| *layer.inner_mut() = None) {
|
|
tracing::error!("Could not free the logs route: {e}");
|
|
}
|
|
}
|
|
}
|
|
|
|
impl LogStreamer {
|
|
pub fn into_stream(self) -> impl futures_util::Stream<Item = Result<Bytes, ResponseError>> {
|
|
futures_util::stream::unfold(self, move |mut this| async move {
|
|
let vec = this.receiver.recv().await;
|
|
|
|
vec.map(From::from).map(Ok).map(|a| (a, this))
|
|
})
|
|
}
|
|
}
|
|
|
|
pub fn make_layer<
|
|
S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
|
|
>(
|
|
opt: &GetLogs,
|
|
sender: UnboundedSender<Vec<u8>>,
|
|
) -> Box<dyn Layer<S> + Send + Sync> {
|
|
match opt.mode {
|
|
LogMode::Fmt => {
|
|
let fmt_layer = tracing_subscriber::fmt::layer()
|
|
.with_line_number(true)
|
|
.with_writer(move || LogWriter { sender: sender.clone() })
|
|
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE);
|
|
|
|
Box::new(fmt_layer) as Box<dyn Layer<S> + Send + Sync>
|
|
}
|
|
LogMode::Profile => {
|
|
let (mut trace, layer) =
|
|
tracing_trace::Trace::new(LogWriter { sender: sender.clone() });
|
|
|
|
tokio::task::spawn(async move {
|
|
loop {
|
|
match tokio::time::timeout(std::time::Duration::from_secs(1), trace.receive())
|
|
.await
|
|
{
|
|
Ok(Ok(ControlFlow::Continue(()))) => continue,
|
|
Ok(Ok(ControlFlow::Break(_))) => break,
|
|
// the other half of the channel was dropped
|
|
Ok(Err(_)) => break,
|
|
Err(_) => trace.flush().unwrap(),
|
|
}
|
|
}
|
|
while trace.try_receive().is_ok() {}
|
|
trace.flush().unwrap();
|
|
});
|
|
|
|
Box::new(layer) as Box<dyn Layer<S> + Send + Sync>
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn get_logs(
|
|
_auth_controller: GuardedData<ActionPolicy<{ actions::METRICS_ALL }>, Data<AuthController>>,
|
|
logs: Data<LogRouteHandle>,
|
|
body: AwebJson<GetLogs, DeserrJsonError>,
|
|
_req: HttpRequest,
|
|
) -> Result<HttpResponse, ResponseError> {
|
|
let opt = body.into_inner();
|
|
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
|
|
|
|
let mut was_available = false;
|
|
|
|
logs.modify(|layer| match layer.inner_mut() {
|
|
None => {
|
|
// there is no one getting logs
|
|
was_available = true;
|
|
*layer.filter_mut() =
|
|
tracing_subscriber::filter::Targets::from_str(&opt.target).unwrap();
|
|
let new_layer = make_layer(&opt, sender);
|
|
|
|
*layer.inner_mut() = Some(new_layer)
|
|
}
|
|
Some(_) => {
|
|
// there is already someone getting logs
|
|
was_available = false;
|
|
}
|
|
})
|
|
.unwrap();
|
|
|
|
if was_available {
|
|
Ok(HttpResponse::Ok()
|
|
.streaming(LogStreamer { receiver, logs: logs.into_inner() }.into_stream()))
|
|
} else {
|
|
Err(MeilisearchHttpError::AlreadyUsedLogRoute.into())
|
|
}
|
|
}
|