Skip to content

Commit

Permalink
add kinesis stream scaler
Browse files Browse the repository at this point in the history
  • Loading branch information
msfuko authored and chloel committed Dec 23, 2019
1 parent 5ab3776 commit 6644c18
Show file tree
Hide file tree
Showing 8 changed files with 495 additions and 8 deletions.
93 changes: 93 additions & 0 deletions examples/awskinesisstream_scaledobject.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
namespace: keda-test
labels:
app: nginx
spec:
replicas: 0
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.7.9
ports:
- containerPort: 80
---
apiVersion: v1
kind: Secret
metadata:
name: keda-aws-secrets
namespace: keda-test
type: Opaque
data:
AWS_ACCESS_KEY_ID: bm9uZQ== # "none"
AWS_SECRET_ACCESS_KEY: bm9uZQ== # "none"
AWS_ROLE_ARN: bm9uZQ== # "none"
---
apiVersion: keda.k8s.io/v1alpha1
kind: TriggerAuthentication
metadata:
name: keda-trigger-auth-aws-credential
namespace: keda-test
spec:
secretTargetRef:
- parameter: awsAccessKeyID # Required.
name: keda-aws-secrets # Required.
key: AWS_ACCESS_KEY_ID # Required.
- parameter: awsSecretAccessKey # Required.
name: keda-aws-secrets # Required.
key: AWS_SECRET_ACCESS_KEY # Required.
---
apiVersion: keda.k8s.io/v1alpha1
kind: TriggerAuthentication
metadata:
name: keda-trigger-auth-aws-role
namespace: keda-test
spec:
secretTargetRef:
- parameter: awsRoleArn # Required.
name: keda-aws-secrets # Required.
key: AWS_ROLE_ARN # Required.
---
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: aws-kinesis-stream-scaledobject
namespace: keda-test
labels:
deploymentName: nginx-deployment
test: nginx-deployment
spec:
scaleTargetRef:
deploymentName: nginx-deployment
triggers:
- type: aws-kinesis-stream
authenticationRef:
name: keda-trigger-auth-aws-role
metadata:
# Required: streamName
streamName: myKinesisStream
# Required: awsRegion
awsRegion: "eu-west-1"
# Optional: how many shards that one consumer handles.
# Default: 2
shardCount: "2"
- type: aws-kinesis-stream
authenticationRef:
name: keda-trigger-auth-aws-credential
metadata:
# Required: streamName
streamName: myKinesisStream
# Required: awsRegion
awsRegion: "eu-west-1"
# Optional: how many shards that one consumer handles.
# Default: 2
shardCount: "2"
2 changes: 2 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
return scalers.NewAwsSqsQueueScaler(resolvedEnv, triggerMetadata, authParams)
case "aws-cloudwatch":
return scalers.NewAwsCloudwatchScaler(resolvedEnv, triggerMetadata, authParams)
case "aws-kinesis-stream":
return scalers.NewAwsKinesisStreamScaler(resolvedEnv, triggerMetadata, authParams)
case "kafka":
return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata, authParams)
case "rabbitmq":
Expand Down
4 changes: 2 additions & 2 deletions pkg/scalers/aws_cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var testAWSCloudwatchResolvedEnv = map[string]string{
}

var testAWSAuthentication = map[string]string{
"awsAccessKeyId": testAWSCloudwatchAccessKeyID,
"awsAccessKeyID": testAWSCloudwatchAccessKeyID,
"awsSecretAccessKey": testAWSCloudwatchSecretAccessKey,
}

