Skip to content

Commit

Permalink
Initial implementation of self_managed_event_source for aws_lambda_ev…
Browse files Browse the repository at this point in the history
…ent_source_mapping.
  • Loading branch information
delbaeth authored and ewbankkit committed May 26, 2021
1 parent 87a4ff2 commit f69c0d7
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 8 deletions.
104 changes: 96 additions & 8 deletions aws/resource_aws_lambda_event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource {
Schema: map[string]*schema.Schema{
"event_source_arn": {
Type: schema.TypeString,
Required: true,
Optional: true,
ForceNew: true,
},
"function_name": {
Expand Down Expand Up @@ -80,12 +80,19 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource {
return false
}

eventSourceARN, err := arn.Parse(d.Get("event_source_arn").(string))
if err != nil {
return false
serviceName := ""
if v, ok := d.GetOk("event_source_arn"); ok {
eventSourceARN, err := arn.Parse(v.(string))
if err != nil {
return false
}
serviceName = eventSourceARN.Service
} else {
// self managed kafka does not have an event_source_arn
serviceName = "kafka"
}
switch eventSourceARN.Service {
// kafka.ServiceName is "Kafka".
switch serviceName {
// kafka.ServiceName is "kafka".
case dynamodb.ServiceName, kinesis.ServiceName, "kafka":
if old == "100" {
return true
Expand Down Expand Up @@ -156,6 +163,66 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource {
},
},
},
/*
"self_managed_event_source": {
Type: schema.TypeList,
Optional: true,
MinItems: 1,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"endpoints": {
Type: schema.TypeMap,
Required: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
},
},
},
*/
"self_managed_event_source": {
Type: schema.TypeList,
Optional: true,
MinItems: 1,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"endpoints": {
Type: schema.TypeList,
Required: true,
MaxItems: 1,
MinItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"kafka_bootstrap_servers": {
Type: schema.TypeList,
Required: true,
ForceNew: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
},
},
},
},
},
},
"source_access_configuration": {
Type: schema.TypeList,
Optional: true,
MinItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"type": {
Type: schema.TypeString,
Required: true,
},
"uri": {
Type: schema.TypeString,
Required: true,
},
},
},
},
"function_arn": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -190,8 +257,11 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte
conn := meta.(*AWSClient).lambdaconn

input := &lambda.CreateEventSourceMappingInput{
Enabled: aws.Bool(d.Get("enabled").(bool)),
FunctionName: aws.String(d.Get("function_name").(string)),
Enabled: aws.Bool(d.Get("enabled").(bool)),
}

if v, ok := d.GetOk("function_name"); ok {
input.FunctionName = aws.String(v.(string))
}

if v, ok := d.GetOk("batch_size"); ok {
Expand Down Expand Up @@ -236,6 +306,14 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte
input.StartingPositionTimestamp = aws.Time(t)
}

if v, ok := d.GetOk("self_managed_event_source"); ok {
input.SelfManagedEventSource = expandLambdaEventSourceMappingSelfManagedEventSource(v.([]interface{}))
}

if v, ok := d.GetOk("source_access_configuration"); ok {
input.SourceAccessConfigurations = expandLambdaEventSourceMappingSourceAccessConfigurations(v.([]interface{}))
}

if v, ok := d.GetOk("topics"); ok && v.(*schema.Set).Len() > 0 {
input.Topics = expandStringSet(v.(*schema.Set))
}
Expand Down Expand Up @@ -322,6 +400,12 @@ func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interf
if err := d.Set("topics", flattenStringSet(eventSourceMappingConfiguration.Topics)); err != nil {
return fmt.Errorf("error setting topics: %w", err)
}
if err := d.Set("self_managed_event_source", flattenLambdaEventSourceMappingSelfManagedEventSource(eventSourceMappingConfiguration.SelfManagedEventSource)); err != nil {
return fmt.Errorf("error setting self_managed_event_source: %w", err)
}
if err := d.Set("source_access_configuration", flattenLambdaEventSourceMappingSourceAccessConfigurations(eventSourceMappingConfiguration.SourceAccessConfigurations, d)); err != nil {
return fmt.Errorf("error setting source_access_configuration: %w", err)
}

d.Set("starting_position", eventSourceMappingConfiguration.StartingPosition)
if eventSourceMappingConfiguration.StartingPositionTimestamp != nil {
Expand Down Expand Up @@ -439,6 +523,10 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte
input.ParallelizationFactor = aws.Int64(int64(d.Get("parallelization_factor").(int)))
}

