Skip to content

Commit

Permalink
chore(warehouse): warehouse formatting changes (#2568)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Oct 21, 2022
1 parent 854e359 commit 888f6f8
Show file tree
Hide file tree
Showing 32 changed files with 605 additions and 585 deletions.
64 changes: 32 additions & 32 deletions warehouse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,11 @@ func (tableUploadReq TableUploadReqT) GetWhTableUploads() ([]*proto.WHTable, err

func (tableUploadReq TableUploadReqT) generateQuery(selectFields string) string {
query := fmt.Sprintf(`
SELECT
%s
FROM
%s
WHERE
SELECT
%s
FROM
%s
WHERE
wh_upload_id = %d
`,
selectFields,
Expand All @@ -453,11 +453,11 @@ func (tableUploadReq TableUploadReqT) validateReq() error {

func (uploadReq UploadReqT) generateQuery(selectedFields string) string {
return fmt.Sprintf(`
SELECT
%s
FROM
%s
WHERE
SELECT
%s
FROM
%s
WHERE
id = %d
`,
selectedFields,
Expand Down Expand Up @@ -500,7 +500,7 @@ func (uploadsReq UploadsReqT) authorizedSources() (sourceIDs []string) {
return sourceIDs
}

func (uploadsReq *UploadsReqT) getUploadsFromDb(isMultiWorkspace bool, query string) ([]*proto.WHUploadResponse, int32, error) {
func (uploadsReq *UploadsReqT) getUploadsFromDB(isMultiWorkspace bool, query string) ([]*proto.WHUploadResponse, int32, error) {
var totalUploadCount int32
var err error
uploads := make([]*proto.WHUploadResponse, 0)
Expand Down Expand Up @@ -605,9 +605,9 @@ func (uploadsReq *UploadsReqT) getUploadsFromDb(isMultiWorkspace bool, query str
func (uploadsReq *UploadsReqT) getTotalUploadCount(whereClause string) (int32, error) {
var totalUploadCount int32
query := fmt.Sprintf(`
select
count(*)
from
select
count(*)
from
%s
`,
warehouseutils.WarehouseUploadsTable,
Expand All @@ -627,11 +627,11 @@ func (uploadsReq *UploadsReqT) warehouseUploadsForHosted(authorizedSourceIDs []s

// create query
subQuery := fmt.Sprintf(`
SELECT
%s,
COUNT(*) OVER() AS total_uploads
FROM
%s
SELECT
%s,
COUNT(*) OVER() AS total_uploads
FROM
%s
WHERE
`,
selectFields,
Expand All @@ -655,13 +655,13 @@ func (uploadsReq *UploadsReqT) warehouseUploadsForHosted(authorizedSourceIDs []s

subQuery = subQuery + strings.Join(whereClauses, " AND ")
query := fmt.Sprintf(`
SELECT
*
FROM
(%s) p
ORDER BY
id DESC
LIMIT
SELECT
*
FROM
(%s) p
ORDER BY
id DESC
LIMIT
%d OFFSET %d
`,
subQuery,
Expand All @@ -671,7 +671,7 @@ func (uploadsReq *UploadsReqT) warehouseUploadsForHosted(authorizedSourceIDs []s
uploadsReq.API.log.Info(query)

// get uploads from db
uploads, totalUploadCount, err = uploadsReq.getUploadsFromDb(true, query)
uploads, totalUploadCount, err = uploadsReq.getUploadsFromDB(true, query)
if err != nil {
uploadsReq.API.log.Errorf(err.Error())
return &proto.WHUploadsResponse{}, err
Expand All @@ -696,9 +696,9 @@ func (uploadsReq *UploadsReqT) warehouseUploads(selectFields string) (uploadsRes

// create query
query := fmt.Sprintf(`
select
%s
from
select
%s
from
%s
`,
selectFields,
Expand Down Expand Up @@ -728,7 +728,7 @@ func (uploadsReq *UploadsReqT) warehouseUploads(selectFields string) (uploadsRes
// we get uploads for non hosted workspaces in two steps
// this is because getting this info via 2 queries is faster than getting it via one query(using the 'count(*) OVER()' clause)
// step1 - get all uploads
uploads, _, err = uploadsReq.getUploadsFromDb(false, query)
uploads, _, err = uploadsReq.getUploadsFromDB(false, query)
if err != nil {
uploadsReq.API.log.Errorf(err.Error())
return &proto.WHUploadsResponse{}, err
Expand Down Expand Up @@ -830,7 +830,7 @@ func getFileManagerSettings(provider string, inputConfig map[string]interface{})
return settings
}

// overrideWithEnv overrides the config keys in the filemanager settings
// overrideWithEnv overrides the config keys in the fileManager settings
// with fallback values pulled from env. Only supported for S3 for now.
func overrideWithEnv(settings *filemanager.SettingsT) {
envConfig := filemanager.GetProviderConfigFromEnv(context.TODO(), settings.Provider)
Expand Down
14 changes: 7 additions & 7 deletions warehouse/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func connect(cred credentialsT) (*sql.DB, error) {
// url := fmt.Sprintf("server=%s;user id=%s;password=%s;port=%s;database=%s;encrypt=%s;TrustServerCertificate=true", cred.host, cred.user, cred.password, cred.port, cred.dbName, cred.sslMode)
// Encryption options : disable, false, true. https://github.com/denisenkom/go-mssqldb
// TrustServerCertificate=true ; all options(disable, false, true) work with this
// if rds.forcessl=1; disable option doesnt work. true, false works alongside TrustServerCertificate=true
// if rds.forcessl=1; disable option doesn't work. true, false works alongside TrustServerCertificate=true
// https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/SQLServer.Concepts.General.SSL.Using.html
// more combination explanations here: https://docs.microsoft.com/en-us/sql/connect/odbc/linux-mac/connection-string-keywords-and-data-source-names-dsns?view=sql-server-ver15
query := url.Values{}
Expand Down Expand Up @@ -182,7 +182,7 @@ func (as *HandleT) DownloadLoadFiles(tableName string) ([]string, error) {
}),
})
if err != nil {
pkgLogger.Errorf("AZ: Error in setting up a downloader for destionationID : %s Error : %v", as.Warehouse.Destination.ID, err)
pkgLogger.Errorf("AZ: Error in setting up a downloader for destinationID : %s Error : %v", as.Warehouse.Destination.ID, err)
return nil, err
}
var fileNames []string
Expand Down Expand Up @@ -232,7 +232,7 @@ func (as *HandleT) loadTable(tableName string, tableSchemaInUpload warehouseutil
sortedColumnKeys := warehouseutils.SortColumnKeysFromColumnMap(tableSchemaInUpload)
sortedColumnString := strings.Join(sortedColumnKeys, ", ")

extraColumns := []string{}
var extraColumns []string
for _, column := range previousColumnKeys {
if !misc.Contains(sortedColumnKeys, column) {
extraColumns = append(extraColumns, column)
Expand Down Expand Up @@ -305,7 +305,7 @@ func (as *HandleT) loadTable(tableName string, tableSchemaInUpload warehouseutil
return
}
if len(sortedColumnKeys) != len(record) {
err = fmt.Errorf(`Load file CSV columns for a row mismatch number found in upload schema. Columns in CSV row: %d, Columns in upload schema of table-%s: %d. Processed rows in csv file until mismatch: %d`, len(record), tableName, len(sortedColumnKeys), csvRowsProcessedCount)
err = fmt.Errorf(`load file CSV columns for a row mismatch number found in upload schema. Columns in CSV row: %d, Columns in upload schema of table-%s: %d. Processed rows in csv file until mismatch: %d`, len(record), tableName, len(sortedColumnKeys), csvRowsProcessedCount)
pkgLogger.Error(err)
txn.Rollback()
return
Expand Down Expand Up @@ -517,7 +517,7 @@ func (as *HandleT) loadUserTables() (errorMap map[string]error) {
order by X.received_at desc
)
end as %[1]s`, colName, as.Namespace+"."+unionStagingTableName)
// IGNORE NULLS only supported in Azure SQL edge, in which case the query can be shortedened to below
// IGNORE NULLS only supported in Azure SQL edge, in which case the query can be shortened to below
// https://docs.microsoft.com/en-us/sql/t-sql/functions/first-value-transact-sql?view=sql-server-ver15
// caseSubQuery := fmt.Sprintf(`FIRST_VALUE(%[1]s) IGNORE NULLS OVER (PARTITION BY id ORDER BY received_at DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS "%[1]s"`, colName)
firstValProps = append(firstValProps, caseSubQuery)
Expand Down Expand Up @@ -641,7 +641,7 @@ func (as *HandleT) addColumn(tableName, columnName, columnType string) (err erro
}

func (as *HandleT) CreateTable(tableName string, columnMap map[string]string) (err error) {
// Search paths doesnt exist unlike Postgres, default is dbo. Hence use namespace wherever possible
// Search paths doesn't exist unlike Postgres, default is dbo. Hence, use namespace wherever possible
err = as.createTable(as.Namespace+"."+tableName, columnMap)
return err
}
Expand Down Expand Up @@ -731,7 +731,7 @@ func (as *HandleT) dropDanglingStagingTables() bool {
var tableName string
err := rows.Scan(&tableName)
if err != nil {
panic(fmt.Errorf("Failed to scan result from query: %s\nwith Error : %w", sqlStatement, err))
panic(fmt.Errorf("failed to scan result from query: %s\nwith Error : %w", sqlStatement, err))
}
stagingTableNames = append(stagingTableNames, tableName)
}
Expand Down
11 changes: 5 additions & 6 deletions warehouse/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (bq *HandleT) dropStagingTable(stagingTableName string) {
}

func (bq *HandleT) DeleteBy(tableNames []string, params warehouseutils.DeleteByParams) error {
pkgLogger.Infof("BQ: Cleaning up the followng tables in bigquery for BQ:%s : %v", tableNames)
pkgLogger.Infof("BQ: Cleaning up the following tables in bigquery for BQ:%s : %v", tableNames)
for _, tb := range tableNames {
sqlStatement := fmt.Sprintf(`DELETE FROM "%[1]s"."%[2]s" WHERE
context_sources_job_run_id <> @jobrunid AND
Expand Down Expand Up @@ -318,7 +318,7 @@ func (bq *HandleT) loadTable(tableName string, _, getLoadFileLocFromTableUploads
loadTableByAppend := func() (err error) {
stagingLoadTable.partitionDate = time.Now().Format("2006-01-02")
outputTable := tableName
// Tables created by Rudderstack are ingestion-time partitioned table with pseudocolumn named _PARTITIONTIME. BigQuery automatically assigns rows to partitions based
// Tables created by RudderStack are ingestion-time partitioned table with pseudo column named _PARTITIONTIME. BigQuery automatically assigns rows to partitions based
// on the time when BigQuery ingests the data. To support custom field partitions, omitting loading into partitioned table like tableName$20191221
// TODO: Support custom field partition on users & identifies tables
if !customPartitionsEnabled {
Expand Down Expand Up @@ -648,7 +648,7 @@ type BQCredentialsT struct {
}

func Connect(context context.Context, cred *BQCredentialsT) (*bigquery.Client, error) {
opts := []option.ClientOption{}
var opts []option.ClientOption
if !googleutils.ShouldSkipCredentialsInit(cred.Credentials) {
credBytes := []byte(cred.Credentials)
if err := googleutils.CompatibleGoogleCredentialsJSON(credBytes); err != nil {
Expand All @@ -670,7 +670,7 @@ func (bq *HandleT) connect(cred BQCredentialsT) (*bigquery.Client, error) {
func loadConfig() {
config.RegisterBoolConfigVariable(true, &setUsersLoadPartitionFirstEventFilter, true, "Warehouse.bigquery.setUsersLoadPartitionFirstEventFilter")
config.RegisterBoolConfigVariable(false, &customPartitionsEnabled, true, "Warehouse.bigquery.customPartitionsEnabled")
config.RegisterBoolConfigVariable(false, &isUsersTableDedupEnabled, true, "Warehouse.bigquery.isUsersTableDedupEnabled") // TODO: Depricate with respect to isDedupEnabled
config.RegisterBoolConfigVariable(false, &isUsersTableDedupEnabled, true, "Warehouse.bigquery.isUsersTableDedupEnabled") // TODO: Deprecate with respect to isDedupEnabled
config.RegisterBoolConfigVariable(false, &isDedupEnabled, true, "Warehouse.bigquery.isDedupEnabled")
config.RegisterBoolConfigVariable(false, &enableDeleteByJobs, true, "Warehouse.bigquery.enableDeleteByJobs")
}
Expand Down Expand Up @@ -841,15 +841,14 @@ func (*HandleT) AlterColumn(_, _, _ string) (err error) {
return
}

// FetchSchema queries bigquery and returns the schema assoiciated with provided namespace
// FetchSchema queries bigquery and returns the schema associated with provided namespace
func (bq *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema warehouseutils.SchemaT, err error) {
bq.warehouse = warehouse
bq.namespace = warehouse.Namespace
bq.projectID = strings.TrimSpace(warehouseutils.GetConfigValue(GCPProjectID, bq.warehouse))
dbClient, err := bq.connect(BQCredentialsT{
ProjectID: bq.projectID,
Credentials: warehouseutils.GetConfigValue(GCPCredentials, bq.warehouse),
// location: warehouseutils.GetConfigValue(GCPLocation, bq.warehouse),
})
if err != nil {
return
Expand Down
14 changes: 9 additions & 5 deletions warehouse/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,10 @@ func Connect(cred CredentialsT, includeDBInConn bool) (*sql.DB, error) {
url += fmt.Sprintf("&timeout=%d", cred.timeout/time.Second)
}

var err error
var db *sql.DB
var (
err error
db *sql.DB
)

if db, err = sql.Open("clickhouse", url); err != nil {
return nil, fmt.Errorf("clickhouse connection error : (%v)", err)
Expand Down Expand Up @@ -352,9 +354,11 @@ func (ch *HandleT) DownloadLoadFiles(tableName string) ([]string, error) {
pkgLogger.Errorf("%s Error in setting up a downloader with Error: %v", ch.GetLogIdentifier(tableName, storageProvider), err)
return nil, err
}
var fileNames []string
var dErr error
var fileNamesLock sync.RWMutex
var (
fileNames []string
dErr error
fileNamesLock sync.RWMutex
)

jobs := make([]misc.RWCJob, 0)
for _, object := range objects {
Expand Down
7 changes: 3 additions & 4 deletions warehouse/datalake/schema-repository/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import (
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var (
// config
UseGlueConfig = "useGlue"
var UseGlueConfig = "useGlue"

// glue
// glue specific config
var (
glueSerdeName = "ParquetHiveSerDe"
glueSerdeSerializationLib = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
glueParquetInputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
Expand Down
8 changes: 4 additions & 4 deletions warehouse/datalake/schema-repository/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (ls *LocalSchemaRepository) CreateTable(tableName string, columnMap map[str
}

if _, ok := schema[tableName]; ok {
return fmt.Errorf("Failed to create table: table %s already exists", tableName)
return fmt.Errorf("failed to create table: table %s already exists", tableName)
}

// add table to schema
Expand All @@ -59,7 +59,7 @@ func (ls *LocalSchemaRepository) AddColumn(tableName, columnName, columnType str

// check if table exists
if _, ok := schema[tableName]; !ok {
return fmt.Errorf("Failed to add column: table %s does not exist", tableName)
return fmt.Errorf("failed to add column: table %s does not exist", tableName)
}

schema[tableName][columnName] = columnType
Expand All @@ -77,12 +77,12 @@ func (ls *LocalSchemaRepository) AlterColumn(tableName, columnName, columnType s

// check if table exists
if _, ok := schema[tableName]; !ok {
return fmt.Errorf("Failed to add column: table %s does not exist", tableName)
return fmt.Errorf("failed to add column: table %s does not exist", tableName)
}

// check if column exists
if _, ok := schema[tableName][columnName]; !ok {
return fmt.Errorf("Failed to alter column: column %s does not exist in table %s", columnName, tableName)
return fmt.Errorf("failed to alter column: column %s does not exist in table %s", columnName, tableName)
}

schema[tableName][columnName] = columnType
Expand Down
2 changes: 1 addition & 1 deletion warehouse/deltalake/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package deltalake
import (
"fmt"

"github.com/rudderlabs/rudder-server/warehouse/utils"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func primaryKey(tableName string) string {
Expand Down
Loading

0 comments on commit 888f6f8

Please sign in to comment.