Skip to content

Commit

Permalink
Update standard orchestrator retries with token bucket and more tests (
Browse files Browse the repository at this point in the history
…#2764)

## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here -->
addresses #2743 

## Description
<!--- Describe your changes in detail -->
- add more standard retry tests
- add optional standard retries token bucket

## Testing
<!--- Please describe in detail how you tested your changes -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->
tests are included

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

---------

Co-authored-by: John DiSanti <jdisanti@amazon.com>
  • Loading branch information
Velfi and jdisanti authored Jun 13, 2023
1 parent 5473192 commit 312d190
Show file tree
Hide file tree
Showing 16 changed files with 655 additions and 455 deletions.
1 change: 0 additions & 1 deletion aws/rust-runtime/aws-config/src/profile/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ async fn build_provider_chain(

#[cfg(test)]
mod test {

use crate::profile::credentials::Builder;
use crate::test_case::TestEnvironment;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,20 +146,6 @@ open class OperationGenerator(
if (codegenContext.smithyRuntimeMode.generateOrchestrator) {
rustTemplate(
"""
pub(crate) fn register_runtime_plugins(
runtime_plugins: #{RuntimePlugins},
handle: #{Arc}<crate::client::Handle>,
config_override: #{Option}<crate::config::Builder>,
) -> #{RuntimePlugins} {
#{register_default_runtime_plugins}(
runtime_plugins,
#{Box}::new(Self::new()) as _,
handle,
config_override
)
#{additional_runtime_plugins}
}
pub(crate) async fn orchestrate(
runtime_plugins: &#{RuntimePlugins},
input: #{Input},
Expand All @@ -186,6 +172,20 @@ open class OperationGenerator(
let input = #{TypedBox}::new(input).erase();
#{invoke_with_stop_point}(input, runtime_plugins, stop_point).await
}
pub(crate) fn register_runtime_plugins(
runtime_plugins: #{RuntimePlugins},
handle: #{Arc}<crate::client::Handle>,
config_override: #{Option}<crate::config::Builder>,
) -> #{RuntimePlugins} {
#{register_default_runtime_plugins}(
runtime_plugins,
#{Box}::new(Self::new()) as _,
handle,
config_override
)
#{additional_runtime_plugins}
}
""",
*codegenScope,
"Error" to RuntimeType.smithyRuntimeApi(runtimeConfig).resolve("client::interceptors::context::Error"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class ServiceRuntimePluginGenerator(
"StaticAuthOptionResolver" to runtimeApi.resolve("client::auth::option_resolver::StaticAuthOptionResolver"),
"default_connector" to client.resolve("conns::default_connector"),
"require_connector" to client.resolve("conns::require_connector"),
"TimeoutConfig" to smithyTypes.resolve("timeout::TimeoutConfig"),
"RetryConfig" to smithyTypes.resolve("retry::RetryConfig"),
)
}

Expand Down Expand Up @@ -142,20 +144,17 @@ class ServiceRuntimePluginGenerator(
self.handle.conf.endpoint_resolver());
cfg.set_endpoint_resolver(endpoint_resolver);
// TODO(enableNewSmithyRuntime): Use the `store_append` method of ConfigBag to insert classifiers
let retry_classifiers = #{RetryClassifiers}::new()
#{retry_classifier_customizations};
cfg.set_retry_classifiers(retry_classifiers);
// TODO(enableNewSmithyRuntime): Make it possible to set retry classifiers at the service level.
// Retry classifiers can also be set at the operation level and those should be added to the
// list of classifiers defined here, rather than replacing them.
let sleep_impl = self.handle.conf.sleep_impl();
let timeout_config = self.handle.conf.timeout_config();
let retry_config = self.handle.conf.retry_config();
let timeout_config = self.handle.conf.timeout_config().cloned().unwrap_or_else(|| #{TimeoutConfig}::disabled());
let retry_config = self.handle.conf.retry_config().cloned().unwrap_or_else(|| #{RetryConfig}::disabled());
if let Some(retry_config) = retry_config {
cfg.set_retry_strategy(#{StandardRetryStrategy}::new(retry_config));
}
cfg.set_retry_strategy(#{StandardRetryStrategy}::new(&retry_config));
let connector_settings = timeout_config.map(#{ConnectorSettings}::from_timeout_config).unwrap_or_default();
let connector_settings = #{ConnectorSettings}::from_timeout_config(&timeout_config);
if let Some(connection) = self.handle.conf.http_connector()
.and_then(|c| c.connector(&connector_settings, sleep_impl.clone()))
.or_else(|| #{default_connector}(&connector_settings, sleep_impl)) {
Expand All @@ -180,9 +179,6 @@ class ServiceRuntimePluginGenerator(
"http_auth_scheme_customizations" to writable {
writeCustomizations(customizations, ServiceRuntimePluginSection.HttpAuthScheme("cfg"))
},
"retry_classifier_customizations" to writable {
writeCustomizations(customizations, ServiceRuntimePluginSection.RetryClassifier("cfg"))
},
"additional_config" to writable {
writeCustomizations(customizations, ServiceRuntimePluginSection.AdditionalConfig("cfg"))
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ pub enum OrchestratorError<E> {
Interceptor { err: InterceptorError },
/// An error returned by a service.
Operation { err: E },
/// An error that occurs when a request times out.
Timeout { err: BoxError },
/// An error that occurs when request dispatch fails.
Connector { err: ConnectorError },
/// An error that occurs when a response can't be deserialized.
Response { err: BoxError },
/// A general orchestrator error.
Other { err: BoxError },
}
Expand All @@ -34,11 +40,26 @@ impl<E: Debug> OrchestratorError<E> {
Self::Operation { err }
}

/// Create a new `OrchestratorError` from an [`InterceptorError`].
/// Create a new `OrchestratorError::Interceptor` from an [`InterceptorError`].
pub fn interceptor(err: InterceptorError) -> Self {
Self::Interceptor { err }
}

/// Create a new `OrchestratorError::Timeout` from a [`BoxError`].
pub fn timeout(err: BoxError) -> Self {
Self::Timeout { err }
}

/// Create a new `OrchestratorError::Response` from a [`BoxError`].
pub fn response(err: BoxError) -> Self {
Self::Response { err }
}

/// Create a new `OrchestratorError::Connector` from a [`ConnectorError`].
pub fn connector(err: ConnectorError) -> Self {
Self::Connector { err }
}

/// Convert the `OrchestratorError` into `Some` operation specific error if it is one. Otherwise,
/// return `None`.
pub fn as_operation_error(&self) -> Option<&E> {
Expand Down Expand Up @@ -72,6 +93,9 @@ impl<E: Debug> OrchestratorError<E> {
debug_assert!(phase.is_after_deserialization(), "operation errors are a result of successfully receiving and parsing a response from the server. Therefore, we must be in the 'After Deserialization' phase.");
SdkError::service_error(err, response.expect("phase has a response"))
}
Self::Connector { err } => SdkError::dispatch_failure(err),
Self::Timeout { err } => SdkError::timeout_error(err),
Self::Response { err } => SdkError::response_error(err, response.unwrap()),
Self::Other { err } => {
use Phase::*;
match phase {
Expand Down Expand Up @@ -111,15 +135,6 @@ where
}
}

impl<E> From<BoxError> for OrchestratorError<E>
where
E: Debug + std::error::Error + 'static,
{
fn from(err: BoxError) -> Self {
Self::other(err)
}
}

impl From<TypeErasedError> for OrchestratorError<TypeErasedError> {
fn from(err: TypeErasedError) -> Self {
Self::operation(err)
Expand Down
16 changes: 13 additions & 3 deletions rust-runtime/aws-smithy-runtime-api/src/client/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ pub enum ShouldAttempt {
YesAfterDelay(Duration),
}

#[cfg(feature = "test-util")]
impl ShouldAttempt {
pub fn expect_delay(self) -> Duration {
match self {
ShouldAttempt::YesAfterDelay(delay) => delay,
_ => panic!("Expected this to be the `YesAfterDelay` variant but it was the `{self:?}` variant instead"),
}
}
}

pub trait RetryStrategy: Send + Sync + Debug {
fn should_attempt_initial_request(&self, cfg: &ConfigBag) -> Result<ShouldAttempt, BoxError>;

Expand All @@ -31,7 +41,7 @@ pub trait RetryStrategy: Send + Sync + Debug {
}

#[non_exhaustive]
#[derive(Eq, PartialEq, Debug)]
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum RetryReason {
Error(ErrorKind),
Explicit(Duration),
Expand Down Expand Up @@ -72,10 +82,10 @@ impl RetryClassifiers {
}

impl ClassifyRetry for RetryClassifiers {
fn classify_retry(&self, error: &InterceptorContext) -> Option<RetryReason> {
fn classify_retry(&self, ctx: &InterceptorContext) -> Option<RetryReason> {
// return the first non-None result
self.inner.iter().find_map(|cr| {
let maybe_reason = cr.classify_retry(error);
let maybe_reason = cr.classify_retry(ctx);

match maybe_reason.as_ref() {
Some(reason) => trace!(
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 312d190

Please sign in to comment.