Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource/aws_dynamodb_table: Improve update handling #7453

Merged
merged 1 commit into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 124 additions & 100 deletions aws/resource_aws_dynamodb_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func resourceAwsDynamoDbTable() *schema.Resource {
Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(10 * time.Minute),
Delete: schema.DefaultTimeout(10 * time.Minute),
Update: schema.DefaultTimeout(10 * time.Minute),
Update: schema.DefaultTimeout(60 * time.Minute),
},

CustomizeDiff: customdiff.Sequence(
Expand Down Expand Up @@ -384,18 +384,78 @@ func resourceAwsDynamoDbTableCreate(d *schema.ResourceData, meta interface{}) er
return err
}

return resourceAwsDynamoDbTableUpdate(d, meta)
if err := updateDynamoDbTimeToLive(d, conn); err != nil {
return fmt.Errorf("error enabling DynamoDB Table (%s) time to live: %s", d.Id(), err)
}

if err := setTagsDynamoDb(conn, d); err != nil {
return fmt.Errorf("error adding DynamoDB Table (%s) tags: %s", d.Id(), err)
}

if d.Get("point_in_time_recovery.0.enabled").(bool) {
if err := updateDynamoDbPITR(d, conn); err != nil {
return fmt.Errorf("error enabling DynamoDB Table (%s) point in time recovery: %s", d.Id(), err)
}
}

return resourceAwsDynamoDbTableRead(d, meta)
}

func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).dynamodbconn
billingMode := d.Get("billing_mode").(string)

