Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource/aws_kinesis_firehose_delivery_stream: Add Splunk destination… #3944

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ func processingConfigurationSchema() *schema.Schema {
Required: true,
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value != "LambdaArn" && value != "NumberOfRetries" {
if value != "LambdaArn" && value != "NumberOfRetries" && value != "RoleArn" && value != "BufferSizeInMBs" && value != "BufferIntervalInSeconds" {
errors = append(errors, fmt.Errorf(
"%q must be one of 'LambdaArn', 'NumberOfRetries'", k))
"%q must be one of 'LambdaArn', 'NumberOfRetries', 'RoleArn', 'BufferSizeInMBs', 'BufferIntervalInSeconds'", k))
}
return
},
Expand Down Expand Up @@ -208,7 +208,7 @@ func flattenFirehoseS3Configuration(s3 firehose.S3DestinationDescription) []inte
func flattenProcessingConfiguration(pc firehose.ProcessingConfiguration, roleArn string) []map[string]interface{} {
processingConfiguration := make([]map[string]interface{}, 1)

// It is necessary to explicitely filter this out
// It is necessary to explicitly filter this out
// to prevent diffs during routine use and retain the ability
// to show diffs if any field has drifted
defaultLambdaParams := map[string]string{
Expand Down Expand Up @@ -321,6 +321,10 @@ func flattenKinesisFirehoseDeliveryStream(d *schema.ResourceData, s *firehose.De
"retry_duration": *destination.SplunkDestinationDescription.RetryOptions.DurationInSeconds,
}

if v := destination.SplunkDestinationDescription.ProcessingConfiguration; v != nil {
splunkConfiguration["processing_configuration"] = v
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this before merging. This should be = flattenProcessingConfiguration(*v, roleArn). I will fix this in an upcoming PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about that, where is RoleArn supposed to come from in the Splunk Configuration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its only used as a default value to filter out a difference if it is undefined in the Terraform configuration under the parameters. In this case, passing it "" is fine. I have a PR almost ready that includes this fix.

}

if v := destination.SplunkDestinationDescription.CloudWatchLoggingOptions; v != nil {
splunkConfiguration["cloudwatch_logging_options"] = flattenCloudwatchLoggingOptions(*v)
}
Expand Down Expand Up @@ -1170,6 +1174,10 @@ func createSplunkConfig(d *schema.ResourceData, s3Config *firehose.S3Destination
S3Configuration: s3Config,
}

if _, ok := splunk["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk)
}

if _, ok := splunk["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(splunk)
}
Expand Down Expand Up @@ -1198,6 +1206,10 @@ func updateSplunkConfig(d *schema.ResourceData, s3Update *firehose.S3Destination
S3Update: s3Update,
}

if _, ok := splunk["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk)
}

if _, ok := splunk["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(splunk)
}
Expand Down
62 changes: 59 additions & 3 deletions aws/resource_aws_kinesis_firehose_delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,15 +329,36 @@ func TestAccAWSKinesisFirehoseDeliveryStream_SplunkConfigUpdates(t *testing.T) {
var stream firehose.DeliveryStreamDescription

ri := acctest.RandInt()

rString := acctest.RandString(8)
funcName := fmt.Sprintf("aws_kinesis_firehose_delivery_stream_test_%s", rString)
policyName := fmt.Sprintf("tf_acc_policy_%s", rString)
roleName := fmt.Sprintf("tf_acc_role_%s", rString)

preConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_SplunkBasic,
ri, ri, ri, ri)
postConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_SplunkUpdates,
ri, ri, ri, ri)
postConfig := testAccFirehoseAWSLambdaConfigBasic(funcName, policyName, roleName) +
fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_SplunkUpdates,
ri, ri, ri, ri)

updatedSplunkConfig := &firehose.SplunkDestinationDescription{
HECEndpointType: aws.String("Event"),
HECAcknowledgmentTimeoutInSeconds: aws.Int64(600),
S3BackupMode: aws.String("FailedEventsOnly"),
ProcessingConfiguration: &firehose.ProcessingConfiguration{
Enabled: aws.Bool(true),
Processors: []*firehose.Processor{
&firehose.Processor{
Type: aws.String("Lambda"),
Parameters: []*firehose.ProcessorParameter{
&firehose.ProcessorParameter{
ParameterName: aws.String("LambdaArn"),
ParameterValue: aws.String("valueNotTested"),
},
},
},
},
},
}

