diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index 1fdbeca07b2..5752cad2b95 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -3,16 +3,14 @@ package aws import ( "fmt" "log" - "strconv" + "reflect" + "sort" "strings" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/arn" - "github.com/aws/aws-sdk-go/service/dynamodb" - "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/lambda" - "github.com/aws/aws-sdk-go/service/sqs" "github.com/hashicorp/aws-sdk-go-base/tfawserr" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" @@ -34,42 +32,6 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, Schema: map[string]*schema.Schema{ - "event_source_arn": { - Type: schema.TypeString, - Optional: true, - ForceNew: true, - ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"}, - }, - "function_name": { - Type: schema.TypeString, - Required: true, - DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { - // Using function name or ARN should not be shown as a diff. - // Try to convert the old and new values from ARN to function name - oldFunctionName, oldFunctionNameErr := getFunctionNameFromLambdaArn(old) - newFunctionName, newFunctionNameErr := getFunctionNameFromLambdaArn(new) - return (oldFunctionName == new && oldFunctionNameErr == nil) || (newFunctionName == old && newFunctionNameErr == nil) - }, - }, - "starting_position": { - Type: schema.TypeString, - Optional: true, - ForceNew: true, - ValidateFunc: validation.StringInSlice(lambda.EventSourcePosition_Values(), false), - }, - "starting_position_timestamp": { - Type: schema.TypeString, - Optional: true, - ForceNew: true, - ValidateFunc: validation.IsRFC3339Time, - }, - "topics": { - Type: schema.TypeSet, - Optional: true, - ForceNew: true, - Elem: &schema.Schema{Type: schema.TypeString}, - Set: schema.HashString, - }, "batch_size": { Type: schema.TypeInt, Optional: true, @@ -83,69 +45,37 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { return false } - serviceName := "" + var serviceName string if v, ok := d.GetOk("event_source_arn"); ok { eventSourceARN, err := arn.Parse(v.(string)) if err != nil { return false } + serviceName = eventSourceARN.Service - } else { - // self managed kafka does not have an event_source_arn + } else if _, ok := d.GetOk("self_managed_event_source"); ok { serviceName = "kafka" } + switch serviceName { - // kafka.ServiceName is "kafka". - case dynamodb.ServiceName, kinesis.ServiceName, "kafka": - if old == "100" { - return true - } - case sqs.ServiceName: - if old == "10" { - return true - } + case "dynamodb", "kinesis", "kafka": + return old == "100" + case "sqs": + return old == "10" } - return false + + return old == new }, }, - "enabled": { - Type: schema.TypeBool, - Optional: true, - Default: true, - }, - "maximum_batching_window_in_seconds": { - Type: schema.TypeInt, - Optional: true, - }, - "parallelization_factor": { - Type: schema.TypeInt, - Optional: true, - ValidateFunc: validation.IntBetween(1, 10), - Computed: true, - }, - "maximum_retry_attempts": { - Type: schema.TypeInt, - Optional: true, - Computed: true, - ValidateFunc: validation.IntBetween(-1, 10_000), - }, - "maximum_record_age_in_seconds": { - Type: schema.TypeInt, - Optional: true, - Computed: true, - ValidateFunc: validation.Any( - validation.IntInSlice([]int{-1}), - validation.IntBetween(60, 604_800), - ), - }, + "bisect_batch_on_function_error": { Type: schema.TypeBool, Optional: true, }, + "destination_config": { Type: schema.TypeList, Optional: true, - MinItems: 1, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ @@ -165,14 +95,83 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, }, }, + DiffSuppressFunc: suppressMissingOptionalConfigurationBlock, }, - "self_managed_event_source": { - Type: schema.TypeList, + + "enabled": { + Type: schema.TypeBool, + Optional: true, + Default: true, + }, + + "event_source_arn": { + Type: schema.TypeString, Optional: true, - MinItems: 1, - MaxItems: 1, + ForceNew: true, ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"}, - RequiredWith: []string{"self_managed_event_source", "source_access_configuration"}, + }, + + "function_arn": { + Type: schema.TypeString, + Computed: true, + }, + + "function_name": { + Type: schema.TypeString, + Required: true, + DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { + // Using function name or ARN should not be shown as a diff. + // Try to convert the old and new values from ARN to function name + oldFunctionName, oldFunctionNameErr := getFunctionNameFromLambdaArn(old) + newFunctionName, newFunctionNameErr := getFunctionNameFromLambdaArn(new) + return (oldFunctionName == new && oldFunctionNameErr == nil) || (newFunctionName == old && newFunctionNameErr == nil) + }, + }, + + "last_modified": { + Type: schema.TypeString, + Computed: true, + }, + + "last_processing_result": { + Type: schema.TypeString, + Computed: true, + }, + + "maximum_batching_window_in_seconds": { + Type: schema.TypeInt, + Optional: true, + }, + + "maximum_record_age_in_seconds": { + Type: schema.TypeInt, + Optional: true, + Computed: true, + ValidateFunc: validation.Any( + validation.IntInSlice([]int{-1}), + validation.IntBetween(60, 604_800), + ), + }, + + "maximum_retry_attempts": { + Type: schema.TypeInt, + Optional: true, + Computed: true, + ValidateFunc: validation.IntBetween(-1, 10_000), + }, + + "parallelization_factor": { + Type: schema.TypeInt, + Optional: true, + ValidateFunc: validation.IntBetween(1, 10), + Computed: true, + }, + + "self_managed_event_source": { + Type: schema.TypeList, + Optional: true, + ForceNew: true, + MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "endpoints": { @@ -180,20 +179,36 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { Required: true, ForceNew: true, Elem: &schema.Schema{Type: schema.TypeString}, + DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { + if k == "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS" { + // AWS returns the bootstrap brokers in sorted order. + olds := strings.Split(old, ",") + sort.Strings(olds) + news := strings.Split(new, ",") + sort.Strings(news) + + return reflect.DeepEqual(olds, news) + } + + return old == new + }, }, }, }, + ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"}, + RequiredWith: []string{"source_access_configuration"}, }, + "source_access_configuration": { - Type: schema.TypeList, - Optional: true, - MinItems: 1, - RequiredWith: []string{"self_managed_event_source", "source_access_configuration"}, + Type: schema.TypeSet, + Optional: true, + MaxItems: 22, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "type": { - Type: schema.TypeString, - Required: true, + Type: schema.TypeString, + Required: true, + ValidateFunc: validation.StringInSlice(lambda.SourceAccessType_Values(), false), }, "uri": { Type: schema.TypeString, @@ -201,27 +216,40 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, }, }, + RequiredWith: []string{"self_managed_event_source"}, }, - "function_arn": { - Type: schema.TypeString, - Computed: true, - }, - "last_modified": { - Type: schema.TypeString, - Computed: true, + + "starting_position": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + ValidateFunc: validation.StringInSlice(lambda.EventSourcePosition_Values(), false), }, - "last_processing_result": { - Type: schema.TypeString, - Computed: true, + + "starting_position_timestamp": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + ValidateFunc: validation.IsRFC3339Time, }, + "state": { Type: schema.TypeString, Computed: true, }, + "state_transition_reason": { Type: schema.TypeString, Computed: true, }, + + "topics": { + Type: schema.TypeSet, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + "uuid": { Type: schema.TypeString, Computed: true, @@ -230,18 +258,16 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { } } -// resourceAwsLambdaEventSourceMappingCreate maps to: -// CreateEventSourceMapping in the API / SDK func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).lambdaconn + functionName := d.Get("function_name").(string) input := &lambda.CreateEventSourceMappingInput{ - Enabled: aws.Bool(d.Get("enabled").(bool)), + Enabled: aws.Bool(d.Get("enabled").(bool)), + FunctionName: aws.String(functionName), } - if v, ok := d.GetOk("function_name"); ok { - input.FunctionName = aws.String(v.(string)) - } + var target string if v, ok := d.GetOk("batch_size"); ok { input.BatchSize = aws.Int64(int64(v.(int))) @@ -251,12 +277,15 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte input.BisectBatchOnFunctionError = aws.Bool(v.(bool)) } - if vDest, ok := d.GetOk("destination_config"); ok { - input.DestinationConfig = expandLambdaEventSourceMappingDestinationConfig(vDest.([]interface{})) + if v, ok := d.GetOk("destination_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.DestinationConfig = expandLambdaDestinationConfig(v.([]interface{})[0].(map[string]interface{})) } if v, ok := d.GetOk("event_source_arn"); ok { - input.EventSourceArn = aws.String(v.(string)) + v := v.(string) + + input.EventSourceArn = aws.String(v) + target = v } if v, ok := d.GetOk("maximum_batching_window_in_seconds"); ok { @@ -275,6 +304,16 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte input.ParallelizationFactor = aws.Int64(int64(v.(int))) } + if v, ok := d.GetOk("self_managed_event_source"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.SelfManagedEventSource = expandLambdaSelfManagedEventSource(v.([]interface{})[0].(map[string]interface{})) + + target = "Self-Managed Apache Kafka" + } + + if v, ok := d.GetOk("source_access_configuration"); ok && v.(*schema.Set).Len() > 0 { + input.SourceAccessConfigurations = expandLambdaSourceAccessConfigurations(v.(*schema.Set).List()) + } + if v, ok := d.GetOk("starting_position"); ok { input.StartingPosition = aws.String(v.(string)) } @@ -285,21 +324,10 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte input.StartingPositionTimestamp = aws.Time(t) } - if v, ok := d.GetOk("self_managed_event_source"); ok { - input.SelfManagedEventSource = expandLambdaEventSourceMappingSelfManagedEventSource(v.([]interface{})) - } - - if v, ok := d.GetOk("source_access_configuration"); ok { - input.SourceAccessConfigurations = expandLambdaEventSourceMappingSourceAccessConfigurations(v.([]interface{})) - } - if v, ok := d.GetOk("topics"); ok && v.(*schema.Set).Len() > 0 { input.Topics = expandStringSet(v.(*schema.Set)) } - // When non-ARN targets are supported, set target to the non-nil value. - target := input.EventSourceArn - log.Printf("[DEBUG] Creating Lambda Event Source Mapping: %s", input) // IAM profiles and roles can take some time to propagate in AWS: @@ -330,7 +358,7 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte } if err != nil { - return fmt.Errorf("error creating Lambda Event Source Mapping (%s): %w", aws.StringValue(target), err) + return fmt.Errorf("error creating Lambda Event Source Mapping (%s): %w", target, err) } d.SetId(aws.StringValue(eventSourceMappingConfiguration.UUID)) @@ -342,8 +370,6 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte return resourceAwsLambdaEventSourceMappingRead(d, meta) } -// resourceAwsLambdaEventSourceMappingRead maps to: -// GetEventSourceMapping in the API / SDK func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).lambdaconn @@ -360,103 +386,61 @@ func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interf } d.Set("batch_size", eventSourceMappingConfiguration.BatchSize) - d.Set("maximum_batching_window_in_seconds", eventSourceMappingConfiguration.MaximumBatchingWindowInSeconds) + d.Set("bisect_batch_on_function_error", eventSourceMappingConfiguration.BisectBatchOnFunctionError) + if eventSourceMappingConfiguration.DestinationConfig != nil { + if err := d.Set("destination_config", []interface{}{flattenLambdaDestinationConfig(eventSourceMappingConfiguration.DestinationConfig)}); err != nil { + return fmt.Errorf("error setting destination_config: %w", err) + } + } else { + d.Set("destination_config", nil) + } d.Set("event_source_arn", eventSourceMappingConfiguration.EventSourceArn) d.Set("function_arn", eventSourceMappingConfiguration.FunctionArn) - d.Set("last_modified", aws.TimeValue(eventSourceMappingConfiguration.LastModified).Format(time.RFC3339)) - d.Set("last_processing_result", eventSourceMappingConfiguration.LastProcessingResult) - d.Set("state", eventSourceMappingConfiguration.State) - d.Set("state_transition_reason", eventSourceMappingConfiguration.StateTransitionReason) - d.Set("uuid", eventSourceMappingConfiguration.UUID) d.Set("function_name", eventSourceMappingConfiguration.FunctionArn) - d.Set("parallelization_factor", eventSourceMappingConfiguration.ParallelizationFactor) - d.Set("maximum_retry_attempts", eventSourceMappingConfiguration.MaximumRetryAttempts) - d.Set("maximum_record_age_in_seconds", eventSourceMappingConfiguration.MaximumRecordAgeInSeconds) - d.Set("bisect_batch_on_function_error", eventSourceMappingConfiguration.BisectBatchOnFunctionError) - if err := d.Set("destination_config", flattenLambdaEventSourceMappingDestinationConfig(eventSourceMappingConfiguration.DestinationConfig)); err != nil { - return fmt.Errorf("error setting destination_config: %w", err) - } - if err := d.Set("topics", flattenStringSet(eventSourceMappingConfiguration.Topics)); err != nil { - return fmt.Errorf("error setting topics: %w", err) + if eventSourceMappingConfiguration.LastModified != nil { + d.Set("last_modified", aws.TimeValue(eventSourceMappingConfiguration.LastModified).Format(time.RFC3339)) + } else { + d.Set("last_modified", nil) } - if err := d.Set("self_managed_event_source", flattenLambdaEventSourceMappingSelfManagedEventSource(eventSourceMappingConfiguration.SelfManagedEventSource, d)); err != nil { - return fmt.Errorf("error setting self_managed_event_source: %w", err) + d.Set("last_processing_result", eventSourceMappingConfiguration.LastProcessingResult) + d.Set("maximum_batching_window_in_seconds", eventSourceMappingConfiguration.MaximumBatchingWindowInSeconds) + d.Set("maximum_record_age_in_seconds", eventSourceMappingConfiguration.MaximumRecordAgeInSeconds) + d.Set("maximum_retry_attempts", eventSourceMappingConfiguration.MaximumRetryAttempts) + d.Set("parallelization_factor", eventSourceMappingConfiguration.ParallelizationFactor) + if eventSourceMappingConfiguration.SelfManagedEventSource != nil { + if err := d.Set("self_managed_event_source", []interface{}{flattenLambdaSelfManagedEventSource(eventSourceMappingConfiguration.SelfManagedEventSource)}); err != nil { + return fmt.Errorf("error setting self_managed_event_source: %w", err) + } + } else { + d.Set("self_managed_event_source", nil) } - if err := d.Set("source_access_configuration", flattenLambdaEventSourceMappingSourceAccessConfigurations(eventSourceMappingConfiguration.SourceAccessConfigurations, d)); err != nil { + if err := d.Set("source_access_configuration", flattenLambdaSourceAccessConfigurations(eventSourceMappingConfiguration.SourceAccessConfigurations)); err != nil { return fmt.Errorf("error setting source_access_configuration: %w", err) } - d.Set("starting_position", eventSourceMappingConfiguration.StartingPosition) if eventSourceMappingConfiguration.StartingPositionTimestamp != nil { d.Set("starting_position_timestamp", aws.TimeValue(eventSourceMappingConfiguration.StartingPositionTimestamp).Format(time.RFC3339)) } else { d.Set("starting_position_timestamp", nil) } + d.Set("state", eventSourceMappingConfiguration.State) + d.Set("state_transition_reason", eventSourceMappingConfiguration.StateTransitionReason) + d.Set("topics", aws.StringValueSlice(eventSourceMappingConfiguration.Topics)) + d.Set("uuid", eventSourceMappingConfiguration.UUID) - state := aws.StringValue(eventSourceMappingConfiguration.State) - - switch state { + switch state := d.Get("state").(string); state { case waiter.EventSourceMappingStateEnabled, waiter.EventSourceMappingStateEnabling: d.Set("enabled", true) case waiter.EventSourceMappingStateDisabled, waiter.EventSourceMappingStateDisabling: d.Set("enabled", false) default: - log.Printf("[WARN] Lambda Event Source Mapping is neither enabled nor disabled but %s", state) - } - - return nil -} - -// resourceAwsLambdaEventSourceMappingDelete maps to: -// DeleteEventSourceMapping in the API / SDK -func resourceAwsLambdaEventSourceMappingDelete(d *schema.ResourceData, meta interface{}) error { - conn := meta.(*AWSClient).lambdaconn - - log.Printf("[INFO] Deleting Lambda Event Source Mapping: %s", d.Id()) - - input := &lambda.DeleteEventSourceMappingInput{ - UUID: aws.String(d.Id()), - } - - err := resource.Retry(waiter.EventSourceMappingPropagationTimeout, func() *resource.RetryError { - _, err := conn.DeleteEventSourceMapping(input) - - if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceNotFoundException) { - return nil - } - - if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceInUseException) { - return resource.RetryableError(err) - } - - if err != nil { - return resource.NonRetryableError(err) - } - - return nil - }) - - if tfresource.TimedOut(err) { - _, err = conn.DeleteEventSourceMapping(input) - } - - if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceNotFoundException) { - return nil - } - - if err != nil { - return fmt.Errorf("error deleting Lambda Event Source Mapping (%s): %w", d.Id(), err) - } - - if _, err := waiter.EventSourceMappingDelete(conn, d.Id()); err != nil { - return fmt.Errorf("error waiting for Lambda Event Source Mapping (%s) to delete: %w", d.Id(), err) + log.Printf("[WARN] Lambda Event Source Mapping (%s) is neither enabled nor disabled, but %s", d.Id(), state) + d.Set("enabled", nil) } return nil } -// resourceAwsLambdaEventSourceMappingUpdate maps to: -// UpdateEventSourceMapping in the API / SDK func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).lambdaconn @@ -475,7 +459,9 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte } if d.HasChange("destination_config") { - input.DestinationConfig = expandLambdaEventSourceMappingDestinationConfig(d.Get("destination_config").([]interface{})) + if v, ok := d.GetOk("destination_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.DestinationConfig = expandLambdaDestinationConfig(v.([]interface{})[0].(map[string]interface{})) + } } if d.HasChange("enabled") { @@ -503,7 +489,9 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte } if d.HasChange("source_access_configuration") { - input.SourceAccessConfigurations = expandLambdaEventSourceMappingSourceAccessConfigurations(d.Get("source_access_configuration").([]interface{})) + if v, ok := d.GetOk("source_access_configuration"); ok && v.(*schema.Set).Len() > 0 { + input.SourceAccessConfigurations = expandLambdaSourceAccessConfigurations(v.(*schema.Set).List()) + } } err := resource.Retry(waiter.EventSourceMappingPropagationTimeout, func() *resource.RetryError { @@ -539,111 +527,220 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte return resourceAwsLambdaEventSourceMappingRead(d, meta) } -func expandLambdaEventSourceMappingSelfManagedEventSource(configured []interface{}) *lambda.SelfManagedEventSource { - if len(configured) == 0 { - return nil +func resourceAwsLambdaEventSourceMappingDelete(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).lambdaconn + + log.Printf("[INFO] Deleting Lambda Event Source Mapping: %s", d.Id()) + + input := &lambda.DeleteEventSourceMappingInput{ + UUID: aws.String(d.Id()), } - source := &lambda.SelfManagedEventSource{} - source.Endpoints = map[string][]*string{} + err := resource.Retry(waiter.EventSourceMappingPropagationTimeout, func() *resource.RetryError { + _, err := conn.DeleteEventSourceMapping(input) + + if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceInUseException) { + return resource.RetryableError(err) + } - if config, ok := configured[0].(map[string]interface{}); ok { - if endpoints, ok := config["endpoints"].(map[string]interface{}); ok { - for key, value := range endpoints { - values := strings.Split(value.(string), ",") - source.Endpoints[key] = make([]*string, len(values)) - for i, value := range values { - valueCopy := value - source.Endpoints[key][i] = &valueCopy - } - } + if err != nil { + return resource.NonRetryableError(err) } + + return nil + }) + + if tfresource.TimedOut(err) { + _, err = conn.DeleteEventSourceMapping(input) + } + + if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceNotFoundException) { + return nil + } + + if err != nil { + return fmt.Errorf("error deleting Lambda Event Source Mapping (%s): %w", d.Id(), err) + } + + if _, err := waiter.EventSourceMappingDelete(conn, d.Id()); err != nil { + return fmt.Errorf("error waiting for Lambda Event Source Mapping (%s) to delete: %w", d.Id(), err) + } + + return nil +} + +func expandLambdaDestinationConfig(tfMap map[string]interface{}) *lambda.DestinationConfig { + if tfMap == nil { + return nil + } + + apiObject := &lambda.DestinationConfig{} + + if v, ok := tfMap["on_failure"].([]interface{}); ok && len(v) > 0 { + apiObject.OnFailure = expandLambdaOnFailure(v[0].(map[string]interface{})) + } + + return apiObject +} + +func expandLambdaOnFailure(tfMap map[string]interface{}) *lambda.OnFailure { + if tfMap == nil { + return nil + } + + apiObject := &lambda.OnFailure{} + + if v, ok := tfMap["destination_arn"].(string); ok && v != "" { + apiObject.Destination = aws.String(v) + } + + return apiObject +} + +func flattenLambdaDestinationConfig(apiObject *lambda.DestinationConfig) map[string]interface{} { + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.OnFailure; v != nil { + tfMap["on_failure"] = []interface{}{flattenLambdaOnFailure(v)} + } + + return tfMap +} + +func flattenLambdaOnFailure(apiObject *lambda.OnFailure) map[string]interface{} { + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.Destination; v != nil { + tfMap["destination_arn"] = aws.StringValue(v) } - return source + + return tfMap } -func flattenLambdaEventSourceMappingSelfManagedEventSource(source *lambda.SelfManagedEventSource, d *schema.ResourceData) []interface{} { - if source == nil { +func expandLambdaSelfManagedEventSource(tfMap map[string]interface{}) *lambda.SelfManagedEventSource { + if tfMap == nil { return nil } - if source.Endpoints == nil { + apiObject := &lambda.SelfManagedEventSource{} + + if v, ok := tfMap["endpoints"].(map[string]interface{}); ok && len(v) > 0 { + m := map[string][]*string{} + + for k, v := range v { + m[k] = aws.StringSlice(strings.Split(v.(string), ",")) + } + + apiObject.Endpoints = m + } + + return apiObject +} + +func flattenLambdaSelfManagedEventSource(apiObject *lambda.SelfManagedEventSource) map[string]interface{} { + if apiObject == nil { return nil } - endpoints := map[string]string{} - for key, values := range source.Endpoints { - sValues := make([]string, len(values)) - for i, value := range values { - sValues[i] = *value + tfMap := map[string]interface{}{} + + if v := apiObject.Endpoints; v != nil { + m := map[string]string{} + + for k, v := range v { + m[k] = strings.Join(aws.StringValueSlice(v), ",") } - // The AWS API sorts the list of brokers so try to order the string by what - // is in the TF file to prevent spurious diffs. - curValue, ok := d.Get("self_managed_event_source.0.endpoints." + key).(string) + + tfMap["endpoints"] = m + } + + return tfMap +} + +func expandLambdaSourceAccessConfiguration(tfMap map[string]interface{}) *lambda.SourceAccessConfiguration { + if tfMap == nil { + return nil + } + + apiObject := &lambda.SourceAccessConfiguration{} + + if v, ok := tfMap["type"].(string); ok && v != "" { + apiObject.Type = aws.String(v) + } + + if v, ok := tfMap["uri"].(string); ok && v != "" { + apiObject.URI = aws.String(v) + } + + return apiObject +} + +func expandLambdaSourceAccessConfigurations(tfList []interface{}) []*lambda.SourceAccessConfiguration { + if len(tfList) == 0 { + return nil + } + + var apiObjects []*lambda.SourceAccessConfiguration + + for _, tfMapRaw := range tfList { + tfMap, ok := tfMapRaw.(map[string]interface{}) + if !ok { - curValue = "" + continue } - curValues := strings.Split(curValue, ",") - if len(sValues) == len(curValues) { - for i := 0; i < len(curValues); i++ { - for j := 0; j < len(sValues); j++ { - if curValues[i] == sValues[j] { - sValues[i], sValues[j] = sValues[j], sValues[i] - break - } - } - } + + apiObject := expandLambdaSourceAccessConfiguration(tfMap) + + if apiObject == nil { + continue } - endpoints[key] = strings.Join(sValues, ",") + + apiObjects = append(apiObjects, apiObject) } - if len(endpoints) == 0 { + return apiObjects +} + +func flattenLambdaSourceAccessConfiguration(apiObject *lambda.SourceAccessConfiguration) map[string]interface{} { + if apiObject == nil { return nil } - config := map[string]interface{}{} - config["endpoints"] = endpoints + tfMap := map[string]interface{}{} - return []interface{}{config} -} + if v := apiObject.Type; v != nil { + tfMap["type"] = aws.StringValue(v) + } -func expandLambdaEventSourceMappingSourceAccessConfigurations(configured []interface{}) []*lambda.SourceAccessConfiguration { - accesses := make([]*lambda.SourceAccessConfiguration, 0, len(configured)) - for _, m := range configured { - config := m.(map[string]interface{}) - accesses = append(accesses, &lambda.SourceAccessConfiguration{ - Type: aws.String(config["type"].(string)), - URI: aws.String(config["uri"].(string)), - }) + if v := apiObject.URI; v != nil { + tfMap["uri"] = aws.StringValue(v) } - return accesses + + return tfMap } -func flattenLambdaEventSourceMappingSourceAccessConfigurations(accesses []*lambda.SourceAccessConfiguration, d *schema.ResourceData) []map[string]interface{} { - if accesses == nil { +func flattenLambdaSourceAccessConfigurations(apiObjects []*lambda.SourceAccessConfiguration) []interface{} { + if len(apiObjects) == 0 { return nil } - settings := make([]map[string]interface{}, len(accesses)) - - for i, access := range accesses { - setting := make(map[string]interface{}) - setting["type"] = access.Type - setting["uri"] = access.URI - settings[i] = setting - } - // The result returned from AWS is sorted so try to order it like the original to prevent spurious diffs - if curCount, ok := d.Get("source_access_configuration.#").(int); ok { - if curCount == len(settings) { - for i := 0; i < curCount; i++ { - if curSetting, ok := d.Get("source_access_configuration." + strconv.Itoa(i)).(map[string]interface{}); ok { - for j := 0; j < len(settings); j++ { - if curSetting["type"] == *settings[j]["type"].(*string) && curSetting["uri"] == *settings[j]["uri"].(*string) { - settings[i], settings[j] = settings[j], settings[i] - } - } - } - } + + var tfList []interface{} + + for _, apiObject := range apiObjects { + if apiObject == nil { + continue } + + tfList = append(tfList, flattenLambdaSourceAccessConfiguration(apiObject)) } - return settings + + return tfList } diff --git a/aws/resource_aws_lambda_event_source_mapping_test.go b/aws/resource_aws_lambda_event_source_mapping_test.go index 94367b53e60..51854bbef46 100644 --- a/aws/resource_aws_lambda_event_source_mapping_test.go +++ b/aws/resource_aws_lambda_event_source_mapping_test.go @@ -677,16 +677,14 @@ func TestAccAWSLambdaEventSourceMapping_SelfManagedKafka(t *testing.T) { CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy, Steps: []resource.TestStep{ { - Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "100"), + Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "100", "test1:9092,test2:9092"), Check: resource.ComposeTestCheckFunc( testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &v), resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), + resource.TestCheckResourceAttr(resourceName, "enabled", "false"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"), - resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test2:9092,test1:9092"), + resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test1:9092,test2:9092"), resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"), - resource.TestCheckResourceAttr(resourceName, "source_access_configuration.0.type", "VPC_SUBNET"), - resource.TestCheckResourceAttr(resourceName, "source_access_configuration.1.type", "VPC_SUBNET"), - resource.TestCheckResourceAttr(resourceName, "source_access_configuration.2.type", "VPC_SECURITY_GROUP"), testAccCheckResourceAttrRfc3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), @@ -695,9 +693,10 @@ func TestAccAWSLambdaEventSourceMapping_SelfManagedKafka(t *testing.T) { // batch_size became optional. Ensure that if the user supplies the default // value, but then moves to not providing the value, that we don't consider this // a diff. + // Verify also that bootstrap broker order does not matter. { PlanOnly: true, - Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "null"), + Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "null", "test2:9092,test1:9092"), }, }, }) @@ -865,6 +864,214 @@ resource "aws_lambda_function" "test" { `, rName) } +func testAccAWSLambdaEventSourceMappingConfigSQSBase(rName string) string { + return fmt.Sprintf(` +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = < 0 { - if config, ok := vDest[0].(map[string]interface{}); ok { - if vOnFailure, ok := config["on_failure"].([]interface{}); ok && len(vOnFailure) > 0 && vOnFailure[0] != nil { - mOnFailure := vOnFailure[0].(map[string]interface{}) - onFailure.SetDestination(mOnFailure["destination_arn"].(string)) - } - } - } - dest.SetOnFailure(onFailure) - return dest -} - -func flattenLambdaEventSourceMappingDestinationConfig(dest *lambda.DestinationConfig) []interface{} { - mDest := map[string]interface{}{} - mOnFailure := map[string]interface{}{} - if dest != nil { - if dest.OnFailure != nil { - if dest.OnFailure.Destination != nil { - mOnFailure["destination_arn"] = *dest.OnFailure.Destination - mDest["on_failure"] = []interface{}{mOnFailure} - } - } - } - - if len(mDest) == 0 { - return nil - } - - return []interface{}{mDest} -} - func flattenLambdaLayers(layers []*lambda.Layer) []interface{} { arns := make([]*string, len(layers)) for i, layer := range layers {