Expand Down Expand Up @@ -122,7 +122,7 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{
"metricStatPeriod": "300",
"awsRegion": "eu-west-1"},
map[string]string{
"awsAccessKeyId": testAWSCloudwatchAccessKeyID,
"awsAccessKeyID": testAWSCloudwatchAccessKeyID,
"awsSecretAccessKey": testAWSCloudwatchSecretAccessKey,
},
false,
Expand Down
4 changes: 2 additions & 2 deletions pkg/scalers/aws_iam_authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func getAwsAuthorization(authParams, metadata, resolvedEnv map[string]string) (a

if authParams["awsRoleArn"] != "" {
meta.awsRoleArn = authParams["awsRoleArn"]
} else if authParams["awsAccessKeyId"] != "" && authParams["awsSecretAccessKey"] != "" {
meta.awsAccessKeyID = authParams["awsAccessKeyId"]
} else if authParams["awsAccessKeyID"] != "" && authParams["awsSecretAccessKey"] != "" {
meta.awsAccessKeyID = authParams["awsAccessKeyID"]
meta.awsSecretAccessKey = authParams["awsSecretAccessKey"]
} else {
var keyName string
Expand Down
155 changes: 155 additions & 0 deletions pkg/scalers/aws_kinesis_stream_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package scalers

import (
"context"
"fmt"
"strconv"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
v2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
awsKinesisStreamMetricName = "ShardCount"
targetShardCountDefault = 2
)

type awsKinesisStreamScaler struct {
metadata *awsKinesisStreamMetadata
}

type awsKinesisStreamMetadata struct {
targetShardCount int
streamName string
awsRegion string
awsAuthorization awsAuthorizationMetadata
}

var kinesisStreamLog = logf.Log.WithName("aws_kinesis_stream_scaler")

// NewAwsKinesisStreamScaler creates a new awsKinesisStreamScaler
func NewAwsKinesisStreamScaler(resolvedEnv, metadata map[string]string, authParams map[string]string) (Scaler, error) {
meta, err := parseAwsKinesisStreamMetadata(metadata, resolvedEnv, authParams)
if err != nil {
return nil, fmt.Errorf("Error parsing Kinesis stream metadata: %s", err)
}

return &awsKinesisStreamScaler{
metadata: meta,
}, nil
}

func parseAwsKinesisStreamMetadata(metadata, resolvedEnv, authParams map[string]string) (*awsKinesisStreamMetadata, error) {
meta := awsKinesisStreamMetadata{}
meta.targetShardCount = targetShardCountDefault

if val, ok := metadata["shardCount"]; ok && val != "" {
shardCount, err := strconv.Atoi(val)
if err != nil {
meta.targetShardCount = targetShardCountDefault
kinesisStreamLog.Error(err, "Error parsing Kinesis stream metadata shardCount, using default %n", targetShardCountDefault)
} else {
meta.targetShardCount = shardCount
}
}

if val, ok := metadata["streamName"]; ok && val != "" {
meta.streamName = val
} else {
return nil, fmt.Errorf("no streamName given")
}

if val, ok := metadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
return nil, fmt.Errorf("no awsRegion given")
}

auth, err := getAwsAuthorization(authParams, metadata, resolvedEnv)
if err != nil {
return nil, err
}

meta.awsAuthorization = auth

return &meta, nil
}

// IsActive determines if we need to scale from zero
func (s *awsKinesisStreamScaler) IsActive(ctx context.Context) (bool, error) {
count, err := s.GetAwsKinesisOpenShardCount()

if err != nil {
return false, err
}

return count > 0, nil
}

func (s *awsKinesisStreamScaler) Close() error {
return nil
}

func (s *awsKinesisStreamScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
targetShardCountQty := resource.NewQuantity(int64(s.metadata.targetShardCount), resource.DecimalSI)
externalMetric := &v2beta1.ExternalMetricSource{MetricName: fmt.Sprintf("%s-%s-%s", "AWS-Kinesis-Stream", awsKinesisStreamMetricName, s.metadata.streamName),
TargetAverageValue: targetShardCountQty}
metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta1.MetricSpec{metricSpec}
}

//GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *awsKinesisStreamScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
shardCount, err := s.GetAwsKinesisOpenShardCount()

if err != nil {
kinesisStreamLog.Error(err, "Error getting shard count")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(shardCount), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// Get Kinesis open shard count
func (s *awsKinesisStreamScaler) GetAwsKinesisOpenShardCount() (int64, error) {
input := &kinesis.DescribeStreamSummaryInput{
StreamName: &s.metadata.streamName,
}

sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(s.metadata.awsRegion),
}))
creds := credentials.NewStaticCredentials(s.metadata.awsAuthorization.awsAccessKeyID, s.metadata.awsAuthorization.awsSecretAccessKey, "")

if s.metadata.awsAuthorization.awsRoleArn != "" {
creds = stscreds.NewCredentials(sess, s.metadata.awsAuthorization.awsRoleArn)
}

kinesisClinent := kinesis.New(sess, &aws.Config{
Region: aws.String(s.metadata.awsRegion),
Credentials: creds,
})

output, err := kinesisClinent.DescribeStreamSummary(input)
if err != nil {
return -1, err
}

return *output.StreamDescriptionSummary.OpenShardCount, nil
}
Loading

0 comments on commit 6644c18

Please sign in to comment.