Skip to content

Commit

Permalink
feat: redshift serverless (#5144)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Sep 30, 2024
1 parent 5bda381 commit e37b0e9
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 148 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ jobs:
BIGQUERY_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDENTIALS }}
DATABRICKS_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.DATABRICKS_INTEGRATION_TEST_CREDENTIALS }}
REDSHIFT_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.REDSHIFT_INTEGRATION_TEST_CREDENTIALS }}
REDSHIFT_IAM_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.REDSHIFT_IAM_INTEGRATION_TEST_CREDENTIALS }}
REDSHIFT_SERVERLESS_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.REDSHIFT_SERVERLESS_INTEGRATION_TEST_CREDENTIALS }}
REDSHIFT_SERVERLESS_IAM_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.REDSHIFT_SERVERLESS_IAM_INTEGRATION_TEST_CREDENTIALS }}
SNOWFLAKE_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.SNOWFLAKE_INTEGRATION_TEST_CREDENTIALS }}
SNOWFLAKE_RBAC_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.SNOWFLAKE_RBAC_INTEGRATION_TEST_CREDENTIALS }}
SNOWFLAKE_KEYPAIR_ENCRYPTED_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.SNOWFLAKE_KEYPAIR_ENCRYPTED_INTEGRATION_TEST_CREDENTIALS }}
Expand Down
22 changes: 14 additions & 8 deletions warehouse/integrations/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,19 +961,25 @@ func (rs *Redshift) connectUsingIAMRole() (*sql.DB, error) {
user = rs.Warehouse.GetStringDestinationConfig(rs.conf, model.UserSetting)
iamRoleARNForAuth = rs.Warehouse.GetStringDestinationConfig(rs.conf, model.IAMRoleARNForAuthSetting)
clusterID = rs.Warehouse.GetStringDestinationConfig(rs.conf, model.ClusterIDSetting)
useServerless = rs.Warehouse.GetBoolDestinationConfig(model.UseServerlessSetting)
workgroupName = rs.Warehouse.GetStringDestinationConfig(rs.conf, model.WorkgroupNameSetting)
clusterRegion = rs.Warehouse.GetStringDestinationConfig(rs.conf, model.ClusterRegionSetting)
timeout = rs.connectTimeout
)

data := sqlconnectconfig.RedshiftData{
ClusterIdentifier: clusterID,
Database: database,
User: user,
Region: clusterRegion,
RoleARN: iamRoleARNForAuth,
ExternalID: rs.Warehouse.WorkspaceID,
RoleARNExpiry: time.Hour,
Timeout: timeout,
Database: database,
Region: clusterRegion,
RoleARN: iamRoleARNForAuth,
ExternalID: rs.Warehouse.WorkspaceID,
RoleARNExpiry: time.Hour,
Timeout: timeout,
}
if useServerless {
data.WorkgroupName = workgroupName
} else {
data.User = user
data.ClusterIdentifier = clusterID
}

credentialsJSON, err := data.MarshalJSON()
Expand Down
260 changes: 202 additions & 58 deletions warehouse/integrations/redshift/redshift_test.go

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions warehouse/integrations/testhelper/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (
)

const (
WaitFor2Minute = 2 * time.Minute
WaitFor10Minute = 10 * time.Minute
DefaultQueryFrequency = 100 * time.Millisecond
SourceJobQueryFrequency = 1000 * time.Millisecond
WaitFor2Minute = 2 * time.Minute
WaitFor10Minute = 10 * time.Minute
DefaultQueryFrequency = 100 * time.Millisecond
DefaultWarehouseQueryFrequency = 500 * time.Millisecond
SourceJobQueryFrequency = 1000 * time.Millisecond
)

