diff --git a/aws/resource_aws_kinesis_firehose_delivery_stream.go b/aws/resource_aws_kinesis_firehose_delivery_stream.go index 5b7755a0ebb..5a9f9f4bd95 100644 --- a/aws/resource_aws_kinesis_firehose_delivery_stream.go +++ b/aws/resource_aws_kinesis_firehose_delivery_stream.go @@ -19,6 +19,14 @@ const ( firehoseDeliveryStreamStatusDeleted = "DESTROYED" ) +const ( + firehoseDestinationTypeS3 = "s3" + firehoseDestinationTypeExtendedS3 = "extended_s3" + firehoseDestinationTypeElasticsearch = "elasticsearch" + firehoseDestinationTypeRedshift = "redshift" + firehoseDestinationTypeSplunk = "splunk" +) + func cloudWatchLoggingOptionsSchema() *schema.Schema { return &schema.Schema{ Type: schema.TypeList, @@ -55,26 +63,30 @@ func s3ConfigurationSchema() *schema.Schema { Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "bucket_arn": { - Type: schema.TypeString, - Required: true, + Type: schema.TypeString, + Required: true, + ValidateFunc: validateArn, }, "buffer_size": { - Type: schema.TypeInt, - Optional: true, - Default: 5, + Type: schema.TypeInt, + Optional: true, + Default: 5, + ValidateFunc: validation.IntAtLeast(1), }, "buffer_interval": { - Type: schema.TypeInt, - Optional: true, - Default: 300, + Type: schema.TypeInt, + Optional: true, + Default: 300, + ValidateFunc: validation.IntAtLeast(60), }, "compression_format": { - Type: schema.TypeString, - Optional: true, - Default: firehose.CompressionFormatUncompressed, + Type: schema.TypeString, + Optional: true, + Default: firehose.CompressionFormatUncompressed, + ValidateFunc: validation.StringInSlice(firehose.CompressionFormat_Values(), false), }, "kms_key_arn": { @@ -84,8 +96,9 @@ func s3ConfigurationSchema() *schema.Schema { }, "role_arn": { - Type: schema.TypeString, - Required: true, + Type: schema.TypeString, + Required: true, + ValidateFunc: validateArn, }, "prefix": { @@ -122,15 +135,9 @@ func processingConfigurationSchema() *schema.Schema { Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "parameter_name": { - Type: schema.TypeString, - Required: true, - ValidateFunc: validation.StringInSlice([]string{ - firehose.ProcessorParameterNameLambdaArn, - firehose.ProcessorParameterNameNumberOfRetries, - firehose.ProcessorParameterNameRoleArn, - firehose.ProcessorParameterNameBufferSizeInMbs, - firehose.ProcessorParameterNameBufferIntervalInSeconds, - }, false), + Type: schema.TypeString, + Required: true, + ValidateFunc: validation.StringInSlice(firehose.ProcessorParameterName_Values(), false), }, "parameter_value": { Type: schema.TypeString, @@ -141,11 +148,9 @@ func processingConfigurationSchema() *schema.Schema { }, }, "type": { - Type: schema.TypeString, - Required: true, - ValidateFunc: validation.StringInSlice([]string{ - firehose.ProcessorTypeLambda, - }, false), + Type: schema.TypeString, + Required: true, + ValidateFunc: validation.StringInSlice(firehose.ProcessorType_Values(), false), }, }, }, @@ -177,7 +182,6 @@ func flattenFirehoseElasticsearchConfiguration(description *firehose.Elasticsear m := map[string]interface{}{ "cloudwatch_logging_options": flattenCloudwatchLoggingOptions(description.CloudWatchLoggingOptions), - "domain_arn": aws.StringValue(description.DomainARN), "role_arn": aws.StringValue(description.RoleARN), "type_name": aws.StringValue(description.TypeName), "index_name": aws.StringValue(description.IndexName), @@ -187,6 +191,14 @@ func flattenFirehoseElasticsearchConfiguration(description *firehose.Elasticsear "processing_configuration": flattenProcessingConfiguration(description.ProcessingConfiguration, aws.StringValue(description.RoleARN)), } + if description.DomainARN != nil { + m["domain_arn"] = aws.StringValue(description.DomainARN) + } + + if description.ClusterEndpoint != nil { + m["cluster_endpoint"] = aws.StringValue(description.ClusterEndpoint) + } + if description.BufferingHints != nil { m["buffering_interval"] = int(aws.Int64Value(description.BufferingHints.IntervalInSeconds)) m["buffering_size"] = int(aws.Int64Value(description.BufferingHints.SizeInMBs)) @@ -624,7 +636,7 @@ func flattenKinesisFirehoseDeliveryStream(d *schema.ResourceData, s *firehose.De if len(s.Destinations) > 0 { destination := s.Destinations[0] if destination.RedshiftDestinationDescription != nil { - d.Set("destination", "redshift") + d.Set("destination", firehoseDestinationTypeRedshift) configuredPassword := d.Get("redshift_configuration.0.password").(string) if err := d.Set("redshift_configuration", flattenFirehoseRedshiftConfiguration(destination.RedshiftDestinationDescription, configuredPassword)); err != nil { return fmt.Errorf("error setting redshift_configuration: %s", err) @@ -633,7 +645,7 @@ func flattenKinesisFirehoseDeliveryStream(d *schema.ResourceData, s *firehose.De return fmt.Errorf("error setting s3_configuration: %s", err) } } else if destination.ElasticsearchDestinationDescription != nil { - d.Set("destination", "elasticsearch") + d.Set("destination", firehoseDestinationTypeElasticsearch) if err := d.Set("elasticsearch_configuration", flattenFirehoseElasticsearchConfiguration(destination.ElasticsearchDestinationDescription)); err != nil { return fmt.Errorf("error setting elasticsearch_configuration: %s", err) } @@ -641,20 +653,20 @@ func flattenKinesisFirehoseDeliveryStream(d *schema.ResourceData, s *firehose.De return fmt.Errorf("error setting s3_configuration: %s", err) } } else if destination.SplunkDestinationDescription != nil { - d.Set("destination", "splunk") + d.Set("destination", firehoseDestinationTypeSplunk) if err := d.Set("splunk_configuration", flattenFirehoseSplunkConfiguration(destination.SplunkDestinationDescription)); err != nil { return fmt.Errorf("error setting splunk_configuration: %s", err) } if err := d.Set("s3_configuration", flattenFirehoseS3Configuration(destination.SplunkDestinationDescription.S3DestinationDescription)); err != nil { return fmt.Errorf("error setting s3_configuration: %s", err) } - } else if d.Get("destination").(string) == "s3" { - d.Set("destination", "s3") + } else if d.Get("destination").(string) == firehoseDestinationTypeS3 { + d.Set("destination", firehoseDestinationTypeS3) if err := d.Set("s3_configuration", flattenFirehoseS3Configuration(destination.S3DestinationDescription)); err != nil { return fmt.Errorf("error setting s3_configuration: %s", err) } } else { - d.Set("destination", "extended_s3") + d.Set("destination", firehoseDestinationTypeExtendedS3) if err := d.Set("extended_s3_configuration", flattenFirehoseExtendedS3Configuration(destination.ExtendedS3DestinationDescription)); err != nil { return fmt.Errorf("error setting extended_s3_configuration: %s", err) } @@ -752,11 +764,11 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { return strings.ToLower(value) }, ValidateFunc: validation.StringInSlice([]string{ - "elasticsearch", - "extended_s3", - "redshift", - "s3", - "splunk", + firehoseDestinationTypeS3, + firehoseDestinationTypeExtendedS3, + firehoseDestinationTypeRedshift, + firehoseDestinationTypeElasticsearch, + firehoseDestinationTypeSplunk, }, false), }, @@ -770,8 +782,9 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "bucket_arn": { - Type: schema.TypeString, - Required: true, + Type: schema.TypeString, + Required: true, + ValidateFunc: validateArn, }, "buffer_size": { @@ -787,9 +800,10 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { }, "compression_format": { - Type: schema.TypeString, - Optional: true, - Default: firehose.CompressionFormatUncompressed, + Type: schema.TypeString, + Optional: true, + Default: firehose.CompressionFormatUncompressed, + ValidateFunc: validation.StringInSlice(firehose.CompressionFormat_Values(), false), }, "data_format_conversion_configuration": { @@ -899,14 +913,10 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { Default: 0.05, }, "compression": { - Type: schema.TypeString, - Optional: true, - Default: firehose.OrcCompressionSnappy, - ValidateFunc: validation.StringInSlice([]string{ - firehose.OrcCompressionNone, - firehose.OrcCompressionSnappy, - firehose.OrcCompressionZlib, - }, false), + Type: schema.TypeString, + Optional: true, + Default: firehose.OrcCompressionSnappy, + ValidateFunc: validation.StringInSlice(firehose.OrcCompression_Values(), false), }, "dictionary_key_threshold": { Type: schema.TypeFloat, @@ -919,13 +929,10 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { Default: false, }, "format_version": { - Type: schema.TypeString, - Optional: true, - Default: firehose.OrcFormatVersionV012, - ValidateFunc: validation.StringInSlice([]string{ - firehose.OrcFormatVersionV011, - firehose.OrcFormatVersionV012, - }, false), + Type: schema.TypeString, + Optional: true, + Default: firehose.OrcFormatVersionV012, + ValidateFunc: validation.StringInSlice(firehose.OrcFormatVersion_Values(), false), }, "padding_tolerance": { Type: schema.TypeFloat, @@ -965,14 +972,10 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { ValidateFunc: validation.IntAtLeast(67108864), }, "compression": { - Type: schema.TypeString, - Optional: true, - Default: firehose.ParquetCompressionSnappy, - ValidateFunc: validation.StringInSlice([]string{ - firehose.ParquetCompressionGzip, - firehose.ParquetCompressionSnappy, - firehose.ParquetCompressionUncompressed, - }, false), + Type: schema.TypeString, + Optional: true, + Default: firehose.ParquetCompressionSnappy, + ValidateFunc: validation.StringInSlice(firehose.ParquetCompression_Values(), false), }, "enable_dictionary_compression": { Type: schema.TypeBool, @@ -993,13 +996,10 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { ValidateFunc: validation.IntAtLeast(65536), }, "writer_version": { - Type: schema.TypeString, - Optional: true, - Default: firehose.ParquetWriterVersionV1, - ValidateFunc: validation.StringInSlice([]string{ - firehose.ParquetWriterVersionV1, - firehose.ParquetWriterVersionV2, - }, false), + Type: schema.TypeString, + Optional: true, + Default: firehose.ParquetWriterVersionV1, + ValidateFunc: validation.StringInSlice(firehose.ParquetWriterVersion_Values(), false), }, }, }, @@ -1031,8 +1031,9 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { Computed: true, }, "role_arn": { - Type: schema.TypeString, - Required: true, + Type: schema.TypeString, + Required: true, + ValidateFunc: validateArn, }, "table_name": { Type: schema.TypeString, @@ -1062,8 +1063,9 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { }, "role_arn": { - Type: schema.TypeString, - Required: true, + Type: schema.TypeString, + Required: true, + ValidateFunc: validateArn, }, "prefix": { @@ -1072,13 +1074,10 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { }, "s3_backup_mode": { - Type: schema.TypeString, - Optional: true, - Default: firehose.S3BackupModeDisabled, - ValidateFunc: validation.StringInSlice([]string{ - firehose.S3BackupModeDisabled, - firehose.S3BackupModeEnabled, - }, false), + Type: schema.TypeString, + Optional: true, + Default: firehose.S3BackupModeDisabled, + ValidateFunc: validation.StringInSlice(firehose.S3BackupMode_Values(), false), }, "s3_backup_configuration": s3ConfigurationSchema(), @@ -1115,34 +1114,25 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { "processing_configuration": processingConfigurationSchema(), "role_arn": { - Type: schema.TypeString, - Required: true, + Type: schema.TypeString, + Required: true, + ValidateFunc: validateArn, }, "s3_backup_mode": { - Type: schema.TypeString, - Optional: true, - Default: firehose.S3BackupModeDisabled, - ValidateFunc: validation.StringInSlice([]string{ - firehose.S3BackupModeDisabled, - firehose.S3BackupModeEnabled, - }, false), + Type: schema.TypeString, + Optional: true, + Default: firehose.S3BackupModeDisabled, + ValidateFunc: validation.StringInSlice(firehose.S3BackupMode_Values(), false), }, "s3_backup_configuration": s3ConfigurationSchema(), "retry_duration": { - Type: schema.TypeInt, - Optional: true, - Default: 3600, - ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) { - value := v.(int) - if value < 0 || value > 7200 { - errors = append(errors, fmt.Errorf( - "%q must be in the range from 0 to 7200 seconds.", k)) - } - return - }, + Type: schema.TypeInt, + Optional: true, + Default: 3600, + ValidateFunc: validation.IntBetween(0, 7200), }, "copy_options": { @@ -1172,36 +1162,24 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "buffering_interval": { - Type: schema.TypeInt, - Optional: true, - Default: 300, - ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) { - value := v.(int) - if value < 60 || value > 900 { - errors = append(errors, fmt.Errorf( - "%q must be in the range from 60 to 900 seconds.", k)) - } - return - }, + Type: schema.TypeInt, + Optional: true, + Default: 300, + ValidateFunc: validation.IntBetween(60, 900), }, "buffering_size": { - Type: schema.TypeInt, - Optional: true, - Default: 5, - ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) { - value := v.(int) - if value < 1 || value > 100 { - errors = append(errors, fmt.Errorf( - "%q must be in the range from 1 to 100 MB.", k)) - } - return - }, + Type: schema.TypeInt, + Optional: true, + Default: 5, + ValidateFunc: validation.IntBetween(1, 100), }, "domain_arn": { - Type: schema.TypeString, - Required: true, + Type: schema.TypeString, + Optional: true, + ValidateFunc: validateArn, + ConflictsWith: []string{"elasticsearch_configuration.0.cluster_endpoint"}, }, "index_name": { @@ -1210,59 +1188,37 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { }, "index_rotation_period": { - Type: schema.TypeString, - Optional: true, - Default: firehose.ElasticsearchIndexRotationPeriodOneDay, - ValidateFunc: validation.StringInSlice([]string{ - firehose.ElasticsearchIndexRotationPeriodNoRotation, - firehose.ElasticsearchIndexRotationPeriodOneHour, - firehose.ElasticsearchIndexRotationPeriodOneDay, - firehose.ElasticsearchIndexRotationPeriodOneWeek, - firehose.ElasticsearchIndexRotationPeriodOneMonth, - }, false), + Type: schema.TypeString, + Optional: true, + Default: firehose.ElasticsearchIndexRotationPeriodOneDay, + ValidateFunc: validation.StringInSlice(firehose.ElasticsearchIndexRotationPeriod_Values(), false), }, "retry_duration": { - Type: schema.TypeInt, - Optional: true, - Default: 300, - ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) { - value := v.(int) - if value < 0 || value > 7200 { - errors = append(errors, fmt.Errorf( - "%q must be in the range from 0 to 7200 seconds.", k)) - } - return - }, + Type: schema.TypeInt, + Optional: true, + Default: 300, + ValidateFunc: validation.IntBetween(0, 7200), }, "role_arn": { - Type: schema.TypeString, - Required: true, + Type: schema.TypeString, + Required: true, + ValidateFunc: validateArn, }, "s3_backup_mode": { - Type: schema.TypeString, - ForceNew: true, - Optional: true, - Default: firehose.ElasticsearchS3BackupModeFailedDocumentsOnly, - ValidateFunc: validation.StringInSlice([]string{ - firehose.ElasticsearchS3BackupModeFailedDocumentsOnly, - firehose.ElasticsearchS3BackupModeAllDocuments, - }, false), + Type: schema.TypeString, + ForceNew: true, + Optional: true, + Default: firehose.ElasticsearchS3BackupModeFailedDocumentsOnly, + ValidateFunc: validation.StringInSlice(firehose.ElasticsearchS3BackupMode_Values(), false), }, "type_name": { - Type: schema.TypeString, - Optional: true, - ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) { - value := v.(string) - if len(value) > 100 { - errors = append(errors, fmt.Errorf( - "%q cannot be longer than 100 characters", k)) - } - return - }, + Type: schema.TypeString, + Optional: true, + ValidateFunc: validation.StringLenBetween(0, 100), }, "vpc_config": { @@ -1301,6 +1257,11 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { "cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(), "processing_configuration": processingConfigurationSchema(), + "cluster_endpoint": { + Type: schema.TypeString, + Optional: true, + ConflictsWith: []string{"elasticsearch_configuration.0.domain_arn"}, + }, }, }, }, @@ -1814,7 +1775,7 @@ func extractEncryptionConfiguration(s3 map[string]interface{}) *firehose.Encrypt } return &firehose.EncryptionConfiguration{ - NoEncryptionConfig: aws.String("NoEncryption"), + NoEncryptionConfig: aws.String(firehose.NoEncryptionConfigNoEncryption), } } @@ -1941,7 +1902,6 @@ func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3Dest config := &firehose.ElasticsearchDestinationConfiguration{ BufferingHints: extractBufferingHints(es), - DomainARN: aws.String(es["domain_arn"].(string)), IndexName: aws.String(es["index_name"].(string)), RetryOptions: extractElasticSearchRetryOptions(es), RoleARN: aws.String(es["role_arn"].(string)), @@ -1949,6 +1909,14 @@ func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3Dest S3Configuration: s3Config, } + if v, ok := es["domain_arn"]; ok && v.(string) != "" { + config.DomainARN = aws.String(v.(string)) + } + + if v, ok := es["cluster_endpoint"]; ok && v.(string) != "" { + config.ClusterEndpoint = aws.String(v.(string)) + } + if _, ok := es["cloudwatch_logging_options"]; ok { config.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(es) } @@ -1982,7 +1950,6 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest update := &firehose.ElasticsearchDestinationUpdate{ BufferingHints: extractBufferingHints(es), - DomainARN: aws.String(es["domain_arn"].(string)), IndexName: aws.String(es["index_name"].(string)), RetryOptions: extractElasticSearchRetryOptions(es), RoleARN: aws.String(es["role_arn"].(string)), @@ -1990,6 +1957,14 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest S3Update: s3Update, } + if v, ok := es["domain_arn"]; ok && v.(string) != "" { + update.DomainARN = aws.String(v.(string)) + } + + if v, ok := es["cluster_endpoint"]; ok && v.(string) != "" { + update.ClusterEndpoint = aws.String(v.(string)) + } + if _, ok := es["cloudwatch_logging_options"]; ok { update.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(es) } @@ -2149,27 +2124,27 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta createInput.DeliveryStreamType = aws.String(firehose.DeliveryStreamTypeDirectPut) } - if d.Get("destination").(string) == "extended_s3" { + if d.Get("destination").(string) == firehoseDestinationTypeExtendedS3 { extendedS3Config := createExtendedS3Config(d) createInput.ExtendedS3DestinationConfiguration = extendedS3Config } else { s3Config := createS3Config(d) - if d.Get("destination").(string) == "s3" { + if d.Get("destination").(string) == firehoseDestinationTypeS3 { createInput.S3DestinationConfiguration = s3Config - } else if d.Get("destination").(string) == "elasticsearch" { + } else if d.Get("destination").(string) == firehoseDestinationTypeElasticsearch { esConfig, err := createElasticsearchConfig(d, s3Config) if err != nil { return err } createInput.ElasticsearchDestinationConfiguration = esConfig - } else if d.Get("destination").(string) == "redshift" { + } else if d.Get("destination").(string) == firehoseDestinationTypeRedshift { rc, err := createRedshiftConfig(d, s3Config) if err != nil { return err } createInput.RedshiftDestinationConfiguration = rc - } else if d.Get("destination").(string) == "splunk" { + } else if d.Get("destination").(string) == firehoseDestinationTypeSplunk { rc, err := createSplunkConfig(d, s3Config) if err != nil { return err @@ -2243,7 +2218,7 @@ func validateAwsKinesisFirehoseSchema(d *schema.ResourceData) error { _, s3Exists := d.GetOk("s3_configuration") _, extendedS3Exists := d.GetOk("extended_s3_configuration") - if d.Get("destination").(string) == "extended_s3" { + if d.Get("destination").(string) == firehoseDestinationTypeExtendedS3 { if !extendedS3Exists { return fmt.Errorf( "When destination is 'extended_s3', extended_s3_configuration is required", @@ -2285,27 +2260,27 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta DestinationId: aws.String(d.Get("destination_id").(string)), } - if d.Get("destination").(string) == "extended_s3" { + if d.Get("destination").(string) == firehoseDestinationTypeExtendedS3 { extendedS3Config := updateExtendedS3Config(d) updateInput.ExtendedS3DestinationUpdate = extendedS3Config } else { s3Config := updateS3Config(d) - if d.Get("destination").(string) == "s3" { + if d.Get("destination").(string) == firehoseDestinationTypeS3 { updateInput.S3DestinationUpdate = s3Config - } else if d.Get("destination").(string) == "elasticsearch" { + } else if d.Get("destination").(string) == firehoseDestinationTypeElasticsearch { esUpdate, err := updateElasticsearchConfig(d, s3Config) if err != nil { return err } updateInput.ElasticsearchDestinationUpdate = esUpdate - } else if d.Get("destination").(string) == "redshift" { + } else if d.Get("destination").(string) == firehoseDestinationTypeRedshift { rc, err := updateRedshiftConfig(d, s3Config) if err != nil { return err } updateInput.RedshiftDestinationUpdate = rc - } else if d.Get("destination").(string) == "splunk" { + } else if d.Get("destination").(string) == firehoseDestinationTypeSplunk { rc, err := updateSplunkConfig(d, s3Config) if err != nil { return err diff --git a/aws/resource_aws_kinesis_firehose_delivery_stream_test.go b/aws/resource_aws_kinesis_firehose_delivery_stream_test.go index f5a163989e1..cd4387d59a9 100644 --- a/aws/resource_aws_kinesis_firehose_delivery_stream_test.go +++ b/aws/resource_aws_kinesis_firehose_delivery_stream_test.go @@ -1050,10 +1050,73 @@ func TestAccAWSKinesisFirehoseDeliveryStream_ElasticsearchConfigUpdates(t *testi policyName := fmt.Sprintf("tf_acc_policy_%s", rString) roleName := fmt.Sprintf("tf_acc_role_%s", rString) preConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchBasic, - ri, ri, ri, ri, ri) + ri, ri, ri, ri, ri, ri) postConfig := testAccFirehoseAWSLambdaConfigBasic(funcName, policyName, roleName) + fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchUpdate, - ri, ri, ri, ri, ri) + ri, ri, ri, ri, ri, ri) + + updatedElasticSearchConfig := &firehose.ElasticsearchDestinationDescription{ + BufferingHints: &firehose.ElasticsearchBufferingHints{ + IntervalInSeconds: aws.Int64(500), + }, + ProcessingConfiguration: &firehose.ProcessingConfiguration{ + Enabled: aws.Bool(true), + Processors: []*firehose.Processor{ + { + Type: aws.String("Lambda"), + Parameters: []*firehose.ProcessorParameter{ + { + ParameterName: aws.String("LambdaArn"), + ParameterValue: aws.String("valueNotTested"), + }, + }, + }, + }, + }, + } + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy, + Steps: []resource.TestStep{ + { + Config: preConfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream), + testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: postConfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream), + testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, updatedElasticSearchConfig, nil), + ), + }, + }, + }) +} + +func TestAccAWSKinesisFirehoseDeliveryStream_ElasticsearchConfigEndpointUpdates(t *testing.T) { + var stream firehose.DeliveryStreamDescription + + resourceName := "aws_kinesis_firehose_delivery_stream.test" + ri := acctest.RandInt() + rString := acctest.RandString(8) + funcName := fmt.Sprintf("aws_kinesis_firehose_delivery_stream_test_%s", rString) + policyName := fmt.Sprintf("tf_acc_policy_%s", rString) + roleName := fmt.Sprintf("tf_acc_role_%s", rString) + preConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchEndpoint, + ri, ri, ri, ri, ri, ri) + postConfig := testAccFirehoseAWSLambdaConfigBasic(funcName, policyName, roleName) + + fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchEndpointUpdate, + ri, ri, ri, ri, ri, ri) updatedElasticSearchConfig := &firehose.ElasticsearchDestinationDescription{ BufferingHints: &firehose.ElasticsearchBufferingHints{ @@ -1083,7 +1146,7 @@ func TestAccAWSKinesisFirehoseDeliveryStream_ElasticsearchConfigUpdates(t *testi { Config: preConfig, Check: resource.ComposeTestCheckFunc( - testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test", &stream), + testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream), testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil), ), }, @@ -1095,7 +1158,7 @@ func TestAccAWSKinesisFirehoseDeliveryStream_ElasticsearchConfigUpdates(t *testi { Config: postConfig, Check: resource.ComposeTestCheckFunc( - testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test", &stream), + testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream), testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, updatedElasticSearchConfig, nil), ), }, @@ -1629,7 +1692,7 @@ data "aws_caller_identity" "current" {} data "aws_partition" "current" {} resource "aws_iam_role" "firehose" { - name = "tf_acctest_firehose_delivery_role_%d" + name = "tf-acc-test-%[1]d" assume_role_policy = <