2024-02-05 11:47:56 +01:00
use std ::convert ::Infallible ;
2024-01-25 18:09:50 +01:00
use std ::io ::Write ;
2024-02-05 11:47:56 +01:00
use std ::ops ::ControlFlow ;
2024-01-31 17:47:30 +01:00
use std ::pin ::Pin ;
2024-01-25 18:09:50 +01:00
use std ::str ::FromStr ;
2024-01-30 16:31:42 +01:00
use std ::sync ::Arc ;
2024-01-25 18:09:50 +01:00
use actix_web ::web ::{ Bytes , Data } ;
2024-02-05 11:47:56 +01:00
use actix_web ::{ web , HttpResponse } ;
2024-01-25 18:09:50 +01:00
use deserr ::actix_web ::AwebJson ;
2024-02-05 11:47:56 +01:00
use deserr ::{ DeserializeError , Deserr , ErrorKind , MergeWithError , ValuePointerRef } ;
2024-01-31 17:47:30 +01:00
use futures_util ::Stream ;
2024-02-05 13:29:01 +01:00
use index_scheduler ::IndexScheduler ;
2024-01-25 18:09:50 +01:00
use meilisearch_types ::deserr ::DeserrJsonError ;
use meilisearch_types ::error ::deserr_codes ::* ;
2024-01-31 17:47:30 +01:00
use meilisearch_types ::error ::{ Code , ResponseError } ;
2024-02-06 14:41:14 +01:00
use tokio ::sync ::mpsc ;
2024-02-05 11:47:56 +01:00
use tracing_subscriber ::filter ::Targets ;
2024-01-25 18:09:50 +01:00
use tracing_subscriber ::Layer ;
2024-01-29 17:56:43 +01:00
use crate ::error ::MeilisearchHttpError ;
2024-01-25 18:09:50 +01:00
use crate ::extractors ::authentication ::policies ::* ;
use crate ::extractors ::authentication ::GuardedData ;
use crate ::extractors ::sequential_extractor ::SeqHandler ;
2024-02-12 11:06:37 +01:00
use crate ::{ LogRouteHandle , LogStderrHandle } ;
2024-01-25 18:09:50 +01:00
pub fn configure ( cfg : & mut web ::ServiceConfig ) {
2024-01-30 18:15:53 +01:00
cfg . service (
2024-02-07 12:13:57 +01:00
web ::resource ( " stream " )
2024-01-30 18:15:53 +01:00
. route ( web ::post ( ) . to ( SeqHandler ( get_logs ) ) )
. route ( web ::delete ( ) . to ( SeqHandler ( cancel_logs ) ) ) ,
2024-02-12 11:06:37 +01:00
)
. service ( web ::resource ( " stderr " ) . route ( web ::post ( ) . to ( SeqHandler ( update_stderr_target ) ) ) ) ;
2024-01-25 18:09:50 +01:00
}
2024-02-07 14:45:40 +01:00
#[ derive(Debug, Default, Clone, Copy, Deserr, PartialEq, Eq) ]
2024-02-08 14:56:28 +01:00
#[ deserr(rename_all = camelCase) ]
2024-01-30 14:19:46 +01:00
pub enum LogMode {
#[ default ]
2024-02-08 13:59:30 +01:00
Human ,
2024-02-14 15:34:39 +01:00
Json ,
2024-01-30 14:19:46 +01:00
Profile ,
}
2024-02-05 11:47:56 +01:00
/// Simple wrapper around the `Targets` from `tracing_subscriber` to implement `MergeWithError` on it.
#[ derive(Clone, Debug) ]
struct MyTargets ( Targets ) ;
/// Simple wrapper around the `ParseError` from `tracing_subscriber` to implement `MergeWithError` on it.
#[ derive(Debug, thiserror::Error) ]
enum MyParseError {
#[ error(transparent) ]
ParseError ( #[ from ] tracing_subscriber ::filter ::ParseError ) ,
#[ error(
2024-02-08 13:49:27 +01:00
" Empty string is not a valid target. If you want to get no logs use `OFF`. Usage: `info`, `meilisearch=info`, or you can write multiple filters in one target: `index_scheduler=info,milli=trace` "
2024-02-05 11:47:56 +01:00
) ]
Example ,
}
impl FromStr for MyTargets {
type Err = MyParseError ;
fn from_str ( s : & str ) -> Result < Self , Self ::Err > {
if s . is_empty ( ) {
Err ( MyParseError ::Example )
} else {
Ok ( MyTargets ( Targets ::from_str ( s ) . map_err ( MyParseError ::ParseError ) ? ) )
}
}
}
impl MergeWithError < MyParseError > for DeserrJsonError < BadRequest > {
fn merge (
_self_ : Option < Self > ,
other : MyParseError ,
merge_location : ValuePointerRef ,
) -> ControlFlow < Self , Self > {
Self ::error ::< Infallible > (
None ,
ErrorKind ::Unexpected { msg : other . to_string ( ) } ,
merge_location ,
)
}
}
2024-01-25 18:09:50 +01:00
#[ derive(Debug, Deserr) ]
2024-02-07 14:45:40 +01:00
#[ deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields, validate = validate_get_logs -> DeserrJsonError<InvalidSettingsTypoTolerance>) ]
2024-01-25 18:09:50 +01:00
pub struct GetLogs {
2024-02-05 11:47:56 +01:00
#[ deserr(default = " info " .parse().unwrap(), try_from(&String) = MyTargets::from_str -> DeserrJsonError<BadRequest>) ]
target : MyTargets ,
2024-01-30 14:19:46 +01:00
#[ deserr(default, error = DeserrJsonError<BadRequest>) ]
2024-02-05 11:47:56 +01:00
mode : LogMode ,
2024-02-07 14:45:40 +01:00
#[ deserr(default = false, error = DeserrJsonError<BadRequest>) ]
profile_memory : bool ,
}
fn validate_get_logs < E : DeserializeError > (
logs : GetLogs ,
location : ValuePointerRef ,
) -> Result < GetLogs , E > {
if logs . profile_memory & & logs . mode ! = LogMode ::Profile {
Err ( deserr ::take_cf_content ( E ::error ::< Infallible > (
None ,
ErrorKind ::Unexpected {
msg : format ! ( " `profile_memory` can only be used while profiling code and is not compatible with the {:?} mode. " , logs . mode ) ,
} ,
location ,
) ) )
} else {
Ok ( logs )
}
2024-01-25 18:09:50 +01:00
}
struct LogWriter {
sender : mpsc ::UnboundedSender < Vec < u8 > > ,
}
impl Write for LogWriter {
fn write ( & mut self , buf : & [ u8 ] ) -> std ::io ::Result < usize > {
self . sender . send ( buf . to_vec ( ) ) . map_err ( std ::io ::Error ::other ) ? ;
Ok ( buf . len ( ) )
}
fn flush ( & mut self ) -> std ::io ::Result < ( ) > {
Ok ( ( ) )
}
}
2024-01-31 17:47:30 +01:00
struct HandleGuard {
2024-01-30 16:31:42 +01:00
/// We need to keep an handle on the logs to make it available again when the streamer is dropped
logs : Arc < LogRouteHandle > ,
}
2024-01-31 17:47:30 +01:00
impl Drop for HandleGuard {
2024-01-30 16:31:42 +01:00
fn drop ( & mut self ) {
if let Err ( e ) = self . logs . modify ( | layer | * layer . inner_mut ( ) = None ) {
tracing ::error! ( " Could not free the logs route: {e} " ) ;
}
}
2024-01-25 18:09:50 +01:00
}
2024-01-31 17:47:30 +01:00
fn byte_stream (
receiver : mpsc ::UnboundedReceiver < Vec < u8 > > ,
guard : HandleGuard ,
) -> impl futures_util ::Stream < Item = Result < Bytes , ResponseError > > {
futures_util ::stream ::unfold ( ( receiver , guard ) , move | ( mut receiver , guard ) | async move {
let vec = receiver . recv ( ) . await ;
2024-01-30 12:27:49 +01:00
2024-01-31 17:47:30 +01:00
vec . map ( From ::from ) . map ( Ok ) . map ( | a | ( a , ( receiver , guard ) ) )
} )
2024-01-30 12:27:49 +01:00
}
2024-01-31 17:47:30 +01:00
type PinnedByteStream = Pin < Box < dyn Stream < Item = Result < Bytes , ResponseError > > > > ;
fn make_layer <
2024-01-29 17:56:43 +01:00
S : tracing ::Subscriber + for < ' span > tracing_subscriber ::registry ::LookupSpan < ' span > ,
> (
opt : & GetLogs ,
2024-01-31 17:47:30 +01:00
logs : Data < LogRouteHandle > ,
) -> ( Box < dyn Layer < S > + Send + Sync > , PinnedByteStream ) {
let guard = HandleGuard { logs : logs . into_inner ( ) } ;
2024-01-30 14:19:46 +01:00
match opt . mode {
2024-02-08 13:59:30 +01:00
LogMode ::Human = > {
2024-01-31 17:47:30 +01:00
let ( sender , receiver ) = tokio ::sync ::mpsc ::unbounded_channel ( ) ;
2024-01-30 14:19:46 +01:00
let fmt_layer = tracing_subscriber ::fmt ::layer ( )
. with_writer ( move | | LogWriter { sender : sender . clone ( ) } )
2024-02-14 15:34:39 +01:00
. with_span_events ( tracing_subscriber ::fmt ::format ::FmtSpan ::CLOSE ) ;
let stream = byte_stream ( receiver , guard ) ;
( Box ::new ( fmt_layer ) as Box < dyn Layer < S > + Send + Sync > , Box ::pin ( stream ) )
}
LogMode ::Json = > {
let ( sender , receiver ) = tokio ::sync ::mpsc ::unbounded_channel ( ) ;
let fmt_layer = tracing_subscriber ::fmt ::layer ( )
. with_writer ( move | | LogWriter { sender : sender . clone ( ) } )
. json ( )
. with_span_events ( tracing_subscriber ::fmt ::format ::FmtSpan ::CLOSE ) ;
2024-01-30 14:19:46 +01:00
2024-01-31 17:47:30 +01:00
let stream = byte_stream ( receiver , guard ) ;
( Box ::new ( fmt_layer ) as Box < dyn Layer < S > + Send + Sync > , Box ::pin ( stream ) )
2024-01-30 14:19:46 +01:00
}
LogMode ::Profile = > {
2024-02-07 15:48:21 +01:00
let ( trace , layer ) = tracing_trace ::Trace ::new ( opt . profile_memory ) ;
2024-01-29 17:56:43 +01:00
2024-01-31 17:47:30 +01:00
let stream = entry_stream ( trace , guard ) ;
( Box ::new ( layer ) as Box < dyn Layer < S > + Send + Sync > , Box ::pin ( stream ) )
2024-01-30 14:19:46 +01:00
}
}
2024-01-29 17:56:43 +01:00
}
2024-01-31 17:47:30 +01:00
fn entry_stream (
trace : tracing_trace ::Trace ,
guard : HandleGuard ,
) -> impl Stream < Item = Result < Bytes , ResponseError > > {
let receiver = trace . into_receiver ( ) ;
let entry_buf = Vec ::new ( ) ;
futures_util ::stream ::unfold (
( receiver , entry_buf , guard ) ,
move | ( mut receiver , mut entry_buf , guard ) | async move {
let mut bytes = Vec ::new ( ) ;
while bytes . len ( ) < 8192 {
entry_buf . clear ( ) ;
let Ok ( count ) = tokio ::time ::timeout (
std ::time ::Duration ::from_secs ( 1 ) ,
receiver . recv_many ( & mut entry_buf , 100 ) ,
)
. await
else {
break ;
} ;
if count = = 0 {
2024-02-06 18:05:02 +01:00
if ! bytes . is_empty ( ) {
break ;
}
2024-01-31 17:47:30 +01:00
// channel closed, exit
return None ;
}
for entry in & entry_buf {
if let Err ( error ) = serde_json ::to_writer ( & mut bytes , entry ) {
tracing ::error! (
error = & error as & dyn std ::error ::Error ,
" deserializing entry "
) ;
return Some ( (
Err ( ResponseError ::from_msg (
format! ( " error deserializing entry: {error} " ) ,
Code ::Internal ,
) ) ,
( receiver , entry_buf , guard ) ,
) ) ;
}
}
}
Some ( ( Ok ( bytes . into ( ) ) , ( receiver , entry_buf , guard ) ) )
} ,
)
}
2024-01-25 18:09:50 +01:00
pub async fn get_logs (
2024-02-05 14:14:13 +01:00
index_scheduler : GuardedData < ActionPolicy < { actions ::METRICS_GET } > , Data < IndexScheduler > > ,
2024-01-29 17:56:43 +01:00
logs : Data < LogRouteHandle > ,
2024-01-25 18:09:50 +01:00
body : AwebJson < GetLogs , DeserrJsonError > ,
) -> Result < HttpResponse , ResponseError > {
2024-02-05 13:29:01 +01:00
index_scheduler . features ( ) . check_logs_route ( ) ? ;
2024-01-25 18:09:50 +01:00
2024-02-05 13:29:01 +01:00
let opt = body . into_inner ( ) ;
2024-01-31 17:47:30 +01:00
let mut stream = None ;
2024-01-29 17:56:43 +01:00
2024-01-29 18:45:55 +01:00
logs . modify ( | layer | match layer . inner_mut ( ) {
2024-01-29 17:56:43 +01:00
None = > {
2024-01-29 18:45:55 +01:00
// there is no one getting logs
2024-02-05 11:47:56 +01:00
* layer . filter_mut ( ) = opt . target . 0. clone ( ) ;
2024-01-31 17:47:30 +01:00
let ( new_layer , new_stream ) = make_layer ( & opt , logs . clone ( ) ) ;
2024-01-29 18:45:55 +01:00
2024-01-31 17:47:30 +01:00
* layer . inner_mut ( ) = Some ( new_layer ) ;
stream = Some ( new_stream ) ;
2024-01-29 17:56:43 +01:00
}
Some ( _ ) = > {
// there is already someone getting logs
}
} )
. unwrap ( ) ;
2024-01-25 18:09:50 +01:00
2024-01-31 17:47:30 +01:00
if let Some ( stream ) = stream {
Ok ( HttpResponse ::Ok ( ) . streaming ( stream ) )
2024-01-29 17:56:43 +01:00
} else {
Err ( MeilisearchHttpError ::AlreadyUsedLogRoute . into ( ) )
}
2024-01-25 18:09:50 +01:00
}
2024-01-30 18:15:53 +01:00
pub async fn cancel_logs (
2024-02-05 14:14:13 +01:00
index_scheduler : GuardedData < ActionPolicy < { actions ::METRICS_GET } > , Data < IndexScheduler > > ,
2024-01-30 18:15:53 +01:00
logs : Data < LogRouteHandle > ,
) -> Result < HttpResponse , ResponseError > {
2024-02-05 13:29:01 +01:00
index_scheduler . features ( ) . check_logs_route ( ) ? ;
2024-01-30 18:15:53 +01:00
if let Err ( e ) = logs . modify ( | layer | * layer . inner_mut ( ) = None ) {
tracing ::error! ( " Could not free the logs route: {e} " ) ;
}
Ok ( HttpResponse ::NoContent ( ) . finish ( ) )
}
2024-02-12 11:06:37 +01:00
#[ derive(Debug, Deserr) ]
#[ deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields) ]
pub struct UpdateStderrLogs {
#[ deserr(default = " info " .parse().unwrap(), try_from(&String) = MyTargets::from_str -> DeserrJsonError<BadRequest>) ]
target : MyTargets ,
}
pub async fn update_stderr_target (
index_scheduler : GuardedData < ActionPolicy < { actions ::METRICS_GET } > , Data < IndexScheduler > > ,
logs : Data < LogStderrHandle > ,
body : AwebJson < UpdateStderrLogs , DeserrJsonError > ,
) -> Result < HttpResponse , ResponseError > {
index_scheduler . features ( ) . check_logs_route ( ) ? ;
let opt = body . into_inner ( ) ;
logs . modify ( | layer | {
* layer . filter_mut ( ) = opt . target . 0. clone ( ) ;
} )
. unwrap ( ) ;
Ok ( HttpResponse ::NoContent ( ) . finish ( ) )
}