Skip to content

Commit

Permalink
resource/aws_kinesis_firehose_delivery_stream: Add elasticsearch_conf…
Browse files Browse the repository at this point in the history
…iguration vpc_config configuration block (support Elasticsearch VPC configuration) (#13269)

Output from acceptance testing:

```
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_s3KinesisStreamSource (102.19s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_s3basicWithTags (106.75s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_ParquetSerDe_Empty (118.90s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_OpenXJsonSerDe_Empty (123.17s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_s3basic (133.80s)
--- FAIL: TestAccAWSKinesisFirehoseDeliveryStream_basic (146.83s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_OrcSerDe_Empty (148.34s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_disappears (149.37s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_HiveJsonSerDe_Empty (149.70s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_Serializer_Update (152.19s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_ExternalUpdate (163.95s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_s3WithCloudwatchLogging (167.63s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3basic (168.05s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3KmsKeyArn (173.94s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_s3ConfigUpdates (175.01s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_ProcessingConfiguration_Empty (176.11s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_ErrorOutputPrefix (180.04s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_Deserializer_Update (187.17s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_Enabled (197.56s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_KinesisStreamSource (98.68s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_s3basicWithSSE (253.21s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_missingProcessingConfiguration (107.01s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_SplunkConfigUpdates (139.39s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3Updates (163.46s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_RedshiftConfigUpdates (340.62s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ElasticsearchConfigUpdates (991.76s)
--- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ElasticsearchWithVpcConfigUpdates (1574.83s)
```
  • Loading branch information
rajholla authored Aug 31, 2020
1 parent e208508 commit 4d3799e
Show file tree
Hide file tree
Showing 3 changed files with 392 additions and 1 deletion.
68 changes: 68 additions & 0 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func flattenFirehoseElasticsearchConfiguration(description *firehose.Elasticsear
"index_name": aws.StringValue(description.IndexName),
"s3_backup_mode": aws.StringValue(description.S3BackupMode),
"index_rotation_period": aws.StringValue(description.IndexRotationPeriod),
"vpc_config": flattenVpcConfiguration(description.VpcConfigurationDescription),
"processing_configuration": flattenProcessingConfiguration(description.ProcessingConfiguration, aws.StringValue(description.RoleARN)),
}

Expand All @@ -198,6 +199,21 @@ func flattenFirehoseElasticsearchConfiguration(description *firehose.Elasticsear
return []map[string]interface{}{m}
}

func flattenVpcConfiguration(description *firehose.VpcConfigurationDescription) []map[string]interface{} {
if description == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"vpc_id": aws.StringValue(description.VpcId),
"subnet_ids": flattenStringSet(description.SubnetIds),
"security_group_ids": flattenStringSet(description.SecurityGroupIds),
"role_arn": aws.StringValue(description.RoleARN),
}

return []map[string]interface{}{m}
}

func flattenFirehoseExtendedS3Configuration(description *firehose.ExtendedS3DestinationDescription) []map[string]interface{} {
if description == nil {
return []map[string]interface{}{}
Expand Down Expand Up @@ -1249,6 +1265,39 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
},
},

"vpc_config": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"vpc_id": {
Type: schema.TypeString,
Computed: true,
},
"subnet_ids": {
Type: schema.TypeSet,
Required: true,
ForceNew: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
"security_group_ids": {
Type: schema.TypeSet,
Required: true,
ForceNew: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
"role_arn": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validateArn,
},
},
},
},

"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(),

"processing_configuration": processingConfigurationSchema(),
Expand Down Expand Up @@ -1792,6 +1841,21 @@ func extractCloudWatchLoggingConfiguration(s3 map[string]interface{}) *firehose.

}

func extractVpcConfiguration(es map[string]interface{}) *firehose.VpcConfiguration {
config := es["vpc_config"].([]interface{})
if len(config) == 0 {
return nil
}

vpcConfig := config[0].(map[string]interface{})

return &firehose.VpcConfiguration{
RoleARN: aws.String(vpcConfig["role_arn"].(string)),
SubnetIds: expandStringSet(vpcConfig["subnet_ids"].(*schema.Set)),
SecurityGroupIds: expandStringSet(vpcConfig["security_group_ids"].(*schema.Set)),
}
}

func extractPrefixConfiguration(s3 map[string]interface{}) *string {
if v, ok := s3["prefix"]; ok {
return aws.String(v.(string))
Expand Down Expand Up @@ -1900,6 +1964,10 @@ func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3Dest
config.S3BackupMode = aws.String(s3BackupMode.(string))
}

if _, ok := es["vpc_config"]; ok {
config.VpcConfiguration = extractVpcConfiguration(es)
}

return config, nil
}