if d.HasChange("billing_mode") && !d.IsNewResource() {
req := &dynamodb.UpdateTableInput{
TableName: aws.String(d.Id()),
BillingMode: aws.String(billingMode),
// Global Secondary Index operations must occur in multiple phases
// to prevent various error scenarios. If there are no detected required
// updates in the Terraform configuration, later validation or API errors
// will signal the problems.
var gsiUpdates []*dynamodb.GlobalSecondaryIndexUpdate

if d.HasChange("global_secondary_index") {
var err error
o, n := d.GetChange("global_secondary_index")
gsiUpdates, err = diffDynamoDbGSI(o.(*schema.Set).List(), n.(*schema.Set).List(), billingMode)

if err != nil {
return fmt.Errorf("computing difference for DynamoDB Table (%s) Global Secondary Index updates failed: %s", d.Id(), err)
}

log.Printf("[DEBUG] Computed DynamoDB Table (%s) Global Secondary Index updates: %s", d.Id(), gsiUpdates)
}

// Phase 1 of Global Secondary Index Operations: Delete Only
// * Delete indexes first to prevent error when simultaneously updating
// BillingMode to PROVISIONED, which requires updating index
// ProvisionedThroughput first, but we have no definition
// * Only 1 online index can be deleted simultaneously per table
for _, gsiUpdate := range gsiUpdates {
if gsiUpdate.Delete == nil {
continue
}

idxName := aws.StringValue(gsiUpdate.Delete.IndexName)
input := &dynamodb.UpdateTableInput{
GlobalSecondaryIndexUpdates: []*dynamodb.GlobalSecondaryIndexUpdate{gsiUpdate},
TableName: aws.String(d.Id()),
}

if _, err := conn.UpdateTable(input); err != nil {
return fmt.Errorf("error deleting DynamoDB Table (%s) Global Secondary Index (%s): %s", d.Id(), idxName, err)
}

if err := waitForDynamoDbGSIToBeDeleted(d.Id(), idxName, d.Timeout(schema.TimeoutUpdate), conn); err != nil {
return fmt.Errorf("error waiting for DynamoDB Table (%s) Global Secondary Index (%s) deletion: %s", d.Id(), idxName, err)
}
}

hasTableUpdate := false
input := &dynamodb.UpdateTableInput{
TableName: aws.String(d.Id()),
}

if d.HasChange("billing_mode") || d.HasChange("read_capacity") || d.HasChange("write_capacity") {
hasTableUpdate = true

capacityMap := map[string]interface{}{
"write_capacity": d.Get("write_capacity"),
"read_capacity": d.Get("read_capacity"),
Expand All @@ -405,133 +465,97 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er
return err
}

req.ProvisionedThroughput = expandDynamoDbProvisionedThroughput(capacityMap, billingMode)
input.BillingMode = aws.String(billingMode)
input.ProvisionedThroughput = expandDynamoDbProvisionedThroughput(capacityMap, billingMode)
}

_, err := conn.UpdateTable(req)
if err != nil {
return fmt.Errorf("Error updating DynamoDB Table (%s) billing mode: %s", d.Id(), err)
if d.HasChange("stream_enabled") || d.HasChange("stream_view_type") {
hasTableUpdate = true

input.StreamSpecification = &dynamodb.StreamSpecification{
StreamEnabled: aws.Bool(d.Get("stream_enabled").(bool)),
}
if err := waitForDynamoDbTableToBeActive(d.Id(), d.Timeout(schema.TimeoutUpdate), conn); err != nil {
return fmt.Errorf("Error waiting for DynamoDB Table update: %s", err)
if d.Get("stream_enabled").(bool) {
input.StreamSpecification.StreamViewType = aws.String(d.Get("stream_view_type").(string))
}
}
// Cannot create or delete index while updating table IOPS
// so we update IOPS separately
if !d.HasChange("billing_mode") && d.Get("billing_mode").(string) == dynamodb.BillingModeProvisioned && (d.HasChange("read_capacity") || d.HasChange("write_capacity")) && !d.IsNewResource() {
_, err := conn.UpdateTable(&dynamodb.UpdateTableInput{
TableName: aws.String(d.Id()),
ProvisionedThroughput: expandDynamoDbProvisionedThroughput(map[string]interface{}{
"read_capacity": d.Get("read_capacity"),
"write_capacity": d.Get("write_capacity"),
}, billingMode),
})
if err != nil {
return err
}
if err := waitForDynamoDbTableToBeActive(d.Id(), d.Timeout(schema.TimeoutUpdate), conn); err != nil {
return fmt.Errorf("Error waiting for DynamoDB Table update: %s", err)

// Phase 2 of Global Secondary Index Operations: Update Only
// Cannot create or delete index while updating table ProvisionedThroughput
// Must skip all index updates when switching BillingMode from PROVISIONED to PAY_PER_REQUEST
// Must update all indexes when switching BillingMode from PAY_PER_REQUEST to PROVISIONED
if billingMode == dynamodb.BillingModeProvisioned {
for _, gsiUpdate := range gsiUpdates {
if gsiUpdate.Update == nil {
continue
}

input.GlobalSecondaryIndexUpdates = append(input.GlobalSecondaryIndexUpdates, gsiUpdate)
}
}

if (d.HasChange("stream_enabled") || d.HasChange("stream_view_type")) && !d.IsNewResource() {
toEnable := d.Get("stream_enabled").(bool)
streamSpec := dynamodb.StreamSpecification{
StreamEnabled: aws.Bool(toEnable),
}
if toEnable {
streamSpec.StreamViewType = aws.String(d.Get("stream_view_type").(string))
}
input := &dynamodb.UpdateTableInput{
TableName: aws.String(d.Id()),
StreamSpecification: &streamSpec,
}
if hasTableUpdate {
log.Printf("[DEBUG] Updating DynamoDB Table: %s", input)
_, err := conn.UpdateTable(input)

if err != nil {
return err
return fmt.Errorf("error updating DynamoDB Table (%s): %s", d.Id(), err)
}

if err := waitForDynamoDbTableToBeActive(d.Id(), d.Timeout(schema.TimeoutUpdate), conn); err != nil {
return fmt.Errorf("Error waiting for DynamoDB Table update: %s", err)
return fmt.Errorf("error waiting for DynamoDB Table (%s) update: %s", d.Id(), err)
}
}

// Indexes and attributes are tightly coupled (DynamoDB requires attribute definitions
// for all indexed attributes) so it's necessary to update these together.
if (d.HasChange("global_secondary_index") || d.HasChange("attribute")) && !d.IsNewResource() {
attributes := d.Get("attribute").(*schema.Set).List()
for _, gsiUpdate := range gsiUpdates {
if gsiUpdate.Update == nil {
continue
}

o, n := d.GetChange("global_secondary_index")
ops, err := diffDynamoDbGSI(o.(*schema.Set).List(), n.(*schema.Set).List(), billingMode)
if err != nil {
return fmt.Errorf("Computing difference for global_secondary_index failed: %s", err)
idxName := aws.StringValue(gsiUpdate.Update.IndexName)
if err := waitForDynamoDbGSIToBeActive(d.Id(), idxName, d.Timeout(schema.TimeoutUpdate), conn); err != nil {
return fmt.Errorf("error waiting for DynamoDB Table (%s) Global Secondary Index (%s) update: %s", d.Id(), idxName, err)
}
}
log.Printf("[DEBUG] Updating global secondary indexes:\n%s", ops)
}

input := &dynamodb.UpdateTableInput{
TableName: aws.String(d.Id()),
AttributeDefinitions: expandDynamoDbAttributes(attributes),
// Phase 3 of Global Secondary Index Operations: Create Only
// Only 1 online index can be created simultaneously per table
for _, gsiUpdate := range gsiUpdates {
if gsiUpdate.Create == nil {
continue
}

// Only 1 online index can be created or deleted simultaneously per table
for _, op := range ops {
input.GlobalSecondaryIndexUpdates = []*dynamodb.GlobalSecondaryIndexUpdate{op}
log.Printf("[DEBUG] Updating DynamoDB Table: %s", input)
_, err := conn.UpdateTable(input)
if err != nil {
return err
}
if op.Create != nil {
idxName := *op.Create.IndexName
if err := waitForDynamoDbGSIToBeActive(d.Id(), idxName, conn); err != nil {
return fmt.Errorf("Error waiting for DynamoDB GSI %q to be created: %s", idxName, err)
}
}
if op.Update != nil {
idxName := *op.Update.IndexName
if err := waitForDynamoDbGSIToBeActive(d.Id(), idxName, conn); err != nil {
return fmt.Errorf("Error waiting for DynamoDB GSI %q to be updated: %s", idxName, err)
}
}
if op.Delete != nil {
idxName := *op.Delete.IndexName
if err := waitForDynamoDbGSIToBeDeleted(d.Id(), idxName, conn); err != nil {
return fmt.Errorf("Error waiting for DynamoDB GSI %q to be deleted: %s", idxName, err)
}
}
idxName := aws.StringValue(gsiUpdate.Create.IndexName)
input := &dynamodb.UpdateTableInput{
AttributeDefinitions: expandDynamoDbAttributes(d.Get("attribute").(*schema.Set).List()),
GlobalSecondaryIndexUpdates: []*dynamodb.GlobalSecondaryIndexUpdate{gsiUpdate},
TableName: aws.String(d.Id()),
}

// We may only be changing the attribute type
if len(ops) == 0 {
_, err := conn.UpdateTable(input)
if err != nil {
return err
}
if _, err := conn.UpdateTable(input); err != nil {
return fmt.Errorf("error creating DynamoDB Table (%s) Global Secondary Index (%s): %s", d.Id(), idxName, err)
}

if err := waitForDynamoDbTableToBeActive(d.Id(), d.Timeout(schema.TimeoutUpdate), conn); err != nil {
return fmt.Errorf("Error waiting for DynamoDB Table op: %s", err)
if err := waitForDynamoDbGSIToBeActive(d.Id(), idxName, d.Timeout(schema.TimeoutUpdate), conn); err != nil {
return fmt.Errorf("error waiting for DynamoDB Table (%s) Global Secondary Index (%s) creation: %s", d.Id(), idxName, err)
}
}

if d.HasChange("ttl") {
if err := updateDynamoDbTimeToLive(d, conn); err != nil {
log.Printf("[DEBUG] Error updating table TimeToLive: %s", err)
return err
return fmt.Errorf("error updating DynamoDB Table (%s) time to live: %s", d.Id(), err)
}
}

if d.HasChange("tags") {
if err := setTagsDynamoDb(conn, d); err != nil {
return err
return fmt.Errorf("error updating DynamoDB Table (%s) tags: %s", d.Id(), err)
}
}

if d.HasChange("point_in_time_recovery") {
_, enabled := d.GetChange("point_in_time_recovery.0.enabled")
if !d.IsNewResource() || enabled.(bool) {
if err := updateDynamoDbPITR(d, conn); err != nil {
return err
}
if err := updateDynamoDbPITR(d, conn); err != nil {
return fmt.Errorf("error updating DynamoDB Table (%s) point in time recovery: %s", d.Id(), err)
}
}

Expand Down Expand Up @@ -773,14 +797,14 @@ func readDynamoDbTableTags(arn string, conn *dynamodb.DynamoDB) (map[string]stri

// Waiters

func waitForDynamoDbGSIToBeActive(tableName string, gsiName string, conn *dynamodb.DynamoDB) error {
func waitForDynamoDbGSIToBeActive(tableName string, gsiName string, timeout time.Duration, conn *dynamodb.DynamoDB) error {
stateConf := &resource.StateChangeConf{
Pending: []string{
dynamodb.IndexStatusCreating,
dynamodb.IndexStatusUpdating,
},
Target: []string{dynamodb.IndexStatusActive},
Timeout: 10 * time.Minute,
Timeout: timeout,
Refresh: func() (interface{}, string, error) {
result, err := conn.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
Expand Down Expand Up @@ -810,14 +834,14 @@ func waitForDynamoDbGSIToBeActive(tableName string, gsiName string, conn *dynamo
return err
}

func waitForDynamoDbGSIToBeDeleted(tableName string, gsiName string, conn *dynamodb.DynamoDB) error {
func waitForDynamoDbGSIToBeDeleted(tableName string, gsiName string, timeout time.Duration, conn *dynamodb.DynamoDB) error {
stateConf := &resource.StateChangeConf{
Pending: []string{
dynamodb.IndexStatusActive,
dynamodb.IndexStatusDeleting,
},
Target: []string{},
Timeout: 10 * time.Minute,
Timeout: timeout,
Refresh: func() (interface{}, string, error) {
result, err := conn.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
Expand Down
Loading