Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify waiting logic in Rust DynamoDB examples #3593

Merged
merged 2 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 21 additions & 76 deletions rust_dev_preview/dynamodb/src/bin/crud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,13 @@
*/

use aws_config::meta::region::RegionProviderChain;
use aws_http::retry::AwsErrorRetryPolicy;
use aws_sdk_dynamodb::error::DescribeTableError;
use aws_sdk_dynamodb::input::DescribeTableInput;
use aws_sdk_dynamodb::middleware::DefaultMiddleware;
use aws_sdk_dynamodb::model::{
AttributeDefinition, AttributeValue, KeySchemaElement, KeyType, ProvisionedThroughput,
ScalarAttributeType, Select, TableStatus,
};
use aws_sdk_dynamodb::operation::DescribeTable;
use aws_sdk_dynamodb::output::DescribeTableOutput;
use aws_sdk_dynamodb::{Client, Config, Error, Region, PKG_VERSION};
use aws_smithy_client::erase::DynConnector;
use aws_smithy_http::result::{SdkError, SdkSuccess};

use aws_smithy_http::operation::Operation;
use aws_smithy_http::retry::ClassifyResponse;
use aws_smithy_types::retry::RetryKind;
use aws_sdk_dynamodb::{Client, Error, Region, PKG_VERSION};
use aws_smithy_http::result::SdkError;

use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::io::{stdin, Read};
Expand Down Expand Up @@ -195,46 +185,6 @@ async fn remove_table(client: &Client, table: &str) -> Result<(), Error> {
}
// snippet-end:[dynamodb.rust.crud-remove_table]

/// Hand-written waiter to retry every second until the table is out of `Creating` state
#[derive(Clone)]
struct WaitForReadyTable<R> {
inner: R,
}

impl<R> ClassifyResponse<SdkSuccess<DescribeTableOutput>, SdkError<DescribeTableError>>
for WaitForReadyTable<R>
where
R: ClassifyResponse<SdkSuccess<DescribeTableOutput>, SdkError<DescribeTableError>>,
{
fn classify(
&self,
response: Result<&SdkSuccess<DescribeTableOutput>, &SdkError<DescribeTableError>>,
) -> RetryKind {
match self.inner.classify(response) {
RetryKind::UnretryableFailure | RetryKind::Unnecessary => (),
other => return other,
};
match response {
Ok(SdkSuccess { parsed, .. }) => {
if parsed
.table
.as_ref()
.unwrap()
.table_status
.as_ref()
.unwrap()
== &TableStatus::Creating
{
RetryKind::Explicit(Duration::from_secs(1))
} else {
RetryKind::Unnecessary
}
}
_ => RetryKind::UnretryableFailure,
}
}
}

