Skip to content

Commit

Permalink
Add s3_backup_mode option in Firehose Redshift destination (#1830)
Browse files Browse the repository at this point in the history
* Add s3_backup_mode option in Firehose Redshift destination

* Update docs for Firehose Redshift destination configuration

* Changes as per review comments

* Don't export a private function
  • Loading branch information
ApsOps authored and radeksimko committed Oct 25, 2017
1 parent 1be222f commit 44edced
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 53 deletions.
181 changes: 132 additions & 49 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,58 @@ func cloudWatchLoggingOptionsSchema() *schema.Schema {
}
}

func s3ConfigurationSchema() *schema.Schema {
return &schema.Schema{
Type: schema.TypeList,
MaxItems: 1,
Optional: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"bucket_arn": {
Type: schema.TypeString,
Required: true,
},

"buffer_size": {
Type: schema.TypeInt,
Optional: true,
Default: 5,
},

"buffer_interval": {
Type: schema.TypeInt,
Optional: true,
Default: 300,
},

"compression_format": {
Type: schema.TypeString,
Optional: true,
Default: "UNCOMPRESSED",
},

"kms_key_arn": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validateArn,
},

"role_arn": {
Type: schema.TypeString,
Required: true,
},

"prefix": {
Type: schema.TypeString,
Optional: true,
},

"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(),
},
},
}
}

func processingConfigurationSchema() *schema.Schema {
return &schema.Schema{
Type: schema.TypeList,
Expand Down Expand Up @@ -173,55 +225,7 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
},
},

"s3_configuration": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"bucket_arn": {
Type: schema.TypeString,
Required: true,
},

"buffer_size": {
Type: schema.TypeInt,
Optional: true,
Default: 5,
},

"buffer_interval": {
Type: schema.TypeInt,
Optional: true,
Default: 300,
},

"compression_format": {
Type: schema.TypeString,
Optional: true,
Default: "UNCOMPRESSED",
},

"kms_key_arn": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validateArn,
},

"role_arn": {
Type: schema.TypeString,
Required: true,
},

"prefix": {
Type: schema.TypeString,
Optional: true,
},

"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(),
},
},
},
"s3_configuration": s3ConfigurationSchema(),

"extended_s3_configuration": {
Type: schema.TypeList,
Expand Down Expand Up @@ -303,6 +307,22 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
Required: true,
},

"s3_backup_mode": {
Type: schema.TypeString,
Optional: true,
Default: "Disabled",
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value != "Disabled" && value != "Enabled" {
errors = append(errors, fmt.Errorf(
"%q must be one of 'Disabled', 'Enabled'", k))
}
return
},
},

"s3_backup_configuration": s3ConfigurationSchema(),

"retry_duration": {
Type: schema.TypeInt,
Optional: true,
Expand Down Expand Up @@ -499,6 +519,33 @@ func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration
return configuration
}

func expandS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationConfiguration {
config := d["s3_backup_configuration"].([]interface{})
if len(config) == 0 {
return nil
}

s3 := config[0].(map[string]interface{})

configuration := &firehose.S3DestinationConfiguration{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64(int64(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64(int64(s3["buffer_size"].(int))),
},
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
}

if _, ok := s3["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3)
}

return configuration
}

func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationConfiguration {
s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})

Expand Down Expand Up @@ -548,6 +595,34 @@ func updateS3Config(d *schema.ResourceData) *firehose.S3DestinationUpdate {
return configuration
}

func updateS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationUpdate {
config := d["s3_backup_configuration"].([]interface{})
if len(config) == 0 {
return nil
}

s3 := config[0].(map[string]interface{})

configuration := &firehose.S3DestinationUpdate{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))),
},
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
CloudWatchLoggingOptions: extractCloudWatchLoggingConfiguration(s3),
}

if _, ok := s3["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3)
}

return configuration
}

func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationUpdate {
s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})

Expand Down Expand Up @@ -689,6 +764,10 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati
if _, ok := redshift["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
configuration.S3BackupConfiguration = expandS3BackupConfig(d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{}))
}

