From 7b670a4afadb132ac4a01b6403108700501a391d Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Fri, 8 Mar 2024 21:28:08 +0100 Subject: [PATCH] Allow dry runs for benchmarks where reports are generated but not sent to the dashboard --- xtask/src/bench/dashboard.rs | 312 +++++++++++++++++++---------------- xtask/src/bench/mod.rs | 24 +-- xtask/src/bench/workload.rs | 16 +- 3 files changed, 189 insertions(+), 163 deletions(-) diff --git a/xtask/src/bench/dashboard.rs b/xtask/src/bench/dashboard.rs index 833426207..3ba0ca58b 100644 --- a/xtask/src/bench/dashboard.rs +++ b/xtask/src/bench/dashboard.rs @@ -11,157 +11,179 @@ use super::client::Client; use super::env_info; use super::workload::Workload; -pub async fn cancel_on_ctrl_c( - invocation_uuid: Uuid, - dashboard_client: Client, - abort_handle: AbortHandle, -) { - tracing::info!("press Ctrl-C to cancel the invocation"); - match ctrl_c().await { - Ok(()) => { - tracing::info!(%invocation_uuid, "received Ctrl-C, cancelling invocation"); - mark_as_failed(dashboard_client, invocation_uuid, None).await; - abort_handle.abort(); +#[derive(Debug, Clone)] +pub enum DashboardClient { + Client(Client), + Dry, +} + +impl DashboardClient { + pub fn new(dashboard_url: &str, api_key: Option<&str>) -> anyhow::Result { + let dashboard_client = Client::new( + Some(format!("{}/api/v1", dashboard_url)), + api_key, + Some(std::time::Duration::from_secs(60)), + )?; + + Ok(Self::Client(dashboard_client)) + } + + pub fn new_dry() -> Self { + Self::Dry + } + + pub async fn send_machine_info(&self, env: &env_info::Environment) -> anyhow::Result<()> { + let Self::Client(dashboard_client) = self else { return Ok(()) }; + + let response = dashboard_client + .put("machine") + .json(&json!({"hostname": env.hostname})) + .send() + .await + .context("sending machine information")?; + if !response.status().is_success() { + bail!( + "could not send machine information: {} {}", + response.status(), + response.text().await.unwrap_or_else(|_| "unknown".into()) + ); } - Err(error) => tracing::warn!( - error = &error as &dyn std::error::Error, - "failed to listen to Ctrl-C signal, invocation won't be canceled on Ctrl-C" - ), + Ok(()) } -} -pub async fn mark_as_failed( - dashboard_client: Client, - invocation_uuid: Uuid, - failure_reason: Option, -) { - let response = dashboard_client - .post("cancel-invocation") - .json(&json!({ - "invocation_uuid": invocation_uuid, - "failure_reason": failure_reason, - })) - .send() - .await; - let response = match response { - Ok(response) => response, - Err(response_error) => { - tracing::error!(error = &response_error as &dyn std::error::Error, %invocation_uuid, "could not mark invocation as failed"); - return; + pub async fn create_invocation( + &self, + build_info: build_info::BuildInfo, + commit_message: &str, + env: env_info::Environment, + max_workloads: usize, + reason: Option<&str>, + ) -> anyhow::Result { + let Self::Client(dashboard_client) = self else { return Ok(Uuid::now_v7()) }; + + let response = dashboard_client + .put("invocation") + .json(&json!({ + "commit": { + "sha1": build_info.commit_sha1, + "message": commit_message, + "commit_date": build_info.commit_timestamp, + "branch": build_info.branch, + "tag": build_info.describe.and_then(|describe| describe.as_tag()), + }, + "machine_hostname": env.hostname, + "max_workloads": max_workloads, + "reason": reason + })) + .send() + .await + .context("sending invocation")?; + if !response.status().is_success() { + bail!( + "could not send new invocation: {}", + response.text().await.unwrap_or_else(|_| "unknown".into()) + ); } - }; - - if !response.status().is_success() { - tracing::error!( - %invocation_uuid, - "could not mark invocation as failed: {}", - response.text().await.unwrap() - ); - return; - } - tracing::warn!(%invocation_uuid, "marked invocation as failed or canceled"); -} - -pub async fn send_machine_info( - dashboard_client: &Client, - env: &env_info::Environment, -) -> anyhow::Result<()> { - let response = dashboard_client - .put("machine") - .json(&json!({"hostname": env.hostname})) - .send() - .await - .context("sending machine information")?; - if !response.status().is_success() { - bail!( - "could not send machine information: {} {}", - response.status(), - response.text().await.unwrap_or_else(|_| "unknown".into()) - ); - } - Ok(()) -} - -pub async fn create_invocation( - dashboard_client: &Client, - build_info: build_info::BuildInfo, - commit_message: &str, - env: env_info::Environment, - max_workloads: usize, - reason: Option<&str>, -) -> anyhow::Result { - let response = dashboard_client - .put("invocation") - .json(&json!({ - "commit": { - "sha1": build_info.commit_sha1, - "message": commit_message, - "commit_date": build_info.commit_timestamp, - "branch": build_info.branch, - "tag": build_info.describe.and_then(|describe| describe.as_tag()), - }, - "machine_hostname": env.hostname, - "max_workloads": max_workloads, - "reason": reason - })) - .send() - .await - .context("sending invocation")?; - if !response.status().is_success() { - bail!( - "could not send new invocation: {}", - response.text().await.unwrap_or_else(|_| "unknown".into()) - ); - } - let invocation_uuid: Uuid = - response.json().await.context("could not deserialize invocation response as JSON")?; - Ok(invocation_uuid) -} - -pub async fn create_workload( - dashboard_client: &Client, - invocation_uuid: Uuid, - workload: &Workload, -) -> anyhow::Result { - let response = dashboard_client - .put("workload") - .json(&json!({ - "invocation_uuid": invocation_uuid, - "name": &workload.name, - "max_runs": workload.run_count, - })) - .send() - .await - .context("could not create new workload")?; - - if !response.status().is_success() { - bail!("creating new workload failed: {}", response.text().await.unwrap()) + let invocation_uuid: Uuid = + response.json().await.context("could not deserialize invocation response as JSON")?; + Ok(invocation_uuid) } - let workload_uuid: Uuid = - response.json().await.context("could not deserialize JSON as UUID")?; - Ok(workload_uuid) -} + pub async fn create_workload( + &self, + invocation_uuid: Uuid, + workload: &Workload, + ) -> anyhow::Result { + let Self::Client(dashboard_client) = self else { return Ok(Uuid::now_v7()) }; -pub async fn create_run( - dashboard_client: Client, - workload_uuid: Uuid, - report: &BTreeMap, -) -> anyhow::Result<()> { - let response = dashboard_client - .put("run") - .json(&json!({ - "workload_uuid": workload_uuid, - "data": report - })) - .send() - .await - .context("sending new run")?; - if !response.status().is_success() { - bail!( - "sending new run failed: {}", - response.text().await.unwrap_or_else(|_| "unknown".into()) - ) + let response = dashboard_client + .put("workload") + .json(&json!({ + "invocation_uuid": invocation_uuid, + "name": &workload.name, + "max_runs": workload.run_count, + })) + .send() + .await + .context("could not create new workload")?; + + if !response.status().is_success() { + bail!("creating new workload failed: {}", response.text().await.unwrap()) + } + + let workload_uuid: Uuid = + response.json().await.context("could not deserialize JSON as UUID")?; + Ok(workload_uuid) + } + + pub async fn create_run( + &self, + workload_uuid: Uuid, + report: &BTreeMap, + ) -> anyhow::Result<()> { + let Self::Client(dashboard_client) = self else { return Ok(()) }; + + let response = dashboard_client + .put("run") + .json(&json!({ + "workload_uuid": workload_uuid, + "data": report + })) + .send() + .await + .context("sending new run")?; + if !response.status().is_success() { + bail!( + "sending new run failed: {}", + response.text().await.unwrap_or_else(|_| "unknown".into()) + ) + } + Ok(()) + } + + pub async fn cancel_on_ctrl_c(self, invocation_uuid: Uuid, abort_handle: AbortHandle) { + tracing::info!("press Ctrl-C to cancel the invocation"); + match ctrl_c().await { + Ok(()) => { + tracing::info!(%invocation_uuid, "received Ctrl-C, cancelling invocation"); + self.mark_as_failed(invocation_uuid, None).await; + abort_handle.abort(); + } + Err(error) => tracing::warn!( + error = &error as &dyn std::error::Error, + "failed to listen to Ctrl-C signal, invocation won't be canceled on Ctrl-C" + ), + } + } + + pub async fn mark_as_failed(&self, invocation_uuid: Uuid, failure_reason: Option) { + if let DashboardClient::Client(client) = self { + let response = client + .post("cancel-invocation") + .json(&json!({ + "invocation_uuid": invocation_uuid, + "failure_reason": failure_reason, + })) + .send() + .await; + let response = match response { + Ok(response) => response, + Err(response_error) => { + tracing::error!(error = &response_error as &dyn std::error::Error, %invocation_uuid, "could not mark invocation as failed"); + return; + } + }; + + if !response.status().is_success() { + tracing::error!( + %invocation_uuid, + "could not mark invocation as failed: {}", + response.text().await.unwrap() + ); + return; + } + } + + tracing::warn!(%invocation_uuid, "marked invocation as failed or canceled"); } - Ok(()) } diff --git a/xtask/src/bench/mod.rs b/xtask/src/bench/mod.rs index 62c11b604..844b64f63 100644 --- a/xtask/src/bench/mod.rs +++ b/xtask/src/bench/mod.rs @@ -50,6 +50,10 @@ pub struct BenchDeriveArgs { #[arg(long, default_value_t = default_dashboard_url())] dashboard_url: String, + /// Don't actually send results to the dashboard + #[arg(long)] + no_dashboard: bool, + /// Directory to output reports. #[arg(long, default_value_t = default_report_folder())] report_folder: String, @@ -103,11 +107,11 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { let assets_client = Client::new(None, args.assets_key.as_deref(), Some(std::time::Duration::from_secs(3600)))?; // 1h - let dashboard_client = Client::new( - Some(format!("{}/api/v1", args.dashboard_url)), - args.api_key.as_deref(), - Some(std::time::Duration::from_secs(60)), - )?; + let dashboard_client = if args.no_dashboard { + dashboard::DashboardClient::new_dry() + } else { + dashboard::DashboardClient::new(&args.dashboard_url, args.api_key.as_deref())? + }; // reporting uses its own client because keeping the stream open to wait for entries // blocks any other requests @@ -127,12 +131,12 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { // enter runtime rt.block_on(async { - dashboard::send_machine_info(&dashboard_client, &env).await?; + dashboard_client.send_machine_info(&env).await?; let commit_message = build_info.commit_msg.context("missing commit message")?.split('\n').next().unwrap(); let max_workloads = args.workload_file.len(); let reason: Option<&str> = args.reason.as_deref(); - let invocation_uuid = dashboard::create_invocation(&dashboard_client, build_info, commit_message, env, max_workloads, reason).await?; + let invocation_uuid = dashboard_client.create_invocation( build_info, commit_message, env, max_workloads, reason).await?; tracing::info!(workload_count = args.workload_file.len(), "handling workload files"); @@ -167,7 +171,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { let abort_handle = workload_runs.abort_handle(); tokio::spawn({ let dashboard_client = dashboard_client.clone(); - dashboard::cancel_on_ctrl_c(invocation_uuid, dashboard_client, abort_handle) + dashboard_client.cancel_on_ctrl_c(invocation_uuid, abort_handle) }); // wait for the end of the main task, handle result @@ -178,7 +182,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { } Ok(Err(error)) => { tracing::error!(%invocation_uuid, error = %error, "invocation failed, attempting to report the failure to dashboard"); - dashboard::mark_as_failed(dashboard_client, invocation_uuid, Some(error.to_string())).await; + dashboard_client.mark_as_failed(invocation_uuid, Some(error.to_string())).await; tracing::warn!(%invocation_uuid, "invocation marked as failed following error"); Err(error) }, @@ -186,7 +190,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { match join_error.try_into_panic() { Ok(panic) => { tracing::error!("invocation panicked, attempting to report the failure to dashboard"); - dashboard::mark_as_failed(dashboard_client, invocation_uuid, Some("Panicked".into())).await; + dashboard_client.mark_as_failed( invocation_uuid, Some("Panicked".into())).await; std::panic::resume_unwind(panic) } Err(_) => { diff --git a/xtask/src/bench/workload.rs b/xtask/src/bench/workload.rs index b3e952f29..d82c5ad19 100644 --- a/xtask/src/bench/workload.rs +++ b/xtask/src/bench/workload.rs @@ -12,8 +12,9 @@ use uuid::Uuid; use super::assets::Asset; use super::client::Client; use super::command::SyncMode; +use super::dashboard::DashboardClient; use super::BenchDeriveArgs; -use crate::bench::{assets, dashboard, meili_process}; +use crate::bench::{assets, meili_process}; #[derive(Deserialize)] pub struct Workload { @@ -25,7 +26,7 @@ pub struct Workload { } async fn run_commands( - dashboard_client: &Client, + dashboard_client: &DashboardClient, logs_client: &Client, meili_client: &Client, workload_uuid: Uuid, @@ -64,7 +65,7 @@ async fn run_commands( #[tracing::instrument(skip(assets_client, dashboard_client, logs_client, meili_client, workload, master_key, args), fields(workload = workload.name))] pub async fn execute( assets_client: &Client, - dashboard_client: &Client, + dashboard_client: &DashboardClient, logs_client: &Client, meili_client: &Client, invocation_uuid: Uuid, @@ -74,8 +75,7 @@ pub async fn execute( ) -> anyhow::Result<()> { assets::fetch_assets(assets_client, &workload.assets, &args.asset_folder).await?; - let workload_uuid = - dashboard::create_workload(dashboard_client, invocation_uuid, &workload).await?; + let workload_uuid = dashboard_client.create_workload(invocation_uuid, &workload).await?; let mut tasks = Vec::new(); @@ -113,7 +113,7 @@ pub async fn execute( #[allow(clippy::too_many_arguments)] // not best code quality, but this is a benchmark runner #[tracing::instrument(skip(dashboard_client, logs_client, meili_client, workload, master_key, args), fields(workload = %workload.name))] async fn execute_run( - dashboard_client: &Client, + dashboard_client: &DashboardClient, logs_client: &Client, meili_client: &Client, workload_uuid: Uuid, @@ -202,7 +202,7 @@ async fn start_report( } async fn stop_report( - dashboard_client: &Client, + dashboard_client: &DashboardClient, logs_client: &Client, workload_uuid: Uuid, filename: String, @@ -232,7 +232,7 @@ async fn stop_report( .context("could not convert trace to report")?; let context = || format!("writing report to {filename}"); - dashboard::create_run(dashboard_client, workload_uuid, &report).await?; + dashboard_client.create_run(workload_uuid, &report).await?; let mut output_file = std::io::BufWriter::new( std::fs::File::options()