/// Wait for the user to press Enter.
fn pause() {
println!("Press Enter to continue.");
Expand Down Expand Up @@ -317,12 +267,7 @@ async fn main() -> Result<(), Error> {

println!("Waiting for table to be ready.");

let raw_client = aws_smithy_client::Client::<DynConnector, DefaultMiddleware>::dyn_https();

raw_client
.call(wait_for_ready_table(&table, client.conf()).await)
.await
.expect("table should become ready.");
wait_for_ready_table(&client, &table).await?;

println!("Table is now ready to use.");

Expand Down Expand Up @@ -387,21 +332,21 @@ async fn main() -> Result<(), Error> {
Ok(())
}

/// Construct a `DescribeTable` request with a policy to retry every second until the table
/// is ready
async fn wait_for_ready_table(
table_name: &str,
conf: &Config,
) -> Operation<DescribeTable, WaitForReadyTable<AwsErrorRetryPolicy>> {
let operation = DescribeTableInput::builder()
.table_name(table_name)
.build()
.expect("valid input")
.make_operation(conf)
.await
.expect("valid operation");
let waiting_policy = WaitForReadyTable {
inner: operation.retry_policy().clone(),
};
operation.with_retry_policy(waiting_policy)
/// Poll the DescribeTable operation once per second until the table exists.
async fn wait_for_ready_table(client: &Client, table_name: &str) -> Result<(), Error> {
loop {
if let Some(table) = client
.describe_table()
.table_name(table_name)
.send()
.await?
.table()
{
if !matches!(table.table_status, Some(TableStatus::Creating)) {
break;
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Ok(())
}
88 changes: 16 additions & 72 deletions rust_dev_preview/dynamodb/src/bin/movies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,12 @@ use std::collections::HashMap;
use std::time::Duration;

use aws_config::meta::region::RegionProviderChain;
use aws_http::retry::AwsErrorRetryPolicy;
use aws_sdk_dynamodb::client::fluent_builders::Query;
use aws_sdk_dynamodb::error::DescribeTableError;
use aws_sdk_dynamodb::input::DescribeTableInput;
use aws_sdk_dynamodb::middleware::DefaultMiddleware;
use aws_sdk_dynamodb::model::{
AttributeDefinition, AttributeValue, KeySchemaElement, KeyType, ProvisionedThroughput,
ScalarAttributeType, TableStatus,
};
use aws_sdk_dynamodb::operation::DescribeTable;
use aws_sdk_dynamodb::output::DescribeTableOutput;
use aws_sdk_dynamodb::{Client, Config, Error, Region, PKG_VERSION};
use aws_smithy_client::erase::DynConnector;
use aws_smithy_http::operation::Operation;
use aws_smithy_http::result::{SdkError, SdkSuccess};
use aws_smithy_http::retry::ClassifyResponse;
use aws_smithy_types::retry::RetryKind;
use aws_sdk_dynamodb::{Client, Error, Region, PKG_VERSION};
use serde_json::Value;
use structopt::StructOpt;

Expand Down Expand Up @@ -79,8 +68,6 @@ async fn main() -> Result<(), Error> {

let client = Client::new(&shared_config);

let raw_client = aws_smithy_client::Client::<DynConnector, DefaultMiddleware>::dyn_https();

let table_exists = does_table_exist(&client, &table).await?;

if !table_exists {
Expand All @@ -92,10 +79,7 @@ async fn main() -> Result<(), Error> {
.expect("failed to create table");
}

raw_client
.call(wait_for_ready_table(&table.to_string(), client.conf()).await)
.await
.expect("table should become ready");
wait_for_ready_table(&client, &table.to_string()).await?;

// data.json contains 2 movies from 2013
let data = match serde_json::from_str(include_str!("data.json")).expect("should be valid JSON")
Expand Down Expand Up @@ -259,61 +243,21 @@ async fn delete_table(client: &Client, table: &str) -> Result<(), Error> {
}
// snippet-end:[dynamodb.rust.movies-delete_table]

/// Hand-written waiter to retry every second until the table is out of `Creating` state
#[derive(Clone)]
struct WaitForReadyTable<R> {
inner: R,
}

impl<R> ClassifyResponse<SdkSuccess<DescribeTableOutput>, SdkError<DescribeTableError>>
for WaitForReadyTable<R>
where
R: ClassifyResponse<SdkSuccess<DescribeTableOutput>, SdkError<DescribeTableError>>,
{
fn classify(
&self,
response: Result<&SdkSuccess<DescribeTableOutput>, &SdkError<DescribeTableError>>,
) -> RetryKind {
match self.inner.classify(response) {
RetryKind::UnretryableFailure | RetryKind::Unnecessary => (),
other => return other,
};
match response {
Ok(SdkSuccess { parsed, .. }) => {
if parsed
.table
.as_ref()
.unwrap()
.table_status
.as_ref()
.unwrap()
== &TableStatus::Creating
{
RetryKind::Explicit(Duration::from_secs(1))
} else {
RetryKind::Unnecessary
}
/// Poll the DescribeTable operation once per second until the table exists.
async fn wait_for_ready_table(client: &Client, table_name: &str) -> Result<(), Error> {
loop {
if let Some(table) = client
.describe_table()
.table_name(table_name)
.send()
.await?
.table()
{
if !matches!(table.table_status, Some(TableStatus::Creating)) {
break;
}
_ => RetryKind::UnretryableFailure,
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

/// Construct a `DescribeTable` request with a policy to retry every second until the table
/// is ready
async fn wait_for_ready_table(
table_name: &str,
conf: &Config,
) -> Operation<DescribeTable, WaitForReadyTable<AwsErrorRetryPolicy>> {
let operation = DescribeTableInput::builder()
.table_name(table_name)
.build()
.expect("valid input")
.make_operation(conf)
.await
.expect("valid operation");
let waiting_policy = WaitForReadyTable {
inner: operation.retry_policy().clone(),
};
operation.with_retry_policy(waiting_policy)
Ok(())
}