return configuration, nil
}
Expand All @@ -715,6 +794,10 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati
if _, ok := redshift["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
configuration.S3BackupUpdate = updateS3BackupConfig(d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{}))
}

return configuration, nil
}
Expand Down
17 changes: 13 additions & 4 deletions aws/resource_aws_kinesis_firehose_delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func TestAccAWSKinesisFirehoseDeliveryStream_RedshiftConfigUpdates(t *testing.T)
CopyCommand: &firehose.CopyCommand{
CopyOptions: aws.String("GZIP"),
},
S3BackupMode: aws.String("Enabled"),
}

resource.Test(t, resource.TestCase{
Expand Down Expand Up @@ -438,16 +439,19 @@ func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.Del
r := redshiftConfig.(*firehose.RedshiftDestinationDescription)
// Range over the Stream Destinations, looking for the matching Redshift
// destination
var match bool
var matchCopyOptions, matchS3BackupMode bool
for _, d := range stream.Destinations {
if d.RedshiftDestinationDescription != nil {
if *d.RedshiftDestinationDescription.CopyCommand.CopyOptions == *r.CopyCommand.CopyOptions {
match = true
matchCopyOptions = true
}
if *d.RedshiftDestinationDescription.S3BackupMode == *r.S3BackupMode {
matchS3BackupMode = true
}
}
}
if !match {
return fmt.Errorf("Mismatch Redshift CopyOptions, expected: %s, got: %s", r, stream.Destinations)
if !matchCopyOptions || !matchS3BackupMode {
return fmt.Errorf("Mismatch Redshift CopyOptions or S3BackupMode, expected: %s, got: %s", r, stream.Destinations)
}
}

Expand Down Expand Up @@ -983,6 +987,11 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
cluster_jdbcurl = "jdbc:redshift://${aws_redshift_cluster.test_cluster.endpoint}/${aws_redshift_cluster.test_cluster.database_name}"
username = "testuser"
password = "T3stPass"
s3_backup_mode = "Enabled"
s3_backup_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
data_table_name = "test-table"
copy_options = "GZIP"
data_table_columns = "test-col"
Expand Down
10 changes: 10 additions & 0 deletions website/docs/r/kinesis_firehose_delivery_stream.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
data_table_name = "test-table"
copy_options = "delimiter '|'" # the default delimiter
data_table_columns = "test-col"
s3_backup_mode = "Enabled"
s3_backup_configuration {
role_arn = "${aws_iam_role.firehose_role.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
buffer_size = 15
buffer_interval = 300
compression_format = "GZIP"
}
}
}
```
Expand Down Expand Up @@ -244,6 +252,8 @@ The `redshift_configuration` object supports the following:
* `password` - (Required) The password for the username above.
* `retry_duration` - (Optional) The length of time during which Firehose retries delivery after a failure, starting from the initial request and including the first attempt. The default value is 3600 seconds (60 minutes). Firehose does not retry if the value of DurationInSeconds is 0 (zero) or if the first delivery attempt takes longer than the current value.
* `role_arn` - (Required) The arn of the role the stream assumes.
* `s3_backup_mode` - (Optional) The Amazon S3 backup mode. Valid values are `Disabled` and `Enabled`. Default value is `Disabled`.
* `s3_backup_configuration` - (Optional) The configuration for backup in Amazon S3. Required if `s3_backup_mode` is `Enabled`. Supports the same fields as `s3_configuration` object.
* `data_table_name` - (Required) The name of the table in the redshift cluster that the s3 bucket will copy to.
* `copy_options` - (Optional) Copy options for copying the data from the s3 intermediate bucket into redshift, for example to change the default delimiter. For valid values, see the [AWS documentation](http://docs.aws.amazon.com/firehose/latest/APIReference/API_CopyCommand.html)
* `data_table_columns` - (Optional) The data table columns that will be targeted by the copy command.
Expand Down

0 comments on commit 44edced

Please sign in to comment.