Skip to content

Commit

Permalink
chore: rate limiting wip
Browse files Browse the repository at this point in the history
  • Loading branch information
pgautier404 committed May 22, 2024
1 parent 5c03423 commit d841feb
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 16 deletions.
8 changes: 7 additions & 1 deletion momento/src/commands/cloud_linter/linter_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ async fn process_data(region: String, sender: Sender<Resource>) -> Result<(), Cl
Quota::per_second(core::num::NonZeroU32::new(20).expect("should create non-zero quota"));
let metrics_limiter = Arc::new(RateLimiter::direct(metrics_quota));

process_s3_resources(&config, Arc::clone(&metrics_limiter), sender.clone()).await?;
process_s3_resources(
&config,
Arc::clone(&metrics_limiter),
Arc::clone(&control_plane_limiter),
sender.clone(),
)
.await?;

process_ddb_resources(
&config,
Expand Down
67 changes: 52 additions & 15 deletions momento/src/commands/cloud_linter/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::commands::cloud_linter::metrics::{
AppendMetrics, Metric, MetricTarget, ResourceWithMetrics,
};
use crate::commands::cloud_linter::resource::{Resource, ResourceType, S3Resource};
use crate::commands::cloud_linter::utils::rate_limit;
use crate::error::CliError;
use aws_config::SdkConfig;
use aws_sdk_s3::types::MetricsConfiguration;
Expand Down Expand Up @@ -126,6 +127,7 @@ impl ResourceWithMetrics for S3Resource {
pub(crate) async fn process_s3_resources(
config: &SdkConfig,
metrics_limiter: Arc<DefaultDirectRateLimiter>,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
sender: Sender<Resource>,
) -> Result<(), CliError> {
let region = config.region().map(|r| r.as_ref()).ok_or(CliError {
Expand All @@ -137,7 +139,10 @@ pub(crate) async fn process_s3_resources(
let list_buckets_bar =
ProgressBar::new_spinner().with_message("Listing S3 General Purpose Buckets");
list_buckets_bar.enable_steady_tick(std::time::Duration::from_millis(100));
let bucket_names = list_buckets(&s3client).await?;
let bucket_names = list_buckets(&s3client).await.unwrap_or_else(|err| {
eprint!("{}", err);
vec![]
});
list_buckets_bar.finish();

process_buckets(
Expand All @@ -147,13 +152,17 @@ pub(crate) async fn process_s3_resources(
region,
sender.clone(),
&metrics_client,
&metrics_limiter,
metrics_limiter.clone(),
control_plane_limiter.clone(),
)
.await?;

let list_buckets_bar = ProgressBar::new_spinner().with_message("Listing S3 Directory Buckets");
list_buckets_bar.enable_steady_tick(std::time::Duration::from_millis(100));
let bucket_names = list_directory_buckets(&s3client).await?;
let bucket_names = list_directory_buckets(&s3client).await.unwrap_or_else(|err| {
eprint!("{}", err);
vec![]
});
list_buckets_bar.finish();

process_buckets(
Expand All @@ -163,7 +172,8 @@ pub(crate) async fn process_s3_resources(
region,
sender,
&metrics_client,
&metrics_limiter,
Arc::clone(&metrics_limiter),
Arc::clone(&control_plane_limiter),
)
.await?;

Expand All @@ -173,16 +183,20 @@ pub(crate) async fn process_s3_resources(
async fn list_bucket_metrics_configs(
s3client: aws_sdk_s3::Client,
bucket: String,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<Vec<MetricsConfiguration>, CliError> {
let mut all_configs: Vec<MetricsConfiguration> = Vec::new();
let mut continuation_token: Option<String> = None;
loop {
let configs = s3client
.list_bucket_metrics_configurations()
.bucket(&bucket)
.continuation_token(continuation_token.unwrap_or_default())
.send()
.await;
let configs = rate_limit(Arc::clone(&control_plane_limiter), || async {
s3client
.list_bucket_metrics_configurations()
.bucket(&bucket)
.continuation_token(continuation_token.unwrap_or_default())
.send()
.await
})
.await;
match configs {
Ok(configs) => {
if configs.metrics_configuration_list.is_none() {
Expand Down Expand Up @@ -210,8 +224,14 @@ async fn list_bucket_metrics_configs(
async fn try_get_bucket_metrics_filter(
s3client: aws_sdk_s3::Client,
bucket: String,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<String, CliError> {
let bucket_metrics = list_bucket_metrics_configs(s3client.clone(), bucket.clone()).await;
let bucket_metrics = list_bucket_metrics_configs(
s3client.clone(),
bucket.clone(),
Arc::clone(&control_plane_limiter),
)
.await;
match bucket_metrics {
Ok(bucket_metrics) => {
for config in bucket_metrics {
Expand All @@ -223,21 +243,23 @@ async fn try_get_bucket_metrics_filter(
}
Err(err) => {
return Err(CliError {
msg: format!("Failed to get bucket metrics configuration: {}", err),
msg: format!("{}", err),
});
}
}
Ok("".to_string())
}

#[allow(clippy::too_many_arguments)]
async fn process_buckets(
s3client: aws_sdk_s3::Client,
buckets: Vec<String>,
bucket_type: &str,
region: &str,
sender: Sender<Resource>,
metrics_client: &aws_sdk_cloudwatch::Client,
metrics_limiter: &Arc<DefaultDirectRateLimiter>,
metrics_limiter: Arc<DefaultDirectRateLimiter>,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<(), CliError> {
let mut resources: Vec<Resource> = Vec::new();

Expand All @@ -246,9 +268,23 @@ async fn process_buckets(
process_buckets_bar
.set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template"));
for bucket in buckets {
println!("{}", bucket);
let mut all_objects_filter: String = "".to_string();
if bucket_type == "general_purpose" {
let filter_id = try_get_bucket_metrics_filter(s3client.clone(), bucket.clone()).await?;
let filter_id = try_get_bucket_metrics_filter(
s3client.clone(),
bucket.clone(),
Arc::clone(&control_plane_limiter),
)
.await;
let filter_id = match filter_id {
Ok(filter_id) => filter_id,
Err(err) => {
eprint!("{}", err);
// panic!("Failed to get bucket metrics filter for {}", bucket);
continue;
}
};
all_objects_filter = filter_id;
}

Expand All @@ -273,7 +309,7 @@ async fn process_buckets(
match resource {
Resource::S3(mut my_resource) => {
my_resource
.append_metrics(metrics_client, Arc::clone(metrics_limiter))
.append_metrics(metrics_client, Arc::clone(&metrics_limiter))
.await?;
sender
.send(Resource::S3(my_resource))
Expand All @@ -291,6 +327,7 @@ async fn process_buckets(
}
}
process_buckets_bar.finish();
println!("Finished processing S3 buckets");
Ok(())
}

Expand Down

0 comments on commit d841feb

Please sign in to comment.