Skip to content

Commit

Permalink
Merge #84619 #84982
Browse files Browse the repository at this point in the history
84619: ccl/changefeedccl: add assume role auth to GCP pubsub r=rhu713 a=rhu713

The URI for GCP pubsub now accept an "ASSUME_ROLE" parameter, which specifies
a comma-separated list of service accounts to be chain assumed by the service
account authenticated by the implicit or specified credentials. For example,
the URI

```
gcpubsub://project?AUTH=implicit&topic_name=topic&ASSUME_ROLE=roleA,roleB
```

will inform the implicit service account to roleB through the delegate roleA
in order to access the pubsub resource.

Release note (enterprise change): The URI for GCP pubsub now accept an
"ASSUME_ROLE" parameter, which specifies a comma-separated list of service
accounts to be chain assumed by the service account authenticated by the
implicit or specified credentials.

84982: acceptance: skip `TestComposeGSS` r=knz a=erikgrinaker

Touches #84981.

Release note: None

Co-authored-by: Rui Hu <rui@cockroachlabs.com>
Co-authored-by: Erik Grinaker <grinaker@cockroachlabs.com>
  • Loading branch information
3 people committed Jul 25, 2022
3 parents eabdc49 + 45c8efe + 8580bbc commit 48ffa80
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 11 deletions.
2 changes: 2 additions & 0 deletions pkg/acceptance/compose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (

"github.com/cockroachdb/cockroach/pkg/acceptance/cluster"
"github.com/cockroachdb/cockroach/pkg/build/bazel"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
)

const composeDir = "compose"

