Skip to content

Commit

Permalink
chore: fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Sep 30, 2024
1 parent dc3b123 commit e19da86
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 289 deletions.
40 changes: 16 additions & 24 deletions warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,20 @@ func (bq *BigQuery) DeleteTable(ctx context.Context, tableName string) (err erro
// only if the partition column exists in the schema.
// Otherwise, it creates a table with ingestion-time partitioning
func (bq *BigQuery) CreateTable(ctx context.Context, tableName string, columnMap model.TableSchema) error {
sampleSchema := getTableSchema(columnMap)
partitionColumn, partitionType := bq.partitionColumn(), bq.partitionType()

log := bq.logger.Withn(
logger.NewStringField(logfield.ProjectID, bq.projectID),
obskit.Namespace(bq.namespace),
logger.NewStringField(logfield.TableName, tableName),
logger.NewStringField("partitionColumn", partitionColumn),
logger.NewStringField("partitionType", partitionType),
)
log.Infon("Creating table")

sampleSchema := getTableSchema(columnMap)

partitionColumn := bq.partitionColumn()
partitionType := bq.partitionType()

var timePartitioning *bigquery.TimePartitioning
if partitionColumn == "" || partitionType == "" {
log.Infon("Creating table: Partition column or partition type not provided, using ingestion-time partitioning")
timePartitioning = &bigquery.TimePartitioning{
Type: bigquery.DayPartitioningType,
}
Expand All @@ -180,6 +180,7 @@ func (bq *BigQuery) CreateTable(ctx context.Context, tableName string, columnMap

// If partition column is _PARTITIONTIME and partition type is not empty, then we only set the partition type
if partitionColumn == "_PARTITIONTIME" {
log.Infon("Creating table: Partition column is _PARTITIONTIME")
timePartitioning = &bigquery.TimePartitioning{
Type: bqPartitionType,
}
Expand All @@ -190,19 +191,13 @@ func (bq *BigQuery) CreateTable(ctx context.Context, tableName string, columnMap
// 3. rudder_identity_mappings: we don't have any column.
_, ok := columnMap[partitionColumn]
if ok {
log.Infon("Creating table: Partition column found in schema",
logger.NewStringField("partitionColumn", partitionColumn),
logger.NewStringField("partitionType", partitionType),
)
log.Infon("Creating table: Partition column found in schema")
timePartitioning = &bigquery.TimePartitioning{
Field: partitionColumn,
Type: bqPartitionType,
}
} else {
log.Warnn("Creating table: Partition column not found in schema",
logger.NewStringField("partitionColumn", partitionColumn),
logger.NewStringField("partitionType", partitionType),
)
log.Infon("Creating table: Partition column not found in schema, using ingestion-time partitioning")
timePartitioning = &bigquery.TimePartitioning{
Type: bigquery.DayPartitioningType,
}
Expand Down Expand Up @@ -244,16 +239,13 @@ func (bq *BigQuery) createTableView(ctx context.Context, tableName string, colum
}

var (
columnToQuery string
granularity string
granularity, partitionFilter string
partitionColumn, partitionType = bq.partitionColumn(), bq.partitionType()
)

partitionColumn := bq.partitionColumn()
partitionType := bq.partitionType()

if partitionColumn == "" || partitionType == "" {
columnToQuery = "_PARTITIONTIME"
granularity = "DAY"
partitionFilter = "_PARTITIONTIME"
} else {
if err := bq.checkValidPartitionColumn(partitionColumn); err != nil {
return fmt.Errorf("check valid partition column: %w", err)

Check warning on line 251 in warehouse/integrations/bigquery/bigquery.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/bigquery/bigquery.go#L251

Added line #L251 was not covered by tests
Expand All @@ -264,22 +256,22 @@ func (bq *BigQuery) createTableView(ctx context.Context, tableName string, colum
}

if partitionColumn == "_PARTITIONTIME" {
columnToQuery = "_PARTITIONTIME"
granularity = string(bqPartitionType)
partitionFilter = "_PARTITIONTIME"
} else {
_, ok := columnMap[partitionColumn]
if ok {
bq.logger.Infon("Creating view: Partition column found in schema",
logger.NewStringField("partitionColumn", partitionColumn),
)
columnToQuery = partitionColumn
granularity = string(bqPartitionType)
partitionFilter = `TIMESTAMP_TRUNC(` + partitionColumn + `, ` + granularity + `, 'UTC')`
} else {
bq.logger.Warnn("Creating view: Partition column not found in schema",
logger.NewStringField("partitionColumn", partitionColumn),
)
columnToQuery = "_PARTITIONTIME"
granularity = "DAY"
partitionFilter = "_PARTITIONTIME"
}
}
}
Expand All @@ -296,7 +288,7 @@ func (bq *BigQuery) createTableView(ctx context.Context, tableName string, colum
SELECT *, ROW_NUMBER() OVER (PARTITION BY ` + partitionKey + viewOrderByStmt + `) AS __row_number
FROM ` + "`" + bq.projectID + "." + bq.namespace + "." + tableName + "`" + `
WHERE
` + columnToQuery + ` BETWEEN TIMESTAMP_TRUNC(
` + partitionFilter + ` BETWEEN TIMESTAMP_TRUNC(
TIMESTAMP_MICROS(UNIX_MICROS(CURRENT_TIMESTAMP()) - 60 * 60 * 24 * 60 * 1000000),
` + granularity + `,
'UTC'
Expand Down
Loading

0 comments on commit e19da86

Please sign in to comment.