Skip to content

Commit

Permalink
feat: Warehouse redesign part4 (#2893)
Browse files Browse the repository at this point in the history
Warehouses redesign part4 (continuation of
#2890):
- handle snowflake parameters using computed values
- handle waiting for suspension

Common:
- move retry to utils (to avoid dependencies cycle)

Still missing (next PRs):
- encapsulate 3-value boolean logic
- extract and generalize part of read logic
- Move GetPropertyAsPointerWithPossibleZeroValues and use for database
parameters too
  • Loading branch information
sfc-gh-asawicki authored Jun 28, 2024
1 parent 7f6c657 commit d525fd9
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 93 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ test-client: ## runs test that checks sdk.Client without instrumentedsql
SF_TF_NO_INSTRUMENTED_SQL=1 SF_TF_GOSNOWFLAKE_LOG_LEVEL=debug go test ./pkg/sdk/internal/client/... -v

test-acceptance-%: ## run acceptance tests for the given resource only, e.g. test-acceptance-Warehouse
TF_ACC=1 SF_TF_ACC_TEST_CONFIGURE_CLIENT_ONCE=true go test -run ^TestAcc_$*_ -v -timeout=20m ./...
TF_ACC=1 TF_LOG=DEBUG SF_TF_ACC_TEST_CONFIGURE_CLIENT_ONCE=true go test -run ^TestAcc_$*_ -v -timeout=20m ./pkg/resources

build-local: ## build the binary locally
go build -o $(BASE_BINARY_NAME) .
Expand Down
17 changes: 0 additions & 17 deletions pkg/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk"
)
Expand Down Expand Up @@ -138,19 +137,3 @@ func DecodeSnowflakeAccountIdentifier(identifier string) (sdk.AccountIdentifier,
return sdk.AccountIdentifier{}, fmt.Errorf("unable to classify account identifier: %s, expected format: <organization_name>.<account_name>", identifier)
}
}

func Retry(attempts int, sleepDuration time.Duration, f func() (error, bool)) error {
for i := 0; i < attempts; i++ {
err, done := f()
if err != nil {
return err
}
if done {
return nil
} else {
log.Printf("[INFO] operation not finished yet, retrying in %v seconds\n", sleepDuration.Seconds())
time.Sleep(sleepDuration)
}
}
return fmt.Errorf("giving up after %v attempts", attempts)
}
23 changes: 23 additions & 0 deletions pkg/internal/util/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package util

import (
"fmt"
"log"
"time"
)

func Retry(attempts int, sleepDuration time.Duration, f func() (error, bool)) error {
for i := 0; i < attempts; i++ {
err, done := f()
if err != nil {
return err
}
if done {
return nil
} else {
log.Printf("[INFO] operation not finished yet, retrying in %v seconds\n", sleepDuration.Seconds())
time.Sleep(sleepDuration)
}
}
return fmt.Errorf("giving up after %v attempts", attempts)
}
8 changes: 4 additions & 4 deletions pkg/resources/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"strings"
"time"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/util"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
Expand Down Expand Up @@ -289,7 +289,7 @@ func CreateAccount(d *schema.ResourceData, meta interface{}) error {
}