Expand Down
232 changes: 232 additions & 0 deletions aws/resource_aws_kinesis_firehose_delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,78 @@ func TestAccAWSKinesisFirehoseDeliveryStream_ElasticsearchConfigUpdates(t *testi
})
}

func TestAccAWSKinesisFirehoseDeliveryStream_ElasticsearchWithVpcConfigUpdates(t *testing.T) {
var stream firehose.DeliveryStreamDescription

resourceName := "aws_kinesis_firehose_delivery_stream.test"
ri := acctest.RandInt()
rString := acctest.RandString(8)
funcName := fmt.Sprintf("aws_kinesis_firehose_delivery_stream_test_%s", rString)
policyName := fmt.Sprintf("tf_acc_policy_%s", rString)
roleName := fmt.Sprintf("tf_acc_role_%s", rString)
preConfigWithVpc := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchVpcBasic,
ri, ri, ri, ri, ri)

postConfigWithVpc := testAccFirehoseAWSLambdaConfigBasic(funcName, policyName, roleName) +
fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchVpcUpdate,
ri, ri, ri, ri, ri)

updatedElasticSearchConfig := &firehose.ElasticsearchDestinationDescription{
BufferingHints: &firehose.ElasticsearchBufferingHints{
IntervalInSeconds: aws.Int64(500),
},
ProcessingConfiguration: &firehose.ProcessingConfiguration{
Enabled: aws.Bool(true),
Processors: []*firehose.Processor{
{
Type: aws.String("Lambda"),
Parameters: []*firehose.ProcessorParameter{
{
ParameterName: aws.String("LambdaArn"),
ParameterValue: aws.String("valueNotTested"),
},
},
},
},
},
}

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t); testAccPreCheckIamServiceLinkedRoleEs(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
{
Config: preConfigWithVpc,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil),
resource.TestCheckResourceAttrPair(resourceName, "elasticsearch_configuration.0.vpc_config.0.vpc_id", "aws_vpc.elasticsearch_in_vpc", "id"),
resource.TestCheckResourceAttr(resourceName, "elasticsearch_configuration.0.vpc_config.0.subnet_ids.#", "2"),
resource.TestCheckResourceAttr(resourceName, "elasticsearch_configuration.0.vpc_config.0.security_group_ids.#", "2"),
resource.TestCheckResourceAttrPair(resourceName, "elasticsearch_configuration.0.vpc_config.0.role_arn", "aws_iam_role.firehose", "arn"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
{
Config: postConfigWithVpc,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, updatedElasticSearchConfig, nil),
resource.TestCheckResourceAttrPair(resourceName, "elasticsearch_configuration.0.vpc_config.0.vpc_id", "aws_vpc.elasticsearch_in_vpc", "id"),
resource.TestCheckResourceAttr(resourceName, "elasticsearch_configuration.0.vpc_config.0.subnet_ids.#", "2"),
resource.TestCheckResourceAttr(resourceName, "elasticsearch_configuration.0.vpc_config.0.security_group_ids.#", "2"),
resource.TestCheckResourceAttrPair(resourceName, "elasticsearch_configuration.0.vpc_config.0.role_arn", "aws_iam_role.firehose", "arn"),
),
},
},
})
}

