Skip to content

Commit

Permalink
Merge pull request #31324 from hashicorp/remove-aws_msk_cluster.ebs_v…
Browse files Browse the repository at this point in the history
…olume_size

r/aws_msk_cluster: Remove `broker_node_group_info.ebs_volume_size` attribute
  • Loading branch information
ewbankkit committed May 11, 2023
2 parents 440ebd5 + 1d3ae40 commit a486057
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 463 deletions.
3 changes: 3 additions & 0 deletions .changelog/31324.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:note
resource/aws_msk_cluster: The `broker_node_group_info.ebs_volume_size` attribute has been removed
```
9 changes: 0 additions & 9 deletions internal/service/eks/addon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"regexp"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/eks"
sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
Expand Down Expand Up @@ -473,14 +472,6 @@ func testAccPreCheckAddon(ctx context.Context, t *testing.T) {
}
}

func testAccCheckAddonUpdateTags(ctx context.Context, addon *eks.Addon, oldTags, newTags map[string]string) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := acctest.Provider.Meta().(*conns.AWSClient).EKSConn()

return tfeks.UpdateTags(ctx, conn, aws.StringValue(addon.AddonArn), oldTags, newTags)
}
}

func testAccAddonConfig_base(rName string) string {
return acctest.ConfigCompose(acctest.ConfigAvailableAZsNoOptIn(), fmt.Sprintf(`
data "aws_partition" "current" {}
Expand Down
13 changes: 9 additions & 4 deletions internal/service/kafka/broker_nodes_data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,22 @@ func TestAccKafkaBrokerNodesDataSource_basic(t *testing.T) {
}