if d.HasChange("source_access_configuration") {
input.SourceAccessConfigurations = expandLambdaEventSourceMappingSourceAccessConfigurations(d.Get("source_access_configuration").([]interface{}))
}

err := resource.Retry(waiter.EventSourceMappingPropagationTimeout, func() *resource.RetryError {
_, err := conn.UpdateEventSourceMapping(input)

Expand Down
167 changes: 167 additions & 0 deletions aws/resource_aws_lambda_event_source_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,52 @@ func TestAccAWSLambdaEventSourceMapping_MSK(t *testing.T) {
})
}

func TestAccAWSLambdaEventSourceMapping_SelfManagedKafka(t *testing.T) {
var v lambda.EventSourceMappingConfiguration
resourceName := "aws_lambda_event_source_mapping.test"
rName := acctest.RandomWithPrefix("tf-acc-test")

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ErrorCheck: testAccErrorCheck(t, lambda.EndpointsID, "kafka"), //using kafka.EndpointsID will import kafka and make linters sad
Providers: testAccProviders,
CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy,
Steps: []resource.TestStep{
{
Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "100"),
Check: resource.ComposeTestCheckFunc(
testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "batch_size", "100"),
resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"),

resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.#", "1"),
resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.0.kafka_bootstrap_servers.#", "1"),
resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.0.kafka_bootstrap_servers.0", "test:9092"),
resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"),
resource.TestCheckResourceAttr(resourceName, "source_access_configuration.0.type", "VPC_SUBNET"),
resource.TestCheckResourceAttr(resourceName, "source_access_configuration.1.type", "VPC_SUBNET"),
resource.TestCheckResourceAttr(resourceName, "source_access_configuration.2.type", "VPC_SECURITY_GROUP"),
testAccCheckResourceAttrRfc3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
},
// batch_size became optional. Ensure that if the user supplies the default
// value, but then moves to not providing the value, that we don't consider this
// a diff.
{
PlanOnly: true,
Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "null"),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func testAccCheckAWSLambdaEventSourceMappingIsBeingDisabled(conf *lambda.EventSourceMappingConfiguration) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).lambdaconn
Expand Down Expand Up @@ -1185,6 +1231,127 @@ resource "aws_lambda_event_source_mapping" "test" {
`, rName, batchSize))
}

func testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, batchSize string) string {
if batchSize == "" {
batchSize = "null"
}

return composeConfig(testAccAvailableAZsNoOptInConfig(), fmt.Sprintf(`
resource "aws_iam_role" "test" {
name = %[1]q
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}
resource "aws_iam_policy" "test" {
name = %[1]q
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:CreateNetworkInterface",
"ec2:DeleteNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSubnets",
"ec2:DescribeVpcs"
],
"Resource": "*"
}
]
}
EOF
}
resource "aws_iam_policy_attachment" "test" {
name = %[1]q
roles = [aws_iam_role.test.name]
policy_arn = aws_iam_policy.test.arn
}
resource "aws_vpc" "test" {
cidr_block = "192.168.0.0/22"
tags = {
Name = %[1]q
}
}
resource "aws_subnet" "test" {
count = 2
vpc_id = aws_vpc.test.id
cidr_block = cidrsubnet(aws_vpc.test.cidr_block, 2, count.index)
availability_zone = data.aws_availability_zones.available.names[count.index]
tags = {
Name = %[1]q
}
}
resource "aws_security_group" "test" {
name = %[1]q
vpc_id = aws_vpc.test.id
tags = {
Name = %[1]q
}
}
resource "aws_lambda_function" "test" {
filename = "test-fixtures/lambdatest.zip"
function_name = %[1]q
role = aws_iam_role.test.arn
handler = "exports.example"
runtime = "nodejs12.x"
}
resource "aws_lambda_event_source_mapping" "test" {
batch_size = %[2]s
enabled = true
function_name = aws_lambda_function.test.arn
topics = ["test"]
starting_position = "TRIM_HORIZON"
self_managed_event_source {
endpoints {
kafka_bootstrap_servers = [ "test:9092" ]
}
}
dynamic "source_access_configuration" {
for_each = aws_subnet.test.*.id
content {
type = "VPC_SUBNET"
uri = "subnet:${source_access_configuration.value}"
}
}
source_access_configuration {
type = "VPC_SECURITY_GROUP"
uri = aws_security_group.test.id
}
}
`, rName, batchSize))
}

func testAccAWSLambdaEventSourceMappingConfigDynamoDBBase(rName string) string {
return fmt.Sprintf(`
resource "aws_iam_role" "test" {
Expand Down
Loading

0 comments on commit f69c0d7

Please sign in to comment.