Skip to content

Commit

Permalink
Add GCP stackdriver aggregation parameters (#3070)
Browse files Browse the repository at this point in the history
Signed-off-by: Ram Cohen <ram.cohen@gmail.com>
  • Loading branch information
RamCohen authored Jun 29, 2022
1 parent 20cacec commit b810307
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **General:** Use `mili` scale for the returned metrics ([#3135](https://github.com/kedacore/keda/issue/3135))
- **General:** Use more readable timestamps in KEDA Operator logs ([#3066](https://github.com/kedacore/keda/issue/3066))
- **AWS SQS Queue Scaler:** Support for scaling to include in-flight messages. ([#3133](https://github.com/kedacore/keda/issues/3133))
- **GCP Stackdriver Scaler:** Added aggregation parameters ([#3008](https://github.com/kedacore/keda/issues/3008))
- **Prometheus Scaler:** Add ignoreNullValues to return error when prometheus return null in values ([#3065](https://github.com/kedacore/keda/issues/3065))
- **Selenium Grid Scaler:** Edge active sessions not being properly counted ([#2709](https://github.com/kedacore/keda/issues/2709))
- **Selenium Grid Scaler:** Max Sessions implementation issue ([#3061](https://github.com/kedacore/keda/issues/3061))
Expand Down
4 changes: 3 additions & 1 deletion pkg/scalers/gcp_pubsub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (int64
subscriptionID, projectID := getSubscriptionData(s)
filter := `metric.type="` + metricType + `" AND resource.labels.subscription_id="` + subscriptionID + `"`

return s.client.GetMetrics(ctx, filter, projectID)
// Pubsub metrics are collected every 60 seconds so no need to aggregate them.
// See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-pubsub
return s.client.GetMetrics(ctx, filter, projectID, nil)
}

func getSubscriptionData(s *pubsubScaler) (string, string) {
Expand Down
38 changes: 36 additions & 2 deletions pkg/scalers/gcp_stackdriver_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strconv"

monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
Expand All @@ -30,6 +31,7 @@ type stackdriverMetadata struct {
metricName string

gcpAuthorization *gcpAuthorizationMetadata
aggregation *monitoringpb.Aggregation
}

var gcpStackdriverLog = logf.Log.WithName("gcp_stackdriver_scaler")
Expand Down Expand Up @@ -101,9 +103,37 @@ func parseStackdriverMetadata(config *ScalerConfig) (*stackdriverMetadata, error
return nil, err
}
meta.gcpAuthorization = auth

meta.aggregation, err = parseAggregation(config)
if err != nil {
return nil, err
}

return &meta, nil
}

func parseAggregation(config *ScalerConfig) (*monitoringpb.Aggregation, error) {
if period, ok := config.TriggerMetadata["alignmentPeriodSeconds"]; ok {
if period == "" {
return nil, nil
}

val, err := strconv.ParseInt(period, 10, 64)
if val < 60 {
gcpStackdriverLog.Error(err, "Error parsing alignmentPeriodSeconds - must be at least 60")
return nil, fmt.Errorf("error parsing alignmentPeriodSeconds - must be at least 60")
}
if err != nil {
gcpStackdriverLog.Error(err, "Error parsing alignmentPeriodSeconds")
return nil, fmt.Errorf("error parsing alignmentPeriodSeconds: %s", err.Error())
}

return NewStackdriverAggregator(val, config.TriggerMetadata["alignmentAligner"], config.TriggerMetadata["alignmentReducer"])
}

return nil, nil
}

func initializeStackdriverClient(ctx context.Context, gcpAuthorization *gcpAuthorizationMetadata) (*StackDriverClient, error) {
var client *StackDriverClient
var err error
Expand Down Expand Up @@ -174,10 +204,14 @@ func (s *stackdriverScaler) GetMetrics(ctx context.Context, metricName string, m

// getMetrics gets metric type value from stackdriver api
func (s *stackdriverScaler) getMetrics(ctx context.Context) (int64, error) {
val, err := s.client.GetMetrics(ctx, s.metadata.filter, s.metadata.projectID)
val, err := s.client.GetMetrics(ctx, s.metadata.filter, s.metadata.projectID, s.metadata.aggregation)
if err == nil {
gcpStackdriverLog.V(1).Info(
fmt.Sprintf("Getting metrics for project %s and filter %s. Result: %d", s.metadata.projectID, s.metadata.filter, val))
fmt.Sprintf("Getting metrics for project %s, filter %s and aggregation %v. Result: %d",
s.metadata.projectID,
s.metadata.filter,
s.metadata.aggregation,
val))
}

return val, err
Expand Down
8 changes: 8 additions & 0 deletions pkg/scalers/gcp_stackdriver_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ var testStackdriverMetadata = []parseStackdriverMetadataTestData{
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"projectId": "myProject", "filter": sdFilter}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"projectId": "myProject", "filter": sdFilter}, true},
// With aggregation info
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS", "alignmentPeriodSeconds": "120", "alignmentAligner": "sum", "alignmentReducer": "percentile_99"}, false},
// With minimal aggregation info
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS", "alignmentPeriodSeconds": "120"}, false},
// With too short alignment period
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS", "alignmentPeriodSeconds": "30"}, true},
// With bad alignment period
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS", "alignmentPeriodSeconds": "a"}, true},
}

var gcpStackdriverMetricIdentifiers = []gcpStackdriverMetricIdentifier{
Expand Down
107 changes: 106 additions & 1 deletion pkg/scalers/stackdriver_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"

"cloud.google.com/go/compute/metadata"
Expand All @@ -13,6 +14,7 @@ import (
"google.golang.org/api/iterator"
option "google.golang.org/api/option"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
durationpb "google.golang.org/protobuf/types/known/durationpb"
)

// StackDriverClient is a generic client to fetch metrics from Stackdriver. Can be used
Expand Down Expand Up @@ -61,8 +63,110 @@ func NewStackDriverClientPodIdentity(ctx context.Context) (*StackDriverClient, e
}, nil
}

func NewStackdriverAggregator(period int64, aligner string, reducer string) (*monitoringpb.Aggregation, error) {
sdAggregation := monitoringpb.Aggregation{
AlignmentPeriod: &durationpb.Duration{
Seconds: period,
Nanos: 0,
},
}

var err error
sdAggregation.PerSeriesAligner, err = alignerFromString(aligner)
if err != nil {
return nil, err
}

sdAggregation.CrossSeriesReducer, err = reducerFromString(reducer)
if err != nil {
return nil, err
}

return &sdAggregation, nil
}

func alignerFromString(aligner string) (monitoringpb.Aggregation_Aligner, error) {
switch strings.ToLower(aligner) {
case "", "none":
return monitoringpb.Aggregation_ALIGN_NONE, nil
case "delta":
return monitoringpb.Aggregation_ALIGN_DELTA, nil
case "interpolate":
return monitoringpb.Aggregation_ALIGN_INTERPOLATE, nil
case "next_older":
return monitoringpb.Aggregation_ALIGN_NEXT_OLDER, nil
case "min":
return monitoringpb.Aggregation_ALIGN_MIN, nil
case "max":
return monitoringpb.Aggregation_ALIGN_MAX, nil
case "mean":
return monitoringpb.Aggregation_ALIGN_MEAN, nil
case "count":
return monitoringpb.Aggregation_ALIGN_COUNT, nil
case "sum":
return monitoringpb.Aggregation_ALIGN_SUM, nil
case "stddev":
return monitoringpb.Aggregation_ALIGN_STDDEV, nil
case "count_true":
return monitoringpb.Aggregation_ALIGN_COUNT_TRUE, nil
case "count_false":
return monitoringpb.Aggregation_ALIGN_COUNT_FALSE, nil
case "fraction_true":
return monitoringpb.Aggregation_ALIGN_FRACTION_TRUE, nil
case "percentile_99":
return monitoringpb.Aggregation_ALIGN_PERCENTILE_99, nil
case "percentile_95":
return monitoringpb.Aggregation_ALIGN_PERCENTILE_95, nil
case "percentile_50":
return monitoringpb.Aggregation_ALIGN_PERCENTILE_50, nil
case "percentile_05":
return monitoringpb.Aggregation_ALIGN_PERCENTILE_05, nil
case "percent_change":
return monitoringpb.Aggregation_ALIGN_PERCENT_CHANGE, nil
default:
}
return monitoringpb.Aggregation_ALIGN_NONE, fmt.Errorf("unknown aligner: %s", aligner)
}

func reducerFromString(reducer string) (monitoringpb.Aggregation_Reducer, error) {
switch strings.ToLower(reducer) {
case "", "none":
return monitoringpb.Aggregation_REDUCE_NONE, nil
case "mean":
return monitoringpb.Aggregation_REDUCE_MEAN, nil
case "min":
return monitoringpb.Aggregation_REDUCE_MIN, nil
case "max":
return monitoringpb.Aggregation_REDUCE_MAX, nil
case "sum":
return monitoringpb.Aggregation_REDUCE_SUM, nil
case "stddev":
return monitoringpb.Aggregation_REDUCE_STDDEV, nil
case "count_true":
return monitoringpb.Aggregation_REDUCE_COUNT_TRUE, nil
case "count_false":
return monitoringpb.Aggregation_REDUCE_COUNT_FALSE, nil
case "fraction_true":
return monitoringpb.Aggregation_REDUCE_FRACTION_TRUE, nil
case "percentile_99":
return monitoringpb.Aggregation_REDUCE_PERCENTILE_99, nil
case "percentile_95":
return monitoringpb.Aggregation_REDUCE_PERCENTILE_95, nil
case "percentile_50":
return monitoringpb.Aggregation_REDUCE_PERCENTILE_50, nil
case "percentile_05":
return monitoringpb.Aggregation_REDUCE_PERCENTILE_05, nil
default:
}
return monitoringpb.Aggregation_REDUCE_NONE, fmt.Errorf("unknown reducer: %s", reducer)
}

// GetMetrics fetches metrics from stackdriver for a specific filter for the last minute
func (s StackDriverClient) GetMetrics(ctx context.Context, filter string, projectID string) (int64, error) {
func (s StackDriverClient) GetMetrics(
ctx context.Context,
filter string,
projectID string,
aggregation *monitoringpb.Aggregation) (int64, error) {
// Set the start time to 1 minute ago
startTime := time.Now().UTC().Add(time.Minute * -2)

Expand All @@ -76,6 +180,7 @@ func (s StackDriverClient) GetMetrics(ctx context.Context, filter string, projec
StartTime: &timestamp.Timestamp{Seconds: startTime.Unix()},
EndTime: &timestamp.Timestamp{Seconds: endTime.Unix()},
},
Aggregation: aggregation,
}

switch projectID {
Expand Down
20 changes: 11 additions & 9 deletions tests/scalers/gcp-pubsub.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import * as tmp from 'tmp'
import test from 'ava'
import { createNamespace, sleep, waitForDeploymentReplicaCount } from './helpers';

const gcpKey = process.env['GCP_SP_KEY']
const gcpKey = process.env['GCP_SP_KEY'] || ''
const projectId = JSON.parse(gcpKey).project_id
const gcpAccount = JSON.parse(gcpKey).client_email
const testNamespace = 'gcp-pubsub-test'
const topicId = `projects/${projectId}/topics/keda-test-topic-` + crypto.randomBytes(6).toString('hex')
const subscriptionName = `keda-test-topic-sub-` + crypto.randomBytes(6).toString('hex')
const subscriptionId = `projects/${projectId}/subscriptions/${subscriptionName}`
const deploymentName = 'dummy-consumer'
const deploymentName = 'dummy-pubsub-consumer'
const maxReplicaCount = '4'
const gsPrefix = `kubectl exec --namespace ${testNamespace} deployment.apps/gcp-sdk -- `

Expand Down Expand Up @@ -74,25 +74,27 @@ test.serial(`Publishing to pubsub`, t => {
// Publish 30 messages
var cmd = gsPrefix + ' /bin/bash -c -- "cd .'
for (let i = 0; i < 30; i++) {
cmd += ` && gcloud pubsub topics publish ${topicId} --message=AAAAAAAAAA`
cmd += ` && gcloud pubsub topics publish ${topicId} --message=AAAAAAAAAA && sleep 2s`
}
cmd += '"'
t.is(0,sh.exec(cmd).code,'Publishing messages to pub/sub should work..')
})

test.serial(`Deployment should scale to ${maxReplicaCount} (the max) then back to 0`, async t => {
// Wait for the number of replicas to be scaled up to maxReplicaCount
t.true(
await waitForDeploymentReplicaCount(parseInt(maxReplicaCount, 10), deploymentName, testNamespace, 150, 2000),
`Replica count should be ${maxReplicaCount} after 120 seconds`)
test.serial(`Deployment should scale to ${maxReplicaCount} (the max)`, async t => {
// Wait for the number of replicas to be scaled up to maxReplicaCount
t.true(
await waitForDeploymentReplicaCount(parseInt(maxReplicaCount, 10), deploymentName, testNamespace, 150, 2000),
`Replica count should be ${maxReplicaCount} after 5 minutes`)
})

test.serial(`Deployment should scale back to 0`, async t => {
// Purge all messages
sh.exec(gsPrefix + `gcloud pubsub subscriptions seek ${subscriptionId} --time=p0s`)

// Wait for the number of replicas to be scaled down to 0
t.true(
await waitForDeploymentReplicaCount(0, deploymentName, testNamespace, 30, 10000),
`Replica count should be 0 after 3 minutes`)
`Replica count should be 0 after 5 minutes`)
})

test.after.always.cb('clean up', t => {
Expand Down
10 changes: 6 additions & 4 deletions tests/scalers/gcp-stackdriver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import * as tmp from 'tmp'
import test from 'ava'
import { createNamespace, waitForDeploymentReplicaCount } from './helpers';

const gcpKey = process.env['GCP_SP_KEY']
const gcpKey = process.env['GCP_SP_KEY'] || ''
const projectId = JSON.parse(gcpKey).project_id
const testNamespace = 'gcp-stackdriver-test'
const bucketName = 'keda-test-stackdriver-bucket'
const deploymentName = 'dummy-consumer'
const deploymentName = 'dummy-stackdriver-consumer'
const maxReplicaCount = '3'
const gsPrefix = `kubectl exec --namespace ${testNamespace} deploy/gcp-sdk -- `

Expand Down Expand Up @@ -70,12 +70,12 @@ test.serial(`Deployment should scale to ${maxReplicaCount} (the max) then back t
sh.exec(gsPrefix + `gsutil cp /usr/lib/google-cloud-sdk/bin/gsutil gs://${bucketName}`).code,
'Copying an object should work..'
)
if (await waitForDeploymentReplicaCount(parseInt(maxReplicaCount, 10), deploymentName, testNamespace, 1, 2000)) {
if (await waitForDeploymentReplicaCount(parseInt(maxReplicaCount, 10), deploymentName, testNamespace, 1, 3000)) {
haveAllReplicas = true
}
}

t.true(haveAllReplicas, `Replica count should be ${maxReplicaCount} after 120 seconds`)
t.true(haveAllReplicas, `Replica count should be ${maxReplicaCount} after 180 seconds`)

// Wait for the number of replicas to be scaled down to 0
t.true(
Expand Down Expand Up @@ -148,6 +148,8 @@ spec:
filter: 'metric.type="storage.googleapis.com/network/received_bytes_count" AND resource.type="gcs_bucket" AND metric.label.method="WriteObject" AND resource.label.bucket_name="${bucketName}"'
metricName: ${bucketName}
targetValue: '5'
alignmentPeriodSeconds: '60'
alignmentAligner: max
credentialsFromEnv: GOOGLE_APPLICATION_CREDENTIALS_JSON
`

Expand Down
4 changes: 2 additions & 2 deletions tests/scalers/gcp-storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import * as tmp from 'tmp'
import test from 'ava'
import { createNamespace, waitForDeploymentReplicaCount } from './helpers';

const gcpKey = process.env['GCP_SP_KEY']
const gcpKey = process.env['GCP_SP_KEY'] || ''
const testNamespace = 'gcp-storage-test'
const bucketName = 'keda-test-storage-bucket'
const deploymentName = 'dummy-consumer'
const deploymentName = 'dummy-storage-consumer'
const maxReplicaCount = '3'
const gsPrefix = `kubectl exec --namespace ${testNamespace} deploy/gcp-sdk -- `

Expand Down

0 comments on commit b810307

Please sign in to comment.