Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Warehouse redesign part4 #2893

Merged
merged 3 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ test-client: ## runs test that checks sdk.Client without instrumentedsql
SF_TF_NO_INSTRUMENTED_SQL=1 SF_TF_GOSNOWFLAKE_LOG_LEVEL=debug go test ./pkg/sdk/internal/client/... -v

test-acceptance-%: ## run acceptance tests for the given resource only, e.g. test-acceptance-Warehouse
TF_ACC=1 SF_TF_ACC_TEST_CONFIGURE_CLIENT_ONCE=true go test -run ^TestAcc_$*_ -v -timeout=20m ./...
TF_ACC=1 TF_LOG=DEBUG SF_TF_ACC_TEST_CONFIGURE_CLIENT_ONCE=true go test -run ^TestAcc_$*_ -v -timeout=20m ./pkg/resources

build-local: ## build the binary locally
go build -o $(BASE_BINARY_NAME) .
Expand Down
17 changes: 0 additions & 17 deletions pkg/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk"
)
Expand Down Expand Up @@ -138,19 +137,3 @@ func DecodeSnowflakeAccountIdentifier(identifier string) (sdk.AccountIdentifier,
return sdk.AccountIdentifier{}, fmt.Errorf("unable to classify account identifier: %s, expected format: <organization_name>.<account_name>", identifier)
}
}

func Retry(attempts int, sleepDuration time.Duration, f func() (error, bool)) error {
for i := 0; i < attempts; i++ {
err, done := f()
if err != nil {
return err
}
if done {
return nil
} else {
log.Printf("[INFO] operation not finished yet, retrying in %v seconds\n", sleepDuration.Seconds())
time.Sleep(sleepDuration)
}
}
return fmt.Errorf("giving up after %v attempts", attempts)
}
23 changes: 23 additions & 0 deletions pkg/internal/util/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package util

import (
"fmt"
"log"
"time"
)

func Retry(attempts int, sleepDuration time.Duration, f func() (error, bool)) error {
for i := 0; i < attempts; i++ {
err, done := f()
if err != nil {
return err
}
if done {
return nil
} else {
log.Printf("[INFO] operation not finished yet, retrying in %v seconds\n", sleepDuration.Seconds())
time.Sleep(sleepDuration)
}
}
return fmt.Errorf("giving up after %v attempts", attempts)
}
8 changes: 4 additions & 4 deletions pkg/resources/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"strings"
"time"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/util"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
Expand Down Expand Up @@ -289,7 +289,7 @@ func CreateAccount(d *schema.ResourceData, meta interface{}) error {
}

