Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws_lambda_event_source_mapping: Add Kafka config blocks + consumer_group_id #26560

Merged
merged 22 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3236261
SelfManagedKafkaConfiguration
A122943 Aug 29, 2022
55d19f4
Update event_source_mapping.go
A122943 Aug 29, 2022
e24b9ac
rename self_managed_kafka_config
A122943 Aug 31, 2022
783b9f5
update test
A122943 Aug 31, 2022
f4e99b5
correct consumer_group_id schema
A122943 Aug 31, 2022
45ad907
add amazon_managed_kafka_config
A122943 Aug 31, 2022
22374c7
add amazon_managed_kafka_config tests
A122943 Aug 31, 2022
841f3f6
Create 26560.txt
A122943 Aug 31, 2022
2989eff
update website/docs
A122943 Aug 31, 2022
616451c
Merge branch 'main' into patch-1
sallaben Aug 31, 2022
953a829
Merge branch 'hashicorp:main' into patch-1
A122943 Aug 31, 2022
018e799
Tweak CHANGELOG entry.
ewbankkit Sep 1, 2022
6c79e1b
Fix terrafmt errors.
ewbankkit Sep 1, 2022
9116af4
r/aws_lambda_event_source_mapping: Rename 'amazon_managed_kafka_confi…
ewbankkit Sep 1, 2022
9f79636
r/aws_lambda_event_source_mapping: Rename 'self_managed_kafka_config'…
ewbankkit Sep 1, 2022
747c090
Remove whitespace.
ewbankkit Sep 1, 2022
3bb0b1f
r/aws_lambda_event_source_mapping: Add ForceNew to 'amazon_managed_ka…
ewbankkit Sep 1, 2022
b8c3543
r/aws_lambda_event_source_mapping: Test with and without 'amazon_mana…
ewbankkit Sep 1, 2022
11d9c4f
r/aws_lambda_event_source_mapping: Make 'self_managed_kafka_event_sou…
ewbankkit Sep 1, 2022
bda0e3d
Merge branch 'hashicorp:main' into patch-1
A122943 Sep 1, 2022
a8d6b46
r/aws_lambda_event_source_mapping: Make 'amazon_managed_kafka_event_s…
ewbankkit Sep 1, 2022
55837a4
Merge commit 'a8d6b461c84e743d8c2a463b32fc49cce145408c' into HEAD
ewbankkit Sep 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/26560.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_lambda_event_source_mapping: Add `amazon_managed_kafka_event_source_config` and `self_managed_kafka_event_source_config` configuration blocks
```
140 changes: 116 additions & 24 deletions internal/service/lambda/event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ func ResourceEventSourceMapping() *schema.Resource {
},

Schema: map[string]*schema.Schema{
"amazon_managed_kafka_event_source_config": {
Type: schema.TypeList,
Optional: true,
Computed: true,
ForceNew: true,
MaxItems: 1,
ConflictsWith: []string{"self_managed_event_source", "self_managed_kafka_event_source_config"},
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"consumer_group_id": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ForceNew: true,
ValidateFunc: validation.StringLenBetween(1, 200),
},
},
},
},
"batch_size": {
Type: schema.TypeInt,
Optional: true,
Expand Down Expand Up @@ -68,12 +87,10 @@ func ResourceEventSourceMapping() *schema.Resource {
return old == new
},
},

"bisect_batch_on_function_error": {
Type: schema.TypeBool,
Optional: true,
},

"destination_config": {
Type: schema.TypeList,
Optional: true,
Expand All @@ -98,20 +115,17 @@ func ResourceEventSourceMapping() *schema.Resource {
},
DiffSuppressFunc: verify.SuppressMissingOptionalConfigurationBlock,
},

"enabled": {
Type: schema.TypeBool,
Optional: true,
Default: true,
},

"event_source_arn": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"},
},

"filter_criteria": {
Type: schema.TypeList,
Optional: true,
Expand All @@ -135,12 +149,10 @@ func ResourceEventSourceMapping() *schema.Resource {
},
},
},

"function_arn": {
Type: schema.TypeString,
Computed: true,
},

"function_name": {
Type: schema.TypeString,
Required: true,
Expand All @@ -152,7 +164,6 @@ func ResourceEventSourceMapping() *schema.Resource {
return (oldFunctionName == new && oldFunctionNameErr == nil) || (newFunctionName == old && newFunctionNameErr == nil)
},
},

"function_response_types": {
Type: schema.TypeSet,
Optional: true,
Expand All @@ -161,22 +172,18 @@ func ResourceEventSourceMapping() *schema.Resource {
ValidateFunc: validation.StringInSlice(lambda.FunctionResponseType_Values(), false),
},
},

"last_modified": {
Type: schema.TypeString,
Computed: true,
},

"last_processing_result": {
Type: schema.TypeString,
Computed: true,
},

"maximum_batching_window_in_seconds": {
Type: schema.TypeInt,
Optional: true,
},

"maximum_record_age_in_seconds": {
Type: schema.TypeInt,
Optional: true,
Expand All @@ -186,21 +193,18 @@ func ResourceEventSourceMapping() *schema.Resource {
validation.IntBetween(60, 604_800),
),
},

"maximum_retry_attempts": {
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntBetween(-1, 10_000),
},

"parallelization_factor": {
Type: schema.TypeInt,
Optional: true,
ValidateFunc: validation.IntBetween(1, 10),
Computed: true,
},

"queues": {
Type: schema.TypeSet,
Optional: true,
Expand All @@ -210,7 +214,6 @@ func ResourceEventSourceMapping() *schema.Resource {
ValidateFunc: validation.StringLenBetween(1, 1000),
},
},

"self_managed_event_source": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -241,7 +244,25 @@ func ResourceEventSourceMapping() *schema.Resource {
},
ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"},
},

"self_managed_kafka_event_source_config": {
Type: schema.TypeList,
Optional: true,
Computed: true,
ForceNew: true,
MaxItems: 1,
ConflictsWith: []string{"event_source_arn", "amazon_managed_kafka_event_source_config"},
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"consumer_group_id": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ForceNew: true,
ValidateFunc: validation.StringLenBetween(1, 200),
},
},
},
},
"source_access_configuration": {
Type: schema.TypeSet,
Optional: true,
Expand All @@ -260,31 +281,26 @@ func ResourceEventSourceMapping() *schema.Resource {
},
},
},

"starting_position": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice(lambda.EventSourcePosition_Values(), false),
},

"starting_position_timestamp": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validation.IsRFC3339Time,
},

"state": {
Type: schema.TypeString,
Computed: true,
},

"state_transition_reason": {
Type: schema.TypeString,
Computed: true,
},

"topics": {
Type: schema.TypeSet,
Optional: true,
Expand All @@ -294,13 +310,11 @@ func ResourceEventSourceMapping() *schema.Resource {
ValidateFunc: validation.StringLenBetween(1, 249),
},
},

"tumbling_window_in_seconds": {
Type: schema.TypeInt,
Optional: true,
ValidateFunc: validation.IntBetween(0, 900),
},

"uuid": {
Type: schema.TypeString,
Computed: true,
Expand All @@ -320,6 +334,10 @@ func resourceEventSourceMappingCreate(d *schema.ResourceData, meta interface{})

var target string

if v, ok := d.GetOk("amazon_managed_kafka_event_source_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.AmazonManagedKafkaEventSourceConfig = expandAmazonManagedKafkaEventSourceConfig(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("batch_size"); ok {
input.BatchSize = aws.Int64(int64(v.(int)))
}
Expand Down Expand Up @@ -373,6 +391,10 @@ func resourceEventSourceMappingCreate(d *schema.ResourceData, meta interface{})
target = "Self-Managed Apache Kafka"
}

if v, ok := d.GetOk("self_managed_kafka_event_source_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.SelfManagedKafkaEventSourceConfig = expandSelfManagedKafkaEventSourceConfig(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("source_access_configuration"); ok && v.(*schema.Set).Len() > 0 {
input.SourceAccessConfigurations = expandSourceAccessConfigurations(v.(*schema.Set).List())
}
Expand Down Expand Up @@ -460,6 +482,13 @@ func resourceEventSourceMappingRead(d *schema.ResourceData, meta interface{}) er
return fmt.Errorf("error reading Lambda Event Source Mapping (%s): %w", d.Id(), err)
}

if eventSourceMappingConfiguration.AmazonManagedKafkaEventSourceConfig != nil {
if err := d.Set("amazon_managed_kafka_event_source_config", []interface{}{flattenAmazonManagedKafkaEventSourceConfig(eventSourceMappingConfiguration.AmazonManagedKafkaEventSourceConfig)}); err != nil {
return fmt.Errorf("error setting amazon_managed_kafka_event_source_config: %w", err)
}
} else {
d.Set("amazon_managed_kafka_event_source_config", nil)
}
d.Set("batch_size", eventSourceMappingConfiguration.BatchSize)
d.Set("bisect_batch_on_function_error", eventSourceMappingConfiguration.BisectBatchOnFunctionError)
if eventSourceMappingConfiguration.DestinationConfig != nil {
Expand Down Expand Up @@ -498,6 +527,13 @@ func resourceEventSourceMappingRead(d *schema.ResourceData, meta interface{}) er
} else {
d.Set("self_managed_event_source", nil)
}
if eventSourceMappingConfiguration.SelfManagedKafkaEventSourceConfig != nil {
if err := d.Set("self_managed_kafka_event_source_config", []interface{}{flattenSelfManagedKafkaEventSourceConfig(eventSourceMappingConfiguration.SelfManagedKafkaEventSourceConfig)}); err != nil {
return fmt.Errorf("error setting self_managed_kafka_event_source_config: %w", err)
}
} else {
d.Set("self_managed_kafka_event_source_config", nil)
}
if err := d.Set("source_access_configuration", flattenSourceAccessConfigurations(eventSourceMappingConfiguration.SourceAccessConfigurations)); err != nil {
return fmt.Errorf("error setting source_access_configuration: %w", err)
}
Expand Down Expand Up @@ -763,6 +799,62 @@ func flattenSelfManagedEventSource(apiObject *lambda.SelfManagedEventSource) map
return tfMap
}

func expandAmazonManagedKafkaEventSourceConfig(tfMap map[string]interface{}) *lambda.AmazonManagedKafkaEventSourceConfig {
if tfMap == nil {
return nil
}

apiObject := &lambda.AmazonManagedKafkaEventSourceConfig{}

if v, ok := tfMap["consumer_group_id"].(string); ok && v != "" {
apiObject.ConsumerGroupId = aws.String(v)
}

return apiObject
}

func flattenAmazonManagedKafkaEventSourceConfig(apiObject *lambda.AmazonManagedKafkaEventSourceConfig) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.ConsumerGroupId; v != nil {
tfMap["consumer_group_id"] = aws.StringValue(v)
}

return tfMap
}

func expandSelfManagedKafkaEventSourceConfig(tfMap map[string]interface{}) *lambda.SelfManagedKafkaEventSourceConfig {
if tfMap == nil {
return nil
}

apiObject := &lambda.SelfManagedKafkaEventSourceConfig{}

if v, ok := tfMap["consumer_group_id"].(string); ok && v != "" {
apiObject.ConsumerGroupId = aws.String(v)
}

return apiObject
}

func flattenSelfManagedKafkaEventSourceConfig(apiObject *lambda.SelfManagedKafkaEventSourceConfig) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.ConsumerGroupId; v != nil {
tfMap["consumer_group_id"] = aws.StringValue(v)
}

return tfMap
}

func expandSourceAccessConfiguration(tfMap map[string]interface{}) *lambda.SourceAccessConfiguration {
if tfMap == nil {
return nil
Expand Down
Loading