var account *sdk.Account
err = helpers.Retry(5, 3*time.Second, func() (error, bool) {
err = util.Retry(5, 3*time.Second, func() (error, bool) {
account, err = client.Accounts.ShowByID(ctx, objectIdentifier)
if err != nil {
return nil, false
Expand All @@ -313,7 +313,7 @@ func ReadAccount(d *schema.ResourceData, meta interface{}) error {

var acc *sdk.Account
var err error
err = helpers.Retry(5, 3*time.Second, func() (error, bool) {
err = util.Retry(5, 3*time.Second, func() (error, bool) {
acc, err = client.Accounts.ShowByID(ctx, id)
if err != nil {
return nil, false
Expand Down
8 changes: 4 additions & 4 deletions pkg/resources/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"strings"
"time"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/util"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand Down Expand Up @@ -383,7 +383,7 @@ func waitResumeAlert(ctx context.Context, client *sdk.Client, id sdk.SchemaObjec
}
return nil, alert.State == sdk.AlertStateStarted
}
return helpers.Retry(5, 10*time.Second, resumeAlert)
return util.Retry(5, 10*time.Second, resumeAlert)
}

func waitSuspendAlert(ctx context.Context, client *sdk.Client, id sdk.SchemaObjectIdentifier) error {
Expand All @@ -399,5 +399,5 @@ func waitSuspendAlert(ctx context.Context, client *sdk.Client, id sdk.SchemaObje
}
return nil, alert.State == sdk.AlertStateSuspended
}
return helpers.Retry(5, 10*time.Second, suspendAlert)
return util.Retry(5, 10*time.Second, suspendAlert)
}
3 changes: 2 additions & 1 deletion pkg/resources/managed_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/util"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
Expand Down Expand Up @@ -131,7 +132,7 @@ func ReadManagedAccount(d *schema.ResourceData, meta interface{}) error {
// TODO [SNOW-1003380]: discuss it as a provider-wide topic during resources redesign.
var managedAccount *sdk.ManagedAccount
var err error
err = helpers.Retry(5, 3*time.Second, func() (error, bool) {
err = util.Retry(5, 3*time.Second, func() (error, bool) {
managedAccount, err = client.ManagedAccounts.ShowByID(ctx, objectIdentifier)
if err != nil {
return nil, false
Expand Down
6 changes: 3 additions & 3 deletions pkg/resources/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"strconv"
"time"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/util"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand Down Expand Up @@ -388,7 +388,7 @@ func waitForTaskStart(ctx context.Context, client *sdk.Client, id sdk.SchemaObje
if err != nil {
return fmt.Errorf("error starting task %s err = %w", id.FullyQualifiedName(), err)
}
return helpers.Retry(5, 5*time.Second, func() (error, bool) {
return util.Retry(5, 5*time.Second, func() (error, bool) {
task, err := client.Tasks.ShowByID(ctx, id)
if err != nil {
return fmt.Errorf("error starting task %s err = %w", id.FullyQualifiedName(), err), false
Expand Down
113 changes: 88 additions & 25 deletions pkg/resources/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,23 @@ var warehouseSchema = map[string]*schema.Schema{
strings.ToLower(string(sdk.ObjectParameterMaxConcurrencyLevel)): {
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntAtLeast(1),
Description: "Object parameter that specifies the concurrency level for SQL statements (i.e. queries and DML) executed by a warehouse.",
Default: -1,
},
strings.ToLower(string(sdk.ObjectParameterStatementQueuedTimeoutInSeconds)): {
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntAtLeast(0),
Description: "Object parameter that specifies the time, in seconds, a SQL statement (query, DDL, DML, etc.) can be queued on a warehouse before it is canceled by the system.",
Default: -1,
},
strings.ToLower(string(sdk.ObjectParameterStatementTimeoutInSeconds)): {
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntBetween(0, 604800),
Description: "Specifies the time, in seconds, after which a running SQL statement (query, DDL, DML, etc.) is canceled by the system",
Default: -1,
},
showOutputAttributeName: {
Type: schema.TypeList,
Expand All @@ -150,6 +150,29 @@ var warehouseSchema = map[string]*schema.Schema{
},
}

// TODO: merge with DatabaseParametersCustomDiff and extract common
var warehouseParametersCustomDiff = func(ctx context.Context, d *schema.ResourceDiff, meta any) error {
if d.Id() == "" {
return nil
}

client := meta.(*provider.Context).Client
params, err := client.Parameters.ShowParameters(context.Background(), &sdk.ShowParametersOptions{
In: &sdk.ParametersIn{
Warehouse: helpers.DecodeSnowflakeID(d.Id()).(sdk.AccountObjectIdentifier),
},
})
if err != nil {
return err
}

return customdiff.All(
IntParameterValueComputedIf("max_concurrency_level", params, sdk.ParameterTypeWarehouse, sdk.AccountParameterMaxConcurrencyLevel),
IntParameterValueComputedIf("statement_queued_timeout_in_seconds", params, sdk.ParameterTypeWarehouse, sdk.AccountParameterStatementQueuedTimeoutInSeconds),
IntParameterValueComputedIf("statement_timeout_in_seconds", params, sdk.ParameterTypeWarehouse, sdk.AccountParameterStatementTimeoutInSeconds),
)(ctx, d, meta)
}

// Warehouse returns a pointer to the resource representing a warehouse.
func Warehouse() *schema.Resource {
return &schema.Resource{
Expand All @@ -172,6 +195,7 @@ func Warehouse() *schema.Resource {
customdiff.ForceNewIfChange("warehouse_size", func(ctx context.Context, old, new, meta any) bool {
return old.(string) != "" && new.(string) == ""
}),
warehouseParametersCustomDiff,
),

StateUpgraders: []schema.StateUpgrader{
Expand Down Expand Up @@ -299,14 +323,14 @@ func CreateWarehouse(ctx context.Context, d *schema.ResourceData, meta any) diag
if v := d.Get("query_acceleration_max_scale_factor").(int); v != -1 {
createOptions.QueryAccelerationMaxScaleFactor = sdk.Int(v)
}
if v := d.Get("max_concurrency_level").(int); v != -1 {
createOptions.MaxConcurrencyLevel = sdk.Int(v)
if v := GetPropertyAsPointerWithPossibleZeroValues[int](d, "max_concurrency_level"); v != nil {
createOptions.MaxConcurrencyLevel = v
}
if v := d.Get("statement_queued_timeout_in_seconds").(int); v != -1 {
createOptions.StatementQueuedTimeoutInSeconds = sdk.Int(v)
if v := GetPropertyAsPointerWithPossibleZeroValues[int](d, "statement_queued_timeout_in_seconds"); v != nil {
createOptions.StatementQueuedTimeoutInSeconds = v
}
if v := d.Get("statement_timeout_in_seconds").(int); v != -1 {
createOptions.StatementTimeoutInSeconds = sdk.Int(v)
if v := GetPropertyAsPointerWithPossibleZeroValues[int](d, "statement_timeout_in_seconds"); v != nil {
createOptions.StatementTimeoutInSeconds = v
}

err := client.Warehouses.Create(ctx, id, createOptions)
Expand All @@ -318,6 +342,19 @@ func CreateWarehouse(ctx context.Context, d *schema.ResourceData, meta any) diag
return GetReadWarehouseFunc(false)(ctx, d, meta)
}

// TODO: move
func GetPropertyAsPointerWithPossibleZeroValues[T any](d *schema.ResourceData, property string) *T {
if d.GetRawConfig().AsValueMap()[property].IsNull() {
return nil
}
value := d.Get(property)
typedValue, ok := value.(T)
if !ok {
return nil
}
return &typedValue
}

func GetReadWarehouseFunc(withExternalChangesMarking bool) schema.ReadContextFunc {
return func(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
client := meta.(*provider.Context).Client
Expand Down Expand Up @@ -357,16 +394,11 @@ func GetReadWarehouseFunc(withExternalChangesMarking bool) schema.ReadContextFun
showMapping{"auto_suspend", "auto_suspend", w.AutoSuspend, w.AutoSuspend, nil},
showMapping{"auto_resume", "auto_resume", w.AutoResume, fmt.Sprintf("%t", w.AutoResume), nil},
showMapping{"resource_monitor", "resource_monitor", w.ResourceMonitor.Name(), w.ResourceMonitor.Name(), nil},
showMapping{"comment", "comment", w.Comment, w.Comment, nil},
showMapping{"enable_query_acceleration", "enable_query_acceleration", w.EnableQueryAcceleration, fmt.Sprintf("%t", w.EnableQueryAcceleration), nil},
showMapping{"query_acceleration_max_scale_factor", "query_acceleration_max_scale_factor", w.QueryAccelerationMaxScaleFactor, w.QueryAccelerationMaxScaleFactor, nil},
); err != nil {
return diag.FromErr(err)
}

if err = markChangedParameters(sdk.WarehouseParameters, warehouseParameters, d, sdk.ParameterTypeWarehouse); err != nil {
return diag.FromErr(err)
}
}

// These are all identity sets, needed for the case where:
Expand Down Expand Up @@ -416,11 +448,6 @@ func GetReadWarehouseFunc(withExternalChangesMarking bool) schema.ReadContextFun
return diag.FromErr(err)
}
}
if v := d.GetRawConfig().AsValueMap()["comment"]; !v.IsNull() {
if err = d.Set("comment", v.AsString()); err != nil {
return diag.FromErr(err)
}
}
if v := d.GetRawConfig().AsValueMap()["enable_query_acceleration"]; !v.IsNull() {
if err = d.Set("enable_query_acceleration", v.AsString()); err != nil {
return diag.FromErr(err)
Expand All @@ -434,6 +461,17 @@ func GetReadWarehouseFunc(withExternalChangesMarking bool) schema.ReadContextFun
}
}

if err = d.Set("name", w.Name); err != nil {
return diag.FromErr(err)
}
if err = d.Set("comment", w.Comment); err != nil {
return diag.FromErr(err)
}

if diags := handleWarehouseParameterRead(d, warehouseParameters); diags != nil {
return diags
}

if err = d.Set(showOutputAttributeName, []map[string]any{schemas.WarehouseToSchema(w)}); err != nil {
return diag.FromErr(err)
}
Expand All @@ -446,6 +484,26 @@ func GetReadWarehouseFunc(withExternalChangesMarking bool) schema.ReadContextFun
}
}

func handleWarehouseParameterRead(d *schema.ResourceData, warehouseParameters []*sdk.Parameter) diag.Diagnostics {
for _, parameter := range warehouseParameters {
switch parameter.Key {
case
string(sdk.ObjectParameterMaxConcurrencyLevel),
string(sdk.ObjectParameterStatementQueuedTimeoutInSeconds),
string(sdk.ObjectParameterStatementTimeoutInSeconds):
value, err := strconv.Atoi(parameter.Value)
if err != nil {
return diag.FromErr(err)
}
if err := d.Set(strings.ToLower(parameter.Key), value); err != nil {
return diag.FromErr(err)
}
}
}

return nil
}

// UpdateWarehouse implements schema.UpdateFunc.
func UpdateWarehouse(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
client := meta.(*provider.Context).Client
Expand Down Expand Up @@ -587,12 +645,9 @@ func UpdateWarehouse(ctx context.Context, d *schema.ResourceData, meta any) diag
unset.StatementQueuedTimeoutInSeconds = sdk.Bool(true)
}
}
if d.HasChange("statement_timeout_in_seconds") {
if v := d.Get("statement_timeout_in_seconds").(int); v != -1 {
set.StatementTimeoutInSeconds = sdk.Int(v)
} else {
unset.StatementTimeoutInSeconds = sdk.Bool(true)
}

if updateParamDiags := handleWarehouseParametersChanges(d, &set, &unset); len(updateParamDiags) > 0 {
return updateParamDiags
}

// Apply SET and UNSET changes
Expand All @@ -616,6 +671,14 @@ func UpdateWarehouse(ctx context.Context, d *schema.ResourceData, meta any) diag
return GetReadWarehouseFunc(false)(ctx, d, meta)
}

func handleWarehouseParametersChanges(d *schema.ResourceData, set *sdk.WarehouseSet, unset *sdk.WarehouseUnset) diag.Diagnostics {
return JoinDiags(
handleValuePropertyChange[int](d, "max_concurrency_level", &set.MaxConcurrencyLevel, &unset.MaxConcurrencyLevel),
handleValuePropertyChange[int](d, "statement_queued_timeout_in_seconds", &set.StatementQueuedTimeoutInSeconds, &unset.StatementQueuedTimeoutInSeconds),
handleValuePropertyChange[int](d, "statement_timeout_in_seconds", &set.StatementTimeoutInSeconds, &unset.StatementTimeoutInSeconds),
)
}

// DeleteWarehouse implements schema.DeleteFunc.
func DeleteWarehouse(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
client := meta.(*provider.Context).Client
Expand Down
Loading

0 comments on commit d525fd9

Please sign in to comment.