Skip to content

Commit

Permalink
Adding auth trigger to pubsub (kedacore#1291)
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Thakur <harshthakur9030@gmail.com>
Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>
  • Loading branch information
RealHarshThakur authored and arschles committed Nov 30, 2020
1 parent 9037672 commit 5a8b37b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
- Added ScaledObject Status Conditions to display status of scaling ([#750](https://github.com/kedacore/keda/pull/750))
- Added optional authentication parameters for the Redis Scaler ([#962](https://github.com/kedacore/keda/pull/962))
- Improved GCP PubSub Scaler performance by closing the client correctly ([#1087](https://github.com/kedacore/keda/pull/1087))
- Added support for Trigger Authentication for GCP PubSub scaler ([#1291](https://github.com/kedacore/keda/pull/1291))

### Breaking Changes

Expand Down
39 changes: 30 additions & 9 deletions pkg/scalers/gcp_pub_sub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ const (
pubSubStackDriverMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages"
)

type gcpAuthorizationMetadata struct {
GoogleApplicationCredentials string
podIdentityOwner bool
}

type pubsubScaler struct {
client *StackDriverClient
metadata *pubsubMetadata
Expand All @@ -28,7 +33,7 @@ type pubsubScaler struct {
type pubsubMetadata struct {
targetSubscriptionSize int
subscriptionName string
credentials string
gcpAuthorization gcpAuthorizationMetadata
}

var gcpPubSubLog = logf.Log.WithName("gcp_pub_sub_scaler")
Expand Down Expand Up @@ -68,14 +73,11 @@ func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) {
return nil, fmt.Errorf("no subscription name given")
}

if config.TriggerMetadata["credentialsFromEnv"] != "" {
meta.credentials = config.ResolvedEnv[config.TriggerMetadata["credentialsFromEnv"]]
}

if len(meta.credentials) == 0 {
return nil, fmt.Errorf("no credentials given. Need GCP service account credentials in json format")
auth, err := getGcpAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
if err != nil {
return nil, err
}

meta.gcpAuthorization = *auth
return &meta, nil
}

Expand Down Expand Up @@ -149,7 +151,7 @@ func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string, metric
// Stackdriver api
func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) {
if s.client == nil {
client, err := NewStackDriverClient(ctx, s.metadata.credentials)
client, err := NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
if err != nil {
return -1, err
}
Expand All @@ -160,3 +162,22 @@ func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) {

return s.client.GetMetrics(ctx, filter)
}

func getGcpAuthorization(authParams, metadata, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) {
meta := gcpAuthorizationMetadata{}
if metadata["identityOwner"] == "operator" {
meta.podIdentityOwner = false
} else if metadata["identityOwner"] == "" || metadata["identityOwner"] == "pod" {
meta.podIdentityOwner = true
if authParams["GoogleApplicationCredentials"] != "" {
meta.GoogleApplicationCredentials = authParams["GoogleApplicationCredentials"]
} else {
if metadata["credentialsFromEnv"] != "" {
meta.GoogleApplicationCredentials = resolvedEnv[metadata["credentialsFromEnv"]]
} else {
return nil, fmt.Errorf("GoogleApplicationCredentials not found")
}
}
}
return &meta, nil
}
23 changes: 13 additions & 10 deletions pkg/scalers/gcp_pubsub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ var testPubSubResolvedEnv = map[string]string{
}

type parsePubSubMetadataTestData struct {
metadata map[string]string
isError bool
authParams map[string]string
metadata map[string]string
isError bool
}

type gcpPubSubMetricIdentifier struct {
Expand All @@ -19,17 +20,19 @@ type gcpPubSubMetricIdentifier struct {
}

var testPubSubMetadata = []parsePubSubMetadataTestData{
{map[string]string{}, true},
{map[string]string{}, map[string]string{}, true},
// all properly formed
{map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// missing subscriptionName
{map[string]string{"subscriptionName": "", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
{nil, map[string]string{"subscriptionName": "", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing credentials
{map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": ""}, true},
// incorrect credentials
{map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "WRONG_CREDS"}, true},
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": ""}, true},
// malformed subscriptionSize
{map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, true},
}

var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{
Expand All @@ -38,7 +41,7 @@ var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{

func TestPubSubParseMetadata(t *testing.T) {
for _, testData := range testPubSubMetadata {
_, err := parsePubSubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testPubSubResolvedEnv})
_, err := parsePubSubMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testPubSubResolvedEnv})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
Expand Down

0 comments on commit 5a8b37b

Please sign in to comment.