resource.Test(t, resource.TestCase{
Expand Down Expand Up @@ -544,9 +565,10 @@ func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.Del
if splunkConfig != nil {
s := splunkConfig.(*firehose.SplunkDestinationDescription)
// Range over the Stream Destinations, looking for the matching Splunk destination
var matchHECEndpointType, matchHECAcknowledgmentTimeoutInSeconds, matchS3BackupMode bool
var matchHECEndpointType, matchHECAcknowledgmentTimeoutInSeconds, matchS3BackupMode, processingConfigMatch bool
for _, d := range stream.Destinations {
if d.SplunkDestinationDescription != nil {

if *d.SplunkDestinationDescription.HECEndpointType == *s.HECEndpointType {
matchHECEndpointType = true
}
Expand All @@ -556,11 +578,17 @@ func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.Del
if *d.SplunkDestinationDescription.S3BackupMode == *s.S3BackupMode {
matchS3BackupMode = true
}

processingConfigMatch = len(s.ProcessingConfiguration.Processors) == len(d.SplunkDestinationDescription.ProcessingConfiguration.Processors)

}
}
if !matchHECEndpointType || !matchHECAcknowledgmentTimeoutInSeconds || !matchS3BackupMode {
return fmt.Errorf("Mismatch Splunk HECEndpointType or HECAcknowledgmentTimeoutInSeconds or S3BackupMode, expected: %s, got: %s", s, stream.Destinations)
}
if !processingConfigMatch {
return fmt.Errorf("Mismatch extended splunk ProcessingConfiguration.Processors count, expected: %s, got: %s", s, stream.Destinations)
}
}
}
return nil
Expand Down Expand Up @@ -1147,6 +1175,34 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
hec_acknowledgment_timeout = 600
hec_endpoint_type = "Event"
s3_backup_mode = "FailedEventsOnly"
processing_configuration = [
{
enabled = "true"
processors = [
{
type = "Lambda"
parameters = [
{
parameter_name = "LambdaArn"
parameter_value = "${aws_lambda_function.lambda_function_test.arn}:$LATEST"
},
{
parameter_name = "RoleArn"
parameter_value = "${aws_iam_role.firehose.arn}"
},
{
parameter_name = "BufferSizeInMBs"
parameter_value = 1
},
{
parameter_name = "BufferIntervalInSeconds"
parameter_value = 60
}
]
}
]
}
]
}
}`

Expand Down
4 changes: 2 additions & 2 deletions website/docs/r/kinesis_firehose_delivery_stream.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ The `splunk_configuration` objects supports the following:
* `hec_endpoint` - (Required) The HTTP Event Collector (HEC) endpoint to which Kinesis Firehose sends your data.
* `hec_endpoint_type` - (Optional) The HEC endpoint type. Valid values are `Raw` or `Event`. The default value is `Raw`.
* `hec_token` - The GUID that you obtain from your Splunk cluster when you create a new HEC endpoint.
* `s3_backup_mode` - (Optional) Defines how documents should be delivered to Amazon S3. Valid values are `FailedDocumentsOnly` and `AllDocuments`. Default value is `FailedDocumentsOnly`.
* `s3_backup_mode` - (Optional) Defines how documents should be delivered to Amazon S3. Valid values are `FailedEventsOnly` and `AllEvents`. Default value is `FailedEventsOnly`.
* `retry_duration` - (Optional) After an initial failure to deliver to Amazon Elasticsearch, the total amount of time, in seconds between 0 to 7200, during which Firehose re-attempts delivery (including the first attempt). After this time has elapsed, the failed documents are written to Amazon S3. The default value is 300s. There will be no retry if the value is 0.
* `cloudwatch_logging_options` - (Optional) The CloudWatch Logging Options for the delivery stream. More details are given below.

Expand All @@ -325,7 +325,7 @@ The `processors` array objects support the following:

The `parameters` array objects support the following:

* `parameter_name` - (Required) Parameter name. Valid Values: `LambdaArn`, `NumberOfRetries`
* `parameter_name` - (Required) Parameter name. Valid Values: `LambdaArn`, `NumberOfRetries`, `RoleArn`, `BufferSizeInMBs`, `BufferIntervalInSeconds`
* `parameter_value` - (Required) Parameter value. Must be between 1 and 512 length (inclusive). When providing a Lambda ARN, you should specify the resource version as well.

## Attributes Reference
Expand Down