From 94edc7113d708d2fdd34caed0ea98953c05cc0e7 Mon Sep 17 00:00:00 2001 From: Nate Anderson Date: Tue, 10 Sep 2024 14:48:18 -0700 Subject: [PATCH] chore: switch metrics & elasticache calls to manual pagination Manually paginate the metrics and elasticache/serverless elasticache calls, since the rate limiting code does not work properly with the paginator stream AWS client code. --- .../src/commands/cloud_linter/elasticache.rs | 59 ++++++++----------- momento/src/commands/cloud_linter/metrics.rs | 39 ++++++------ .../cloud_linter/serverless_elasticache.rs | 55 ++++++++--------- 3 files changed, 69 insertions(+), 84 deletions(-) diff --git a/momento/src/commands/cloud_linter/elasticache.rs b/momento/src/commands/cloud_linter/elasticache.rs index 10f3679..ec81df4 100644 --- a/momento/src/commands/cloud_linter/elasticache.rs +++ b/momento/src/commands/cloud_linter/elasticache.rs @@ -217,45 +217,38 @@ async fn describe_clusters( control_plane_limiter: Arc, region: &str, ) -> Result, CliError> { - let mut resources = Vec::new(); - let mut elasticache_stream = elasticache_client - .describe_cache_clusters() - .show_cache_node_info(true) - .into_paginator() - .send(); - - while let Some(result) = rate_limit(Arc::clone(&control_plane_limiter), || { - elasticache_stream.next() - }) - .await - { - match result { - Ok(result) => { - if let Some(aws_clusters) = result.cache_clusters { - let mut chunks = Vec::new(); - for chunk in aws_clusters.chunks(10) { - chunks.push(chunk.to_owned()); - } - for clusters in chunks { - for cluster in clusters { - let cluster_resources = convert_to_resources(cluster, region).await?; - resources.extend(cluster_resources); - } - } - } - } - Err(err) => { - return Err(CliError { - msg: format!("Failed to describe cache clusters: {}", err), - }); + let mut clusters = Vec::new(); + let mut next_marker: Option = None; + loop { + let response = rate_limit(Arc::clone(&control_plane_limiter), || { + let mut req = elasticache_client + .describe_cache_clusters() + .show_cache_node_info(true); + if let Some(marker) = &next_marker { + req = req.marker(marker); } + req.send() + }) + .await?; + + if let Some(aws_clusters) = response.cache_clusters.as_ref() { + clusters.extend_from_slice(aws_clusters); + } + + next_marker = response.marker().map(String::from); + if next_marker.is_none() { + break; } } - Ok(resources) + clusters + .into_iter() + .map(|cluster| convert_to_resources(cluster, region)) + .collect::, _>>() + .map(|vec| vec.into_iter().flatten().collect()) } -async fn convert_to_resources( +fn convert_to_resources( cluster: CacheCluster, region: &str, ) -> Result, CliError> { diff --git a/momento/src/commands/cloud_linter/metrics.rs b/momento/src/commands/cloud_linter/metrics.rs index 46042d9..d95622e 100644 --- a/momento/src/commands/cloud_linter/metrics.rs +++ b/momento/src/commands/cloud_linter/metrics.rs @@ -148,26 +148,20 @@ async fn query_metrics_for_target( metric_data_queries.push(metric_data_query); } - let mut metric_stream = client - .get_metric_data() - .start_time(DateTime::from_millis(start_millis)) - .end_time(DateTime::from_millis(end_millis)) - .set_metric_data_queries(Some(metric_data_queries)) - .into_paginator() - .send(); - - while let Some(result) = rate_limit(Arc::clone(&limiter), || metric_stream.next()).await { - let result = match result { - Ok(res) => res, - Err(e) => { - println!("get_metric_data_error: {:?}", e); - return Err(CliError { - msg: "error from aws api while querying metrics".to_string(), - }); - } - }; - // let result = result?; - if let Some(mdr_vec) = result.metric_data_results { + let mut next_token: Option = None; + loop { + let response = rate_limit(Arc::clone(&limiter), || { + client + .get_metric_data() + .start_time(DateTime::from_millis(start_millis)) + .end_time(DateTime::from_millis(end_millis)) + .set_metric_data_queries(Some(metric_data_queries.clone())) + .set_next_token(next_token) + .send() + }) + .await?; + + if let Some(mdr_vec) = response.metric_data_results { for mdr in mdr_vec { let name = mdr.id.ok_or_else(|| CliError { msg: "Metric has no id".to_string(), @@ -178,6 +172,11 @@ async fn query_metrics_for_target( metric_results.push(Metric { name, values }); } } + + next_token = response.next_token; + if next_token.is_none() { + break; + } } } diff --git a/momento/src/commands/cloud_linter/serverless_elasticache.rs b/momento/src/commands/cloud_linter/serverless_elasticache.rs index 454081d..bd30fb6 100644 --- a/momento/src/commands/cloud_linter/serverless_elasticache.rs +++ b/momento/src/commands/cloud_linter/serverless_elasticache.rs @@ -231,42 +231,35 @@ async fn describe_caches( control_plane_limiter: Arc, region: &str, ) -> Result, CliError> { - let mut resources = Vec::new(); - let mut elasticache_stream = elasticache_client - .describe_serverless_caches() - .into_paginator() - .send(); - - while let Some(result) = rate_limit(Arc::clone(&control_plane_limiter), || { - elasticache_stream.next() - }) - .await - { - match result { - Ok(result) => { - if let Some(aws_caches) = result.serverless_caches { - let mut chunks = Vec::new(); - for chunk in aws_caches.chunks(10) { - chunks.push(chunk.to_owned()); - } - for clusters in chunks { - for cluster in clusters { - resources.push(convert_to_resource(cluster, region).await?); - } - } - } - } - Err(err) => { - return Err(CliError { - msg: format!("Failed to describe serverless caches: {}", err), - }); + let mut caches = Vec::new(); + let mut next_token: Option = None; + loop { + let response = rate_limit(Arc::clone(&control_plane_limiter), || { + let mut req = elasticache_client.describe_serverless_caches(); + if let Some(token) = &next_token { + req = req.next_token(token); } + req.send() + }) + .await?; + + if let Some(aws_caches) = response.serverless_caches.as_ref() { + caches.extend_from_slice(aws_caches); + } + + next_token = response.next_token().map(String::from); + if next_token.is_none() { + break; } } - Ok(resources) + + caches + .into_iter() + .map(|cluster| convert_to_resource(cluster, region)) + .collect::>() } -async fn convert_to_resource( +fn convert_to_resource( cache: ServerlessCache, region: &str, ) -> Result {