109 lines
3.7 KiB
Rust

mod error;
use std::num::NonZeroUsize;
use std::rc::Rc;
use std::str::FromStr;
use actix_web::http::header::ContentType;
use actix_web::web::Data;
use meili_snap::snapshot;
use meilisearch::analytics::Analytics;
use meilisearch::search_queue::SearchQueue;
use meilisearch::{create_app, Opt, SubscriberForSecondLayer};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Layer;
use crate::common::{default_settings, Server};
use crate::json;
#[actix_web::test]
async fn basic_test_log_stream_route() {
let db_path = tempfile::tempdir().unwrap();
let server = Server::new_with_options(Opt {
experimental_enable_logs_route: true,
..default_settings(db_path.path())
})
.await
.unwrap();
let (route_layer, route_layer_handle) =
tracing_subscriber::reload::Layer::new(None.with_filter(
tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF),
));
let (_stderr_layer, stderr_layer_handle) = tracing_subscriber::reload::Layer::new(
(Box::new(
tracing_subscriber::fmt::layer()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE),
) as Box<dyn tracing_subscriber::Layer<SubscriberForSecondLayer> + Send + Sync>)
.with_filter(tracing_subscriber::filter::Targets::new()),
);
let subscriber = tracing_subscriber::registry().with(route_layer).with(
tracing_subscriber::fmt::layer()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE)
.with_filter(tracing_subscriber::filter::LevelFilter::from_str("OFF").unwrap()),
);
let search_queue = SearchQueue::new(
server.service.options.experimental_search_queue_size,
NonZeroUsize::new(1).unwrap(),
);
let app = actix_web::test::init_service(create_app(
server.service.index_scheduler.clone().into(),
server.service.auth.clone().into(),
Data::new(search_queue),
server.service.options.clone(),
(route_layer_handle, stderr_layer_handle),
Data::new(Analytics::no_analytics()),
true,
))
.await;
// set the subscriber as the default for the application
tracing::subscriber::set_global_default(subscriber).unwrap();
let app = Rc::new(app);
// First, we start listening on the `/logs/stream` route
let handle_app = app.clone();
let handle = tokio::task::spawn_local(async move {
let req = actix_web::test::TestRequest::post()
.uri("/logs/stream")
.insert_header(ContentType::json())
.set_payload(
serde_json::to_vec(&json!({
"mode": "human",
"target": "info",
}))
.unwrap(),
);
let req = req.to_request();
let ret = actix_web::test::call_service(&*handle_app, req).await;
actix_web::test::read_body(ret).await
});
// We're going to create an index to get at least one info log saying we processed a batch of task
let (ret, _code) = server.create_index(json!({ "uid": "tamo" })).await;
snapshot!(ret, @r###"
{
"taskUid": 0,
"indexUid": "tamo",
"status": "enqueued",
"type": "indexCreation",
"enqueuedAt": "[date]"
}
"###);
server.wait_task(ret.uid()).await;
let req = actix_web::test::TestRequest::delete().uri("/logs/stream");
let req = req.to_request();
let ret = actix_web::test::call_service(&*app, req).await;
let code = ret.status();
snapshot!(code, @"204 No Content");
let logs = handle.await.unwrap();
let logs = String::from_utf8(logs.to_vec()).unwrap();
assert!(logs.contains("INFO"), "{logs}");
}