diff --git a/.changelog/17784.txt b/.changelog/17784.txt new file mode 100644 index 00000000000..b794982a476 --- /dev/null +++ b/.changelog/17784.txt @@ -0,0 +1,7 @@ +```release-note:enhancement +resource/aws_kinesis_analytics_application: Add `start_application` attribute +``` + +```release-note:enhancement +resource/aws_kinesis_analytics_application: `starting_position_configuration` can be specified when starting an application +``` diff --git a/aws/internal/service/kinesisanalytics/finder/finder.go b/aws/internal/service/kinesisanalytics/finder/finder.go index 1cbe195c57f..9f6de54f0a0 100644 --- a/aws/internal/service/kinesisanalytics/finder/finder.go +++ b/aws/internal/service/kinesisanalytics/finder/finder.go @@ -3,18 +3,42 @@ package finder import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kinesisanalytics" + "github.com/hashicorp/aws-sdk-go-base/tfawserr" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" ) -// ApplicationByName returns the application corresponding to the specified name. -func ApplicationByName(conn *kinesisanalytics.KinesisAnalytics, name string) (*kinesisanalytics.ApplicationDetail, error) { +// ApplicationDetailByName returns the application corresponding to the specified name. +// Returns NotFoundError if no application is found. +func ApplicationDetailByName(conn *kinesisanalytics.KinesisAnalytics, name string) (*kinesisanalytics.ApplicationDetail, error) { input := &kinesisanalytics.DescribeApplicationInput{ ApplicationName: aws.String(name), } + return ApplicationDetail(conn, input) +} + +// ApplicationDetail returns the application details corresponding to the specified name. +// Returns NotFoundError if no application is found. +func ApplicationDetail(conn *kinesisanalytics.KinesisAnalytics, input *kinesisanalytics.DescribeApplicationInput) (*kinesisanalytics.ApplicationDetail, error) { output, err := conn.DescribeApplication(input) + + if tfawserr.ErrCodeEquals(err, kinesisanalytics.ErrCodeResourceNotFoundException) { + return nil, &resource.NotFoundError{ + LastError: err, + LastRequest: input, + } + } + if err != nil { return nil, err } + if output == nil || output.ApplicationDetail == nil { + return nil, &resource.NotFoundError{ + Message: "Empty result", + LastRequest: input, + } + } + return output.ApplicationDetail, nil } diff --git a/aws/internal/service/kinesisanalytics/waiter/status.go b/aws/internal/service/kinesisanalytics/waiter/status.go index 0a326c79d21..a8d537b0296 100644 --- a/aws/internal/service/kinesisanalytics/waiter/status.go +++ b/aws/internal/service/kinesisanalytics/waiter/status.go @@ -3,29 +3,24 @@ package waiter import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kinesisanalytics" - "github.com/hashicorp/aws-sdk-go-base/tfawserr" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesisanalytics/finder" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" ) -const ( - applicationStatusNotFound = "NotFound" - applicationStatusUnknown = "Unknown" -) - -// ApplicationStatus fetches the Application and its Status +// ApplicationStatus fetches the ApplicationDetail and its Status func ApplicationStatus(conn *kinesisanalytics.KinesisAnalytics, name string) resource.StateRefreshFunc { return func() (interface{}, string, error) { - application, err := finder.ApplicationByName(conn, name) + applicationDetail, err := finder.ApplicationDetailByName(conn, name) - if tfawserr.ErrCodeEquals(err, kinesisanalytics.ErrCodeResourceNotFoundException) { - return nil, applicationStatusNotFound, nil + if tfresource.NotFound(err) { + return nil, "", nil } if err != nil { - return nil, applicationStatusUnknown, err + return nil, "", err } - return application, aws.StringValue(application.ApplicationStatus), nil + return applicationDetail, aws.StringValue(applicationDetail.ApplicationStatus), nil } } diff --git a/aws/internal/service/kinesisanalytics/waiter/waiter.go b/aws/internal/service/kinesisanalytics/waiter/waiter.go index 0c9b9e290e5..79cc43ad6f7 100644 --- a/aws/internal/service/kinesisanalytics/waiter/waiter.go +++ b/aws/internal/service/kinesisanalytics/waiter/waiter.go @@ -10,18 +10,79 @@ import ( "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" ) +const ( + ApplicationDeletedTimeout = 5 * time.Minute + ApplicationStartedTimeout = 5 * time.Minute + ApplicationStoppedTimeout = 5 * time.Minute + ApplicationUpdatedTimeout = 5 * time.Minute +) + // ApplicationDeleted waits for an Application to return Deleted -func ApplicationDeleted(conn *kinesisanalytics.KinesisAnalytics, name string, timeout time.Duration) (*kinesisanalytics.ApplicationSummary, error) { +func ApplicationDeleted(conn *kinesisanalytics.KinesisAnalytics, name string) (*kinesisanalytics.ApplicationDetail, error) { stateConf := &resource.StateChangeConf{ - Pending: []string{kinesisanalytics.ApplicationStatusRunning, kinesisanalytics.ApplicationStatusDeleting}, + Pending: []string{kinesisanalytics.ApplicationStatusDeleting}, Target: []string{}, Refresh: ApplicationStatus(conn, name), - Timeout: timeout, + Timeout: ApplicationDeletedTimeout, + } + + outputRaw, err := stateConf.WaitForState() + + if v, ok := outputRaw.(*kinesisanalytics.ApplicationDetail); ok { + return v, err + } + + return nil, err +} + +// ApplicationStarted waits for an Application to start +func ApplicationStarted(conn *kinesisanalytics.KinesisAnalytics, name string) (*kinesisanalytics.ApplicationDetail, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{kinesisanalytics.ApplicationStatusStarting}, + Target: []string{kinesisanalytics.ApplicationStatusRunning}, + Refresh: ApplicationStatus(conn, name), + Timeout: ApplicationStartedTimeout, + } + + outputRaw, err := stateConf.WaitForState() + + if v, ok := outputRaw.(*kinesisanalytics.ApplicationDetail); ok { + return v, err + } + + return nil, err +} + +// ApplicationStopped waits for an Application to stop +func ApplicationStopped(conn *kinesisanalytics.KinesisAnalytics, name string) (*kinesisanalytics.ApplicationDetail, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{kinesisanalytics.ApplicationStatusStopping}, + Target: []string{kinesisanalytics.ApplicationStatusReady}, + Refresh: ApplicationStatus(conn, name), + Timeout: ApplicationStoppedTimeout, + } + + outputRaw, err := stateConf.WaitForState() + + if v, ok := outputRaw.(*kinesisanalytics.ApplicationDetail); ok { + return v, err + } + + return nil, err +} + +// ApplicationUpdated waits for an Application to update +func ApplicationUpdated(conn *kinesisanalytics.KinesisAnalytics, name string) (*kinesisanalytics.ApplicationDetail, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{kinesisanalytics.ApplicationStatusUpdating}, + Target: []string{kinesisanalytics.ApplicationStatusReady, kinesisanalytics.ApplicationStatusRunning}, + Refresh: ApplicationStatus(conn, name), + Timeout: ApplicationUpdatedTimeout, } outputRaw, err := stateConf.WaitForState() - if v, ok := outputRaw.(*kinesisanalytics.ApplicationSummary); ok { + if v, ok := outputRaw.(*kinesisanalytics.ApplicationDetail); ok { return v, err } diff --git a/aws/resource_aws_kinesis_analytics_application.go b/aws/resource_aws_kinesis_analytics_application.go index 8930fc77bca..0080c57dbfe 100644 --- a/aws/resource_aws_kinesis_analytics_application.go +++ b/aws/resource_aws_kinesis_analytics_application.go @@ -11,12 +11,14 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/arn" "github.com/aws/aws-sdk-go/service/kinesisanalytics" + "github.com/hashicorp/aws-sdk-go-base/tfawserr" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" "github.com/terraform-providers/terraform-provider-aws/aws/internal/keyvaluetags" "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesisanalytics/finder" "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesisanalytics/waiter" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" ) func resourceAwsKinesisAnalyticsApplication() *schema.Resource { @@ -316,12 +318,15 @@ func resourceAwsKinesisAnalyticsApplication() *schema.Resource { "starting_position_configuration": { Type: schema.TypeList, + Optional: true, Computed: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "starting_position": { - Type: schema.TypeString, - Computed: true, + Type: schema.TypeString, + Optional: true, + Computed: true, + ValidateFunc: validation.StringInSlice(kinesisanalytics.InputStartingPosition_Values(), false), }, }, }, @@ -427,7 +432,7 @@ func resourceAwsKinesisAnalyticsApplication() *schema.Resource { Schema: map[string]*schema.Schema{ "record_format_type": { Type: schema.TypeString, - Optional: true, + Required: true, ValidateFunc: validation.StringInSlice(kinesisanalytics.RecordFormatType_Values(), false), }, }, @@ -586,6 +591,11 @@ func resourceAwsKinesisAnalyticsApplication() *schema.Resource { }, }, + "start_application": { + Type: schema.TypeBool, + Optional: true, + }, + "status": { Type: schema.TypeString, Computed: true, @@ -603,11 +613,12 @@ func resourceAwsKinesisAnalyticsApplication() *schema.Resource { func resourceAwsKinesisAnalyticsApplicationCreate(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).kinesisanalyticsconn + applicationName := d.Get("name").(string) input := &kinesisanalytics.CreateApplicationInput{ ApplicationCode: aws.String(d.Get("code").(string)), ApplicationDescription: aws.String(d.Get("description").(string)), - ApplicationName: aws.String(d.Get("name").(string)), + ApplicationName: aws.String(applicationName), CloudWatchLoggingOptions: expandKinesisAnalyticsCloudWatchLoggingOptions(d.Get("cloudwatch_logging_options").([]interface{})), Inputs: expandKinesisAnalyticsInputs(d.Get("inputs").([]interface{})), Outputs: expandKinesisAnalyticsOutputs(d.Get("outputs").(*schema.Set).List()), @@ -624,7 +635,7 @@ func resourceAwsKinesisAnalyticsApplicationCreate(d *schema.ResourceData, meta i }) if err != nil { - return fmt.Errorf("error creating Kinesis Analytics Application: %w", err) + return fmt.Errorf("error creating Kinesis Analytics Application (%s): %w", applicationName, err) } applicationSummary := outputRaw.(*kinesisanalytics.CreateApplicationOutput).ApplicationSummary @@ -634,7 +645,7 @@ func resourceAwsKinesisAnalyticsApplicationCreate(d *schema.ResourceData, meta i if v := d.Get("reference_data_sources").([]interface{}); len(v) > 0 && v[0] != nil { // Add new reference data source. input := &kinesisanalytics.AddApplicationReferenceDataSourceInput{ - ApplicationName: applicationSummary.ApplicationName, + ApplicationName: aws.String(applicationName), CurrentApplicationVersionId: aws.Int64(1), // Newly created application version. ReferenceDataSource: expandKinesisAnalyticsReferenceDataSource(v), } @@ -650,6 +661,36 @@ func resourceAwsKinesisAnalyticsApplicationCreate(d *schema.ResourceData, meta i } } + if _, ok := d.GetOk("start_application"); ok { + if v, ok := d.GetOk("inputs"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + tfMap := v.([]interface{})[0].(map[string]interface{}) + + var inputStartingPosition string + + if v, ok := tfMap["starting_position_configuration"].([]interface{}); ok && len(v) > 0 && v[0] != nil { + tfMap := v[0].(map[string]interface{}) + + if v, ok := tfMap["starting_position"].(string); ok && v != "" { + inputStartingPosition = v + } + } + + application, err := finder.ApplicationDetailByName(conn, applicationName) + + if err != nil { + return fmt.Errorf("error reading Kinesis Analytics Application (%s): %w", d.Id(), err) + } + + err = kinesisAnalyticsStartApplication(conn, application, inputStartingPosition) + + if err != nil { + return err + } + } else { + log.Printf("[DEBUG] Kinesis Analytics Application (%s) has no inputs", d.Id()) + } + } + return resourceAwsKinesisAnalyticsApplicationRead(d, meta) } @@ -657,9 +698,9 @@ func resourceAwsKinesisAnalyticsApplicationRead(d *schema.ResourceData, meta int conn := meta.(*AWSClient).kinesisanalyticsconn ignoreTagsConfig := meta.(*AWSClient).IgnoreTagsConfig - application, err := finder.ApplicationByName(conn, d.Get("name").(string)) + application, err := finder.ApplicationDetailByName(conn, d.Get("name").(string)) - if isAWSErr(err, kinesisanalytics.ErrCodeResourceNotFoundException, "") { + if !d.IsNewResource() && tfresource.NotFound(err) { log.Printf("[WARN] Kinesis Analytics Application (%s) not found, removing from state", d.Id()) d.SetId("") return nil @@ -747,6 +788,10 @@ func resourceAwsKinesisAnalyticsApplicationUpdate(d *schema.ResourceData, meta i return fmt.Errorf("error adding Kinesis Analytics Application (%s) CloudWatch logging option: %w", d.Id(), err) } + if _, err := waiter.ApplicationUpdated(conn, applicationName); err != nil { + return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) to update: %w", d.Id(), err) + } + currentApplicationVersionId += 1 } else if len(n.([]interface{})) == 0 { // Delete existing CloudWatch logging options. @@ -768,6 +813,10 @@ func resourceAwsKinesisAnalyticsApplicationUpdate(d *schema.ResourceData, meta i return fmt.Errorf("error deleting Kinesis Analytics Application (%s) CloudWatch logging option: %w", d.Id(), err) } + if _, err := waiter.ApplicationUpdated(conn, applicationName); err != nil { + return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) to update: %w", d.Id(), err) + } + currentApplicationVersionId += 1 } else { // Update existing CloudWatch logging options. @@ -813,6 +862,10 @@ func resourceAwsKinesisAnalyticsApplicationUpdate(d *schema.ResourceData, meta i return fmt.Errorf("error adding Kinesis Analytics Application (%s) input: %w", d.Id(), err) } + if _, err := waiter.ApplicationUpdated(conn, applicationName); err != nil { + return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) to update: %w", d.Id(), err) + } + currentApplicationVersionId += 1 } else if len(n.([]interface{})) == 0 { // The existing input cannot be deleted. @@ -846,6 +899,10 @@ func resourceAwsKinesisAnalyticsApplicationUpdate(d *schema.ResourceData, meta i return fmt.Errorf("error adding Kinesis Analytics Application (%s) input processing configuration: %w", d.Id(), err) } + if _, err := waiter.ApplicationUpdated(conn, applicationName); err != nil { + return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) to update: %w", d.Id(), err) + } + currentApplicationVersionId += 1 } else if len(n.([]interface{})) == 0 { // Delete existing input processing configuration. @@ -865,6 +922,10 @@ func resourceAwsKinesisAnalyticsApplicationUpdate(d *schema.ResourceData, meta i return fmt.Errorf("error deleting Kinesis Analytics Application (%s) input processing configuration: %w", d.Id(), err) } + if _, err := waiter.ApplicationUpdated(conn, applicationName); err != nil { + return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) to update: %w", d.Id(), err) + } + currentApplicationVersionId += 1 } } @@ -921,6 +982,10 @@ func resourceAwsKinesisAnalyticsApplicationUpdate(d *schema.ResourceData, meta i return fmt.Errorf("error deleting Kinesis Analytics Application (%s) output: %w", d.Id(), err) } + if _, err := waiter.ApplicationUpdated(conn, applicationName); err != nil { + return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) to update: %w", d.Id(), err) + } + currentApplicationVersionId += 1 } @@ -942,6 +1007,10 @@ func resourceAwsKinesisAnalyticsApplicationUpdate(d *schema.ResourceData, meta i return fmt.Errorf("error adding Kinesis Analytics Application (%s) output: %w", d.Id(), err) } + if _, err := waiter.ApplicationUpdated(conn, applicationName); err != nil { + return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) to update: %w", d.Id(), err) + } + currentApplicationVersionId += 1 } } @@ -967,6 +1036,10 @@ func resourceAwsKinesisAnalyticsApplicationUpdate(d *schema.ResourceData, meta i return fmt.Errorf("error adding Kinesis Analytics Application (%s) reference data source: %w", d.Id(), err) } + if _, err := waiter.ApplicationUpdated(conn, applicationName); err != nil { + return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) to update: %w", d.Id(), err) + } + currentApplicationVersionId += 1 } else if len(n.([]interface{})) == 0 { // Delete existing reference data source. @@ -988,6 +1061,10 @@ func resourceAwsKinesisAnalyticsApplicationUpdate(d *schema.ResourceData, meta i return fmt.Errorf("error deleting Kinesis Analytics Application (%s) reference data source: %w", d.Id(), err) } + if _, err := waiter.ApplicationUpdated(conn, applicationName); err != nil { + return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) to update: %w", d.Id(), err) + } + currentApplicationVersionId += 1 } else { // Update existing reference data source. @@ -1011,6 +1088,10 @@ func resourceAwsKinesisAnalyticsApplicationUpdate(d *schema.ResourceData, meta i if err != nil { return fmt.Errorf("error updating Kinesis Analytics Application (%s): %w", d.Id(), err) } + + if _, err := waiter.ApplicationUpdated(conn, applicationName); err != nil { + return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) to update: %w", d.Id(), err) + } } } @@ -1022,6 +1103,44 @@ func resourceAwsKinesisAnalyticsApplicationUpdate(d *schema.ResourceData, meta i } } + if d.HasChange("start_application") { + application, err := finder.ApplicationDetailByName(conn, d.Get("name").(string)) + + if err != nil { + return fmt.Errorf("error reading Kinesis Analytics Application (%s): %w", d.Id(), err) + } + + if _, ok := d.GetOk("start_application"); ok { + if v, ok := d.GetOk("inputs"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + tfMap := v.([]interface{})[0].(map[string]interface{}) + + var inputStartingPosition string + + if v, ok := tfMap["starting_position_configuration"].([]interface{}); ok && len(v) > 0 && v[0] != nil { + tfMap := v[0].(map[string]interface{}) + + if v, ok := tfMap["starting_position"].(string); ok && v != "" { + inputStartingPosition = v + } + } + + err = kinesisAnalyticsStartApplication(conn, application, inputStartingPosition) + + if err != nil { + return err + } + } else { + log.Printf("[DEBUG] Kinesis Analytics Application (%s) has no inputs", d.Id()) + } + } else { + err = kinesisAnalyticsStopApplication(conn, application) + + if err != nil { + return err + } + } + } + return resourceAwsKinesisAnalyticsApplicationRead(d, meta) } @@ -1041,7 +1160,7 @@ func resourceAwsKinesisAnalyticsApplicationDelete(d *schema.ResourceData, meta i CreateTimestamp: aws.Time(createTimestamp), }) - if isAWSErr(err, kinesisanalytics.ErrCodeResourceNotFoundException, "") { + if tfawserr.ErrCodeEquals(err, kinesisanalytics.ErrCodeResourceNotFoundException) { return nil } @@ -1049,7 +1168,7 @@ func resourceAwsKinesisAnalyticsApplicationDelete(d *schema.ResourceData, meta i return fmt.Errorf("error deleting Kinesis Analytics Application (%s): %w", d.Id(), err) } - _, err = waiter.ApplicationDeleted(conn, applicationName, d.Timeout(schema.TimeoutDelete)) + _, err = waiter.ApplicationDeleted(conn, applicationName) if err != nil { return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) deletion: %w", d.Id(), err) @@ -1075,6 +1194,71 @@ func resourceAwsKinesisAnalyticsApplicationImport(d *schema.ResourceData, meta i return []*schema.ResourceData{d}, nil } +func kinesisAnalyticsStartApplication(conn *kinesisanalytics.KinesisAnalytics, application *kinesisanalytics.ApplicationDetail, inputStartingPosition string) error { + applicationARN := aws.StringValue(application.ApplicationARN) + applicationName := aws.StringValue(application.ApplicationName) + + if actual, expected := aws.StringValue(application.ApplicationStatus), kinesisanalytics.ApplicationStatusReady; actual != expected { + log.Printf("[DEBUG] Kinesis Analytics Application (%s) has status %s. An application can only be started if it's in the %s state", applicationARN, actual, expected) + return nil + } + + if len(application.InputDescriptions) == 0 { + log.Printf("[DEBUG] Kinesis Analytics Application (%s) has no input description", applicationARN) + return nil + } + + input := &kinesisanalytics.StartApplicationInput{ + ApplicationName: aws.String(applicationName), + InputConfigurations: []*kinesisanalytics.InputConfiguration{{ + Id: application.InputDescriptions[0].InputId, + InputStartingPositionConfiguration: &kinesisanalytics.InputStartingPositionConfiguration{}, + }}, + } + + if inputStartingPosition != "" { + input.InputConfigurations[0].InputStartingPositionConfiguration.InputStartingPosition = aws.String(inputStartingPosition) + } + + log.Printf("[DEBUG] Starting Kinesis Analytics Application (%s): %s", applicationARN, input) + + if _, err := conn.StartApplication(input); err != nil { + return fmt.Errorf("error starting Kinesis Analytics Application (%s): %w", applicationARN, err) + } + + if _, err := waiter.ApplicationStarted(conn, applicationName); err != nil { + return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) to start: %w", applicationARN, err) + } + + return nil +} + +func kinesisAnalyticsStopApplication(conn *kinesisanalytics.KinesisAnalytics, application *kinesisanalytics.ApplicationDetail) error { + applicationARN := aws.StringValue(application.ApplicationARN) + applicationName := aws.StringValue(application.ApplicationName) + + if actual, expected := aws.StringValue(application.ApplicationStatus), kinesisanalytics.ApplicationStatusRunning; actual != expected { + log.Printf("[DEBUG] Kinesis Analytics Application (%s) has status %s. An application can only be stopped if it's in the %s state", applicationARN, actual, expected) + return nil + } + + input := &kinesisanalytics.StopApplicationInput{ + ApplicationName: aws.String(applicationName), + } + + log.Printf("[DEBUG] Stopping Kinesis Analytics Application (%s): %s", applicationARN, input) + + if _, err := conn.StopApplication(input); err != nil { + return fmt.Errorf("error stopping Kinesis Analytics Application (%s): %w", applicationARN, err) + } + + if _, err := waiter.ApplicationStopped(conn, applicationName); err != nil { + return fmt.Errorf("error waiting for Kinesis Analytics Application (%s) to stop: %w", applicationARN, err) + } + + return nil +} + func expandKinesisAnalyticsCloudWatchLoggingOptions(vCloudWatchLoggingOptions []interface{}) []*kinesisanalytics.CloudWatchLoggingOption { if len(vCloudWatchLoggingOptions) == 0 || vCloudWatchLoggingOptions[0] == nil { return nil diff --git a/aws/resource_aws_kinesis_analytics_application_test.go b/aws/resource_aws_kinesis_analytics_application_test.go index 0e3c3b514bd..ffa8781b592 100644 --- a/aws/resource_aws_kinesis_analytics_application_test.go +++ b/aws/resource_aws_kinesis_analytics_application_test.go @@ -3,6 +3,7 @@ package aws import ( "fmt" "log" + "strconv" "testing" "time" @@ -15,6 +16,7 @@ import ( "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesisanalytics/finder" "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesisanalytics/lister" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" ) func init() { @@ -42,7 +44,7 @@ func testSweepKinesisAnalyticsApplications(region string) error { arn := aws.StringValue(applicationSummary.ApplicationARN) name := aws.StringValue(applicationSummary.ApplicationName) - application, err := finder.ApplicationByName(conn, name) + application, err := finder.ApplicationDetailByName(conn, name) if tfawserr.ErrMessageContains(err, kinesisanalytics.ErrCodeUnsupportedOperationException, "was created/updated by kinesisanalyticsv2 SDK") { continue @@ -108,6 +110,7 @@ func TestAccAWSKinesisAnalyticsApplication_basic(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "1"), @@ -215,6 +218,7 @@ func TestAccAWSKinesisAnalyticsApplication_Code_Update(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "1"), @@ -234,6 +238,7 @@ func TestAccAWSKinesisAnalyticsApplication_Code_Update(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "2"), @@ -274,6 +279,7 @@ func TestAccAWSKinesisAnalyticsApplication_CloudWatchLoggingOptions_Add(t *testi resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "1"), @@ -295,6 +301,7 @@ func TestAccAWSKinesisAnalyticsApplication_CloudWatchLoggingOptions_Add(t *testi resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "2"), @@ -337,6 +344,7 @@ func TestAccAWSKinesisAnalyticsApplication_CloudWatchLoggingOptions_Delete(t *te resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "1"), @@ -356,6 +364,7 @@ func TestAccAWSKinesisAnalyticsApplication_CloudWatchLoggingOptions_Delete(t *te resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "2"), @@ -400,6 +409,7 @@ func TestAccAWSKinesisAnalyticsApplication_CloudWatchLoggingOptions_Update(t *te resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "1"), @@ -421,6 +431,7 @@ func TestAccAWSKinesisAnalyticsApplication_CloudWatchLoggingOptions_Update(t *te resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "2"), @@ -461,6 +472,7 @@ func TestAccAWSKinesisAnalyticsApplication_Input_Add(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "1"), @@ -500,8 +512,11 @@ func TestAccAWSKinesisAnalyticsApplication_Input_Add(t *testing.T) { resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRoleResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", ""), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "2"), @@ -564,8 +579,11 @@ func TestAccAWSKinesisAnalyticsApplication_Input_Update(t *testing.T) { resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRole1ResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", ""), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "1"), @@ -608,8 +626,11 @@ func TestAccAWSKinesisAnalyticsApplication_Input_Update(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "1"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_stream.0.resource_arn", streamsResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_stream.0.role_arn", iamRole2ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", ""), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "2"), @@ -671,8 +692,11 @@ func TestAccAWSKinesisAnalyticsApplication_InputProcessingConfiguration_Add(t *t resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRoleResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", ""), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "1"), @@ -715,8 +739,11 @@ func TestAccAWSKinesisAnalyticsApplication_InputProcessingConfiguration_Add(t *t resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRoleResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", ""), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "3"), // Add input processing configuration + update input. @@ -781,8 +808,11 @@ func TestAccAWSKinesisAnalyticsApplication_InputProcessingConfiguration_Delete(t resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRoleResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", ""), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "1"), @@ -822,8 +852,11 @@ func TestAccAWSKinesisAnalyticsApplication_InputProcessingConfiguration_Delete(t resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRoleResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", ""), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "3"), // Delete input processing configuration + update input. @@ -890,8 +923,11 @@ func TestAccAWSKinesisAnalyticsApplication_InputProcessingConfiguration_Update(t resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRole1ResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", ""), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "1"), @@ -934,8 +970,11 @@ func TestAccAWSKinesisAnalyticsApplication_InputProcessingConfiguration_Update(t resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRole1ResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", ""), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "2"), @@ -968,7 +1007,7 @@ func TestAccAWSKinesisAnalyticsApplication_Multiple_Update(t *testing.T) { CheckDestroy: testAccCheckKinesisAnalyticsApplicationDestroy, Steps: []resource.TestStep{ { - Config: testAccKinesisAnalyticsApplicationConfigMultiple(rName), + Config: testAccKinesisAnalyticsApplicationConfigMultiple(rName, "", ""), Check: resource.ComposeTestCheckFunc( testAccCheckKinesisAnalyticsApplicationExists(resourceName, &v), testAccCheckResourceAttrRegionalARN(resourceName, "arn", "kinesisanalytics", fmt.Sprintf("application/%s", rName)), @@ -1006,6 +1045,8 @@ func TestAccAWSKinesisAnalyticsApplication_Multiple_Update(t *testing.T) { resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRole1ResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", ""), resource.TestCheckResourceAttr(resourceName, "outputs.#", "1"), resource.TestCheckTypeSetElemNestedAttrs(resourceName, "outputs.*", map[string]string{ "name": "OUTPUT_1", @@ -1025,7 +1066,7 @@ func TestAccAWSKinesisAnalyticsApplication_Multiple_Update(t *testing.T) { ), }, { - Config: testAccKinesisAnalyticsApplicationConfigMultipleUpdated(rName), + Config: testAccKinesisAnalyticsApplicationConfigMultipleUpdated(rName, "", ""), Check: resource.ComposeTestCheckFunc( testAccCheckKinesisAnalyticsApplicationExists(resourceName, &v), testAccCheckResourceAttrRegionalARN(resourceName, "arn", "kinesisanalytics", fmt.Sprintf("application/%s", rName)), @@ -1061,6 +1102,8 @@ func TestAccAWSKinesisAnalyticsApplication_Multiple_Update(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "1"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_stream.0.resource_arn", streamsResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_stream.0.role_arn", iamRole2ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", ""), resource.TestCheckResourceAttr(resourceName, "outputs.#", "2"), resource.TestCheckTypeSetElemNestedAttrs(resourceName, "outputs.*", map[string]string{ "name": "OUTPUT_2", @@ -1156,6 +1199,7 @@ func TestAccAWSKinesisAnalyticsApplication_Output_Update(t *testing.T) { resource.TestCheckTypeSetElemAttrPair(resourceName, "outputs.*.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), resource.TestCheckTypeSetElemAttrPair(resourceName, "outputs.*.kinesis_firehose.0.role_arn", iamRole1ResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "1"), @@ -1195,6 +1239,7 @@ func TestAccAWSKinesisAnalyticsApplication_Output_Update(t *testing.T) { resource.TestCheckTypeSetElemAttrPair(resourceName, "outputs.*.lambda.0.resource_arn", lambdaResourceName, "arn"), resource.TestCheckTypeSetElemAttrPair(resourceName, "outputs.*.lambda.0.role_arn", iamRole1ResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "4"), // 1 * output deletion + 2 * output addition. @@ -1219,6 +1264,7 @@ func TestAccAWSKinesisAnalyticsApplication_Output_Update(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "6"), // 2 * output deletion. @@ -1254,6 +1300,7 @@ func TestAccAWSKinesisAnalyticsApplication_ReferenceDataSource_Add(t *testing.T) resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "1"), @@ -1291,6 +1338,7 @@ func TestAccAWSKinesisAnalyticsApplication_ReferenceDataSource_Add(t *testing.T) resource.TestCheckResourceAttrPair(resourceName, "reference_data_sources.0.s3.0.role_arn", iamRoleResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.table_name", "TABLE-1"), resource.TestCheckResourceAttrSet(resourceName, "reference_data_sources.0.id"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "2"), @@ -1349,6 +1397,7 @@ func TestAccAWSKinesisAnalyticsApplication_ReferenceDataSource_Delete(t *testing resource.TestCheckResourceAttrPair(resourceName, "reference_data_sources.0.s3.0.role_arn", iamRoleResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.table_name", "TABLE-1"), resource.TestCheckResourceAttrSet(resourceName, "reference_data_sources.0.id"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "2"), @@ -1368,6 +1417,7 @@ func TestAccAWSKinesisAnalyticsApplication_ReferenceDataSource_Delete(t *testing resource.TestCheckResourceAttr(resourceName, "inputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "3"), @@ -1427,6 +1477,7 @@ func TestAccAWSKinesisAnalyticsApplication_ReferenceDataSource_Update(t *testing resource.TestCheckResourceAttrPair(resourceName, "reference_data_sources.0.s3.0.role_arn", iamRole1ResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.table_name", "TABLE-1"), resource.TestCheckResourceAttrSet(resourceName, "reference_data_sources.0.id"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "2"), @@ -1467,6 +1518,7 @@ func TestAccAWSKinesisAnalyticsApplication_ReferenceDataSource_Update(t *testing resource.TestCheckResourceAttrPair(resourceName, "reference_data_sources.0.s3.0.role_arn", iamRole2ResourceName, "arn"), resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.table_name", "TABLE-2"), resource.TestCheckResourceAttrSet(resourceName, "reference_data_sources.0.id"), + resource.TestCheckNoResourceAttr(resourceName, "start_application"), resource.TestCheckResourceAttr(resourceName, "status", "READY"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), resource.TestCheckResourceAttr(resourceName, "version", "3"), @@ -1481,6 +1533,400 @@ func TestAccAWSKinesisAnalyticsApplication_ReferenceDataSource_Update(t *testing }) } +func TestAccAWSKinesisAnalyticsApplication_StartApplication_OnCreate(t *testing.T) { + var v kinesisanalytics.ApplicationDetail + resourceName := "aws_kinesis_analytics_application.test" + iamRole1ResourceName := "aws_iam_role.test.0" + firehoseResourceName := "aws_kinesis_firehose_delivery_stream.test" + rName := acctest.RandomWithPrefix("tf-acc-test") + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSKinesisAnalytics(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisAnalyticsApplicationDestroy, + Steps: []resource.TestStep{ + { + Config: testAccKinesisAnalyticsApplicationConfigStartApplication(rName, true), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisAnalyticsApplicationExists(resourceName, &v), + testAccCheckResourceAttrRegionalARN(resourceName, "arn", "kinesisanalytics", fmt.Sprintf("application/%s", rName)), + resource.TestCheckResourceAttr(resourceName, "cloudwatch_logging_options.#", "0"), + resource.TestCheckResourceAttr(resourceName, "code", ""), + resource.TestCheckResourceAttrSet(resourceName, "create_timestamp"), + resource.TestCheckResourceAttr(resourceName, "description", ""), + resource.TestCheckResourceAttrSet(resourceName, "last_update_timestamp"), + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "inputs.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.stream_names.#", "1"), + resource.TestCheckResourceAttrSet(resourceName, "inputs.0.id"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.name", "COLUMN_1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.sql_type", "INTEGER"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_encoding", ""), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.0.record_column_delimiter", ","), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.0.record_row_delimiter", "|"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.json.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.record_format_type", "CSV"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.name_prefix", "NAME_PREFIX_1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.parallelism.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.parallelism.0.count", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.processing_configuration.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_firehose.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRole1ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", "NOW"), + resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckResourceAttr(resourceName, "start_application", "true"), + resource.TestCheckResourceAttr(resourceName, "status", "RUNNING"), + resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), + resource.TestCheckResourceAttr(resourceName, "version", "1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"start_application"}, + }, + }, + }) +} + +func TestAccAWSKinesisAnalyticsApplication_StartApplication_OnUpdate(t *testing.T) { + var v kinesisanalytics.ApplicationDetail + resourceName := "aws_kinesis_analytics_application.test" + iamRole1ResourceName := "aws_iam_role.test.0" + firehoseResourceName := "aws_kinesis_firehose_delivery_stream.test" + rName := acctest.RandomWithPrefix("tf-acc-test") + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSKinesisAnalytics(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisAnalyticsApplicationDestroy, + Steps: []resource.TestStep{ + { + Config: testAccKinesisAnalyticsApplicationConfigStartApplication(rName, false), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisAnalyticsApplicationExists(resourceName, &v), + testAccCheckResourceAttrRegionalARN(resourceName, "arn", "kinesisanalytics", fmt.Sprintf("application/%s", rName)), + resource.TestCheckResourceAttr(resourceName, "cloudwatch_logging_options.#", "0"), + resource.TestCheckResourceAttr(resourceName, "code", ""), + resource.TestCheckResourceAttrSet(resourceName, "create_timestamp"), + resource.TestCheckResourceAttr(resourceName, "description", ""), + resource.TestCheckResourceAttrSet(resourceName, "last_update_timestamp"), + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "inputs.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.stream_names.#", "1"), + resource.TestCheckResourceAttrSet(resourceName, "inputs.0.id"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.name", "COLUMN_1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.sql_type", "INTEGER"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_encoding", ""), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.0.record_column_delimiter", ","), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.0.record_row_delimiter", "|"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.json.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.record_format_type", "CSV"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.name_prefix", "NAME_PREFIX_1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.parallelism.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.parallelism.0.count", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.processing_configuration.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_firehose.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRole1ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", ""), + resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckResourceAttr(resourceName, "start_application", "false"), + resource.TestCheckResourceAttr(resourceName, "status", "READY"), + resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), + resource.TestCheckResourceAttr(resourceName, "version", "1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"start_application"}, + }, + { + Config: testAccKinesisAnalyticsApplicationConfigStartApplication(rName, true), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisAnalyticsApplicationExists(resourceName, &v), + testAccCheckResourceAttrRegionalARN(resourceName, "arn", "kinesisanalytics", fmt.Sprintf("application/%s", rName)), + resource.TestCheckResourceAttr(resourceName, "cloudwatch_logging_options.#", "0"), + resource.TestCheckResourceAttr(resourceName, "code", ""), + resource.TestCheckResourceAttrSet(resourceName, "create_timestamp"), + resource.TestCheckResourceAttr(resourceName, "description", ""), + resource.TestCheckResourceAttrSet(resourceName, "last_update_timestamp"), + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "inputs.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.stream_names.#", "1"), + resource.TestCheckResourceAttrSet(resourceName, "inputs.0.id"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.name", "COLUMN_1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.sql_type", "INTEGER"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_encoding", ""), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.0.record_column_delimiter", ","), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.0.record_row_delimiter", "|"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.json.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.record_format_type", "CSV"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.name_prefix", "NAME_PREFIX_1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.parallelism.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.parallelism.0.count", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.processing_configuration.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_firehose.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRole1ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", "NOW"), + resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckResourceAttr(resourceName, "start_application", "true"), + resource.TestCheckResourceAttr(resourceName, "status", "RUNNING"), + resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), + resource.TestCheckResourceAttr(resourceName, "version", "2"), + ), + }, + { + Config: testAccKinesisAnalyticsApplicationConfigStartApplication(rName, false), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisAnalyticsApplicationExists(resourceName, &v), + testAccCheckResourceAttrRegionalARN(resourceName, "arn", "kinesisanalytics", fmt.Sprintf("application/%s", rName)), + resource.TestCheckResourceAttr(resourceName, "cloudwatch_logging_options.#", "0"), + resource.TestCheckResourceAttr(resourceName, "code", ""), + resource.TestCheckResourceAttrSet(resourceName, "create_timestamp"), + resource.TestCheckResourceAttr(resourceName, "description", ""), + resource.TestCheckResourceAttrSet(resourceName, "last_update_timestamp"), + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "inputs.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.stream_names.#", "1"), + resource.TestCheckResourceAttrSet(resourceName, "inputs.0.id"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.name", "COLUMN_1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.sql_type", "INTEGER"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_encoding", ""), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.0.record_column_delimiter", ","), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.0.record_row_delimiter", "|"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.json.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.record_format_type", "CSV"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.name_prefix", "NAME_PREFIX_1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.parallelism.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.parallelism.0.count", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.processing_configuration.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_firehose.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRole1ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", "NOW"), + resource.TestCheckResourceAttr(resourceName, "outputs.#", "0"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckResourceAttr(resourceName, "start_application", "false"), + resource.TestCheckResourceAttr(resourceName, "status", "READY"), + resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), + resource.TestCheckResourceAttr(resourceName, "version", "2"), + ), + }, + }, + }) +} + +func TestAccAWSKinesisAnalyticsApplication_StartApplication_Update(t *testing.T) { + var v kinesisanalytics.ApplicationDetail + resourceName := "aws_kinesis_analytics_application.test" + iamRole1ResourceName := "aws_iam_role.test.0" + iamRole2ResourceName := "aws_iam_role.test.1" + cloudWatchLogStreamResourceName := "aws_cloudwatch_log_stream.test" + lambdaResourceName := "aws_lambda_function.test.0" + firehoseResourceName := "aws_kinesis_firehose_delivery_stream.test" + streamsResourceName := "aws_kinesis_stream.test" + s3BucketResourceName := "aws_s3_bucket.test" + rName := acctest.RandomWithPrefix("tf-acc-test") + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSKinesisAnalytics(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisAnalyticsApplicationDestroy, + Steps: []resource.TestStep{ + { + Config: testAccKinesisAnalyticsApplicationConfigMultiple(rName, "true", "LAST_STOPPED_POINT"), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisAnalyticsApplicationExists(resourceName, &v), + testAccCheckResourceAttrRegionalARN(resourceName, "arn", "kinesisanalytics", fmt.Sprintf("application/%s", rName)), + resource.TestCheckResourceAttr(resourceName, "cloudwatch_logging_options.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "cloudwatch_logging_options.0.log_stream_arn", cloudWatchLogStreamResourceName, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "cloudwatch_logging_options.0.role_arn", iamRole2ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "code", ""), + resource.TestCheckResourceAttrSet(resourceName, "create_timestamp"), + resource.TestCheckResourceAttr(resourceName, "description", ""), + resource.TestCheckResourceAttrSet(resourceName, "last_update_timestamp"), + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "inputs.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.stream_names.#", "1"), + resource.TestCheckResourceAttrSet(resourceName, "inputs.0.id"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.name", "COLUMN_1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.sql_type", "INTEGER"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_encoding", ""), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.0.record_column_delimiter", ","), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.0.record_row_delimiter", "|"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.json.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.record_format_type", "CSV"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.name_prefix", "NAME_PREFIX_1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.parallelism.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.parallelism.0.count", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.processing_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.processing_configuration.0.lambda.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.processing_configuration.0.lambda.0.resource_arn", lambdaResourceName, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.processing_configuration.0.lambda.0.role_arn", iamRole1ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_firehose.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_firehose.0.role_arn", iamRole1ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", "LAST_STOPPED_POINT"), + resource.TestCheckResourceAttr(resourceName, "outputs.#", "1"), + resource.TestCheckTypeSetElemNestedAttrs(resourceName, "outputs.*", map[string]string{ + "name": "OUTPUT_1", + "schema.#": "1", + "schema.0.record_format_type": "CSV", + "kinesis_firehose.#": "1", + "kinesis_stream.#": "0", + "lambda.#": "0", + }), + resource.TestCheckTypeSetElemAttrPair(resourceName, "outputs.*.kinesis_firehose.0.resource_arn", firehoseResourceName, "arn"), + resource.TestCheckTypeSetElemAttrPair(resourceName, "outputs.*.kinesis_firehose.0.role_arn", iamRole2ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "0"), + resource.TestCheckResourceAttr(resourceName, "start_application", "true"), + resource.TestCheckResourceAttr(resourceName, "status", "RUNNING"), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.Key1", "Value1"), + resource.TestCheckResourceAttr(resourceName, "version", "1"), + ), + }, + { + Config: testAccKinesisAnalyticsApplicationConfigMultipleUpdated(rName, "true", "LAST_STOPPED_POINT"), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisAnalyticsApplicationExists(resourceName, &v), + testAccCheckResourceAttrRegionalARN(resourceName, "arn", "kinesisanalytics", fmt.Sprintf("application/%s", rName)), + resource.TestCheckResourceAttr(resourceName, "cloudwatch_logging_options.#", "0"), + resource.TestCheckResourceAttr(resourceName, "code", ""), + resource.TestCheckResourceAttrSet(resourceName, "create_timestamp"), + resource.TestCheckResourceAttr(resourceName, "description", ""), + resource.TestCheckResourceAttrSet(resourceName, "last_update_timestamp"), + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "inputs.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.stream_names.#", "42"), + resource.TestCheckResourceAttrSet(resourceName, "inputs.0.id"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.#", "2"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.mapping", "MAPPING-2"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.name", "COLUMN_2"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.0.sql_type", "VARCHAR(8)"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.1.mapping", "MAPPING-3"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.1.name", "COLUMN_3"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_columns.1.sql_type", "DOUBLE"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_encoding", "UTF-8"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.csv.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.json.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.mapping_parameters.0.json.0.record_row_path", "$path.to.record"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.schema.0.record_format.0.record_format_type", "JSON"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.name_prefix", "NAME_PREFIX_2"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.parallelism.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.parallelism.0.count", "42"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.processing_configuration.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_firehose.#", "0"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.kinesis_stream.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_stream.0.resource_arn", streamsResourceName, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "inputs.0.kinesis_stream.0.role_arn", iamRole2ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "inputs.0.starting_position_configuration.0.starting_position", "LAST_STOPPED_POINT"), + resource.TestCheckResourceAttr(resourceName, "outputs.#", "2"), + resource.TestCheckTypeSetElemNestedAttrs(resourceName, "outputs.*", map[string]string{ + "name": "OUTPUT_2", + "schema.#": "1", + "schema.0.record_format_type": "JSON", + "kinesis_firehose.#": "0", + "kinesis_stream.#": "1", + "lambda.#": "0", + }), + resource.TestCheckTypeSetElemAttrPair(resourceName, "outputs.*.kinesis_stream.0.resource_arn", streamsResourceName, "arn"), + resource.TestCheckTypeSetElemAttrPair(resourceName, "outputs.*.kinesis_stream.0.role_arn", iamRole2ResourceName, "arn"), + resource.TestCheckTypeSetElemNestedAttrs(resourceName, "outputs.*", map[string]string{ + "name": "OUTPUT_3", + "schema.#": "1", + "schema.0.record_format_type": "CSV", + "kinesis_firehose.#": "0", + "kinesis_stream.#": "0", + "lambda.#": "1", + }), + resource.TestCheckTypeSetElemAttrPair(resourceName, "outputs.*.lambda.0.resource_arn", lambdaResourceName, "arn"), + resource.TestCheckTypeSetElemAttrPair(resourceName, "outputs.*.lambda.0.role_arn", iamRole1ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.#", "1"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.schema.#", "1"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.schema.0.record_columns.#", "1"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.schema.0.record_columns.0.name", "COLUMN_1"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.schema.0.record_columns.0.sql_type", "INTEGER"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.schema.0.record_encoding", ""), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.schema.0.record_format.#", "1"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.schema.0.record_format.0.mapping_parameters.#", "1"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.schema.0.record_format.0.mapping_parameters.0.csv.#", "1"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.schema.0.record_format.0.mapping_parameters.0.csv.0.record_column_delimiter", ","), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.schema.0.record_format.0.mapping_parameters.0.csv.0.record_row_delimiter", "|"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.schema.0.record_format.0.mapping_parameters.0.json.#", "0"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.schema.0.record_format.0.record_format_type", "CSV"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.s3.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "reference_data_sources.0.s3.0.bucket_arn", s3BucketResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.s3.0.file_key", "KEY-1"), + resource.TestCheckResourceAttrPair(resourceName, "reference_data_sources.0.s3.0.role_arn", iamRole2ResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "reference_data_sources.0.table_name", "TABLE-1"), + resource.TestCheckResourceAttrSet(resourceName, "reference_data_sources.0.id"), + resource.TestCheckResourceAttr(resourceName, "start_application", "true"), + resource.TestCheckResourceAttr(resourceName, "status", "RUNNING"), + resource.TestCheckResourceAttr(resourceName, "tags.%", "2"), + resource.TestCheckResourceAttr(resourceName, "tags.Key2", "Value2"), + resource.TestCheckResourceAttr(resourceName, "tags.Key3", "Value3"), + resource.TestCheckResourceAttr(resourceName, "version", "8"), // Delete CloudWatch logging options + add reference data source + delete input processing configuration+ update application + delete output + 2 * add output. + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"start_application"}, + }, + }, + }) +} + func testAccCheckKinesisAnalyticsApplicationDestroy(s *terraform.State) error { conn := testAccProvider.Meta().(*AWSClient).kinesisanalyticsconn @@ -1489,10 +1935,12 @@ func testAccCheckKinesisAnalyticsApplicationDestroy(s *terraform.State) error { continue } - _, err := finder.ApplicationByName(conn, rs.Primary.Attributes["name"]) - if isAWSErr(err, kinesisanalytics.ErrCodeResourceNotFoundException, "") { + _, err := finder.ApplicationDetailByName(conn, rs.Primary.Attributes["name"]) + + if tfresource.NotFound(err) { continue } + if err != nil { return err } @@ -1515,7 +1963,8 @@ func testAccCheckKinesisAnalyticsApplicationExists(n string, v *kinesisanalytics conn := testAccProvider.Meta().(*AWSClient).kinesisanalyticsconn - application, err := finder.ApplicationByName(conn, rs.Primary.Attributes["name"]) + application, err := finder.ApplicationDetailByName(conn, rs.Primary.Attributes["name"]) + if err != nil { return err } @@ -1821,7 +2270,16 @@ resource "aws_kinesis_analytics_application" "test" { `, rName, lambdaIndex)) } -func testAccKinesisAnalyticsApplicationConfigMultiple(rName string) string { +func testAccKinesisAnalyticsApplicationConfigMultiple(rName, startApplication, startingPosition string) string { + if startApplication == "" { + startApplication = "null" + } + if startingPosition == "" { + startingPosition = "null" + } else { + startingPosition = strconv.Quote(startingPosition) + } + return composeConfig( testAccKinesisAnalyticsApplicationConfigBaseIamRole(rName), testAccKinesisAnalyticsApplicationConfigBaseInputOutput(rName), @@ -1873,6 +2331,10 @@ resource "aws_kinesis_analytics_application" "test" { resource_arn = aws_kinesis_firehose_delivery_stream.test.arn role_arn = aws_iam_role.test[0].arn } + + starting_position_configuration { + starting_position = %[3]s + } } outputs { @@ -1891,11 +2353,22 @@ resource "aws_kinesis_analytics_application" "test" { tags = { Key1 = "Value1" } + + start_application = %[2]s } -`, rName)) +`, rName, startApplication, startingPosition)) } -func testAccKinesisAnalyticsApplicationConfigMultipleUpdated(rName string) string { +func testAccKinesisAnalyticsApplicationConfigMultipleUpdated(rName, startApplication, startingPosition string) string { + if startApplication == "" { + startApplication = "null" + } + if startingPosition == "" { + startingPosition = "null" + } else { + startingPosition = strconv.Quote(startingPosition) + } + return composeConfig( testAccKinesisAnalyticsApplicationConfigBaseIamRole(rName), testAccKinesisAnalyticsApplicationConfigBaseInputOutput(rName), @@ -1938,6 +2411,10 @@ resource "aws_kinesis_analytics_application" "test" { resource_arn = aws_kinesis_stream.test.arn role_arn = aws_iam_role.test[1].arn } + + starting_position_configuration { + starting_position = %[3]s + } } outputs { @@ -1996,8 +2473,10 @@ resource "aws_kinesis_analytics_application" "test" { Key2 = "Value2" Key3 = "Value3" } + + start_application = %[2]s } -`, rName)) +`, rName, startApplication, startingPosition)) } func testAccKinesisAnalyticsApplicationOutput(rName string) string { @@ -2149,6 +2628,48 @@ resource "aws_kinesis_analytics_application" "test" { `, rName)) } +func testAccKinesisAnalyticsApplicationConfigStartApplication(rName string, start bool) string { + return composeConfig( + testAccKinesisAnalyticsApplicationConfigBaseIamRole(rName), + testAccKinesisAnalyticsApplicationConfigBaseInputOutput(rName), + fmt.Sprintf(` +resource "aws_kinesis_analytics_application" "test" { + name = %[1]q + + inputs { + name_prefix = "NAME_PREFIX_1" + + schema { + record_columns { + name = "COLUMN_1" + sql_type = "INTEGER" + } + + record_format { + mapping_parameters { + csv { + record_column_delimiter = "," + record_row_delimiter = "|" + } + } + } + } + + kinesis_firehose { + resource_arn = aws_kinesis_firehose_delivery_stream.test.arn + role_arn = aws_iam_role.test[0].arn + } + + starting_position_configuration { + starting_position = (%[2]t ? "NOW" : null) + } + } + + start_application = %[2]t +} +`, rName, start)) +} + func testAccKinesisAnalyticsApplicationConfigTags1(rName, tagKey1, tagValue1 string) string { return fmt.Sprintf(` resource "aws_kinesis_analytics_application" "test" { diff --git a/website/docs/r/kinesis_analytics_application.html.markdown b/website/docs/r/kinesis_analytics_application.html.markdown index a16fbd6552d..0ae26ed8bbb 100644 --- a/website/docs/r/kinesis_analytics_application.html.markdown +++ b/website/docs/r/kinesis_analytics_application.html.markdown @@ -17,6 +17,8 @@ For more details, see the [Amazon Kinesis Analytics Documentation][1]. ## Example Usage +### Kinesis Stream Input + ```hcl resource "aws_kinesis_stream" "test_stream" { name = "terraform-kinesis-test" @@ -59,6 +61,87 @@ resource "aws_kinesis_analytics_application" "test_application" { } ``` +### Starting An Application + +```hcl +resource "aws_cloudwatch_log_group" "example" { + name = "analytics" +} + +resource "aws_cloudwatch_log_stream" "example" { + name = "example-kinesis-application" + log_group_name = aws_cloudwatch_log_group.example.name +} + +resource "aws_kinesis_stream" "example" { + name = "example-kinesis-stream" + shard_count = 1 +} + +resource "aws_kinesis_firehose_delivery_stream" "example" { + name = "example-kinesis-delivery-stream" + destination = "extended_s3" + + extended_s3_configuration { + bucket_arn = aws_s3_bucket.example.arn + role_arn = aws_iam_role.example.arn + } +} + +resource "aws_kinesis_analytics_application" "test" { + name = "example-application" + + cloudwatch_logging_options { + log_stream_arn = aws_cloudwatch_log_stream.example.arn + role_arn = aws_iam_role.example.arn + } + + inputs { + name_prefix = "example_prefix" + + schema { + record_columns { + name = "COLUMN_1" + sql_type = "INTEGER" + } + + record_format { + mapping_parameters { + csv { + record_column_delimiter = "," + record_row_delimiter = "|" + } + } + } + } + + kinesis_stream { + resource_arn = aws_kinesis_stream.example.arn + role_arn = aws_iam_role.example.arn + } + + starting_position_configuration { + starting_position = "NOW" + } + } + + outputs { + name = "OUTPUT_1" + + schema { + record_format_type = "CSV" + } + + kinesis_firehose { + resource_arn = aws_kinesis_firehose_delivery_stream.example.arn + role_arn = aws_iam_role.example.arn + } + } + + start_application = true +} +``` + ## Argument Reference The following arguments are supported: @@ -72,6 +155,8 @@ See [CloudWatch Logging Options](#cloudwatch-logging-options) below for more det * `outputs` - (Optional) Output destination configuration of the application. See [Outputs](#outputs) below for more details. * `reference_data_sources` - (Optional) An S3 Reference Data Source for the application. See [Reference Data Sources](#reference-data-sources) below for more details. +* `start_application` - (Optional) Whether to start or stop the Kinesis Analytics Application. To start an application, an input with a defined `starting_position` must be configured. +To modify an application's starting position, first stop the application by setting `start_application = false`, then update `starting_position` and set `start_application = true`. * `tags` - Key-value map of tags for the Kinesis Analytics Application. ### CloudWatch Logging Options @@ -99,6 +184,8 @@ See [Kinesis Stream](#kinesis-stream) below for more details. See [Parallelism](#parallelism) below for more details. * `processing_configuration` - (Optional) The Processing Configuration to transform records as they are received from the stream. See [Processing Configuration](#processing-configuration) below for more details. +* `starting_position_configuration` (Optional) The point at which the application starts processing records from the streaming source. +See [Starting Position Configuration](#starting-position-configuration) below for more details. ### Outputs @@ -187,6 +274,14 @@ The `lambda` block supports the following: * `resource_arn` - (Required) The ARN of the Lambda function. * `role_arn` - (Required) The ARN of the IAM Role used to access the Lambda function. +#### Starting Position Configuration + +The point at which the application reads from the streaming source. + +The `starting_position_configuration` block supports the following: + +* `starting_position` - (Required) The starting position on the stream. Valid values: `LAST_STOPPED_POINT`, `NOW`, `TRIM_HORIZON`. + #### Record Columns The Column mapping of each data element in the streaming source to the corresponding column in the in-application stream.