// Regression test for https://github.com/terraform-providers/terraform-provider-aws/issues/1657
func TestAccAWSKinesisFirehoseDeliveryStream_missingProcessingConfiguration(t *testing.T) {
var stream firehose.DeliveryStreamDescription
Expand Down Expand Up @@ -2452,6 +2524,108 @@ EOF
}
`

// ElasticSearch associated with VPC
var testAccKinesisFirehoseDeliveryStreamBaseElasticsearchVpcConfig = testAccKinesisFirehoseDeliveryStreamBaseConfig + `
data "aws_availability_zones" "available" {
state = "available"
filter {
name = "opt-in-status"
values = ["opt-in-not-required"]
}
}
resource "aws_vpc" "elasticsearch_in_vpc" {
cidr_block = "192.168.0.0/22"
tags = {
Name = "terraform-testacc-elasticsearch-domain-in-vpc"
}
}
resource "aws_subnet" "first" {
vpc_id = aws_vpc.elasticsearch_in_vpc.id
availability_zone = data.aws_availability_zones.available.names[0]
cidr_block = "192.168.0.0/24"
tags = {
Name = "tf-acc-elasticsearch-domain-in-vpc-first"
}
}
resource "aws_subnet" "second" {
vpc_id = aws_vpc.elasticsearch_in_vpc.id
availability_zone = data.aws_availability_zones.available.names[1]
cidr_block = "192.168.1.0/24"
tags = {
Name = "tf-acc-elasticsearch-domain-in-vpc-second"
}
}
resource "aws_security_group" "first" {
vpc_id = aws_vpc.elasticsearch_in_vpc.id
}
resource "aws_security_group" "second" {
vpc_id = aws_vpc.elasticsearch_in_vpc.id
}
resource "aws_elasticsearch_domain" "test_cluster" {
domain_name = "es-test-%d"
cluster_config {
instance_count = 2
zone_awareness_enabled = true
instance_type = "t2.small.elasticsearch"
}
ebs_options {
ebs_enabled = true
volume_size = 10
}
vpc_options {
security_group_ids = [aws_security_group.first.id, aws_security_group.second.id]
subnet_ids = [aws_subnet.first.id, aws_subnet.second.id]
}
}
resource "aws_iam_role_policy" "firehose-elasticsearch" {
name = "elasticsearch"
role = aws_iam_role.firehose.id
policy = <<EOF
{
"Version":"2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Action":[
"es:*"
],
"Resource":[
"${aws_elasticsearch_domain.test_cluster.arn}",
"${aws_elasticsearch_domain.test_cluster.arn}/*"
]
},
{
"Effect":"Allow",
"Action":[
"ec2:Describe*",
"ec2:CreateNetworkInterface",
"ec2:CreateNetworkInterfacePermission",
"ec2:DeleteNetworkInterface"
],
"Resource":[
"*"
]
}
]
}
EOF
}
`

var testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchBasic = testAccKinesisFirehoseDeliveryStreamBaseElasticsearchConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test" {
depends_on = [aws_iam_role_policy.firehose-elasticsearch]
Expand All @@ -2473,6 +2647,30 @@ resource "aws_kinesis_firehose_delivery_stream" "test" {
}
`

var testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchVpcBasic = testAccKinesisFirehoseDeliveryStreamBaseElasticsearchVpcConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test" {
depends_on = [aws_iam_role_policy.firehose-elasticsearch]
name = "terraform-kinesis-firehose-es-%d"
destination = "elasticsearch"
s3_configuration {
role_arn = aws_iam_role.firehose.arn
bucket_arn = aws_s3_bucket.bucket.arn
}
elasticsearch_configuration {
domain_arn = aws_elasticsearch_domain.test_cluster.arn
role_arn = aws_iam_role.firehose.arn
index_name = "test"
type_name = "test"
vpc_config {
subnet_ids = [aws_subnet.first.id, aws_subnet.second.id]
security_group_ids = [aws_security_group.first.id, aws_security_group.second.id]
role_arn = aws_iam_role.firehose.arn
}
}
}
`
var testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchUpdate = testAccKinesisFirehoseDeliveryStreamBaseElasticsearchConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test" {
depends_on = [aws_iam_role_policy.firehose-elasticsearch]
Expand Down Expand Up @@ -2508,6 +2706,40 @@ resource "aws_kinesis_firehose_delivery_stream" "test" {
}
`

var testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchVpcUpdate = testAccKinesisFirehoseDeliveryStreamBaseElasticsearchVpcConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test" {
depends_on = [aws_iam_role_policy.firehose-elasticsearch]
name = "terraform-kinesis-firehose-es-%d"
destination = "elasticsearch"
s3_configuration {
role_arn = aws_iam_role.firehose.arn
bucket_arn = aws_s3_bucket.bucket.arn
}
elasticsearch_configuration {
domain_arn = aws_elasticsearch_domain.test_cluster.arn
role_arn = aws_iam_role.firehose.arn
index_name = "test"
type_name = "test"
buffering_interval = 500
vpc_config {
subnet_ids = [aws_subnet.first.id, aws_subnet.second.id]
security_group_ids = [aws_security_group.first.id, aws_security_group.second.id]
role_arn = aws_iam_role.firehose.arn
}
processing_configuration {
enabled = false
processors {
type = "Lambda"
parameters {
parameter_name = "LambdaArn"
parameter_value = "${aws_lambda_function.lambda_function_test.arn}:$LATEST"
}
}
}
}
}`

func testAccKinesisFirehoseDeliveryStreamConfig_missingProcessingConfiguration(rInt int) string {
return fmt.Sprintf(`
data "aws_partition" "current" {}
Expand Down
Loading

0 comments on commit 4d3799e

Please sign in to comment.