func TestComposeGSS(t *testing.T) {
skip.WithIssue(t, 84978)
testCompose(t, filepath.Join("gss", "docker-compose.yml"), "psql")
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ go_library(
"@com_github_shopify_sarama//:sarama",
"@com_github_xdg_go_scram//:scram",
"@com_google_cloud_go_pubsub//:pubsub",
"@org_golang_google_api//impersonate",
"@org_golang_google_api//option",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
Expand Down
39 changes: 33 additions & 6 deletions pkg/ccl/changefeedccl/sink_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (
"cloud.google.com/go/pubsub"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"golang.org/x/oauth2/google"
"google.golang.org/api/impersonate"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -33,6 +35,7 @@ const credentialsParam = "CREDENTIALS"
// GcpScheme to be used in testfeed and sink.go
const GcpScheme = "gcpubsub"
const gcpScope = "https://www.googleapis.com/auth/pubsub"
const cloudPlatformScope = "https://www.googleapis.com/auth/cloud-platform"

// TODO: make numOfWorkers configurable
const numOfWorkers = 128
Expand Down Expand Up @@ -110,8 +113,9 @@ type pubsubSink struct {

// TODO: unify gcp credentials code with gcp cloud storage credentials code
// getGCPCredentials returns gcp credentials parsed out from url
func getGCPCredentials(ctx context.Context, u sinkURL) (*google.Credentials, error) {
func getGCPCredentials(ctx context.Context, u sinkURL) (option.ClientOption, error) {
const authParam = "AUTH"
const assumeRoleParam = "ASSUME_ROLE"
const authSpecified = "specified"
const authImplicit = "implicit"
const authDefault = "default"
Expand All @@ -120,15 +124,21 @@ func getGCPCredentials(ctx context.Context, u sinkURL) (*google.Credentials, err
var creds *google.Credentials
var err error
authOption := u.consumeParam(authParam)
assumeRoleOption := u.consumeParam(assumeRoleParam)
authScope := gcpScope
if assumeRoleOption != "" {
// If we need to assume a role, the credentials need to have the scope to
// impersonate instead.
authScope = cloudPlatformScope
}

// implemented according to https://github.com/cockroachdb/cockroach/pull/64737
switch authOption {
case authImplicit:
creds, err = google.FindDefaultCredentials(ctx, gcpScope)
creds, err = google.FindDefaultCredentials(ctx, authScope)
if err != nil {
return nil, err
}
return creds, nil
case authSpecified:
fallthrough
case authDefault:
Expand All @@ -138,12 +148,29 @@ func getGCPCredentials(ctx context.Context, u sinkURL) (*google.Credentials, err
if err != nil {
return nil, errors.Wrap(err, "decoding credentials json")
}
creds, err = google.CredentialsFromJSON(ctx, credsJSON, gcpScope)
creds, err = google.CredentialsFromJSON(ctx, credsJSON, authScope)
if err != nil {
return nil, errors.Wrap(err, "creating credentials")
}
return creds, nil
}

credsOpt := option.WithCredentials(creds)
if assumeRoleOption != "" {
assumeRole, delegateRoles := cloud.ParseRoleString(assumeRoleOption)
cfg := impersonate.CredentialsConfig{
TargetPrincipal: assumeRole,
Scopes: []string{gcpScope},
Delegates: delegateRoles,
}

ts, err := impersonate.CredentialsTokenSource(ctx, cfg, credsOpt)
if err != nil {
return nil, errors.Wrap(err, "creating impersonate credentials")
}
return option.WithTokenSource(ts), nil
}

return credsOpt, nil
}

// MakePubsubSink returns the corresponding pubsub sink based on the url given
Expand Down Expand Up @@ -459,7 +486,7 @@ func (p *gcpPubsubClient) init() error {
client, err = pubsub.NewClient(
p.ctx,
p.projectID,
option.WithCredentials(creds),
creds,
option.WithEndpoint(p.region),
)

Expand Down
65 changes: 61 additions & 4 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type cdcTestArgs struct {
crdbChaos bool
whichSink sinkType
sinkURI string
assumeRole string

// preStartStatements are executed after the workload is initialized but before the
// changefeed is created.
Expand Down Expand Up @@ -147,6 +148,10 @@ func cdcBasicTest(ctx context.Context, t test.Test, c cluster.Cluster, args cdcT
sinkURI = kafka.sinkURL(ctx)
}

if args.assumeRole != "" {
sinkURI = sinkURI + "&ASSUME_ROLE=" + args.assumeRole
}

m := c.NewMonitor(ctx, crdbNodes)
workloadCompleteCh := make(chan struct{}, 1)

Expand Down Expand Up @@ -792,9 +797,7 @@ func registerCDC(r registry.Registry) {
})
},
})
// TODO(zinger): uncomment once manual acceptance testing passes
// and whatever connectivity/provisioning issue happening here is fixed.
/* r.Add(registry.TestSpec{
r.Add(registry.TestSpec{
Name: "cdc/pubsub-sink",
Owner: `cdc`,
Cluster: r.MakeClusterSpec(4, spec.CPU(16)),
Expand All @@ -810,7 +813,61 @@ func registerCDC(r registry.Registry) {
targetSteadyLatency: time.Minute,
})
},
}) */
})

// In order to run this test, the service account corresponding to the
// implicit credentials must have the Service Account Token Creator role on
// the first account on the assume-role chain:
// cdc-roachtest-intermediate@cockroach-ephemeral.iam.gserviceaccount.com. See
// https://cloud.google.com/iam/docs/create-short-lived-credentials-direct.
//
// TODO(rui): Change to a shorter test as it just needs to validate
// permissions and shouldn't need to run a full 30m workload.
r.Add(registry.TestSpec{
Name: "cdc/pubsub-sink/assume-role",
Owner: `cdc`,
Cluster: r.MakeClusterSpec(4, spec.CPU(16)),
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
cdcBasicTest(ctx, t, c, cdcTestArgs{
workloadType: tpccWorkloadType,
tpccWarehouseCount: 1,
workloadDuration: "30m",
initialScan: true,
whichSink: pubsubSink,
assumeRole: "cdc-roachtest-intermediate@cockroach-ephemeral.iam.gserviceaccount.com,cdc-roachtest@cockroach-ephemeral.iam.gserviceaccount.com",
targetInitialScanLatency: 30 * time.Minute,
targetSteadyLatency: time.Minute,
})
},
})

// In order to run this test, the service account corresponding to the
// implicit credentials must have the Service Account Token Creator role on
// the first account on the assume-role chain:
// cdc-roachtest-intermediate@cockroach-ephemeral.iam.gserviceaccount.com. See
// https://cloud.google.com/iam/docs/create-short-lived-credentials-direct.
//
// TODO(rui): Change to a shorter test as it just needs to validate
// permissions and shouldn't need to run a full 30m workload.
r.Add(registry.TestSpec{
Name: "cdc/cloud-sink-gcs/assume-role",
Owner: `cdc`,
Cluster: r.MakeClusterSpec(4, spec.CPU(16)),
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
cdcBasicTest(ctx, t, c, cdcTestArgs{
tpccWarehouseCount: 50,
workloadDuration: "30m",
initialScan: true,
whichSink: cloudStorageSink,
assumeRole: "cdc-roachtest-intermediate@cockroach-ephemeral.iam.gserviceaccount.com,cdc-roachtest@cockroach-ephemeral.iam.gserviceaccount.com",
targetInitialScanLatency: 30 * time.Minute,
targetSteadyLatency: time.Minute,
})
},
})

// TODO(zinger): uncomment once connectivity issue is fixed,
// currently fails with "initial scan did not complete" because sink
// URI is set as localhost, need to expose it to the other nodes via IP
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachprod/vm/gce/gcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (p *Provider) Create(
args := []string{
"compute", "instances", "create",
"--subnet", "default",
"--scopes", "default,storage-rw",
"--scopes", "cloud-platform",
"--image", providerOpts.Image,
"--image-project", "ubuntu-os-cloud",
"--boot-disk-type", "pd-ssd",
Expand Down

0 comments on commit 48ffa80

Please sign in to comment.