var account *sdk.Account
err = helpers.Retry(5, 3*time.Second, func() (error, bool) {
err = util.Retry(5, 3*time.Second, func() (error, bool) {
account, err = client.Accounts.ShowByID(ctx, objectIdentifier)
if err != nil {
return nil, false
Expand All @@ -313,7 +313,7 @@ func ReadAccount(d *schema.ResourceData, meta interface{}) error {

var acc *sdk.Account
var err error
err = helpers.Retry(5, 3*time.Second, func() (error, bool) {
err = util.Retry(5, 3*time.Second, func() (error, bool) {
acc, err = client.Accounts.ShowByID(ctx, id)
if err != nil {
return nil, false
Expand Down
8 changes: 4 additions & 4 deletions pkg/resources/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"strings"
"time"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/util"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand Down Expand Up @@ -383,7 +383,7 @@ func waitResumeAlert(ctx context.Context, client *sdk.Client, id sdk.SchemaObjec
}
return nil, alert.State == sdk.AlertStateStarted
}
return helpers.Retry(5, 10*time.Second, resumeAlert)
return util.Retry(5, 10*time.Second, resumeAlert)
}

func waitSuspendAlert(ctx context.Context, client *sdk.Client, id sdk.SchemaObjectIdentifier) error {
Expand All @@ -399,5 +399,5 @@ func waitSuspendAlert(ctx context.Context, client *sdk.Client, id sdk.SchemaObje
}
return nil, alert.State == sdk.AlertStateSuspended
}
return helpers.Retry(5, 10*time.Second, suspendAlert)
return util.Retry(5, 10*time.Second, suspendAlert)
}
3 changes: 2 additions & 1 deletion pkg/resources/managed_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/util"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
Expand Down Expand Up @@ -131,7 +132,7 @@ func ReadManagedAccount(d *schema.ResourceData, meta interface{}) error {
// TODO [SNOW-1003380]: discuss it as a provider-wide topic during resources redesign.
var managedAccount *sdk.ManagedAccount
var err error
err = helpers.Retry(5, 3*time.Second, func() (error, bool) {
err = util.Retry(5, 3*time.Second, func() (error, bool) {
managedAccount, err = client.ManagedAccounts.ShowByID(ctx, objectIdentifier)
if err != nil {
return nil, false
Expand Down
6 changes: 3 additions & 3 deletions pkg/resources/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"strconv"
"time"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"

"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/provider"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/util"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand Down Expand Up @@ -388,7 +388,7 @@ func waitForTaskStart(ctx context.Context, client *sdk.Client, id sdk.SchemaObje
if err != nil {
return fmt.Errorf("error starting task %s err = %w", id.FullyQualifiedName(), err)
}
return helpers.Retry(5, 5*time.Second, func() (error, bool) {
return util.Retry(5, 5*time.Second, func() (error, bool) {
task, err := client.Tasks.ShowByID(ctx, id)
if err != nil {
return fmt.Errorf("error starting task %s err = %w", id.FullyQualifiedName(), err), false
Expand Down
113 changes: 88 additions & 25 deletions pkg/resources/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,23 @@ var warehouseSchema = map[string]*schema.Schema{
strings.ToLower(string(sdk.ObjectParameterMaxConcurrencyLevel)): {
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntAtLeast(1),
Description: "Object parameter that specifies the concurrency level for SQL statements (i.e. queries and DML) executed by a warehouse.",
Default: -1,
},
strings.ToLower(string(sdk.ObjectParameterStatementQueuedTimeoutInSeconds)): {
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntAtLeast(0),
Description: "Object parameter that specifies the time, in seconds, a SQL statement (query, DDL, DML, etc.) can be queued on a warehouse before it is canceled by the system.",
Default: -1,
},
strings.ToLower(string(sdk.ObjectParameterStatementTimeoutInSeconds)): {
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntBetween(0, 604800),
Description: "Specifies the time, in seconds, after which a running SQL statement (query, DDL, DML, etc.) is canceled by the system",
Default: -1,
},
showOutputAttributeName: {
Type: schema.TypeList,
Expand All @@ -150,6 +150,29 @@ var warehouseSchema = map[string]*schema.Schema{
},
}

// TODO: merge with DatabaseParametersCustomDiff and extract common
var warehouseParametersCustomDiff = func(ctx context.Context, d *schema.ResourceDiff, meta any) error {
if d.Id() == "" {
return nil
}

client := meta.(*provider.Context).Client
params, err := client.Parameters.ShowParameters(context.Background(), &sdk.ShowParametersOptions{
In: &sdk.ParametersIn{
Warehouse: helpers.DecodeSnowflakeID(d.Id()).(sdk.AccountObjectIdentifier),
},
})
if err != nil {
return err
}

return customdiff.All(
IntParameterValueComputedIf("max_concurrency_level", params, sdk.ParameterTypeWarehouse, sdk.AccountParameterMaxConcurrencyLevel),
IntParameterValueComputedIf("statement_queued_timeout_in_seconds", params, sdk.ParameterTypeWarehouse, sdk.AccountParameterStatementQueuedTimeoutInSeconds),
IntParameterValueComputedIf("statement_timeout_in_seconds", params, sdk.ParameterTypeWarehouse, sdk.AccountParameterStatementTimeoutInSeconds),
)(ctx, d, meta)
}

// Warehouse returns a pointer to the resource representing a warehouse.
func Warehouse() *schema.Resource {
return &schema.Resource{
Expand All @@ -172,6 +195,7 @@ func Warehouse() *schema.Resource {
customdiff.ForceNewIfChange("warehouse_size", func(ctx context.Context, old, new, meta any) bool {
return old.(string) != "" && new.(string) == ""
}),
warehouseParametersCustomDiff,
),

StateUpgraders: []schema.StateUpgrader{
Expand Down Expand Up @@ -299,14 +323,14 @@ func CreateWarehouse(ctx context.Context, d *schema.ResourceData, meta any) diag
if v := d.Get("query_acceleration_max_scale_factor").(int); v != -1 {
createOptions.QueryAccelerationMaxScaleFactor = sdk.Int(v)
}
if v := d.Get("max_concurrency_level").(int); v != -1 {
createOptions.MaxConcurrencyLevel = sdk.Int(v)
if v := GetPropertyAsPointerWithPossibleZeroValues[int](d, "max_concurrency_level"); v != nil {
createOptions.MaxConcurrencyLevel = v
}
if v := d.Get("statement_queued_timeout_in_seconds").(int); v != -1 {
createOptions.StatementQueuedTimeoutInSeconds = sdk.Int(v)
if v := GetPropertyAsPointerWithPossibleZeroValues[int](d, "statement_queued_timeout_in_seconds"); v != nil {
createOptions.StatementQueuedTimeoutInSeconds = v
}
if v := d.Get("statement_timeout_in_seconds").(int); v != -1 {
createOptions.StatementTimeoutInSeconds = sdk.Int(v)
if v := GetPropertyAsPointerWithPossibleZeroValues[int](d, "statement_timeout_in_seconds"); v != nil {
createOptions.StatementTimeoutInSeconds = v
}

err := client.Warehouses.Create(ctx, id, createOptions)
Expand All @@ -318,6 +342,19 @@ func CreateWarehouse(ctx context.Context, d *schema.ResourceData, meta any) diag
return GetReadWarehouseFunc(false)(ctx, d, meta)
}

// TODO: move
func GetPropertyAsPointerWithPossibleZeroValues[T any](d *schema.ResourceData, property string) *T {
if d.GetRawConfig().AsValueMap()[property].IsNull() {
return nil
}
value := d.Get(property)
typedValue, ok := value.(T)
if !ok {
return nil
}
return &typedValue
}

func GetReadWarehouseFunc(withExternalChangesMarking bool) schema.ReadContextFunc {
return func(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
client := meta.(*provider.Context).Client
Expand Down Expand Up @@ -357,16 +394,11 @@ func GetReadWarehouseFunc(withExternalChangesMarking bool) schema.ReadContextFun
showMapping{"auto_suspend", "auto_suspend", w.AutoSuspend, w.AutoSuspend, nil},
showMapping{"auto_resume", "auto_resume", w.AutoResume, fmt.Sprintf("%t", w.AutoResume), nil},
showMapping{"resource_monitor", "resource_monitor", w.ResourceMonitor.Name(), w.ResourceMonitor.Name(), nil},
showMapping{"comment", "comment", w.Comment, w.Comment, nil},
showMapping{"enable_query_acceleration", "enable_query_acceleration", w.EnableQueryAcceleration, fmt.Sprintf("%t", w.EnableQueryAcceleration), nil},
showMapping{"query_acceleration_max_scale_factor", "query_acceleration_max_scale_factor", w.QueryAccelerationMaxScaleFactor, w.QueryAccelerationMaxScaleFactor, nil},
); err != nil {
return diag.FromErr(err)
}

if err = markChangedParameters(sdk.WarehouseParameters, warehouseParameters, d, sdk.ParameterTypeWarehouse); err != nil {
return diag.FromErr(err)
}
}

// These are all identity sets, needed for the case where:
Expand Down Expand Up @@ -416,11 +448,6 @@ func GetReadWarehouseFunc(withExternalChangesMarking bool) schema.ReadContextFun
return diag.FromErr(err)
}
}
if v := d.GetRawConfig().AsValueMap()["comment"]; !v.IsNull() {
if err = d.Set("comment", v.AsString()); err != nil {
return diag.FromErr(err)
}
}
if v := d.GetRawConfig().AsValueMap()["enable_query_acceleration"]; !v.IsNull() {
if err = d.Set("enable_query_acceleration", v.AsString()); err != nil {
return diag.FromErr(err)
Expand All @@ -434,6 +461,17 @@ func GetReadWarehouseFunc(withExternalChangesMarking bool) schema.ReadContextFun
}
}

if err = d.Set("name", w.Name); err != nil {
return diag.FromErr(err)
}
if err = d.Set("comment", w.Comment); err != nil {
return diag.FromErr(err)
}

if diags := handleWarehouseParameterRead(d, warehouseParameters); diags != nil {
return diags
}

if err = d.Set(showOutputAttributeName, []map[string]any{schemas.WarehouseToSchema(w)}); err != nil {
return diag.FromErr(err)
}
Expand All @@ -446,6 +484,26 @@ func GetReadWarehouseFunc(withExternalChangesMarking bool) schema.ReadContextFun
}
}

func handleWarehouseParameterRead(d *schema.ResourceData, warehouseParameters []*sdk.Parameter) diag.Diagnostics {
for _, parameter := range warehouseParameters {
switch parameter.Key {
case
string(sdk.ObjectParameterMaxConcurrencyLevel),
string(sdk.ObjectParameterStatementQueuedTimeoutInSeconds),
string(sdk.ObjectParameterStatementTimeoutInSeconds):
value, err := strconv.Atoi(parameter.Value)
if err != nil {
return diag.FromErr(err)
}
if err := d.Set(strings.ToLower(parameter.Key), value); err != nil {
return diag.FromErr(err)
}
}
}

return nil
}

// UpdateWarehouse implements schema.UpdateFunc.
func UpdateWarehouse(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
client := meta.(*provider.Context).Client
Expand Down Expand Up @@ -587,12 +645,9 @@ func UpdateWarehouse(ctx context.Context, d *schema.ResourceData, meta any) diag
unset.StatementQueuedTimeoutInSeconds = sdk.Bool(true)
}
}
if d.HasChange("statement_timeout_in_seconds") {
if v := d.Get("statement_timeout_in_seconds").(int); v != -1 {
set.StatementTimeoutInSeconds = sdk.Int(v)
} else {
unset.StatementTimeoutInSeconds = sdk.Bool(true)
}

if updateParamDiags := handleWarehouseParametersChanges(d, &set, &unset); len(updateParamDiags) > 0 {
return updateParamDiags
}

// Apply SET and UNSET changes
Expand All @@ -616,6 +671,14 @@ func UpdateWarehouse(ctx context.Context, d *schema.ResourceData, meta any) diag
return GetReadWarehouseFunc(false)(ctx, d, meta)
}

func handleWarehouseParametersChanges(d *schema.ResourceData, set *sdk.WarehouseSet, unset *sdk.WarehouseUnset) diag.Diagnostics {
return JoinDiags(
handleValuePropertyChange[int](d, "max_concurrency_level", &set.MaxConcurrencyLevel, &unset.MaxConcurrencyLevel),
handleValuePropertyChange[int](d, "statement_queued_timeout_in_seconds", &set.StatementQueuedTimeoutInSeconds, &unset.StatementQueuedTimeoutInSeconds),
handleValuePropertyChange[int](d, "statement_timeout_in_seconds", &set.StatementTimeoutInSeconds, &unset.StatementTimeoutInSeconds),
)
}

// DeleteWarehouse implements schema.DeleteFunc.
func DeleteWarehouse(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
client := meta.(*provider.Context).Client
Expand Down
Loading
Loading