func testAccBrokerNodesDataSourceConfig_basic(rName string) string {
return acctest.ConfigCompose(testAccClusterBaseConfig(rName), fmt.Sprintf(`
return acctest.ConfigCompose(testAccClusterConfig_base(rName), fmt.Sprintf(`
resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = "2.2.1"
number_of_broker_nodes = 3
broker_node_group_info {
client_subnets = [aws_subnet.example_subnet_az1.id, aws_subnet.example_subnet_az2.id, aws_subnet.example_subnet_az3.id]
ebs_volume_size = 10
client_subnets = aws_subnet.test[*].id
instance_type = "kafka.t3.small"
security_groups = [aws_security_group.example_sg.id]
security_groups = [aws_security_group.test.id]
storage_info {
ebs_storage_info {
volume_size = 10
}
}
}
tags = {
Expand Down
81 changes: 15 additions & 66 deletions internal/service/kafka/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ func ResourceCluster() *schema.Resource {
customdiff.ForceNewIfChange("kafka_version", func(_ context.Context, old, new, meta interface{}) bool {
return verify.SemVerLessThan(new.(string), old.(string))
}),
customdiff.ComputedIf("broker_node_group_info.0.storage_info", func(_ context.Context, diff *schema.ResourceDiff, meta interface{}) bool {
return diff.HasChange("broker_node_group_info.0.ebs_volume_size")
}),
customdiff.ComputedIf("broker_node_group_info.0.ebs_volume_size", func(_ context.Context, diff *schema.ResourceDiff, meta interface{}) bool {
return diff.HasChange("broker_node_group_info.0.storage_info")
}),
verify.SetTagsDiff,
),

Expand Down Expand Up @@ -134,14 +128,6 @@ func ResourceCluster() *schema.Resource {
},
},
},
"ebs_volume_size": {
Type: schema.TypeInt,
Optional: true,
Computed: true,
Deprecated: "use 'storage_info' argument instead",
ValidateFunc: validation.IntBetween(1, 16384),
ConflictsWith: []string{"broker_node_group_info.0.storage_info"},
},
"instance_type": {
Type: schema.TypeString,
Required: true,
Expand All @@ -155,11 +141,10 @@ func ResourceCluster() *schema.Resource {
},
},
"storage_info": {
Type: schema.TypeList,
Optional: true,
Computed: true,
MaxItems: 1,
ConflictsWith: []string{"broker_node_group_info.0.ebs_volume_size"},
Type: schema.TypeList,
Optional: true,
Computed: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"ebs_storage_info": {
Expand Down Expand Up @@ -671,52 +656,32 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, meta int
}
}

if d.HasChanges("broker_node_group_info.0.ebs_volume_size", "broker_node_group_info.0.storage_info") {
if d.HasChanges("broker_node_group_info.0.storage_info") {
input := &kafka.UpdateBrokerStorageInput{
ClusterArn: aws.String(d.Id()),
CurrentVersion: aws.String(d.Get("current_version").(string)),
}
if d.HasChange("broker_node_group_info.0.storage_info") {
// case 1: deprecated ebs_volume_size replaced with storage_info
// case 2: regular update of storage_info
ebsVolumeInfo := &kafka.BrokerEBSVolumeInfo{
TargetBrokerEBSVolumeInfo: []*kafka.BrokerEBSVolumeInfo{{
KafkaBrokerNodeId: aws.String("All"),
VolumeSizeGB: aws.Int64(int64(d.Get("broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size").(int))),
}
if v, ok := d.GetOk("broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
ebsVolumeInfo.ProvisionedThroughput = expandProvisionedThroughput(v.([]interface{})[0].(map[string]interface{}))
}
input.TargetBrokerEBSVolumeInfo = []*kafka.BrokerEBSVolumeInfo{ebsVolumeInfo}
} else {
// case 3: regular update of deprecated ebs_volume_size
input.TargetBrokerEBSVolumeInfo = []*kafka.BrokerEBSVolumeInfo{
{
KafkaBrokerNodeId: aws.String("All"),
VolumeSizeGB: aws.Int64(int64(d.Get("broker_node_group_info.0.ebs_volume_size").(int))),
},
}
}},
}

if v, ok := d.GetOk("broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.TargetBrokerEBSVolumeInfo[0].ProvisionedThroughput = expandProvisionedThroughput(v.([]interface{})[0].(map[string]interface{}))
}

output, err := conn.UpdateBrokerStorageWithContext(ctx, input)

// the following error is thrown if previous ebs_volume_size and new storage_info.ebs_storage_info.volume_size have the same value:
// BadRequestException: The request does not include any updates to the EBS volumes of the cluster. Verify the request, then try again
// ignore this error to allow users to replace deprecated ebs_volume_size with storage_info - Address case 1
if err != nil && !tfawserr.ErrMessageContains(err, kafka.ErrCodeBadRequestException, "The request does not include any updates to the EBS volumes of the cluster") {
if err != nil {
return diag.Errorf("updating MSK Cluster (%s) broker storage: %s", d.Id(), err)
}

clusterOperationARN := aws.StringValue(output.ClusterOperationArn)

// when there are no changes, output.ClusterOperationArn is not returned leading to
// InvalidParameter: 1 validation error(s) found. - minimum field size of 1, DescribeClusterOperationInput.ClusterOperationArn.
// skip the wait if the EBS volume size is unchanged
if !tfawserr.ErrMessageContains(err, kafka.ErrCodeBadRequestException, "The request does not include any updates to the EBS volumes of the cluster") {
_, err = waitClusterOperationCompleted(ctx, conn, clusterOperationARN, d.Timeout(schema.TimeoutUpdate))
_, err = waitClusterOperationCompleted(ctx, conn, clusterOperationARN, d.Timeout(schema.TimeoutUpdate))

if err != nil {
return diag.Errorf("waiting for MSK Cluster (%s) operation (%s): %s", d.Id(), clusterOperationARN, err)
}
if err != nil {
return diag.Errorf("waiting for MSK Cluster (%s) operation (%s): %s", d.Id(), clusterOperationARN, err)
}

// refresh the current_version attribute after each update
Expand Down Expand Up @@ -979,14 +944,6 @@ func expandBrokerNodeGroupInfo(tfMap map[string]interface{}) *kafka.BrokerNodeGr
apiObject.SecurityGroups = flex.ExpandStringSet(v)
}

if v, ok := tfMap["ebs_volume_size"].(int); ok && v != 0 {
apiObject.StorageInfo = &kafka.StorageInfo{
EbsStorageInfo: &kafka.EBSStorageInfo{
VolumeSize: aws.Int64(int64(v)),
},
}
}

if v, ok := tfMap["storage_info"].([]interface{}); ok && len(v) > 0 && v[0] != nil {
apiObject.StorageInfo = expandStorageInfo(v[0].(map[string]interface{}))
}
Expand Down Expand Up @@ -1376,14 +1333,6 @@ func flattenBrokerNodeGroupInfo(apiObject *kafka.BrokerNodeGroupInfo) map[string
tfMap["storage_info"] = flattenStorageInfo(v)
}

if v := apiObject.StorageInfo; v != nil {
if v := v.EbsStorageInfo; v != nil {
if v := v.VolumeSize; v != nil {
tfMap["ebs_volume_size"] = aws.Int64Value(v)
}
}
}

return tfMap
}

Expand Down
13 changes: 9 additions & 4 deletions internal/service/kafka/cluster_data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,22 @@ func TestAccKafkaClusterDataSource_basic(t *testing.T) {
}

func testAccClusterDataSourceConfig_basic(rName string) string {
return acctest.ConfigCompose(testAccClusterBaseConfig(rName), fmt.Sprintf(`
return acctest.ConfigCompose(testAccClusterConfig_base(rName), fmt.Sprintf(`
resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = "2.2.1"
number_of_broker_nodes = 3
broker_node_group_info {
client_subnets = [aws_subnet.example_subnet_az1.id, aws_subnet.example_subnet_az2.id, aws_subnet.example_subnet_az3.id]
ebs_volume_size = 10
client_subnets = aws_subnet.test[*].id
instance_type = "kafka.m5.large"
security_groups = [aws_security_group.example_sg.id]
security_groups = [aws_security_group.test.id]
storage_info {
ebs_storage_info {
volume_size = 10
}
}
}
tags = {
Expand Down
Loading

0 comments on commit a486057

Please sign in to comment.