From 600f8191a8fe169eb38c429958dd59714349acb4 Mon Sep 17 00:00:00 2001 From: Will Wang Date: Wed, 2 Aug 2023 14:09:58 -0400 Subject: [PATCH] chore(api): Refactor top and tap for library use (#18129) * Expose tap EventFormatter and run * Shorten comment * Add default_graphql_url * Move healthcheck error outside * Refactor tap pattern creation * Expose RECONNECT_DELAY * Refactor tap to one exportable function * Add url() method to tap Opts * Refactor core top logic into function * Refactor web socket URL creation * Adjust error message * Publicize tap/top * Use cmd::tap * Use cmd::top * Allow customizing dashboard title * Apply PR suggestion --- lib/vector-api-client/src/client.rs | 33 +----- src/config/api.rs | 8 ++ src/lib.rs | 4 +- src/tap/cmd.rs | 63 +++++----- src/tap/mod.rs | 47 +++++++- src/top/cmd.rs | 173 +++++++++++++++------------- src/top/dashboard.rs | 11 +- src/top/mod.rs | 30 ++++- 8 files changed, 215 insertions(+), 154 deletions(-) diff --git a/lib/vector-api-client/src/client.rs b/lib/vector-api-client/src/client.rs index 01159be74d01f..941ffa7963f27 100644 --- a/lib/vector-api-client/src/client.rs +++ b/lib/vector-api-client/src/client.rs @@ -1,8 +1,9 @@ use anyhow::Context; use graphql_client::GraphQLQuery; -use indoc::indoc; use url::Url; +use crate::gql::HealthQueryExt; + /// Wrapped `Result` type, that returns deserialized GraphQL response data. pub type QueryResult = anyhow::Result::ResponseData>>; @@ -19,33 +20,9 @@ impl Client { Self { url } } - pub async fn new_with_healthcheck(url: Url) -> Option { - #![allow(clippy::print_stderr)] - - use crate::gql::HealthQueryExt; - - // Create a new API client for connecting to the local/remote Vector instance. - let client = Self::new(url.clone()); - - // Check that the GraphQL server is reachable - match client.health_query().await { - Ok(_) => Some(client), - _ => { - eprintln!( - indoc! {" - Vector API server isn't reachable ({}). - - Have you enabled the API? - - To enable the API, add the following to your `vector.toml` config file: - - [api] - enabled = true"}, - url - ); - None - } - } + /// Send a health query + pub async fn healthcheck(&self) -> Result<(), ()> { + self.health_query().await.map(|_| ()).map_err(|_| ()) } /// Issue a GraphQL query using Reqwest, serializing the response to the associated diff --git a/src/config/api.rs b/src/config/api.rs index 7024b08ee8586..0429f6be3662c 100644 --- a/src/config/api.rs +++ b/src/config/api.rs @@ -1,5 +1,6 @@ use std::net::{Ipv4Addr, SocketAddr}; +use url::Url; use vector_config::configurable_component; /// API options. @@ -41,6 +42,13 @@ pub fn default_address() -> Option { Some(SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8686)) } +/// Default GraphQL API address +pub fn default_graphql_url() -> Url { + let addr = default_address().unwrap(); + Url::parse(&format!("http://{}/graphql", addr)) + .expect("Couldn't parse default API URL. Please report this.") +} + const fn default_playground() -> bool { true } diff --git a/src/lib.rs b/src/lib.rs index 37d71f87922ec..d16cc17ce9a46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,12 +101,12 @@ pub mod sources; pub mod stats; #[cfg(feature = "api-client")] #[allow(unreachable_pub)] -mod tap; +pub mod tap; pub mod template; pub mod test_util; #[cfg(feature = "api-client")] #[allow(unreachable_pub)] -pub(crate) mod top; +pub mod top; #[allow(unreachable_pub)] pub mod topology; pub mod trace; diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index 1b7c0a999b281..9a0eccc5700c1 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -12,60 +12,49 @@ use vector_api_client::{ Client, }; -use crate::{ - config, - signal::{SignalRx, SignalTo}, -}; +use crate::signal::{SignalRx, SignalTo}; /// Delay (in milliseconds) before attempting to reconnect to the Vector API const RECONNECT_DELAY: u64 = 5000; /// CLI command func for issuing 'tap' queries, and communicating with a local/remote /// Vector API server via HTTP/WebSockets. -pub(crate) async fn cmd(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode { - // Use the provided URL as the Vector GraphQL API server, or default to the local port - // provided by the API config. This will work despite `api` and `api-client` being distinct - // features; the config is available even if `api` is disabled. - let mut url = opts.url.clone().unwrap_or_else(|| { - let addr = config::api::default_address().unwrap(); - Url::parse(&format!("http://{}/graphql", addr)) - .expect("Couldn't parse default API URL. Please report this.") - }); - +pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode { + let url = opts.url(); // Return early with instructions for enabling the API if the endpoint isn't reachable // via a healthcheck. - if Client::new_with_healthcheck(url.clone()).await.is_none() { + let client = Client::new(url.clone()); + #[allow(clippy::print_stderr)] + if client.healthcheck().await.is_err() { + eprintln!( + indoc::indoc! {" + Vector API server isn't reachable ({}). + + Have you enabled the API? + + To enable the API, add the following to your `vector.toml` config file: + + [api] + enabled = true"}, + url + ); return exitcode::UNAVAILABLE; } - // Change the HTTP schema to WebSockets. - url.set_scheme(match url.scheme() { - "https" => "wss", - _ => "ws", - }) - .expect("Couldn't build WebSocket URL. Please report."); - - // If no patterns are provided, tap all components' outputs - let outputs_patterns = if opts.component_id_patterns.is_empty() - && opts.outputs_of.is_empty() - && opts.inputs_of.is_empty() - { - vec!["*".to_string()] - } else { - opts.outputs_of - .iter() - .cloned() - .chain(opts.component_id_patterns.iter().cloned()) - .collect() - }; + tap(opts, signal_rx).await +} +/// Observe event flow from specified components +pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode { + let subscription_url = opts.web_socket_url(); let formatter = EventFormatter::new(opts.meta, opts.format); + let outputs_patterns = opts.outputs_patterns(); loop { tokio::select! { biased; Ok(SignalTo::Shutdown | SignalTo::Quit) = signal_rx.recv() => break, - status = run(url.clone(), opts, outputs_patterns.clone(), formatter.clone()) => { + status = run(subscription_url.clone(), opts, outputs_patterns.clone(), formatter.clone()) => { if status == exitcode::UNAVAILABLE || status == exitcode::TEMPFAIL && !opts.no_reconnect { #[allow(clippy::print_stderr)] { @@ -93,7 +82,7 @@ async fn run( Err(e) => { #[allow(clippy::print_stderr)] { - eprintln!("[tap] Couldn't connect to Vector API via WebSockets: {}", e); + eprintln!("[tap] Couldn't connect to API via WebSockets: {}", e); } return exitcode::UNAVAILABLE; } diff --git a/src/tap/mod.rs b/src/tap/mod.rs index 5864660b444f9..81b4d5a8d5733 100644 --- a/src/tap/mod.rs +++ b/src/tap/mod.rs @@ -1,10 +1,15 @@ +//! Tap subcommand mod cmd; use clap::Parser; pub(crate) use cmd::cmd; +pub use cmd::tap; use url::Url; use vector_api_client::gql::TapEncodingFormat; +use crate::config::api::default_graphql_url; + +/// Tap options #[derive(Parser, Debug, Clone)] #[command(rename_all = "kebab-case")] pub struct Opts { @@ -12,7 +17,7 @@ pub struct Opts { #[arg(default_value = "500", short = 'i', long)] interval: u32, - /// Vector GraphQL API server endpoint + /// GraphQL API server endpoint #[arg(short, long)] url: Option, @@ -44,7 +49,45 @@ pub struct Opts { #[arg(short, long)] meta: bool, - /// Whether to reconnect if the underlying Vector API connection drops. By default, tap will attempt to reconnect if the connection drops. + /// Whether to reconnect if the underlying API connection drops. By default, tap will attempt to reconnect if the connection drops. #[arg(short, long)] no_reconnect: bool, } + +impl Opts { + /// Component ID patterns to tap + /// + /// If no patterns are provided, tap all components' outputs + pub fn outputs_patterns(&self) -> Vec { + if self.component_id_patterns.is_empty() + && self.outputs_of.is_empty() + && self.inputs_of.is_empty() + { + vec!["*".to_string()] + } else { + self.outputs_of + .iter() + .cloned() + .chain(self.component_id_patterns.iter().cloned()) + .collect() + } + } + + /// Use the provided URL as the Vector GraphQL API server, or default to the local port + /// provided by the API config. + pub fn url(&self) -> Url { + self.url.clone().unwrap_or_else(default_graphql_url) + } + + /// URL with scheme set to WebSockets + pub fn web_socket_url(&self) -> Url { + let mut url = self.url(); + url.set_scheme(match url.scheme() { + "https" => "wss", + _ => "ws", + }) + .expect("Couldn't build WebSocket URL. Please report."); + + url + } +} diff --git a/src/top/cmd.rs b/src/top/cmd.rs index 16aa38126548a..a774ca7bcc448 100644 --- a/src/top/cmd.rs +++ b/src/top/cmd.rs @@ -2,8 +2,7 @@ use std::time::Duration; use chrono::Local; use futures_util::future::join_all; -use tokio::sync::oneshot; -use url::Url; +use tokio::sync::{mpsc, oneshot}; use vector_api_client::{connect_subscription_client, Client}; use super::{ @@ -11,7 +10,6 @@ use super::{ metrics, state::{self, ConnectionStatus, EventType}, }; -use crate::config; /// Delay (in milliseconds) before attempting to reconnect to the Vector API const RECONNECT_DELAY: u64 = 5000; @@ -28,88 +26,49 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { return exitcode::IOERR; } - // Use the provided URL as the Vector GraphQL API server, or default to the local port - // provided by the API config. This will work despite `api` and `api-client` being distinct - // features; the config is available even if `api` is disabled - let url = opts.url.clone().unwrap_or_else(|| { - let addr = config::api::default_address().unwrap(); - Url::parse(&format!("http://{}/graphql", addr)) - .expect("Couldn't parse default API URL. Please report this.") - }); - + let url = opts.url(); // Create a new API client for connecting to the local/remote Vector instance. - let client = match Client::new_with_healthcheck(url.clone()).await { - Some(client) => client, - None => return exitcode::UNAVAILABLE, - }; + let client = Client::new(url.clone()); + #[allow(clippy::print_stderr)] + if client.healthcheck().await.is_err() { + eprintln!( + indoc::indoc! {" + Vector API server isn't reachable ({}). - // Create a channel for updating state via event messages - let (tx, rx) = tokio::sync::mpsc::channel(20); - let state_rx = state::updater(rx).await; + Have you enabled the API? - // Change the HTTP schema to WebSockets - let mut ws_url = url.clone(); - ws_url - .set_scheme(match url.scheme() { - "https" => "wss", - _ => "ws", - }) - .expect("Couldn't build WebSocket URL. Please report."); + To enable the API, add the following to your `vector.toml` config file: - let opts_clone = opts.clone(); + [api] + enabled = true"}, + url + ); + return exitcode::UNAVAILABLE; + } + + top(opts, client, "Vector").await +} + +/// General monitoring +pub async fn top(opts: &super::Opts, client: Client, dashboard_title: &str) -> exitcode::ExitCode { + // Channel for updating state via event messages + let (tx, rx) = tokio::sync::mpsc::channel(20); + let state_rx = state::updater(rx).await; + // Channel for shutdown signal let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - // This task handles reconnecting the subscription client and all - // subscriptions in the case of a web socket disconnect - let connection = tokio::spawn(async move { - loop { - // Initialize state. On future reconnects, we re-initialize state in - // order to accurately capture added, removed, and edited - // components. - let state = match metrics::init_components(&client).await { - Ok(state) => state, - Err(_) => { - tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await; - continue; - } - }; - _ = tx.send(EventType::InitializeState(state)).await; - - let subscription_client = match connect_subscription_client(ws_url.clone()).await { - Ok(c) => c, - Err(_) => { - tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await; - continue; - } - }; - - // Subscribe to updated metrics - let finished = - metrics::subscribe(subscription_client, tx.clone(), opts_clone.interval as i64); - - _ = tx - .send(EventType::ConnectionUpdated(ConnectionStatus::Connected( - Local::now(), - ))) - .await; - // Tasks spawned in metrics::subscribe finish when the subscription - // streams have completed. Currently, subscription streams only - // complete when the underlying web socket connection to the GraphQL - // server drops. - _ = join_all(finished).await; - _ = tx - .send(EventType::ConnectionUpdated( - ConnectionStatus::Disconnected(RECONNECT_DELAY), - )) - .await; - if opts_clone.no_reconnect { - _ = shutdown_tx.send(()); - break; - } - } - }); + + let connection = tokio::spawn(subscription(opts.clone(), client, tx, shutdown_tx)); // Initialize the dashboard - match init_dashboard(url.as_str(), opts, state_rx, shutdown_rx).await { + match init_dashboard( + dashboard_title, + opts.url().as_str(), + opts, + state_rx, + shutdown_rx, + ) + .await + { Ok(_) => { connection.abort(); exitcode::OK @@ -117,10 +76,66 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { Err(err) => { #[allow(clippy::print_stderr)] { - eprintln!("Encountered error: {}", err); + eprintln!("[top] Encountered shutdown error: {}", err); } connection.abort(); exitcode::IOERR } } } + +// This task handles reconnecting the subscription client and all +// subscriptions in the case of a web socket disconnect +async fn subscription( + opts: super::Opts, + client: Client, + tx: mpsc::Sender, + shutdown_tx: oneshot::Sender<()>, +) { + let ws_url = opts.web_socket_url(); + + loop { + // Initialize state. On future reconnects, we re-initialize state in + // order to accurately capture added, removed, and edited + // components. + let state = match metrics::init_components(&client).await { + Ok(state) => state, + Err(_) => { + tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await; + continue; + } + }; + _ = tx.send(EventType::InitializeState(state)).await; + + let subscription_client = match connect_subscription_client(ws_url.clone()).await { + Ok(c) => c, + Err(_) => { + tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await; + continue; + } + }; + + // Subscribe to updated metrics + let finished = metrics::subscribe(subscription_client, tx.clone(), opts.interval as i64); + + _ = tx + .send(EventType::ConnectionUpdated(ConnectionStatus::Connected( + Local::now(), + ))) + .await; + // Tasks spawned in metrics::subscribe finish when the subscription + // streams have completed. Currently, subscription streams only + // complete when the underlying web socket connection to the GraphQL + // server drops. + _ = join_all(finished).await; + _ = tx + .send(EventType::ConnectionUpdated( + ConnectionStatus::Disconnected(RECONNECT_DELAY), + )) + .await; + if opts.no_reconnect { + _ = shutdown_tx.send(()); + break; + } + } +} diff --git a/src/top/dashboard.rs b/src/top/dashboard.rs index ccf0b49085a5c..e38505555dce0 100644 --- a/src/top/dashboard.rs +++ b/src/top/dashboard.rs @@ -143,11 +143,12 @@ struct Widgets<'a> { constraints: Vec, url_string: &'a str, opts: &'a super::Opts, + title: &'a str, } impl<'a> Widgets<'a> { /// Creates a new Widgets, containing constraints to re-use across renders. - pub fn new(url_string: &'a str, opts: &'a super::Opts) -> Self { + pub fn new(title: &'a str, url_string: &'a str, opts: &'a super::Opts) -> Self { let constraints = vec![ Constraint::Length(3), Constraint::Max(90), @@ -158,10 +159,11 @@ impl<'a> Widgets<'a> { constraints, url_string, opts, + title, } } - /// Renders a title showing 'Vector', and the URL the dashboard is currently connected to. + /// Renders a title and the URL the dashboard is currently connected to. fn title( &'a self, f: &mut Frame, @@ -181,7 +183,7 @@ impl<'a> Widgets<'a> { let text = vec![Spans::from(text)]; let block = Block::default().borders(Borders::ALL).title(Span::styled( - "Vector", + self.title, Style::default() .fg(Color::Green) .add_modifier(Modifier::BOLD), @@ -353,6 +355,7 @@ pub fn is_tty() -> bool { /// as well as entering an 'alternate screen' to overlay the console. This ensures that when /// the dashboard is exited, the user's previous terminal session can commence, unaffected. pub async fn init_dashboard<'a>( + title: &'a str, url: &'a str, opts: &'a super::Opts, mut state_rx: state::StateRx, @@ -377,7 +380,7 @@ pub async fn init_dashboard<'a>( // Clear the screen, readying it for output terminal.clear()?; - let widgets = Widgets::new(url, opts); + let widgets = Widgets::new(title, url, opts); loop { tokio::select! { diff --git a/src/top/mod.rs b/src/top/mod.rs index 24a0e8b48c99c..790d7c6025dc0 100644 --- a/src/top/mod.rs +++ b/src/top/mod.rs @@ -1,3 +1,4 @@ +//! Top subcommand mod cmd; mod dashboard; mod events; @@ -6,8 +7,13 @@ mod state; use clap::Parser; pub use cmd::cmd; +pub use cmd::top; +pub use dashboard::is_tty; use url::Url; +use crate::config::api::default_graphql_url; + +/// Top options #[derive(Parser, Debug, Clone)] #[command(rename_all = "kebab-case")] pub struct Opts { @@ -15,7 +21,7 @@ pub struct Opts { #[arg(default_value = "1000", short = 'i', long)] interval: u32, - /// Vector GraphQL API server endpoint + /// GraphQL API server endpoint #[arg(short, long)] url: Option, @@ -23,9 +29,29 @@ pub struct Opts { #[arg(short = 'H', long, default_value_t = true)] human_metrics: bool, - /// Whether to reconnect if the underlying Vector API connection drops. + /// Whether to reconnect if the underlying API connection drops. /// /// By default, top will attempt to reconnect if the connection drops. #[arg(short, long)] no_reconnect: bool, } + +impl Opts { + /// Use the provided URL as the Vector GraphQL API server, or default to the local port + /// provided by the API config. + pub fn url(&self) -> Url { + self.url.clone().unwrap_or_else(default_graphql_url) + } + + /// URL with scheme set to WebSockets + pub fn web_socket_url(&self) -> Url { + let mut url = self.url(); + url.set_scheme(match url.scheme() { + "https" => "wss", + _ => "ws", + }) + .expect("Couldn't build WebSocket URL. Please report."); + + url + } +}