diff --git a/momento-cli-opts/src/lib.rs b/momento-cli-opts/src/lib.rs index 8c42b3a..aec8741 100644 --- a/momento-cli-opts/src/lib.rs +++ b/momento-cli-opts/src/lib.rs @@ -246,6 +246,16 @@ to help find opportunities for optimizations with Momento. default_value = "10" )] metric_collection_rate: u32, + #[arg( + long = "start-date", + help = "The UTC start date of the metric collection period. Will use (end-date - 30 days) if not provided. (YYYY-MM-DD)", + )] + metric_start_date: Option, + #[arg( + long = "end-date", + help = "The UTC end date of the metric collection period. Will use the current date if not provided. (YYYY-MM-DD)", + )] + metric_end_date: Option, }, } diff --git a/momento/src/commands/cloud_linter/api_gateway.rs b/momento/src/commands/cloud_linter/api_gateway.rs index 4931581..89e3ea3 100644 --- a/momento/src/commands/cloud_linter/api_gateway.rs +++ b/momento/src/commands/cloud_linter/api_gateway.rs @@ -74,6 +74,8 @@ pub(crate) async fn process_api_gateway_resources( control_plane_limiter: Arc, metrics_limiter: Arc, sender: Sender, + metrics_start_millis: i64, + metrics_end_millis: i64, ) -> Result<(), CliError> { let region = config.region().map(|r| r.as_ref()).ok_or(CliError { msg: "No region configured for client".to_string(), @@ -103,11 +105,13 @@ pub(crate) async fn process_api_gateway_resources( list_apis_bar.finish(); process_apis( apig_client.clone(), - &apis, - region, - sender, &metrics_client, &metrics_limiter, + region, + metrics_start_millis, + metrics_end_millis, + &apis, + sender, ) .await?; @@ -116,11 +120,13 @@ pub(crate) async fn process_api_gateway_resources( async fn process_apis( apig_client: aws_sdk_apigateway::Client, - apis: &[RestApi], - region: &str, - sender: Sender, metrics_client: &aws_sdk_cloudwatch::Client, metrics_limiter: &Arc, + region: &str, + metrics_start_millis: i64, + metrics_end_millis: i64, + apis: &[RestApi], + sender: Sender, ) -> Result<(), CliError> { let mut resources: Vec = Vec::with_capacity(apis.len()); let get_apis_bar = @@ -155,7 +161,12 @@ async fn process_apis( match resource { Resource::ApiGateway(mut apig_resource) => { apig_resource - .append_metrics(metrics_client, Arc::clone(metrics_limiter)) + .append_metrics( + metrics_client, + Arc::clone(metrics_limiter), + metrics_start_millis, + metrics_end_millis, + ) .await?; sender .send(Resource::ApiGateway(apig_resource)) diff --git a/momento/src/commands/cloud_linter/dynamodb.rs b/momento/src/commands/cloud_linter/dynamodb.rs index 5a6c923..d4e5d73 100644 --- a/momento/src/commands/cloud_linter/dynamodb.rs +++ b/momento/src/commands/cloud_linter/dynamodb.rs @@ -212,6 +212,8 @@ pub(crate) async fn process_ddb_resources( metrics_limiter: Arc, describe_ttl_limiter: Arc, sender: Sender, + metrics_start_millis: i64, + metrics_end_millis: i64, enable_ddb_ttl_check: bool, enable_gsi: bool, ) -> Result<(), CliError> { @@ -249,13 +251,15 @@ pub(crate) async fn process_ddb_resources( let res = process_table_resources( &ddb_client_clone, &metrics_client_clone, - &table_name_clone, control_plane_limiter_clone, metrics_limiter_clone, describe_ttl_limiter_clone, sender_clone, + metrics_start_millis, + metrics_end_millis, enable_ddb_ttl_check, enable_gsi, + &table_name_clone, ) .await; progress_bar_clone.inc(1); @@ -361,13 +365,15 @@ async fn is_ddb_ttl_enabled( async fn process_table_resources( ddb_client: &aws_sdk_dynamodb::Client, metrics_client: &aws_sdk_cloudwatch::Client, - table_name: &str, control_plane_limiter: Arc, metrics_limiter: Arc, describe_ttl_limiter: Arc, sender: Sender, + metrics_start_millis: i64, + metrics_end_millis: i64, enable_ddb_ttl_check: bool, enable_gsi: bool, + table_name: &str, ) -> Result<(), CliError> { let region = ddb_client .config() @@ -537,7 +543,12 @@ async fn process_table_resources( continue; } resource - .append_metrics(metrics_client, Arc::clone(&metrics_limiter)) + .append_metrics( + metrics_client, + Arc::clone(&metrics_limiter), + metrics_start_millis, + metrics_end_millis, + ) .await?; let ttl_enabled = match enable_ddb_ttl_check { true => { diff --git a/momento/src/commands/cloud_linter/elasticache.rs b/momento/src/commands/cloud_linter/elasticache.rs index 1efef1e..ccacc5f 100644 --- a/momento/src/commands/cloud_linter/elasticache.rs +++ b/momento/src/commands/cloud_linter/elasticache.rs @@ -111,6 +111,8 @@ pub(crate) async fn process_elasticache_resources( control_plane_limiter: Arc, metrics_limiter: Arc, sender: Sender, + metrics_start_millis: i64, + metrics_end_millis: i64, ) -> Result<(), CliError> { let region = config.region().map(|r| r.as_ref()).ok_or(CliError { msg: "No region configured for client".to_string(), @@ -123,8 +125,10 @@ pub(crate) async fn process_elasticache_resources( &metrics_client, control_plane_limiter, metrics_limiter, - region, sender, + region, + metrics_start_millis, + metrics_end_millis, ) .await?; @@ -136,8 +140,10 @@ async fn process_resources( metrics_client: &aws_sdk_cloudwatch::Client, control_plane_limiter: Arc, metrics_limiter: Arc, - region: &str, sender: Sender, + region: &str, + metrics_start_millis: i64, + metrics_end_millis: i64, ) -> Result<(), CliError> { let describe_bar = ProgressBar::new_spinner().with_message("Listing ElastiCache resources"); describe_bar.enable_steady_tick(Duration::from_millis(100)); @@ -165,7 +171,12 @@ async fn process_resources( futures.push(tokio::spawn(async move { resource - .append_metrics(&metrics_client_clone, metrics_limiter_clone) + .append_metrics( + &metrics_client_clone, + metrics_limiter_clone, + metrics_start_millis, + metrics_end_millis, + ) .await?; let wrapped_resource = Resource::ElastiCache(resource); diff --git a/momento/src/commands/cloud_linter/linter_cli.rs b/momento/src/commands/cloud_linter/linter_cli.rs index 86776f4..c5aef71 100644 --- a/momento/src/commands/cloud_linter/linter_cli.rs +++ b/momento/src/commands/cloud_linter/linter_cli.rs @@ -6,6 +6,7 @@ use std::time::Duration; use crate::commands::cloud_linter::api_gateway::process_api_gateway_resources; use aws_config::retry::RetryConfig; use aws_config::{BehaviorVersion, Region}; +use chrono::{NaiveDate, NaiveDateTime, Utc}; use flate2::write::GzEncoder; use flate2::Compression; use governor::{Quota, RateLimiter}; @@ -30,12 +31,16 @@ pub async fn run_cloud_linter( enable_gsi: bool, only_collect_for_resource: Option, metric_collection_rate: u32, + start_date: Option, + end_date: Option, ) -> Result<(), CliError> { let (tx, mut rx) = mpsc::channel::(32); let file_path = "linter_results.json"; // first we check to make sure we have perms to write files to the current directory check_output_is_writable(file_path).await?; + let (metric_start_time, metric_end_time) = get_metric_time_range(start_date, end_date)?; + // here we write the unzipped json file, containing all the linter results let unzipped_tokio_file = File::create(file_path).await?; let mut unzipped_file = unzipped_tokio_file.into_std().await; @@ -51,6 +56,8 @@ pub async fn run_cloud_linter( enable_gsi, only_collect_for_resource, metric_collection_rate, + metric_start_time, + metric_end_time, ) .await; }); @@ -87,6 +94,8 @@ async fn process_data( enable_gsi: bool, only_collect_for_resource: Option, metric_collection_rate: u32, + metrics_start_millis: i64, + metrics_end_millis: i64, ) -> Result<(), CliError> { let retry_config = RetryConfig::adaptive() .with_initial_backoff(Duration::from_millis(250)) @@ -122,6 +131,8 @@ async fn process_data( Arc::clone(&control_plane_limiter), Arc::clone(&metrics_limiter), sender.clone(), + metrics_start_millis, + metrics_end_millis, ) .await?; Ok(()) @@ -132,6 +143,8 @@ async fn process_data( Arc::clone(&control_plane_limiter), Arc::clone(&metrics_limiter), sender.clone(), + metrics_start_millis, + metrics_end_millis, ) .await?; Ok(()) @@ -143,6 +156,8 @@ async fn process_data( Arc::clone(&metrics_limiter), Arc::clone(&describe_ttl_limiter), sender.clone(), + metrics_start_millis, + metrics_end_millis, enable_ddb_ttl_check, enable_gsi, ) @@ -155,6 +170,8 @@ async fn process_data( Arc::clone(&control_plane_limiter), Arc::clone(&metrics_limiter), sender.clone(), + metrics_start_millis, + metrics_end_millis, ) .await?; @@ -163,6 +180,8 @@ async fn process_data( Arc::clone(&control_plane_limiter), Arc::clone(&metrics_limiter), sender.clone(), + metrics_start_millis, + metrics_end_millis, ) .await?; Ok(()) @@ -175,6 +194,8 @@ async fn process_data( Arc::clone(&control_plane_limiter), Arc::clone(&metrics_limiter), sender.clone(), + metrics_start_millis, + metrics_end_millis, ) .await?; @@ -183,6 +204,8 @@ async fn process_data( Arc::clone(&control_plane_limiter), Arc::clone(&metrics_limiter), sender.clone(), + metrics_start_millis, + metrics_end_millis, ) .await?; @@ -192,6 +215,8 @@ async fn process_data( Arc::clone(&metrics_limiter), Arc::clone(&describe_ttl_limiter), sender.clone(), + metrics_start_millis, + metrics_end_millis, enable_ddb_ttl_check, enable_gsi, ) @@ -202,6 +227,8 @@ async fn process_data( Arc::clone(&control_plane_limiter), Arc::clone(&metrics_limiter), sender.clone(), + metrics_start_millis, + metrics_end_millis, ) .await?; @@ -210,12 +237,68 @@ async fn process_data( Arc::clone(&control_plane_limiter), Arc::clone(&metrics_limiter), sender.clone(), + metrics_start_millis, + metrics_end_millis, ) .await?; Ok(()) } +/// Calculates a metric time range based on optional start and end dates. +/// If start_date is not provided, it defaults to end_date - 30 days. +/// If end_date is not provided, it defaults to now. +/// +/// # Arguments +/// +/// * `start_date` - An optional String representing the start date in "YYYY-MM-DD" format. +/// * `end_date` - An optional String representing the end date in "YYYY-MM-DD" format. +/// +/// # Returns +/// +/// A Result containing a tuple of the start and end timestamps in millis, or a CliError +/// if date parsing fails. +fn get_metric_time_range( + start_date: Option, + end_date: Option, +) -> Result<(i64, i64), CliError> { + let now = Utc::now(); + let thirty_days = chrono::Duration::days(30); + + let parse_date = |date_str: &str| -> Result { + let date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d")?; + date.and_hms_opt(0, 0, 0).ok_or_else(|| CliError { + msg: "invalid time".to_string(), + }) + }; + + match (start_date.as_deref(), end_date.as_deref()) { + (Some(s), Some(e)) => { + let start = parse_date(s)?; + let end = parse_date(e)?; + Ok((start.timestamp_millis(), end.timestamp_millis())) + } + (Some(s), None) => { + let start = parse_date(s)?; + Ok(( + start.timestamp_millis(), + now.timestamp_millis(), + )) + } + (None, Some(e)) => { + let end = parse_date(e)?; + Ok(( + (end - thirty_days).timestamp_millis(), + end.timestamp_millis(), + )) + } + (None, None) => Ok(( + (now - thirty_days).timestamp_millis(), + now.timestamp_millis(), + )), + } +} + async fn check_output_is_writable(file_path: &str) -> Result<(), CliError> { let path = Path::new(file_path); diff --git a/momento/src/commands/cloud_linter/metrics.rs b/momento/src/commands/cloud_linter/metrics.rs index 4049619..46042d9 100644 --- a/momento/src/commands/cloud_linter/metrics.rs +++ b/momento/src/commands/cloud_linter/metrics.rs @@ -5,7 +5,6 @@ use aws_sdk_cloudwatch::primitives::DateTime; use aws_sdk_cloudwatch::types::Metric as CloudwatchMetric; use aws_sdk_cloudwatch::types::{Dimension, MetricDataQuery, MetricStat}; use aws_sdk_cloudwatch::Client; -use chrono::{Duration, Utc}; use futures::stream::FuturesUnordered; use futures::StreamExt; use governor::DefaultDirectRateLimiter; @@ -42,32 +41,11 @@ pub(crate) trait AppendMetrics { &mut self, config: &Client, limiter: Arc, + start_millis: i64, + end_millis: i64, ) -> Result<(), CliError>; } -// impl AppendMetrics for T -// where -// T: ResourceWithMetrics, -// { -// async fn append_metrics( -// &mut self, -// metrics_client: &Client, -// limiter: Arc, -// ) -> Result<(), CliError> { -// let metric_targets = self.create_metric_targets()?; -// let mut metrics: Vec> = Vec::new(); -// for target in metric_targets { -// metrics.push( -// query_metrics_for_target(metrics_client, Arc::clone(&limiter), target).await?, -// ); -// } -// self.set_metrics(metrics.into_iter().flatten().collect()); -// self.set_metric_period_seconds(60 * 60 * 24); - -// Ok(()) -// } -// } - impl AppendMetrics for T where T: ResourceWithMetrics, @@ -76,6 +54,8 @@ where &mut self, metrics_client: &Client, limiter: Arc, + start_millis: i64, + end_millis: i64, ) -> Result<(), CliError> { let metric_targets = self.create_metric_targets()?; let mut metrics: Vec> = Vec::new(); @@ -85,7 +65,8 @@ where let client = metrics_client.clone(); let moved_limiter = Arc::clone(&limiter); let spawn = tokio::spawn(async move { - query_metrics_for_target(&client, moved_limiter, target).await + query_metrics_for_target(&client, moved_limiter, start_millis, end_millis, target) + .await }); futures.push(spawn); } @@ -113,6 +94,8 @@ where async fn query_metrics_for_target( client: &Client, limiter: Arc, + start_millis: i64, + end_millis: i64, metric_target: MetricTarget, ) -> Result, CliError> { let mut metric_results: Vec = Vec::new(); @@ -167,10 +150,8 @@ async fn query_metrics_for_target( let mut metric_stream = client .get_metric_data() - .start_time(DateTime::from_millis( - (Utc::now() - Duration::days(30)).timestamp_millis(), - )) - .end_time(DateTime::from_millis(Utc::now().timestamp_millis())) + .start_time(DateTime::from_millis(start_millis)) + .end_time(DateTime::from_millis(end_millis)) .set_metric_data_queries(Some(metric_data_queries)) .into_paginator() .send(); diff --git a/momento/src/commands/cloud_linter/s3.rs b/momento/src/commands/cloud_linter/s3.rs index c26de1e..c941440 100644 --- a/momento/src/commands/cloud_linter/s3.rs +++ b/momento/src/commands/cloud_linter/s3.rs @@ -129,6 +129,8 @@ pub(crate) async fn process_s3_resources( control_plane_limiter: Arc, metrics_limiter: Arc, sender: Sender, + metrics_start_millis: i64, + metrics_end_millis: i64, ) -> Result<(), CliError> { let region = config.region().map(|r| r.as_ref()).ok_or(CliError { msg: "No region configured for client".to_string(), @@ -146,12 +148,14 @@ pub(crate) async fn process_s3_resources( process_buckets( s3client.clone(), - bucket_names, - region, - sender.clone(), &metrics_client, metrics_limiter.clone(), control_plane_limiter.clone(), + sender.clone(), + region, + metrics_start_millis, + metrics_end_millis, + bucket_names, ) .await?; @@ -235,12 +239,14 @@ async fn try_get_bucket_metrics_filter( async fn process_buckets( s3client: aws_sdk_s3::Client, - buckets: Vec, - region: &str, - sender: Sender, metrics_client: &aws_sdk_cloudwatch::Client, metrics_limiter: Arc, control_plane_limiter: Arc, + sender: Sender, + region: &str, + metrics_start_millis: i64, + metrics_end_millis: i64, + buckets: Vec, ) -> Result<(), CliError> { let process_buckets_bar = ProgressBar::new(buckets.len() as u64).with_message("Processing S3 Buckets"); @@ -260,12 +266,14 @@ async fn process_buckets( let spawn = tokio::spawn(async move { let res = process_bucket( s3_client_clone, - bucket, - region_clone.as_str(), - sender_clone, &metrics_client_clone, metrics_limiter_clone, control_plane_limiter_clone, + sender_clone, + region_clone.as_str(), + metrics_start_millis, + metrics_end_millis, + bucket, ) .await; progress_bar_clone.inc(1); @@ -294,12 +302,14 @@ async fn process_buckets( async fn process_bucket( s3client: aws_sdk_s3::Client, - bucket: String, - region: &str, - sender: Sender, metrics_client: &aws_sdk_cloudwatch::Client, metrics_limiter: Arc, control_plane_limiter: Arc, + sender: Sender, + region: &str, + metrics_start_millis: i64, + metrics_end_millis: i64, + bucket: String, ) -> Result<(), CliError> { let filter_id = try_get_bucket_metrics_filter( s3client.clone(), @@ -321,7 +331,12 @@ async fn process_bucket( metadata, }; s3_resource - .append_metrics(metrics_client, Arc::clone(&metrics_limiter)) + .append_metrics( + metrics_client, + Arc::clone(&metrics_limiter), + metrics_start_millis, + metrics_end_millis, + ) .await?; sender .send(Resource::S3(s3_resource)) diff --git a/momento/src/commands/cloud_linter/serverless_elasticache.rs b/momento/src/commands/cloud_linter/serverless_elasticache.rs index 21e5db0..42b6c39 100644 --- a/momento/src/commands/cloud_linter/serverless_elasticache.rs +++ b/momento/src/commands/cloud_linter/serverless_elasticache.rs @@ -124,6 +124,8 @@ pub(crate) async fn process_serverless_elasticache_resources( control_plane_limiter: Arc, metrics_limiter: Arc, sender: Sender, + metrics_start_millis: i64, + metrics_end_millis: i64, ) -> Result<(), CliError> { let region = config.region().map(|r| r.as_ref()).ok_or(CliError { msg: "No region configured for client".to_string(), @@ -136,8 +138,10 @@ pub(crate) async fn process_serverless_elasticache_resources( &metrics_client, control_plane_limiter, metrics_limiter, - region, sender, + region, + metrics_start_millis, + metrics_end_millis, ) .await?; @@ -149,8 +153,10 @@ async fn process_resources( metrics_client: &aws_sdk_cloudwatch::Client, control_plane_limiter: Arc, metrics_limiter: Arc, - region: &str, sender: Sender, + region: &str, + metrics_start_millis: i64, + metrics_end_millis: i64, ) -> Result<(), CliError> { let describe_bar = ProgressBar::new_spinner().with_message("Listing Serverless ElastiCache resources"); @@ -178,7 +184,12 @@ async fn process_resources( futures.push(tokio::spawn(async move { resource - .append_metrics(&metrics_client_clone, metrics_limiter_clone) + .append_metrics( + &metrics_client_clone, + metrics_limiter_clone, + metrics_start_millis, + metrics_end_millis, + ) .await?; let wrapped_resource = Resource::ServerlessElastiCache(resource); diff --git a/momento/src/commands/cloud_linter/utils.rs b/momento/src/commands/cloud_linter/utils.rs index 882be03..cbab0e9 100644 --- a/momento/src/commands/cloud_linter/utils.rs +++ b/momento/src/commands/cloud_linter/utils.rs @@ -58,6 +58,14 @@ impl From for CliError { } } +impl From for CliError { + fn from(val: chrono::ParseError) -> Self { + CliError { + msg: format!("{val:?}"), + } + } +} + pub(crate) async fn check_aws_credentials(config: &SdkConfig) -> Result<(), CliError> { if let Some(credentials_provider) = config.credentials_provider() { let credentials = credentials_provider diff --git a/momento/src/main.rs b/momento/src/main.rs index 3b20cea..2f77f50 100644 --- a/momento/src/main.rs +++ b/momento/src/main.rs @@ -269,6 +269,8 @@ async fn run_momento_command(args: momento_cli_opts::Momento) -> Result<(), CliE resource, metric_collection_rate, enable_gsi, + metric_start_date, + metric_end_date, } => { commands::cloud_linter::linter_cli::run_cloud_linter( region, @@ -276,6 +278,8 @@ async fn run_momento_command(args: momento_cli_opts::Momento) -> Result<(), CliE enable_gsi, resource, metric_collection_rate, + metric_start_date, + metric_end_date, ) .await?; }