From f42ef0864055e4e7589ea950605160f9414ed1ee Mon Sep 17 00:00:00 2001 From: Dylan Frankland Date: Thu, 17 Oct 2024 10:11:27 -0700 Subject: [PATCH] create api_client for unified way of calling apis (#118) --- Cargo.lock | 2 + cli/Cargo.toml | 9 ++ cli/src/api_client/call_api.rs | 220 ++++++++++++++++++++++++++ cli/src/api_client/mod.rs | 276 +++++++++++++++++++++++++++++++++ cli/src/clients.rs | 187 ---------------------- cli/src/constants.rs | 1 + cli/src/lib.rs | 2 +- cli/src/main.rs | 176 ++++++++------------- cli/src/runner.rs | 42 ++--- cli/src/utils.rs | 20 --- 10 files changed, 593 insertions(+), 342 deletions(-) create mode 100644 cli/src/api_client/call_api.rs create mode 100644 cli/src/api_client/mod.rs delete mode 100644 cli/src/clients.rs diff --git a/Cargo.lock b/Cargo.lock index 8f420d11..5001b8f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3059,6 +3059,8 @@ dependencies = [ "env_logger", "exitcode", "glob", + "http", + "lazy_static", "log", "openssl", "quick-junit", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 789c3976..e8ef93a3 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -20,6 +20,7 @@ context = { path = "../context" } env_logger = { version = "0.11.0", default-features = false } log = "0.4.14" exitcode = "1.1.1" +http = "1.1.0" tokio = { version = "*", default-features = false, features = [ "rt-multi-thread", "macros", @@ -44,6 +45,14 @@ openssl = { version = "0.10.66", features = ["vendored"] } uuid = { version = "1.10.0", features = ["v5"] } quick-junit = "0.5.0" +[dev-dependencies] +lazy_static = "1.5.0" +tokio = { version = "*", default-features = false, features = [ + "rt-multi-thread", + "macros", + "test-util", +] } + [build-dependencies] vergen = { version = "8.3.1", features = [ "build", diff --git a/cli/src/api_client/call_api.rs b/cli/src/api_client/call_api.rs new file mode 100644 index 00000000..5206457b --- /dev/null +++ b/cli/src/api_client/call_api.rs @@ -0,0 +1,220 @@ +use std::time::Duration; + +use tokio::time::{self, Instant}; +use tokio_retry::{strategy::ExponentialBackoff, Action, Retry}; + +// Tokio-retry uses base ^ retry * factor formula. +// This will give us 8ms, 64ms, 512ms, 4096ms, 32768ms +const RETRY_BASE_MS: u64 = 8; +const RETRY_FACTOR: u64 = 1; +const RETRY_COUNT: usize = 5; + +const CHECK_PROGRESS_INTERVAL_SECS: u64 = 2; +const REPORT_SLOW_PROGRESS_TIMEOUT_SECS: u64 = enforce_increment_check_progress_interval_secs(10); + +const fn enforce_increment_check_progress_interval_secs( + report_slow_progress_timeout_secs: u64, +) -> u64 { + if report_slow_progress_timeout_secs % CHECK_PROGRESS_INTERVAL_SECS == 0 { + return report_slow_progress_timeout_secs; + } + // NOTE: This is a build time error due to `const fn` + panic!("`report_slow_progress_timeout_secs` must be an increment of `CHECK_PROGRESS_INTERVAL_SECS`") +} + +fn default_delay() -> std::iter::Take { + ExponentialBackoff::from_millis(RETRY_BASE_MS) + .factor(RETRY_FACTOR) + .take(RETRY_COUNT) +} + +pub struct CallApi +where + A: Action, + L: (FnOnce(Duration, usize) -> String) + Copy + Send + 'static, + R: (FnOnce(Duration) -> String) + Copy + Send + 'static, +{ + pub action: A, + pub log_progress_message: L, + pub report_slow_progress_message: R, +} + +impl CallApi +where + A: Action, + L: (FnOnce(Duration, usize) -> String) + Copy + Send + 'static, + R: (FnOnce(Duration) -> String) + Copy + Send + 'static, +{ + pub async fn call_api(&mut self) -> Result { + let report_slow_progress_start = time::Instant::now(); + let report_slow_progress_message = self.report_slow_progress_message; + let report_slow_progress_handle = tokio::spawn(async move { + let duration = Duration::from_secs(REPORT_SLOW_PROGRESS_TIMEOUT_SECS); + time::sleep(duration).await; + let time_elapsed = Instant::now().duration_since(report_slow_progress_start); + sentry::capture_message( + report_slow_progress_message(time_elapsed).as_ref(), + sentry::Level::Error, + ); + }); + + let check_progress_start = time::Instant::now(); + let log_progress_message = self.log_progress_message; + let check_progress_handle = tokio::spawn(async move { + let mut log_count = 0; + let duration = Duration::from_secs(CHECK_PROGRESS_INTERVAL_SECS); + let mut interval = time::interval_at(Instant::now() + duration, duration); + + loop { + let instant = interval.tick().await; + let time_elapsed = instant.duration_since(check_progress_start); + let log_message = log_progress_message(time_elapsed, log_count); + log::info!("{}", log_message); + log_count += 1; + } + }); + + let result = Retry::spawn(default_delay(), || (&mut self.action).run()).await; + report_slow_progress_handle.abort(); + check_progress_handle.abort(); + + result + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::Duration, + }; + + use lazy_static::lazy_static; + use tokio::time; + + use super::{ + CallApi, CHECK_PROGRESS_INTERVAL_SECS, REPORT_SLOW_PROGRESS_TIMEOUT_SECS, RETRY_COUNT, + }; + + #[tokio::test(start_paused = true)] + async fn logs_progress_and_reports_slow_progress() { + lazy_static! { + static ref LOG_PROGRESS_TIME_ELAPSED_AND_LOG_COUNT: Arc>> = + Arc::new(Mutex::new(vec![])); + static ref REPORT_SLOW_PROGRESS_TIME_ELAPSED: Arc>> = + Arc::new(Mutex::new(vec![])); + } + + const DURATION: u64 = 20; + + CallApi { + action: || async { + time::sleep(Duration::from_secs(DURATION)).await; + Result::<(), ()>::Ok(()) + }, + log_progress_message: |time_elapsed, log_count| { + LOG_PROGRESS_TIME_ELAPSED_AND_LOG_COUNT + .lock() + .unwrap() + .push((time_elapsed, log_count)); + String::new() + }, + report_slow_progress_message: |time_elapsed| { + REPORT_SLOW_PROGRESS_TIME_ELAPSED + .lock() + .unwrap() + .push(time_elapsed); + String::new() + }, + } + .call_api() + .await + .unwrap(); + + assert_eq!( + LOG_PROGRESS_TIME_ELAPSED_AND_LOG_COUNT + .lock() + .unwrap() + .iter() + .map(|(ts, count)| (ts.as_secs(), *count)) + .collect::>(), + (0..(DURATION / CHECK_PROGRESS_INTERVAL_SECS).saturating_sub(1)) + .map(|i| ((i + 1) * CHECK_PROGRESS_INTERVAL_SECS, i as usize)) + .collect::>() + ); + assert_eq!( + *REPORT_SLOW_PROGRESS_TIME_ELAPSED + .lock() + .unwrap() + .iter() + .map(|ts| ts.as_secs()) + .collect::>(), + vec![REPORT_SLOW_PROGRESS_TIMEOUT_SECS] + ); + } + + #[tokio::test(start_paused = true)] + async fn does_not_log_after_action_completes() { + lazy_static! { + static ref LOG_PROGRESS_TIME_ELAPSED_AND_LOG_COUNT: Arc>> = + Arc::new(Mutex::new(vec![])); + static ref REPORT_SLOW_PROGRESS_TIME_ELAPSED: Arc>> = + Arc::new(Mutex::new(vec![])); + } + + CallApi { + action: || async { + time::sleep(Duration::from_secs(CHECK_PROGRESS_INTERVAL_SECS - 1)).await; + Result::<(), ()>::Ok(()) + }, + log_progress_message: |time_elapsed, log_count| { + LOG_PROGRESS_TIME_ELAPSED_AND_LOG_COUNT + .lock() + .unwrap() + .push((time_elapsed, log_count)); + String::new() + }, + report_slow_progress_message: |time_elapsed| { + REPORT_SLOW_PROGRESS_TIME_ELAPSED + .lock() + .unwrap() + .push(time_elapsed); + String::new() + }, + } + .call_api() + .await + .unwrap(); + + assert_eq!( + *LOG_PROGRESS_TIME_ELAPSED_AND_LOG_COUNT.lock().unwrap(), + Vec::new() + ); + assert_eq!( + *REPORT_SLOW_PROGRESS_TIME_ELAPSED.lock().unwrap(), + Vec::::new() + ); + } + + #[tokio::test(start_paused = true)] + async fn retries() { + let retry_count = AtomicUsize::new(0); + + let _ = CallApi { + action: || async { + time::sleep(Duration::from_secs(CHECK_PROGRESS_INTERVAL_SECS - 1)).await; + retry_count.fetch_add(1, Ordering::Relaxed); + Result::<(), ()>::Err(()) + }, + log_progress_message: |_, _| String::new(), + report_slow_progress_message: |_| String::new(), + } + .call_api() + .await; + + assert_eq!(retry_count.into_inner(), RETRY_COUNT + 1); + } +} diff --git a/cli/src/api_client/mod.rs b/cli/src/api_client/mod.rs new file mode 100644 index 00000000..ffc00268 --- /dev/null +++ b/cli/src/api_client/mod.rs @@ -0,0 +1,276 @@ +use std::path::Path; + +use anyhow::Context; +use api; +use call_api::CallApi; +use http::{header::HeaderMap, HeaderValue}; +use reqwest::{header, Client, Response, StatusCode}; +use tokio::fs; + +use crate::constants::{DEFAULT_ORIGIN, TRUNK_PUBLIC_API_ADDRESS_ENV}; + +mod call_api; + +pub struct ApiClient { + host: String, + s3_client: Client, + trunk_client: Client, +} + +impl ApiClient { + const TRUNK_API_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + const TRUNK_API_TOKEN_HEADER: &str = "x-api-token"; + + pub fn new(api_token: String) -> anyhow::Result { + let trimmed_token = api_token.trim(); + if trimmed_token.is_empty() { + return Err(anyhow::anyhow!("Trunk API token is required.")); + } + let api_token_header_value = HeaderValue::from_str(&api_token) + .map_err(|_| anyhow::Error::msg("Trunk API token is not ASCII"))?; + + let host = std::env::var(TRUNK_PUBLIC_API_ADDRESS_ENV) + .ok() + .and_then(|s| if s.is_empty() { None } else { Some(s) }) + .unwrap_or_else(|| DEFAULT_ORIGIN.to_string()); + + let mut trunk_client_default_headers = HeaderMap::new(); + trunk_client_default_headers.append( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + trunk_client_default_headers.append(Self::TRUNK_API_TOKEN_HEADER, api_token_header_value); + + let trunk_client = Client::builder() + .timeout(Self::TRUNK_API_TIMEOUT) + .default_headers(trunk_client_default_headers) + .build()?; + + let mut s3_client_default_headers = HeaderMap::new(); + s3_client_default_headers.append( + header::CONTENT_TYPE, + HeaderValue::from_static("application/octet-stream"), + ); + let s3_client = Client::builder() + .default_headers(s3_client_default_headers) + .build()?; + + Ok(Self { + host, + s3_client, + trunk_client, + }) + } + + pub async fn create_trunk_repo(&self, request: &api::CreateRepoRequest) -> anyhow::Result<()> { + CallApi { + action: || async { + let response = self + .trunk_client + .post(format!("{}/v1/repo/create", self.host)) + .json(&request) + .send() + .await?; + + status_code_help( + &response, + CheckUnauthorized::Check, + CheckNotFound::DoNotCheck, + |_| format!("Failed to create repo."), + ) + }, + log_progress_message: |time_elapsed, _| { + format!("Communicating with Trunk services is taking longer than expected. It has taken {} seconds so far.", time_elapsed.as_secs()) + }, + report_slow_progress_message: |time_elapsed| { + format!("Creating a Trunk repo is taking longer than {} seconds", time_elapsed.as_secs()) + }, + } + .call_api() + .await + } + + pub async fn create_bundle_upload_intent( + &self, + request: &api::CreateBundleUploadRequest, + ) -> anyhow::Result { + CallApi { + action: || async { + let response = self + .trunk_client + .post(format!("{}/v1/metrics/createBundleUpload", self.host)) + .json(&request) + .send() + .await?; + + status_code_help( + &response, + CheckUnauthorized::Check, + CheckNotFound::Check, + |_| String::from("Failed to create bundle upload."), + )?; + + response + .json::() + .await + .context("Failed to get response body as json.") + }, + log_progress_message: |time_elapsed, _| { + format!("Reporting bundle upload initiation to Trunk services is taking longer than expected. It has taken {} seconds so far.", time_elapsed.as_secs()) + }, + report_slow_progress_message: |time_elapsed| { + format!("Creating a Trunk upload intent is taking longer than {} seconds", time_elapsed.as_secs()) + }, + } + .call_api() + .await + } + + pub async fn get_quarantining_config( + &self, + request: &api::GetQuarantineBulkTestStatusRequest, + ) -> anyhow::Result { + CallApi { + action: || async { + let response = self + .trunk_client + .post(format!("{}/v1/metrics/getQuarantineConfig", self.host)) + .json(&request) + .send() + .await?; + + status_code_help( + &response, + CheckUnauthorized::Check, + CheckNotFound::Check, + |_| String::from("Failed to get quarantine bulk test."), + )?; + + response + .json::() + .await + .context("Failed to get response body as json.") + }, + log_progress_message: |time_elapsed, _| { + format!("Getting quarantine configuration from Trunk services is taking longer than expected. It has taken {} seconds so far.", time_elapsed.as_secs()) + }, + report_slow_progress_message: |time_elapsed| { + format!("Getting a Trunk quarantine configuration is taking longer than {} seconds", time_elapsed.as_secs()) + }, + } + .call_api() + .await + } + + pub async fn put_bundle_to_s3, B: AsRef>( + &self, + url: U, + bundle_path: B, + ) -> anyhow::Result<()> { + CallApi { + action: || async { + let file = fs::File::open(bundle_path.as_ref()).await?; + let file_size = file.metadata().await?.len(); + + let response = self + .s3_client + .put(url.as_ref()) + .header(header::CONTENT_LENGTH, file_size) + .body(file) + .send() + .await?; + + status_code_help( + &response, + CheckUnauthorized::DoNotCheck, + CheckNotFound::DoNotCheck, + |_| String::from("Failed to upload bundle to S3."), + ) + }, + log_progress_message: |time_elapsed, _| { + format!("Uploading bundle to S3 is taking longer than expected. It has taken {} seconds so far.", time_elapsed.as_secs()) + }, + report_slow_progress_message: |time_elapsed| { + format!("Uploading bundle to S3 is taking longer than {} seconds", time_elapsed.as_secs()) + }, + } + .call_api() + .await + } + + pub async fn update_bundle_upload_status( + &self, + request: &api::UpdateBundleUploadRequest, + ) -> anyhow::Result<()> { + CallApi { + action: || async { + let response = self + .trunk_client + .patch(format!("{}/v1/metrics/updateBundleUpload", self.host)) + .json(request) + .send() + .await?; + + status_code_help( + &response, + CheckUnauthorized::Check, + CheckNotFound::Check, + |_| { + format!( + "Failed to update bundle upload status to {:#?}", + request.upload_status + ) + }, + ) + }, + log_progress_message: |time_elapsed, _| { + format!("Communicating with Trunk services is taking longer than expected. It has taken {} seconds so far.", time_elapsed.as_secs()) + }, + report_slow_progress_message: |time_elapsed| { + format!("Updating a bundle upload status is taking longer than {} seconds", time_elapsed.as_secs()) + }, + } + .call_api() + .await + } +} + +#[derive(Debug, Clone, Copy)] +enum CheckUnauthorized { + Check, + DoNotCheck, +} + +#[derive(Debug, Clone, Copy)] +enum CheckNotFound { + Check, + DoNotCheck, +} + +fn status_code_help String>( + response: &Response, + check_unauthorized: CheckUnauthorized, + check_not_found: CheckNotFound, + mut create_error_message: T, +) -> anyhow::Result<()> { + if !response.status().is_client_error() { + return Ok(()); + } + + let error_message = match (response.status(), check_unauthorized, check_not_found) { + (StatusCode::UNAUTHORIZED, CheckUnauthorized::Check, _) => concat!( + "Your Trunk token may be incorrect - find it on the Trunk app ", + "(Settings -> Manage Organization -> Organization API Token -> View).", + ), + (StatusCode::NOT_FOUND, _, CheckNotFound::Check) => concat!( + "Your Trunk organization URL slug may be incorrect - find it on the Trunk app ", + "(Settings -> Manage Organization -> Organization Slug).", + ), + _ => &create_error_message(response), + }; + + let error_message_with_help = + format!("{error_message}\n\nFor more help, contact us at https://slack.trunk.io/"); + + Err(anyhow::Error::msg(error_message_with_help)) +} diff --git a/cli/src/clients.rs b/cli/src/clients.rs deleted file mode 100644 index 8fba987f..00000000 --- a/cli/src/clients.rs +++ /dev/null @@ -1,187 +0,0 @@ -use std::{format, path::PathBuf}; - -use anyhow::Context; -use api::{ - BundleUploadStatus, CreateBundleUploadRequest, CreateBundleUploadResponse, CreateRepoRequest, - GetQuarantineBulkTestStatusRequest, QuarantineConfig, UpdateBundleUploadRequest, -}; -use context::repo::RepoUrlParts as Repo; - -use crate::utils::status_code_help; - -pub const TRUNK_API_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); -pub const TRUNK_API_TOKEN_HEADER: &str = "x-api-token"; - -pub async fn create_trunk_repo( - origin: &str, - api_token: &str, - org_slug: &str, - repo: &Repo, - remote_urls: &[String], -) -> anyhow::Result<()> { - let client = reqwest::Client::new(); - let resp = match client - .post(format!("{}/v1/repo/create", origin)) - .timeout(TRUNK_API_TIMEOUT) - .header(reqwest::header::CONTENT_TYPE, "application/json") - .header(TRUNK_API_TOKEN_HEADER, api_token) - .json(&CreateRepoRequest { - org_url_slug: org_slug.to_owned(), - repo: repo.clone(), - remote_urls: remote_urls.to_vec(), - }) - .send() - .await - { - Ok(resp) => resp, - Err(e) => return Err(anyhow::anyhow!(e).context("Failed to validate trunk repo")), - }; - - if resp.status().is_client_error() { - return Err(anyhow::anyhow!( - "Organization not found. Please double check the provided organization token and url slug: {}", - org_slug - ) - .context("Failed to validate trunk repo")); - } - - Ok(()) -} - -pub async fn update_bundle_upload_status( - origin: &str, - api_token: &str, - id: &str, - upload_status: &BundleUploadStatus, -) -> anyhow::Result<()> { - let client = reqwest::Client::new(); - let resp = client - .patch(format!("{}/v1/metrics/updateBundleUpload", origin)) - .timeout(TRUNK_API_TIMEOUT) - .header(reqwest::header::CONTENT_TYPE, "application/json") - .header(TRUNK_API_TOKEN_HEADER, api_token) - .json(&UpdateBundleUploadRequest { - id: id.to_owned(), - upload_status: upload_status.to_owned(), - }) - .send() - .await - .map_err(|e| anyhow::anyhow!(e).context("Failed to update bundle upload status"))?; - - if resp.status() != reqwest::StatusCode::OK { - return Err( - anyhow::anyhow!("{}: {}", resp.status(), status_code_help(resp.status())) - .context("Failed to update bundle upload status"), - ); - } - - Ok(()) -} - -pub async fn create_bundle_upload_intent( - origin: &str, - api_token: &str, - org_slug: &str, - repo: &Repo, - client_version: &str, -) -> anyhow::Result { - let client = reqwest::Client::new(); - let resp = match client - .post(format!("{}/v1/metrics/createBundleUpload", origin)) - .timeout(TRUNK_API_TIMEOUT) - .header(reqwest::header::CONTENT_TYPE, "application/json") - .header(TRUNK_API_TOKEN_HEADER, api_token) - .json(&CreateBundleUploadRequest { - org_url_slug: org_slug.to_owned(), - repo: repo.clone(), - client_version: client_version.to_owned(), - }) - .send() - .await - { - Ok(resp) => resp, - Err(e) => return Err(anyhow::anyhow!(e).context("Failed to create bundle upload")), - }; - - if resp.status() != reqwest::StatusCode::OK { - return Err( - anyhow::anyhow!("{}: {}", resp.status(), status_code_help(resp.status())) - .context("Failed to create bundle upload"), - ); - } - - resp.json::() - .await - .context("Failed to get response body as json") -} - -pub async fn get_quarantining_config( - origin: &str, - api_token: &str, - org_slug: &str, - repo: &Repo, -) -> anyhow::Result { - let client = reqwest::Client::new(); - let resp = match client - .post(format!("{}/v1/metrics/getQuarantineConfig", origin)) - .timeout(TRUNK_API_TIMEOUT) - .header(reqwest::header::CONTENT_TYPE, "application/json") - .header(TRUNK_API_TOKEN_HEADER, api_token) - .json(&GetQuarantineBulkTestStatusRequest { - org_url_slug: org_slug.to_owned(), - repo: repo.clone(), - }) - .send() - .await - { - Ok(resp) => resp, - Err(e) => return Err(anyhow::anyhow!(e).context("Failed to get quarantine bulk test")), - }; - - if resp.status() != reqwest::StatusCode::OK { - return Err( - anyhow::anyhow!("{}: {}", resp.status(), status_code_help(resp.status())) - .context("Failed to get quarantine bulk test"), - ); - } - - resp.json::() - .await - .context("Failed to get response body as json") -} - -/// Puts file to S3 using pre-signed link. -/// -pub async fn put_bundle_to_s3(url: &str, bundle_path: &PathBuf) -> anyhow::Result<()> { - let file_size = bundle_path.metadata()?.len(); - let file = tokio::fs::File::open(bundle_path).await?; - let client = reqwest::Client::new(); - let resp = match client - .put(url) - .header(reqwest::header::CONTENT_TYPE, "application/octet-stream") - .header(reqwest::header::CONTENT_LENGTH, file_size) - .body(reqwest::Body::from(file)) - .send() - .await - { - Ok(resp) => resp, - Err(e) => { - log::error!("Failed to upload bundle to S3. Status: {:?}", e.status()); - return Err(anyhow::anyhow!( - "Failed to upload bundle to S3. Error: {}", - e - )); - } - }; - - if !resp.status().is_success() { - log::error!("Failed to upload bundle to S3. Code: {:?}", resp.status()); - return Err(anyhow::anyhow!( - "Failed to upload bundle to S3. Code={}: {}", - resp.status(), - resp.text().await? - )); - } - - Ok(()) -} diff --git a/cli/src/constants.rs b/cli/src/constants.rs index 183a426c..d6aafec0 100644 --- a/cli/src/constants.rs +++ b/cli/src/constants.rs @@ -18,6 +18,7 @@ pub const EXIT_FAILURE: i32 = 1; // .bitbucket pub const CODEOWNERS_LOCATIONS: &[&str] = &[".github", ".bitbucket", ".", "docs", ".gitlab"]; +pub const DEFAULT_ORIGIN: &str = "https://api.trunk.io"; pub const TRUNK_PUBLIC_API_ADDRESS_ENV: &str = "TRUNK_PUBLIC_API_ADDRESS"; pub const ENVS_TO_GET: &[&str] = &[ "CI", diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 56dec4cc..10597fc9 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -1,5 +1,5 @@ +pub mod api_client; pub mod bundler; -pub mod clients; pub mod codeowners; pub mod constants; pub mod runner; diff --git a/cli/src/main.rs b/cli/src/main.rs index cc12b814..442e5afd 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,30 +1,22 @@ -use std::env; use std::io::Write; -use std::time::{SystemTime, UNIX_EPOCH}; -#[cfg(target_os = "macos")] -use xcresult::XCResult; +use std::time::SystemTime; +use std::{env, time::UNIX_EPOCH}; use api::BundleUploadStatus; use clap::{Args, Parser, Subcommand}; use context::repo::BundleRepo; -use tokio_retry::strategy::ExponentialBackoff; -use tokio_retry::Retry; -use trunk_analytics_cli::bundler::BundlerUtil; -use trunk_analytics_cli::clients::{ - create_bundle_upload_intent, create_trunk_repo, put_bundle_to_s3, update_bundle_upload_status, -}; -use trunk_analytics_cli::codeowners::CodeOwners; -use trunk_analytics_cli::constants::{ - EXIT_FAILURE, EXIT_SUCCESS, SENTRY_DSN, TRUNK_PUBLIC_API_ADDRESS_ENV, -}; -use trunk_analytics_cli::runner::{ - build_filesets, extract_failed_tests, run_quarantine, run_test_command, -}; -use trunk_analytics_cli::scanner::EnvScanner; -use trunk_analytics_cli::types::{ - BundleMeta, QuarantineBulkTestStatus, QuarantineRunResult, RunResult, META_VERSION, +use trunk_analytics_cli::{ + api_client::ApiClient, + bundler::BundlerUtil, + codeowners::CodeOwners, + constants::{EXIT_FAILURE, EXIT_SUCCESS, SENTRY_DSN}, + runner::{build_filesets, extract_failed_tests, run_quarantine, run_test_command}, + scanner::EnvScanner, + types::{BundleMeta, QuarantineBulkTestStatus, QuarantineRunResult, RunResult, META_VERSION}, + utils::parse_custom_tags, }; -use trunk_analytics_cli::utils::parse_custom_tags; +#[cfg(target_os = "macos")] +use xcresult::XCResult; #[derive(Debug, Parser)] #[command( @@ -115,13 +107,6 @@ enum Commands { Test(TestArgs), } -const DEFAULT_ORIGIN: &str = "https://api.trunk.io"; -// Tokio-retry uses base ^ retry * factor formula. -// This will give us 8ms, 64ms, 512ms, 4096ms, 32768ms -const RETRY_BASE_MS: u64 = 8; -const RETRY_FACTOR: u64 = 1; -const RETRY_COUNT: usize = 5; - // "the Sentry client must be initialized before starting an async runtime or spawning threads" // https://docs.sentry.io/platforms/rust/#async-main-function fn main() -> anyhow::Result<()> { @@ -223,7 +208,7 @@ async fn run_upload( repo_head_commit_epoch, )?; - let api_address = get_api_address(); + let api_client = ApiClient::new(token)?; let codeowners = codeowners.or_else(|| CodeOwners::find_file(&repo.repo_root, &codeowners_path)); @@ -235,10 +220,6 @@ async fn run_upload( env!("VERGEN_RUSTC_SEMVER") ); - if token.trim().is_empty() { - return Err(anyhow::anyhow!("Trunk API token is required.")); - } - let tags = parse_custom_tags(&tags)?; #[cfg(target_os = "macos")] let junit_temp_dir = tempfile::tempdir()?; @@ -255,7 +236,7 @@ async fn run_upload( return Err(anyhow::anyhow!("No JUnit files found to upload.")); } - let failures = extract_failed_tests(&repo, &org_url_slug, &file_sets).await?; + let failures = extract_failed_tests(&repo, &org_url_slug, &file_sets).await; // Run the quarantine step and update the exit code. let exit_code = if failures.is_empty() { @@ -266,15 +247,15 @@ async fn run_upload( let quarantine_run_results = if use_quarantining && quarantine_results.is_none() { Some( run_quarantine( - exit_code, + &api_client, + &api::GetQuarantineBulkTestStatusRequest { + repo: repo.repo.clone(), + org_url_slug: org_url_slug.clone(), + }, failures, - &api_address, - &token, - &org_url_slug, - &repo, - default_delay(), + exit_code, ) - .await?, + .await, ) } else { quarantine_results @@ -303,16 +284,13 @@ async fn run_upload( env!("VERGEN_RUSTC_SEMVER") ); let client_version = format!("trunk-analytics-cli {}", cli_version); - let upload = Retry::spawn(default_delay(), || { - create_bundle_upload_intent( - &api_address, - &token, - &org_url_slug, - &repo.repo, - &client_version, - ) - }) - .await?; + let upload = api_client + .create_bundle_upload_intent(&api::CreateBundleUploadRequest { + repo: repo.repo.clone(), + org_url_slug: org_url_slug.clone(), + client_version, + }) + .await?; let meta = BundleMeta { version: META_VERSION.to_string(), @@ -358,15 +336,14 @@ async fn run_upload( log::info!("Flushed temporary tarball to {:?}", bundle_time_file); if dry_run { - if let Err(e) = update_bundle_upload_status( - &api_address, - &token, - &upload.id, - &BundleUploadStatus::DryRun, - ) - .await + if let Err(e) = api_client + .update_bundle_upload_status(&api::UpdateBundleUploadRequest { + id: upload.id.clone(), + upload_status: BundleUploadStatus::DryRun, + }) + .await { - log::warn!("Failed to update bundle upload status: {}", e); + log::warn!("{}", e); } else { log::debug!("Updated bundle upload status to DRY_RUN"); } @@ -374,38 +351,32 @@ async fn run_upload( return Ok(exit_code); } - let upload_status = Retry::spawn(default_delay(), || { - put_bundle_to_s3(&upload.url, &bundle_time_file) - }) - .await - .map(|_| BundleUploadStatus::UploadComplete) - .unwrap_or_else(|e| { - log::error!("Failed to upload bundle to S3 after retries: {}", e); - BundleUploadStatus::UploadFailed - }); - if let Err(e) = - update_bundle_upload_status(&api_address, &token, &upload.id, &upload_status).await + api_client + .put_bundle_to_s3(&upload.url, &bundle_time_file) + .await?; + + if let Err(e) = api_client + .update_bundle_upload_status(&api::UpdateBundleUploadRequest { + id: upload.id.clone(), + upload_status: BundleUploadStatus::UploadComplete, + }) + .await { - log::warn!( - "Failed to update bundle upload status to {:#?}: {}", - upload_status, - e - ) + log::warn!("{}", e) } else { - log::debug!("Updated bundle upload status to {:#?}", upload_status) + log::debug!( + "Updated bundle upload status to {:#?}", + BundleUploadStatus::UploadComplete + ) } - let remote_urls = vec![repo.repo_url.clone()]; - Retry::spawn(default_delay(), || { - create_trunk_repo( - &api_address, - &token, - &org_url_slug, - &repo.repo, - &remote_urls, - ) - }) - .await?; + api_client + .create_trunk_repo(&api::CreateRepoRequest { + repo: repo.repo, + org_url_slug, + remote_urls: vec![repo.repo_url.clone()], + }) + .await?; log::info!("Done"); Ok(exit_code) @@ -443,14 +414,14 @@ async fn run_test(test_args: TestArgs) -> anyhow::Result { return Err(anyhow::anyhow!("No junit paths provided.")); } - let api_address = get_api_address(); + let api_client = ApiClient::new(String::from(token))?; let codeowners = CodeOwners::find_file(&repo.repo_root, codeowners_path); log::info!("running command: {:?}", command); let run_result = run_test_command( &repo, - org_url_slug, + &org_url_slug, command.first().unwrap(), command.iter().skip(1).collect(), junit_paths, @@ -473,15 +444,15 @@ async fn run_test(test_args: TestArgs) -> anyhow::Result { let quarantine_run_result = if *use_quarantining { Some( run_quarantine( - run_exit_code, + &api_client, + &api::GetQuarantineBulkTestStatusRequest { + repo: repo.repo, + org_url_slug: org_url_slug.clone(), + }, failures, - &api_address, - token, - org_url_slug, - &repo, - default_delay(), + run_exit_code, ) - .await?, + .await, ) } else { None @@ -518,12 +489,6 @@ async fn run(cli: Cli) -> anyhow::Result { } } -fn default_delay() -> std::iter::Take { - ExponentialBackoff::from_millis(RETRY_BASE_MS) - .factor(RETRY_FACTOR) - .take(RETRY_COUNT) -} - fn setup_logger() -> anyhow::Result<()> { let mut builder = env_logger::Builder::new(); builder @@ -543,10 +508,3 @@ fn setup_logger() -> anyhow::Result<()> { builder.init(); Ok(()) } - -fn get_api_address() -> String { - std::env::var(TRUNK_PUBLIC_API_ADDRESS_ENV) - .ok() - .and_then(|s| if s.is_empty() { None } else { Some(s) }) - .unwrap_or_else(|| DEFAULT_ORIGIN.to_string()) -} diff --git a/cli/src/runner.rs b/cli/src/runner.rs index fbb8e4a1..2da5f8d4 100644 --- a/cli/src/runner.rs +++ b/cli/src/runner.rs @@ -2,13 +2,10 @@ use quick_junit::TestCaseStatus; use std::process::{Command, Stdio}; use std::time::SystemTime; -use api::QuarantineConfig; -use context::junit::parser::JunitParser; -use context::repo::BundleRepo; -use tokio_retry::strategy::ExponentialBackoff; -use tokio_retry::Retry; +use api; +use context::{junit::parser::JunitParser, repo::BundleRepo}; -use crate::clients::get_quarantining_config; +use crate::api_client::ApiClient; use crate::codeowners::CodeOwners; use crate::constants::{EXIT_FAILURE, EXIT_SUCCESS}; use crate::scanner::{FileSet, FileSetCounter}; @@ -28,7 +25,7 @@ pub async fn run_test_command( log::info!("Command exit code: {}", exit_code); let (file_sets, ..) = build_filesets(repo, output_paths, team, codeowners, Some(start))?; let failures = if exit_code != EXIT_SUCCESS { - extract_failed_tests(repo, org_slug, &file_sets).await? + extract_failed_tests(repo, org_slug, &file_sets).await } else { Vec::new() }; @@ -114,7 +111,7 @@ pub async fn extract_failed_tests( repo: &BundleRepo, org_slug: &str, file_sets: &[FileSet], -) -> anyhow::Result> { +) -> Vec { let mut failures = Vec::::new(); for file_set in file_sets { for file in &file_set.files { @@ -162,32 +159,27 @@ pub async fn extract_failed_tests( } } } - Ok(failures) + failures } pub async fn run_quarantine( - exit_code: i32, + api_client: &ApiClient, + request: &api::GetQuarantineBulkTestStatusRequest, failures: Vec, - api_address: &str, - token: &str, - org_url_slug: &str, - repo: &BundleRepo, - delay: std::iter::Take, -) -> anyhow::Result { - let quarantine_config: QuarantineConfig = if !failures.is_empty() { + exit_code: i32, +) -> QuarantineRunResult { + let quarantine_config: api::QuarantineConfig = if !failures.is_empty() { log::info!("Quarantining failed tests"); - let result = Retry::spawn(delay, || { - get_quarantining_config(api_address, token, org_url_slug, &repo.repo) - }) - .await; + let result = api_client.get_quarantining_config(request).await; if let Err(ref err) = result { - log::error!("Failed to get quarantine results: {:?}", err); + log::error!("{}", err); } + result.unwrap_or_default() } else { log::debug!("No failed tests to quarantine"); - QuarantineConfig::default() + api::QuarantineConfig::default() }; // quarantine the failed tests @@ -232,8 +224,8 @@ pub async fn run_quarantine( exit_code }; - Ok(QuarantineRunResult { + QuarantineRunResult { exit_code, quarantine_status: quarantine_results, - }) + } } diff --git a/cli/src/utils.rs b/cli/src/utils.rs index 30467d70..a99dad3b 100644 --- a/cli/src/utils.rs +++ b/cli/src/utils.rs @@ -3,26 +3,6 @@ use crate::types::CustomTag; pub const MAX_KEY_LEN: usize = 32; pub const MAX_VAL_LEN: usize = 1024 * 8; -pub fn status_code_help(status: reqwest::StatusCode) -> String { - match status { - reqwest::StatusCode::UNAUTHORIZED => { - "Your Trunk token may be incorrect - \ - find it on the Trunk app (Settings -> \ - Manage Organization -> Organization \ - API Token -> View)." - } - reqwest::StatusCode::NOT_FOUND => { - "Your Trunk organization URL \ - slug may be incorrect - find \ - it on the Trunk app (Settings \ - -> Manage Organization -> \ - Organization Slug)." - } - _ => "For more help, contact us at https://slack.trunk.io/", - } - .to_string() -} - pub fn parse_custom_tags(tags: &[String]) -> anyhow::Result> { let parsed = tags.iter() .filter(|tag_str| !tag_str.trim().is_empty())