Skip to content

Commit

Permalink
feat(warehouse): added support for bigquery custom partition for work…
Browse files Browse the repository at this point in the history
…spaceIDs (#2679)
  • Loading branch information
achettyiitr authored Nov 11, 2022
1 parent d673640 commit af2b5a7
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 62 deletions.
19 changes: 19 additions & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"time"

"golang.org/x/exp/slices"

"github.com/fsnotify/fsnotify"
"github.com/joho/godotenv"
"github.com/spf13/viper"
Expand Down Expand Up @@ -161,6 +163,23 @@ func (c *Config) checkAndHotReloadConfig(configMap map[string][]*configValue) {
fmt.Printf("The value of key:%s & variable:%p changed from %v to %v\n", key, configVal, *value, _value)
*value = _value
}
case *[]string:
var _value []string
var isSet bool
for _, key := range configVal.keys {
if c.IsSet(key) {
isSet = true
_value = c.GetStringSlice(key, configVal.defaultValue.([]string))
break
}
}
if !isSet {
_value = configVal.defaultValue.([]string)
}
if slices.Compare(_value, *value) != 0 {
fmt.Printf("The value of key:%s & variable:%p changed from %v to %v\n", key, configVal, *value, _value)
*value = _value
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect
golang.org/x/exp v0.0.0-20221109205753-fc8884afc316
golang.org/x/sys v0.1.0 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1093,8 +1093,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/exp v0.0.0-20221109205753-fc8884afc316 h1:FedCSp0+vayF11p3wAQndIgu+JTcW2nLp5M+HSefjlM=
golang.org/x/exp v0.0.0-20221109205753-fc8884afc316/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
Expand Down Expand Up @@ -1275,8 +1275,8 @@ golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI=
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
Expand Down
10 changes: 7 additions & 3 deletions warehouse/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"time"

"cloud.google.com/go/bigquery"

"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/utils/googleutils"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/client"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"golang.org/x/exp/slices"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
Expand All @@ -26,6 +28,7 @@ var (
isUsersTableDedupEnabled bool
isDedupEnabled bool
enableDeleteByJobs bool
customPartitionsEnabledWorkspaceIDs []string
)

type HandleT struct {
Expand Down Expand Up @@ -300,12 +303,12 @@ func (bq *HandleT) loadTable(tableName string, _, getLoadFileLocFromTableUploads

loadTableByAppend := func() (err error) {
stagingLoadTable.partitionDate = time.Now().Format("2006-01-02")
outputTable := tableName
outputTable := partitionedTable(tableName, stagingLoadTable.partitionDate)
// Tables created by RudderStack are ingestion-time partitioned table with pseudo column named _PARTITIONTIME. BigQuery automatically assigns rows to partitions based
// on the time when BigQuery ingests the data. To support custom field partitions, omitting loading into partitioned table like tableName$20191221
// TODO: Support custom field partition on users & identifies tables
if !customPartitionsEnabled {
outputTable = partitionedTable(tableName, stagingLoadTable.partitionDate)
if customPartitionsEnabled || slices.Contains(customPartitionsEnabledWorkspaceIDs, bq.warehouse.WorkspaceID) {
outputTable = tableName
}

loader := bq.db.Dataset(bq.namespace).Table(outputTable).LoaderFrom(gcsRef)
Expand Down Expand Up @@ -670,6 +673,7 @@ func loadConfig() {
config.RegisterBoolConfigVariable(false, &isUsersTableDedupEnabled, true, "Warehouse.bigquery.isUsersTableDedupEnabled") // TODO: Deprecate with respect to isDedupEnabled
config.RegisterBoolConfigVariable(false, &isDedupEnabled, true, "Warehouse.bigquery.isDedupEnabled")
config.RegisterBoolConfigVariable(false, &enableDeleteByJobs, true, "Warehouse.bigquery.enableDeleteByJobs")
config.RegisterStringSliceConfigVariable(nil, &customPartitionsEnabledWorkspaceIDs, true, "Warehouse.bigquery.customPartitionsEnabledWorkspaceIDs")
}

func Init() {
Expand Down
156 changes: 103 additions & 53 deletions warehouse/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,15 @@ import (
"github.com/stretchr/testify/assert"
)

type TestHandle struct {
DB *bigquery.Client
WriteKey string
Schema string
Tables []string
}

var handle *TestHandle
type TestHandle struct{}

func (*TestHandle) VerifyConnection() error {
credentials, err := testhelper.BigqueryCredentials()
if err != nil {
return err
}
return testhelper.WithConstantBackoff(func() (err error) {
handle.DB, err = bigquery2.Connect(context.TODO(), &credentials)
_, err = bigquery2.Connect(context.TODO(), &credentials)
if err != nil {
err = fmt.Errorf("could not connect to warehouse bigquery with error: %s", err.Error())
return
Expand All @@ -50,10 +43,23 @@ func (*TestHandle) VerifyConnection() error {
}

func TestBigQueryIntegration(t *testing.T) {
credentials, err := testhelper.BigqueryCredentials()
require.NoError(t, err)

var (
schema = testhelper.Schema(warehouseutils.BQ, testhelper.BigqueryIntegrationTestSchema)
writeKey = "J77aX7tLFJ84qYU6UrN8ctecwZt"
tables = []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "_groups", "groups"}
db *bigquery.Client
)

db, err = bigquery2.Connect(context.TODO(), &credentials)
require.NoError(t, err)

t.Cleanup(func() {
require.NoError(t, testhelper.WithConstantBackoff(func() (err error) {
return handle.DB.Dataset(handle.Schema).DeleteWithContents(context.TODO())
}), fmt.Sprintf("Failed dropping dataset %s for BigQuery", handle.Schema))
return db.Dataset(schema).DeleteWithContents(context.TODO())
}), fmt.Sprintf("Failed dropping dataset %s for BigQuery", schema))
})

t.Run("Merge Mode", func(t *testing.T) {
Expand All @@ -66,12 +72,12 @@ func TestBigQueryIntegration(t *testing.T) {

warehouseTest := &testhelper.WareHouseTest{
Client: &client.Client{
BQ: handle.DB,
BQ: db,
Type: client.BQClient,
},
WriteKey: handle.WriteKey,
Schema: handle.Schema,
Tables: handle.Tables,
WriteKey: writeKey,
Schema: schema,
Tables: tables,
MessageId: uuid.Must(uuid.NewV4()).String(),
Provider: warehouseutils.BQ,
SourceID: "24p1HhPk09FW25Kuzxv7GshCLKR",
Expand Down Expand Up @@ -112,43 +118,92 @@ func TestBigQueryIntegration(t *testing.T) {
})

t.Run("Append Mode", func(t *testing.T) {
require.NoError(t, testhelper.SetConfig([]warehouseutils.KeyValue{
testCases := []struct {
name string
customPartitionsEnabledWorkspaceIDs []string
prerequisite func(t *testing.T)
}{
{
Key: "Warehouse.bigquery.isDedupEnabled",
Value: false,
name: "Append mode without custom partitions",
},
}))

warehouseTest := &testhelper.WareHouseTest{
Client: &client.Client{
BQ: handle.DB,
Type: client.BQClient,
{
name: "Append mode with custom partitions",
customPartitionsEnabledWorkspaceIDs: []string{"BpLnfgDsc2WD8F2qNfHK5a84jjJ"},
prerequisite: func(t *testing.T) {
err = db.Dataset(schema).Create(context.Background(), &bigquery.DatasetMetadata{
Location: "US",
})
require.NoError(t, err)

err = db.Dataset(schema).Table("tracks").Create(
context.Background(),
&bigquery.TableMetadata{
Schema: []*bigquery.FieldSchema{{
Name: "timestamp",
Type: bigquery.TimestampFieldType,
}},
TimePartitioning: &bigquery.TimePartitioning{
Field: "timestamp",
},
})
require.NoError(t, err)
},
},
WriteKey: handle.WriteKey,
Schema: handle.Schema,
Tables: handle.Tables,
MessageId: uuid.Must(uuid.NewV4()).String(),
Provider: warehouseutils.BQ,
SourceID: "24p1HhPk09FW25Kuzxv7GshCLKR",
DestinationID: "26Bgm9FrQDZjvadSwAlpd35atwn",
}

// Scenario 1
warehouseTest.TimestampBeforeSendingEvents = timeutil.Now()
warehouseTest.UserId = testhelper.GetUserId(warehouseutils.BQ)

sendEventsMap := testhelper.SendEventsMap()
testhelper.SendEvents(t, warehouseTest, sendEventsMap)
testhelper.SendIntegratedEvents(t, warehouseTest, sendEventsMap)
testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap)
testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap)

testhelper.VerifyEventsInStagingFiles(t, warehouseTest, stagingFilesEventsMap())
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap())
testhelper.VerifyEventsInWareHouse(t, warehouseTest, appendEventsMap())

testhelper.VerifyWorkspaceIDInStats(t)
for _, tc := range testCases {
tc := tc

t.Run(tc.name, func(t *testing.T) {
_ = db.Dataset(schema).DeleteWithContents(context.TODO())

if tc.prerequisite != nil {
tc.prerequisite(t)
}

require.NoError(t, testhelper.SetConfig([]warehouseutils.KeyValue{
{
Key: "Warehouse.bigquery.isDedupEnabled",
Value: false,
},
{
Key: "Warehouse.bigquery.customPartitionsEnabledWorkspaceIDs",
Value: tc.customPartitionsEnabledWorkspaceIDs,
},
}))

warehouseTest := &testhelper.WareHouseTest{
Client: &client.Client{
BQ: db,
Type: client.BQClient,
},
WriteKey: writeKey,
Schema: schema,
Tables: tables,
MessageId: uuid.Must(uuid.NewV4()).String(),
Provider: warehouseutils.BQ,
SourceID: "24p1HhPk09FW25Kuzxv7GshCLKR",
DestinationID: "26Bgm9FrQDZjvadSwAlpd35atwn",
}

// Scenario 1
warehouseTest.TimestampBeforeSendingEvents = timeutil.Now()
warehouseTest.UserId = testhelper.GetUserId(warehouseutils.BQ)

sendEventsMap := testhelper.SendEventsMap()
testhelper.SendEvents(t, warehouseTest, sendEventsMap)
testhelper.SendIntegratedEvents(t, warehouseTest, sendEventsMap)
testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap)
testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap)

testhelper.VerifyEventsInStagingFiles(t, warehouseTest, stagingFilesEventsMap())
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap())
testhelper.VerifyEventsInWareHouse(t, warehouseTest, appendEventsMap())

testhelper.VerifyWorkspaceIDInStats(t)
})
}
})
}

Expand Down Expand Up @@ -239,10 +294,5 @@ func TestMain(m *testing.M) {
return
}

handle = &TestHandle{
WriteKey: "J77aX7tLFJ84qYU6UrN8ctecwZt",
Schema: testhelper.Schema(warehouseutils.BQ, testhelper.BigqueryIntegrationTestSchema),
Tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "_groups", "groups"},
}
os.Exit(testhelper.Run(m, handle))
os.Exit(testhelper.Run(m, &TestHandle{}))
}

0 comments on commit af2b5a7

Please sign in to comment.