const (
Expand Down
22 changes: 1 addition & 21 deletions warehouse/integrations/testhelper/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ func verifyEventsInStagingFiles(t testing.TB, testConfig *TestConfig) {
testConfig.WorkspaceID, testConfig.SourceID, testConfig.DestinationID,
testConfig.TimestampBeforeSendingEvents,
).Scan(&count)

if err == nil && count.Int64 != expectedCount {
t.Logf("Expected staging files events count is %d, got %d", expectedCount, count.Int64)
}

return err == nil && count.Int64 == expectedCount
}
require.Eventuallyf(t, operation, WaitFor2Minute, DefaultQueryFrequency,
Expand Down Expand Up @@ -106,13 +101,6 @@ func verifyEventsInTableUploads(t testing.TB, testConfig *TestConfig) {
testConfig.WorkspaceID, testConfig.SourceID, testConfig.DestinationID,
testConfig.TimestampBeforeSendingEvents, whutils.ToProviderCase(testConfig.DestinationType, table),
).Scan(&count)

if err == nil && count.Int64 != expectedCount {
t.Logf("Expected table uploads events count for table %q is %d, got %d",
table, expectedCount, count.Int64,
)
}

return err == nil && count.Int64 == expectedCount
}
require.Eventuallyf(t, operation, WaitFor10Minute, DefaultQueryFrequency,
Expand Down Expand Up @@ -161,16 +149,8 @@ func verifyEventsInWareHouse(t testing.TB, testConfig *TestConfig) {
require.Eventuallyf(t,
func() bool {
count, err = queryCount(testConfig.Client, sqlStatement)

if err == nil && count != expectedCount {
t.Logf("Expected %d events in WH (schema: %s, table: %s, userID: %s), got %d", expectedCount,
testConfig.Schema, whutils.ToProviderCase(testConfig.DestinationType, table), testConfig.UserID,
count,
)
}

return err == nil && count == expectedCount
}, WaitFor10Minute, DefaultQueryFrequency,
}, WaitFor10Minute, DefaultWarehouseQueryFrequency,
"Expected %d events in WH (schema: %s, table: %s, userID: %s), got %d: %v",
expectedCount,
testConfig.Schema, whutils.ToProviderCase(testConfig.DestinationType, table), testConfig.UserID,
Expand Down
60 changes: 60 additions & 0 deletions warehouse/internal/model/settings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package model

type DestinationConfigSetting interface {
String() string
protected()
}

type destConfSetting string

func (destConfSetting) protected() {}
func (s destConfSetting) String() string { return string(s) }

var (
PreferAppendSetting DestinationConfigSetting = destConfSetting("preferAppend")
UseRudderStorageSetting DestinationConfigSetting = destConfSetting("useRudderStorage")
SecureSetting DestinationConfigSetting = destConfSetting("secure")
SkipVerifySetting DestinationConfigSetting = destConfSetting("skipVerify")
EnableExternalLocationSetting DestinationConfigSetting = destConfSetting("enableExternalLocation")
UseSTSTokensSetting DestinationConfigSetting = destConfSetting("useSTSTokens")
UseGlueSetting DestinationConfigSetting = destConfSetting("useGlue")
HostSetting DestinationConfigSetting = destConfSetting("host")
DatabaseSetting DestinationConfigSetting = destConfSetting("database")
UserSetting DestinationConfigSetting = destConfSetting("user")
PasswordSetting DestinationConfigSetting = destConfSetting("password")
PortSetting DestinationConfigSetting = destConfSetting("port")
SSLModeSetting DestinationConfigSetting = destConfSetting("sslMode")
ProjectSetting DestinationConfigSetting = destConfSetting("project")
CredentialsSetting DestinationConfigSetting = destConfSetting("credentials")
LocationSetting DestinationConfigSetting = destConfSetting("location")
CACertificateSetting DestinationConfigSetting = destConfSetting("caCertificate")
ClusterSetting DestinationConfigSetting = destConfSetting("cluster")
AWSAccessKeySetting DestinationConfigSetting = destConfSetting("accessKey")
AWSAccessSecretSetting DestinationConfigSetting = destConfSetting("accessKeyID")
AWSBucketNameSetting DestinationConfigSetting = destConfSetting("bucketName")
AWSPrefixSetting DestinationConfigSetting = destConfSetting("prefix")
MinioAccessKeyIDSetting DestinationConfigSetting = destConfSetting("accessKeyID")
MinioSecretAccessKeySetting DestinationConfigSetting = destConfSetting("secretAccessKey")
TimeWindowLayoutSetting DestinationConfigSetting = destConfSetting("timeWindowLayout")
PathSetting DestinationConfigSetting = destConfSetting("path")
TokenSetting DestinationConfigSetting = destConfSetting("token")
CatalogSetting DestinationConfigSetting = destConfSetting("catalog")
ExternalLocationSetting DestinationConfigSetting = destConfSetting("externalLocation")
UseIAMForAuthSetting DestinationConfigSetting = destConfSetting("useIAMForAuth")
IAMRoleARNForAuthSetting DestinationConfigSetting = destConfSetting("iamRoleARNForAuth")
ClusterIDSetting DestinationConfigSetting = destConfSetting("clusterId")
UseServerlessSetting DestinationConfigSetting = destConfSetting("useServerless")
WorkgroupNameSetting DestinationConfigSetting = destConfSetting("workgroupName")
ClusterRegionSetting DestinationConfigSetting = destConfSetting("clusterRegion")
StorageIntegrationSetting DestinationConfigSetting = destConfSetting("storageIntegration")
AccountSetting DestinationConfigSetting = destConfSetting("account")
WarehouseSetting DestinationConfigSetting = destConfSetting("warehouse")
RoleSetting DestinationConfigSetting = destConfSetting("role")
UseKeyPairAuthSetting DestinationConfigSetting = destConfSetting("useKeyPairAuth")
PrivateKeySetting DestinationConfigSetting = destConfSetting("privateKey")
PrivateKeyPassphraseSetting DestinationConfigSetting = destConfSetting("privateKeyPassphrase")
TableSuffixSetting DestinationConfigSetting = destConfSetting("tableSuffix")
SyncFrequencySetting DestinationConfigSetting = destConfSetting("syncFrequency")
SyncStartAtSetting DestinationConfigSetting = destConfSetting("syncStartAt")
ExcludeWindowSetting DestinationConfigSetting = destConfSetting("excludeWindow")
)
57 changes: 0 additions & 57 deletions warehouse/internal/model/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,63 +8,6 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
)

type DestinationConfigSetting interface {
String() string
protected()
}

type destConfSetting string

func (destConfSetting) protected() {}
func (s destConfSetting) String() string { return string(s) }

var (
PreferAppendSetting DestinationConfigSetting = destConfSetting("preferAppend")
UseRudderStorageSetting DestinationConfigSetting = destConfSetting("useRudderStorage")
SecureSetting DestinationConfigSetting = destConfSetting("secure")
SkipVerifySetting DestinationConfigSetting = destConfSetting("skipVerify")
EnableExternalLocationSetting DestinationConfigSetting = destConfSetting("enableExternalLocation")
UseSTSTokensSetting DestinationConfigSetting = destConfSetting("useSTSTokens")
UseGlueSetting DestinationConfigSetting = destConfSetting("useGlue")
HostSetting DestinationConfigSetting = destConfSetting("host")
DatabaseSetting DestinationConfigSetting = destConfSetting("database")
UserSetting DestinationConfigSetting = destConfSetting("user")
PasswordSetting DestinationConfigSetting = destConfSetting("password")
PortSetting DestinationConfigSetting = destConfSetting("port")
SSLModeSetting DestinationConfigSetting = destConfSetting("sslMode")
ProjectSetting DestinationConfigSetting = destConfSetting("project")
CredentialsSetting DestinationConfigSetting = destConfSetting("credentials")
LocationSetting DestinationConfigSetting = destConfSetting("location")
CACertificateSetting DestinationConfigSetting = destConfSetting("caCertificate")
ClusterSetting DestinationConfigSetting = destConfSetting("cluster")
AWSAccessKeySetting DestinationConfigSetting = destConfSetting("accessKey")
AWSAccessSecretSetting DestinationConfigSetting = destConfSetting("accessKeyID")
AWSBucketNameSetting DestinationConfigSetting = destConfSetting("bucketName")
AWSPrefixSetting DestinationConfigSetting = destConfSetting("prefix")
MinioAccessKeyIDSetting DestinationConfigSetting = destConfSetting("accessKeyID")
MinioSecretAccessKeySetting DestinationConfigSetting = destConfSetting("secretAccessKey")
TimeWindowLayoutSetting DestinationConfigSetting = destConfSetting("timeWindowLayout")
PathSetting DestinationConfigSetting = destConfSetting("path")
TokenSetting DestinationConfigSetting = destConfSetting("token")
CatalogSetting DestinationConfigSetting = destConfSetting("catalog")
ExternalLocationSetting DestinationConfigSetting = destConfSetting("externalLocation")
UseIAMForAuthSetting DestinationConfigSetting = destConfSetting("useIAMForAuth")
IAMRoleARNForAuthSetting DestinationConfigSetting = destConfSetting("iamRoleARNForAuth")
ClusterIDSetting DestinationConfigSetting = destConfSetting("clusterId")
ClusterRegionSetting DestinationConfigSetting = destConfSetting("clusterRegion")
StorageIntegrationSetting DestinationConfigSetting = destConfSetting("storageIntegration")
AccountSetting DestinationConfigSetting = destConfSetting("account")
WarehouseSetting DestinationConfigSetting = destConfSetting("warehouse")
RoleSetting DestinationConfigSetting = destConfSetting("role")
UseKeyPairAuthSetting DestinationConfigSetting = destConfSetting("useKeyPairAuth")
PrivateKeySetting DestinationConfigSetting = destConfSetting("privateKey")
PrivateKeyPassphraseSetting DestinationConfigSetting = destConfSetting("privateKeyPassphrase")
TableSuffixSetting DestinationConfigSetting = destConfSetting("tableSuffix")
SyncFrequencySetting DestinationConfigSetting = destConfSetting("syncFrequency")
SyncStartAtSetting DestinationConfigSetting = destConfSetting("syncStartAt")
ExcludeWindowSetting DestinationConfigSetting = destConfSetting("excludeWindow")
)

type Warehouse struct {
WorkspaceID string
Source backendconfig.SourceT
Expand Down

0 comments on commit e37b0e9

Please sign in to comment.