From e37b0e93c277500f81eeef99c2cd37349a2a2159 Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Mon, 30 Sep 2024 10:19:49 +0530 Subject: [PATCH] feat: redshift serverless (#5144) --- .github/workflows/tests.yaml | 3 + warehouse/integrations/redshift/redshift.go | 22 +- .../integrations/redshift/redshift_test.go | 260 ++++++++++++++---- warehouse/integrations/testhelper/setup.go | 9 +- warehouse/integrations/testhelper/verify.go | 22 +- warehouse/internal/model/settings.go | 60 ++++ warehouse/internal/model/warehouse.go | 57 ---- 7 files changed, 285 insertions(+), 148 deletions(-) create mode 100644 warehouse/internal/model/settings.go diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index e6adf207f9..14b29b87e6 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -96,6 +96,9 @@ jobs: BIGQUERY_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDENTIALS }} DATABRICKS_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.DATABRICKS_INTEGRATION_TEST_CREDENTIALS }} REDSHIFT_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.REDSHIFT_INTEGRATION_TEST_CREDENTIALS }} + REDSHIFT_IAM_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.REDSHIFT_IAM_INTEGRATION_TEST_CREDENTIALS }} + REDSHIFT_SERVERLESS_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.REDSHIFT_SERVERLESS_INTEGRATION_TEST_CREDENTIALS }} + REDSHIFT_SERVERLESS_IAM_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.REDSHIFT_SERVERLESS_IAM_INTEGRATION_TEST_CREDENTIALS }} SNOWFLAKE_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.SNOWFLAKE_INTEGRATION_TEST_CREDENTIALS }} SNOWFLAKE_RBAC_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.SNOWFLAKE_RBAC_INTEGRATION_TEST_CREDENTIALS }} SNOWFLAKE_KEYPAIR_ENCRYPTED_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.SNOWFLAKE_KEYPAIR_ENCRYPTED_INTEGRATION_TEST_CREDENTIALS }} diff --git a/warehouse/integrations/redshift/redshift.go b/warehouse/integrations/redshift/redshift.go index c733756eef..db3b7695d6 100644 --- a/warehouse/integrations/redshift/redshift.go +++ b/warehouse/integrations/redshift/redshift.go @@ -961,19 +961,25 @@ func (rs *Redshift) connectUsingIAMRole() (*sql.DB, error) { user = rs.Warehouse.GetStringDestinationConfig(rs.conf, model.UserSetting) iamRoleARNForAuth = rs.Warehouse.GetStringDestinationConfig(rs.conf, model.IAMRoleARNForAuthSetting) clusterID = rs.Warehouse.GetStringDestinationConfig(rs.conf, model.ClusterIDSetting) + useServerless = rs.Warehouse.GetBoolDestinationConfig(model.UseServerlessSetting) + workgroupName = rs.Warehouse.GetStringDestinationConfig(rs.conf, model.WorkgroupNameSetting) clusterRegion = rs.Warehouse.GetStringDestinationConfig(rs.conf, model.ClusterRegionSetting) timeout = rs.connectTimeout ) data := sqlconnectconfig.RedshiftData{ - ClusterIdentifier: clusterID, - Database: database, - User: user, - Region: clusterRegion, - RoleARN: iamRoleARNForAuth, - ExternalID: rs.Warehouse.WorkspaceID, - RoleARNExpiry: time.Hour, - Timeout: timeout, + Database: database, + Region: clusterRegion, + RoleARN: iamRoleARNForAuth, + ExternalID: rs.Warehouse.WorkspaceID, + RoleARNExpiry: time.Hour, + Timeout: timeout, + } + if useServerless { + data.WorkgroupName = workgroupName + } else { + data.User = user + data.ClusterIdentifier = clusterID } credentialsJSON, err := data.MarshalJSON() diff --git a/warehouse/integrations/redshift/redshift_test.go b/warehouse/integrations/redshift/redshift_test.go index 367e0a9f2a..be40d29042 100644 --- a/warehouse/integrations/redshift/redshift_test.go +++ b/warehouse/integrations/redshift/redshift_test.go @@ -14,6 +14,9 @@ import ( "go.uber.org/mock/gomock" + "github.com/rudderlabs/sqlconnect-go/sqlconnect" + sqlconnectconfig "github.com/rudderlabs/sqlconnect-go/sqlconnect/config" + "github.com/lib/pq" "github.com/ory/dockertest/v3" "github.com/stretchr/testify/require" @@ -45,29 +48,34 @@ type testCredentials struct { Host string `json:"host"` Port string `json:"port"` UserName string `json:"userName"` - IAMUserName string `json:"iamUserName"` Password string `json:"password"` - DbName string `json:"dbName"` + Database string `json:"dbName"` BucketName string `json:"bucketName"` AccessKeyID string `json:"accessKeyID"` AccessKey string `json:"accessKey"` IAMRoleARN string `json:"iamRoleARN"` ClusterID string `json:"clusterID"` ClusterRegion string `json:"clusterRegion"` + WorkgroupName string `json:"workgroupName"` } -const testKey = "REDSHIFT_INTEGRATION_TEST_CREDENTIALS" +const ( + testKey = "REDSHIFT_INTEGRATION_TEST_CREDENTIALS" + testIAMKey = "REDSHIFT_IAM_INTEGRATION_TEST_CREDENTIALS" + testServerlessKey = "REDSHIFT_SERVERLESS_INTEGRATION_TEST_CREDENTIALS" + testServerlessIAMKey = "REDSHIFT_SERVERLESS_IAM_INTEGRATION_TEST_CREDENTIALS" +) -func rsTestCredentials() (*testCredentials, error) { - cred, exists := os.LookupEnv(testKey) +func getRedshiftTestCredentials(key string) (*testCredentials, error) { + cred, exists := os.LookupEnv(key) if !exists { - return nil, fmt.Errorf("missing redshift test credentials") + return nil, errors.New("redshift test credentials not found") } var credentials testCredentials err := json.Unmarshal([]byte(cred), &credentials) if err != nil { - return nil, fmt.Errorf("failed to unmarshal redshift test credentials: %w", err) + return nil, fmt.Errorf("unable to marshall %s to redshift test credentials: %v", key, err) } return &credentials, nil } @@ -76,11 +84,16 @@ func TestIntegration(t *testing.T) { if os.Getenv("SLOW") != "1" { t.Skip("Skipping tests. Add 'SLOW=1' env var to run test.") } - if _, exists := os.LookupEnv(testKey); !exists { - if os.Getenv("FORCE_RUN_INTEGRATION_TESTS") == "true" { - t.Fatalf("%s environment variable not set", testKey) + for _, key := range []string{ + testKey, + testIAMKey, + } { + if _, exists := os.LookupEnv(key); !exists { + if os.Getenv("FORCE_RUN_INTEGRATION_TESTS") == "true" { + t.Fatalf("%s environment variable not set", key) + } + t.Skipf("Skipping %s as %s is not set", t.Name(), key) } - t.Skipf("Skipping %s as %s is not set", t.Name(), testKey) } misc.Init() @@ -89,10 +102,29 @@ func TestIntegration(t *testing.T) { destType := whutils.RS - credentials, err := rsTestCredentials() + credentials, err := getRedshiftTestCredentials(testKey) + require.NoError(t, err) + iamCredentials, err := getRedshiftTestCredentials(testIAMKey) require.NoError(t, err) t.Run("Events flow", func(t *testing.T) { + for _, key := range []string{ + testServerlessKey, + testServerlessIAMKey, + } { + if _, exists := os.LookupEnv(key); !exists { + if os.Getenv("FORCE_RUN_INTEGRATION_TESTS") == "true" { + t.Fatalf("%s environment variable not set", key) + } + t.Skipf("Skipping %s as %s is not set", t.Name(), key) + } + } + + serverlessCredentials, err := getRedshiftTestCredentials(testServerlessKey) + require.NoError(t, err) + serverlessIAMCredentials, err := getRedshiftTestCredentials(testServerlessIAMKey) + require.NoError(t, err) + httpPort, err := kithelper.GetFreePort() require.NoError(t, err) @@ -125,6 +157,7 @@ func TestIntegration(t *testing.T) { testcase := []struct { name string + credentials *testCredentials tables []string warehouseEventsMap2 whth.EventsCountMap sourceJob bool @@ -139,6 +172,7 @@ func TestIntegration(t *testing.T) { }{ { name: "Upload Job", + credentials: credentials, tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"}, eventFilePath1: "../testdata/upload-job.events-1.json", eventFilePath2: "../testdata/upload-job.events-2.json", @@ -175,15 +209,16 @@ func TestIntegration(t *testing.T) { }, { name: "IAM Upload Job", + credentials: iamCredentials, tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"}, eventFilePath1: "../testdata/upload-job.events-1.json", eventFilePath2: "../testdata/upload-job.events-2.json", configOverride: map[string]any{ "useIAMForAuth": true, - "user": credentials.IAMUserName, - "iamRoleARNForAuth": credentials.IAMRoleARN, - "clusterId": credentials.ClusterID, - "clusterRegion": credentials.ClusterRegion, + "user": iamCredentials.UserName, + "iamRoleARNForAuth": iamCredentials.IAMRoleARN, + "clusterId": iamCredentials.ClusterID, + "clusterRegion": iamCredentials.ClusterRegion, }, verifySchema: func(t *testing.T, db *sql.DB, namespace string) { t.Helper() @@ -211,8 +246,84 @@ func TestIntegration(t *testing.T) { }, }, { - name: "Append Mode", - tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"}, + name: "Serverless Upload Job", + credentials: serverlessCredentials, + tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"}, + eventFilePath1: "../testdata/upload-job.events-1.json", + eventFilePath2: "../testdata/upload-job.events-2.json", + configOverride: map[string]any{ + "host": serverlessCredentials.Host, + "port": serverlessCredentials.Port, + "user": serverlessCredentials.UserName, + "password": serverlessCredentials.Password, + }, + verifySchema: func(t *testing.T, db *sql.DB, namespace string) { + t.Helper() + schema := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT table_name, column_name, data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE table_schema = '%s';`, namespace)) + require.Equal(t, expectedUploadJobSchema, whth.ConvertRecordsToSchema(schema)) + }, + verifyRecords: func(t *testing.T, db *sql.DB, sourceID, destinationID, namespace, jobRunID, taskRunID string) { + t.Helper() + identifiesRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT %s, %s, context_traits_logins, _as, name, logins, email, original_timestamp, context_ip, context_traits_as, "timestamp", received_at, context_destination_type, sent_at, context_source_type, context_traits_between, context_source_id, context_traits_name, context_request_ip, _between, context_traits_email, context_destination_id, id FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "identifies")) + require.ElementsMatch(t, identifiesRecords, whth.UploadJobIdentifiesRecords(userIDFormat, sourceID, destinationID, destType)) + usersRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_source_id, context_destination_type, context_request_ip, context_traits_name, context_traits_between, _as, logins, sent_at, context_traits_logins, context_ip, _between, context_traits_email, "timestamp", context_destination_id, email, context_traits_as, context_source_type, substring(id from 1 for 9), %s, received_at, name, original_timestamp FROM %q.%q ORDER BY id;`, uuidTSSQL, namespace, "users")) + require.ElementsMatch(t, usersRecords, whth.UploadJobUsersRecords(userIDFormat, sourceID, destinationID, destType)) + tracksRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT original_timestamp, context_destination_id, context_destination_type, %s, context_source_type, "timestamp", id, event, sent_at, context_ip, event_text, context_source_id, context_request_ip, received_at, %s FROM %q.%q ORDER BY id;`, uuidTSSQL, userIDSQL, namespace, "tracks")) + require.ElementsMatch(t, tracksRecords, whth.UploadJobTracksRecords(userIDFormat, sourceID, destinationID, destType)) + productTrackRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT "timestamp", %s, product_id, received_at, context_source_id, sent_at, context_source_type, context_ip, context_destination_type, original_timestamp, context_request_ip, context_destination_id, %s, _as, review_body, _between, review_id, event_text, id, event, rating FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "product_track")) + require.ElementsMatch(t, productTrackRecords, whth.UploadJobProductTrackRecords(userIDFormat, sourceID, destinationID, destType)) + pagesRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT %s, context_source_id, id, title, "timestamp", context_source_type, _as, received_at, context_destination_id, context_ip, context_destination_type, name, original_timestamp, _between, context_request_ip, sent_at, url, %s FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "pages")) + require.ElementsMatch(t, pagesRecords, whth.UploadJobPagesRecords(userIDFormat, sourceID, destinationID, destType)) + screensRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_destination_type, url, context_source_type, title, original_timestamp, %s, _between, context_ip, name, context_request_ip, %s, context_source_id, id, received_at, context_destination_id, "timestamp", sent_at, _as FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "screens")) + require.ElementsMatch(t, screensRecords, whth.UploadJobScreensRecords(userIDFormat, sourceID, destinationID, destType)) + aliasesRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_source_id, context_destination_id, context_ip, sent_at, id, %s, %s, previous_id, original_timestamp, context_source_type, received_at, context_destination_type, context_request_ip, "timestamp" FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "aliases")) + require.ElementsMatch(t, aliasesRecords, whth.UploadJobAliasesRecords(userIDFormat, sourceID, destinationID, destType)) + groupsRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_destination_type, id, _between, plan, original_timestamp, %s, context_source_id, sent_at, %s, group_id, industry, context_request_ip, context_source_type, "timestamp", employees, _as, context_destination_id, received_at, name, context_ip FROM %q.%q ORDER BY id;`, uuidTSSQL, userIDSQL, namespace, "groups")) + require.ElementsMatch(t, groupsRecords, whth.UploadJobGroupsRecords(userIDFormat, sourceID, destinationID, destType)) + }, + }, + { + name: "Serverless IAM Upload Job", + credentials: serverlessIAMCredentials, + tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"}, + eventFilePath1: "../testdata/upload-job.events-1.json", + eventFilePath2: "../testdata/upload-job.events-2.json", + configOverride: map[string]any{ + "useIAMForAuth": true, + "useServerless": true, + "iamRoleARNForAuth": serverlessIAMCredentials.IAMRoleARN, + "workgroupName": serverlessIAMCredentials.WorkgroupName, + "clusterRegion": serverlessIAMCredentials.ClusterRegion, + }, + verifySchema: func(t *testing.T, db *sql.DB, namespace string) { + t.Helper() + schema := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT table_name, column_name, data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE table_schema = '%s';`, namespace)) + require.Equal(t, expectedUploadJobSchema, whth.ConvertRecordsToSchema(schema)) + }, + verifyRecords: func(t *testing.T, db *sql.DB, sourceID, destinationID, namespace, jobRunID, taskRunID string) { + t.Helper() + identifiesRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT %s, %s, context_traits_logins, _as, name, logins, email, original_timestamp, context_ip, context_traits_as, "timestamp", received_at, context_destination_type, sent_at, context_source_type, context_traits_between, context_source_id, context_traits_name, context_request_ip, _between, context_traits_email, context_destination_id, id FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "identifies")) + require.ElementsMatch(t, identifiesRecords, whth.UploadJobIdentifiesRecords(userIDFormat, sourceID, destinationID, destType)) + usersRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_source_id, context_destination_type, context_request_ip, context_traits_name, context_traits_between, _as, logins, sent_at, context_traits_logins, context_ip, _between, context_traits_email, "timestamp", context_destination_id, email, context_traits_as, context_source_type, substring(id from 1 for 9), %s, received_at, name, original_timestamp FROM %q.%q ORDER BY id;`, uuidTSSQL, namespace, "users")) + require.ElementsMatch(t, usersRecords, whth.UploadJobUsersRecords(userIDFormat, sourceID, destinationID, destType)) + tracksRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT original_timestamp, context_destination_id, context_destination_type, %s, context_source_type, "timestamp", id, event, sent_at, context_ip, event_text, context_source_id, context_request_ip, received_at, %s FROM %q.%q ORDER BY id;`, uuidTSSQL, userIDSQL, namespace, "tracks")) + require.ElementsMatch(t, tracksRecords, whth.UploadJobTracksRecords(userIDFormat, sourceID, destinationID, destType)) + productTrackRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT "timestamp", %s, product_id, received_at, context_source_id, sent_at, context_source_type, context_ip, context_destination_type, original_timestamp, context_request_ip, context_destination_id, %s, _as, review_body, _between, review_id, event_text, id, event, rating FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "product_track")) + require.ElementsMatch(t, productTrackRecords, whth.UploadJobProductTrackRecords(userIDFormat, sourceID, destinationID, destType)) + pagesRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT %s, context_source_id, id, title, "timestamp", context_source_type, _as, received_at, context_destination_id, context_ip, context_destination_type, name, original_timestamp, _between, context_request_ip, sent_at, url, %s FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "pages")) + require.ElementsMatch(t, pagesRecords, whth.UploadJobPagesRecords(userIDFormat, sourceID, destinationID, destType)) + screensRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_destination_type, url, context_source_type, title, original_timestamp, %s, _between, context_ip, name, context_request_ip, %s, context_source_id, id, received_at, context_destination_id, "timestamp", sent_at, _as FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "screens")) + require.ElementsMatch(t, screensRecords, whth.UploadJobScreensRecords(userIDFormat, sourceID, destinationID, destType)) + aliasesRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_source_id, context_destination_id, context_ip, sent_at, id, %s, %s, previous_id, original_timestamp, context_source_type, received_at, context_destination_type, context_request_ip, "timestamp" FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "aliases")) + require.ElementsMatch(t, aliasesRecords, whth.UploadJobAliasesRecords(userIDFormat, sourceID, destinationID, destType)) + groupsRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_destination_type, id, _between, plan, original_timestamp, %s, context_source_id, sent_at, %s, group_id, industry, context_request_ip, context_source_type, "timestamp", employees, _as, context_destination_id, received_at, name, context_ip FROM %q.%q ORDER BY id;`, uuidTSSQL, userIDSQL, namespace, "groups")) + require.ElementsMatch(t, groupsRecords, whth.UploadJobGroupsRecords(userIDFormat, sourceID, destinationID, destType)) + }, + }, + { + name: "Append Mode", + credentials: credentials, + tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"}, warehouseEventsMap2: whth.EventsCountMap{ // For all tables except users we will be appending because of: // * preferAppend @@ -257,8 +368,9 @@ func TestIntegration(t *testing.T) { }, }, { - name: "IAM Append Mode", - tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"}, + name: "IAM Append Mode", + credentials: iamCredentials, + tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"}, warehouseEventsMap2: whth.EventsCountMap{ // For all tables except users we will be appending because of: // * preferAppend @@ -273,10 +385,10 @@ func TestIntegration(t *testing.T) { configOverride: map[string]any{ "preferAppend": true, "useIAMForAuth": true, - "user": credentials.IAMUserName, - "iamRoleARNForAuth": credentials.IAMRoleARN, - "clusterId": credentials.ClusterID, - "clusterRegion": credentials.ClusterRegion, + "user": iamCredentials.UserName, + "iamRoleARNForAuth": iamCredentials.IAMRoleARN, + "clusterId": iamCredentials.ClusterID, + "clusterRegion": iamCredentials.ClusterRegion, }, verifySchema: func(t *testing.T, db *sql.DB, namespace string) { t.Helper() @@ -305,6 +417,7 @@ func TestIntegration(t *testing.T) { }, { name: "Undefined preferAppend", + credentials: credentials, tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"}, eventFilePath1: "../testdata/upload-job.events-1.json", eventFilePath2: "../testdata/upload-job.events-1.json", @@ -341,8 +454,9 @@ func TestIntegration(t *testing.T) { }, }, { - name: "Append Users", - tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"}, + name: "Append Users", + credentials: credentials, + tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"}, warehouseEventsMap2: whth.EventsCountMap{ // For all tables except users we will be appending because of: // * preferAppend @@ -395,6 +509,7 @@ func TestIntegration(t *testing.T) { }, { name: "Source Job", + credentials: credentials, tables: []string{"tracks", "google_sheet"}, sourceJob: true, eventFilePath1: "../testdata/source-job.events-1.json", @@ -436,10 +551,10 @@ func TestIntegration(t *testing.T) { destinationBuilder := backendconfigtest.NewDestinationBuilder(destType). WithID(destinationID). WithRevisionID(destinationID). - WithConfigOption("database", credentials.DbName). - WithConfigOption("bucketName", credentials.BucketName). - WithConfigOption("accessKeyID", credentials.AccessKeyID). - WithConfigOption("accessKey", credentials.AccessKey). + WithConfigOption("database", tc.credentials.Database). + WithConfigOption("bucketName", tc.credentials.BucketName). + WithConfigOption("accessKeyID", tc.credentials.AccessKeyID). + WithConfigOption("accessKey", tc.credentials.AccessKey). WithConfigOption("namespace", namespace). WithConfigOption("enableSSE", false). WithConfigOption("useRudderStorage", false). @@ -474,11 +589,40 @@ func TestIntegration(t *testing.T) { whth.BootstrapSvc(t, workspaceConfig, httpPort, jobsDBPort) - dsn := fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", - credentials.UserName, credentials.Password, credentials.Host, credentials.Port, credentials.DbName, + var ( + useIAMForAuth bool + + db *sql.DB ) - db, err := sql.Open("postgres", dsn) - require.NoError(t, err) + if k, ok := tc.configOverride["useIAMForAuth"]; ok { + useIAMForAuth = k.(bool) + } + if useIAMForAuth { + data := sqlconnectconfig.RedshiftData{ + Database: tc.credentials.Database, + Region: tc.credentials.ClusterRegion, + RoleARN: tc.credentials.IAMRoleARN, + ExternalID: workspaceID, + WorkgroupName: tc.credentials.WorkgroupName, + User: tc.credentials.UserName, + ClusterIdentifier: tc.credentials.ClusterID, + RoleARNExpiry: time.Hour, + } + + credentialsJSON, err := data.MarshalJSON() + require.NoError(t, err) + sqlConnectDB, err := sqlconnect.NewDB("redshift", credentialsJSON) + require.NoError(t, err) + + db = sqlConnectDB.SqlDB() + } else { + var err error + dsn := fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", + tc.credentials.UserName, tc.credentials.Password, tc.credentials.Host, tc.credentials.Port, tc.credentials.Database, + ) + db, err = sql.Open("postgres", dsn) + require.NoError(t, err) + } require.NoError(t, db.Ping()) t.Cleanup(func() { _ = db.Close() }) t.Cleanup(func() { @@ -491,9 +635,9 @@ func TestIntegration(t *testing.T) { } conf := map[string]any{ - "bucketName": credentials.BucketName, - "accessKeyID": credentials.AccessKeyID, - "accessKey": credentials.AccessKey, + "bucketName": tc.credentials.BucketName, + "accessKeyID": tc.credentials.AccessKeyID, + "accessKey": tc.credentials.AccessKey, "enableSSE": false, "useRudderStorage": false, } @@ -562,7 +706,7 @@ func TestIntegration(t *testing.T) { iamNamespace := whth.RandSchema(destType) dsn := fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", - credentials.UserName, credentials.Password, credentials.Host, credentials.Port, credentials.DbName, + credentials.UserName, credentials.Password, credentials.Host, credentials.Port, credentials.Database, ) db, err := sql.Open("postgres", dsn) require.NoError(t, err) @@ -582,7 +726,7 @@ func TestIntegration(t *testing.T) { "port": credentials.Port, "user": credentials.UserName, "password": credentials.Password, - "database": credentials.DbName, + "database": credentials.Database, "bucketName": credentials.BucketName, "accessKeyID": credentials.AccessKeyID, "accessKey": credentials.AccessKey, @@ -607,16 +751,16 @@ func TestIntegration(t *testing.T) { destination: backendconfig.DestinationT{ ID: "test_destination_id", Config: map[string]interface{}{ - "user": credentials.IAMUserName, - "database": credentials.DbName, - "bucketName": credentials.BucketName, - "accessKeyID": credentials.AccessKeyID, - "accessKey": credentials.AccessKey, + "user": iamCredentials.UserName, + "database": iamCredentials.Database, + "bucketName": iamCredentials.BucketName, + "accessKeyID": iamCredentials.AccessKeyID, + "accessKey": iamCredentials.AccessKey, "namespace": iamNamespace, "useIAMForAuth": true, - "iamRoleARNForAuth": credentials.IAMRoleARN, - "clusterId": credentials.ClusterID, - "clusterRegion": credentials.ClusterRegion, + "iamRoleARNForAuth": iamCredentials.IAMRoleARN, + "clusterId": iamCredentials.ClusterID, + "clusterRegion": iamCredentials.ClusterRegion, "syncFrequency": "30", "enableSSE": false, "useRudderStorage": false, @@ -650,7 +794,7 @@ func TestIntegration(t *testing.T) { iamNamespace := whth.RandSchema(destType) dsn := fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", - credentials.UserName, credentials.Password, credentials.Host, credentials.Port, credentials.DbName, + credentials.UserName, credentials.Password, credentials.Host, credentials.Port, credentials.Database, ) db, err := sql.Open("postgres", dsn) require.NoError(t, err) @@ -674,7 +818,7 @@ func TestIntegration(t *testing.T) { "port": credentials.Port, "user": credentials.UserName, "password": credentials.Password, - "database": credentials.DbName, + "database": credentials.Database, "bucketName": credentials.BucketName, "accessKeyID": credentials.AccessKeyID, "accessKey": credentials.AccessKey, @@ -706,16 +850,16 @@ func TestIntegration(t *testing.T) { Destination: backendconfig.DestinationT{ ID: "test_destination_id", Config: map[string]interface{}{ - "user": credentials.IAMUserName, - "database": credentials.DbName, - "bucketName": credentials.BucketName, - "accessKeyID": credentials.AccessKeyID, - "accessKey": credentials.AccessKey, + "user": iamCredentials.UserName, + "database": iamCredentials.Database, + "bucketName": iamCredentials.BucketName, + "accessKeyID": iamCredentials.AccessKeyID, + "accessKey": iamCredentials.AccessKey, "namespace": iamNamespace, "useIAMForAuth": true, - "iamRoleARNForAuth": credentials.IAMRoleARN, - "clusterId": credentials.ClusterID, - "clusterRegion": credentials.ClusterRegion, + "iamRoleARNForAuth": iamCredentials.IAMRoleARN, + "clusterId": iamCredentials.ClusterID, + "clusterRegion": iamCredentials.ClusterRegion, "syncFrequency": "30", "enableSSE": false, "useRudderStorage": false, @@ -1320,7 +1464,7 @@ func TestIntegration(t *testing.T) { "port": credentials.Port, "user": credentials.UserName, "password": credentials.Password, - "database": credentials.DbName, + "database": credentials.Database, "bucketName": credentials.BucketName, "accessKeyID": credentials.AccessKeyID, "accessKey": credentials.AccessKey, diff --git a/warehouse/integrations/testhelper/setup.go b/warehouse/integrations/testhelper/setup.go index 47f9a34510..8a2452df13 100644 --- a/warehouse/integrations/testhelper/setup.go +++ b/warehouse/integrations/testhelper/setup.go @@ -32,10 +32,11 @@ import ( ) const ( - WaitFor2Minute = 2 * time.Minute - WaitFor10Minute = 10 * time.Minute - DefaultQueryFrequency = 100 * time.Millisecond - SourceJobQueryFrequency = 1000 * time.Millisecond + WaitFor2Minute = 2 * time.Minute + WaitFor10Minute = 10 * time.Minute + DefaultQueryFrequency = 100 * time.Millisecond + DefaultWarehouseQueryFrequency = 500 * time.Millisecond + SourceJobQueryFrequency = 1000 * time.Millisecond ) const ( diff --git a/warehouse/integrations/testhelper/verify.go b/warehouse/integrations/testhelper/verify.go index 1374afd96a..795e70be13 100644 --- a/warehouse/integrations/testhelper/verify.go +++ b/warehouse/integrations/testhelper/verify.go @@ -54,11 +54,6 @@ func verifyEventsInStagingFiles(t testing.TB, testConfig *TestConfig) { testConfig.WorkspaceID, testConfig.SourceID, testConfig.DestinationID, testConfig.TimestampBeforeSendingEvents, ).Scan(&count) - - if err == nil && count.Int64 != expectedCount { - t.Logf("Expected staging files events count is %d, got %d", expectedCount, count.Int64) - } - return err == nil && count.Int64 == expectedCount } require.Eventuallyf(t, operation, WaitFor2Minute, DefaultQueryFrequency, @@ -106,13 +101,6 @@ func verifyEventsInTableUploads(t testing.TB, testConfig *TestConfig) { testConfig.WorkspaceID, testConfig.SourceID, testConfig.DestinationID, testConfig.TimestampBeforeSendingEvents, whutils.ToProviderCase(testConfig.DestinationType, table), ).Scan(&count) - - if err == nil && count.Int64 != expectedCount { - t.Logf("Expected table uploads events count for table %q is %d, got %d", - table, expectedCount, count.Int64, - ) - } - return err == nil && count.Int64 == expectedCount } require.Eventuallyf(t, operation, WaitFor10Minute, DefaultQueryFrequency, @@ -161,16 +149,8 @@ func verifyEventsInWareHouse(t testing.TB, testConfig *TestConfig) { require.Eventuallyf(t, func() bool { count, err = queryCount(testConfig.Client, sqlStatement) - - if err == nil && count != expectedCount { - t.Logf("Expected %d events in WH (schema: %s, table: %s, userID: %s), got %d", expectedCount, - testConfig.Schema, whutils.ToProviderCase(testConfig.DestinationType, table), testConfig.UserID, - count, - ) - } - return err == nil && count == expectedCount - }, WaitFor10Minute, DefaultQueryFrequency, + }, WaitFor10Minute, DefaultWarehouseQueryFrequency, "Expected %d events in WH (schema: %s, table: %s, userID: %s), got %d: %v", expectedCount, testConfig.Schema, whutils.ToProviderCase(testConfig.DestinationType, table), testConfig.UserID, diff --git a/warehouse/internal/model/settings.go b/warehouse/internal/model/settings.go new file mode 100644 index 0000000000..837589c889 --- /dev/null +++ b/warehouse/internal/model/settings.go @@ -0,0 +1,60 @@ +package model + +type DestinationConfigSetting interface { + String() string + protected() +} + +type destConfSetting string + +func (destConfSetting) protected() {} +func (s destConfSetting) String() string { return string(s) } + +var ( + PreferAppendSetting DestinationConfigSetting = destConfSetting("preferAppend") + UseRudderStorageSetting DestinationConfigSetting = destConfSetting("useRudderStorage") + SecureSetting DestinationConfigSetting = destConfSetting("secure") + SkipVerifySetting DestinationConfigSetting = destConfSetting("skipVerify") + EnableExternalLocationSetting DestinationConfigSetting = destConfSetting("enableExternalLocation") + UseSTSTokensSetting DestinationConfigSetting = destConfSetting("useSTSTokens") + UseGlueSetting DestinationConfigSetting = destConfSetting("useGlue") + HostSetting DestinationConfigSetting = destConfSetting("host") + DatabaseSetting DestinationConfigSetting = destConfSetting("database") + UserSetting DestinationConfigSetting = destConfSetting("user") + PasswordSetting DestinationConfigSetting = destConfSetting("password") + PortSetting DestinationConfigSetting = destConfSetting("port") + SSLModeSetting DestinationConfigSetting = destConfSetting("sslMode") + ProjectSetting DestinationConfigSetting = destConfSetting("project") + CredentialsSetting DestinationConfigSetting = destConfSetting("credentials") + LocationSetting DestinationConfigSetting = destConfSetting("location") + CACertificateSetting DestinationConfigSetting = destConfSetting("caCertificate") + ClusterSetting DestinationConfigSetting = destConfSetting("cluster") + AWSAccessKeySetting DestinationConfigSetting = destConfSetting("accessKey") + AWSAccessSecretSetting DestinationConfigSetting = destConfSetting("accessKeyID") + AWSBucketNameSetting DestinationConfigSetting = destConfSetting("bucketName") + AWSPrefixSetting DestinationConfigSetting = destConfSetting("prefix") + MinioAccessKeyIDSetting DestinationConfigSetting = destConfSetting("accessKeyID") + MinioSecretAccessKeySetting DestinationConfigSetting = destConfSetting("secretAccessKey") + TimeWindowLayoutSetting DestinationConfigSetting = destConfSetting("timeWindowLayout") + PathSetting DestinationConfigSetting = destConfSetting("path") + TokenSetting DestinationConfigSetting = destConfSetting("token") + CatalogSetting DestinationConfigSetting = destConfSetting("catalog") + ExternalLocationSetting DestinationConfigSetting = destConfSetting("externalLocation") + UseIAMForAuthSetting DestinationConfigSetting = destConfSetting("useIAMForAuth") + IAMRoleARNForAuthSetting DestinationConfigSetting = destConfSetting("iamRoleARNForAuth") + ClusterIDSetting DestinationConfigSetting = destConfSetting("clusterId") + UseServerlessSetting DestinationConfigSetting = destConfSetting("useServerless") + WorkgroupNameSetting DestinationConfigSetting = destConfSetting("workgroupName") + ClusterRegionSetting DestinationConfigSetting = destConfSetting("clusterRegion") + StorageIntegrationSetting DestinationConfigSetting = destConfSetting("storageIntegration") + AccountSetting DestinationConfigSetting = destConfSetting("account") + WarehouseSetting DestinationConfigSetting = destConfSetting("warehouse") + RoleSetting DestinationConfigSetting = destConfSetting("role") + UseKeyPairAuthSetting DestinationConfigSetting = destConfSetting("useKeyPairAuth") + PrivateKeySetting DestinationConfigSetting = destConfSetting("privateKey") + PrivateKeyPassphraseSetting DestinationConfigSetting = destConfSetting("privateKeyPassphrase") + TableSuffixSetting DestinationConfigSetting = destConfSetting("tableSuffix") + SyncFrequencySetting DestinationConfigSetting = destConfSetting("syncFrequency") + SyncStartAtSetting DestinationConfigSetting = destConfSetting("syncStartAt") + ExcludeWindowSetting DestinationConfigSetting = destConfSetting("excludeWindow") +) diff --git a/warehouse/internal/model/warehouse.go b/warehouse/internal/model/warehouse.go index 07e3684e64..3cfe020c0e 100644 --- a/warehouse/internal/model/warehouse.go +++ b/warehouse/internal/model/warehouse.go @@ -8,63 +8,6 @@ import ( backendconfig "github.com/rudderlabs/rudder-server/backend-config" ) -type DestinationConfigSetting interface { - String() string - protected() -} - -type destConfSetting string - -func (destConfSetting) protected() {} -func (s destConfSetting) String() string { return string(s) } - -var ( - PreferAppendSetting DestinationConfigSetting = destConfSetting("preferAppend") - UseRudderStorageSetting DestinationConfigSetting = destConfSetting("useRudderStorage") - SecureSetting DestinationConfigSetting = destConfSetting("secure") - SkipVerifySetting DestinationConfigSetting = destConfSetting("skipVerify") - EnableExternalLocationSetting DestinationConfigSetting = destConfSetting("enableExternalLocation") - UseSTSTokensSetting DestinationConfigSetting = destConfSetting("useSTSTokens") - UseGlueSetting DestinationConfigSetting = destConfSetting("useGlue") - HostSetting DestinationConfigSetting = destConfSetting("host") - DatabaseSetting DestinationConfigSetting = destConfSetting("database") - UserSetting DestinationConfigSetting = destConfSetting("user") - PasswordSetting DestinationConfigSetting = destConfSetting("password") - PortSetting DestinationConfigSetting = destConfSetting("port") - SSLModeSetting DestinationConfigSetting = destConfSetting("sslMode") - ProjectSetting DestinationConfigSetting = destConfSetting("project") - CredentialsSetting DestinationConfigSetting = destConfSetting("credentials") - LocationSetting DestinationConfigSetting = destConfSetting("location") - CACertificateSetting DestinationConfigSetting = destConfSetting("caCertificate") - ClusterSetting DestinationConfigSetting = destConfSetting("cluster") - AWSAccessKeySetting DestinationConfigSetting = destConfSetting("accessKey") - AWSAccessSecretSetting DestinationConfigSetting = destConfSetting("accessKeyID") - AWSBucketNameSetting DestinationConfigSetting = destConfSetting("bucketName") - AWSPrefixSetting DestinationConfigSetting = destConfSetting("prefix") - MinioAccessKeyIDSetting DestinationConfigSetting = destConfSetting("accessKeyID") - MinioSecretAccessKeySetting DestinationConfigSetting = destConfSetting("secretAccessKey") - TimeWindowLayoutSetting DestinationConfigSetting = destConfSetting("timeWindowLayout") - PathSetting DestinationConfigSetting = destConfSetting("path") - TokenSetting DestinationConfigSetting = destConfSetting("token") - CatalogSetting DestinationConfigSetting = destConfSetting("catalog") - ExternalLocationSetting DestinationConfigSetting = destConfSetting("externalLocation") - UseIAMForAuthSetting DestinationConfigSetting = destConfSetting("useIAMForAuth") - IAMRoleARNForAuthSetting DestinationConfigSetting = destConfSetting("iamRoleARNForAuth") - ClusterIDSetting DestinationConfigSetting = destConfSetting("clusterId") - ClusterRegionSetting DestinationConfigSetting = destConfSetting("clusterRegion") - StorageIntegrationSetting DestinationConfigSetting = destConfSetting("storageIntegration") - AccountSetting DestinationConfigSetting = destConfSetting("account") - WarehouseSetting DestinationConfigSetting = destConfSetting("warehouse") - RoleSetting DestinationConfigSetting = destConfSetting("role") - UseKeyPairAuthSetting DestinationConfigSetting = destConfSetting("useKeyPairAuth") - PrivateKeySetting DestinationConfigSetting = destConfSetting("privateKey") - PrivateKeyPassphraseSetting DestinationConfigSetting = destConfSetting("privateKeyPassphrase") - TableSuffixSetting DestinationConfigSetting = destConfSetting("tableSuffix") - SyncFrequencySetting DestinationConfigSetting = destConfSetting("syncFrequency") - SyncStartAtSetting DestinationConfigSetting = destConfSetting("syncStartAt") - ExcludeWindowSetting DestinationConfigSetting = destConfSetting("excludeWindow") -) - type Warehouse struct { WorkspaceID string Source backendconfig.SourceT