Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kinesis stream scaler #526

Merged
merged 1 commit into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
return scalers.NewAwsSqsQueueScaler(resolvedEnv, triggerMetadata, authParams)
case "aws-cloudwatch":
return scalers.NewAwsCloudwatchScaler(resolvedEnv, triggerMetadata, authParams)
case "aws-kinesis-stream":
return scalers.NewAwsKinesisStreamScaler(resolvedEnv, triggerMetadata, authParams)
case "kafka":
return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata, authParams)
case "rabbitmq":
Expand Down
7 changes: 5 additions & 2 deletions pkg/scalers/aws_iam_authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ func getAwsAuthorization(authParams, metadata, resolvedEnv map[string]string) (a

if authParams["awsRoleArn"] != "" {
meta.awsRoleArn = authParams["awsRoleArn"]
} else if authParams["awsAccessKeyId"] != "" && authParams["awsSecretAccessKey"] != "" {
meta.awsAccessKeyID = authParams["awsAccessKeyId"]
} else if (authParams["awsAccessKeyID"] != "" || authParams["awsAccessKeyId"] != "") && authParams["awsSecretAccessKey"] != "" {
meta.awsAccessKeyID = authParams["awsAccessKeyID"]
if meta.awsAccessKeyID == "" {
meta.awsAccessKeyID = authParams["awsAccessKeyId"]
}
meta.awsSecretAccessKey = authParams["awsSecretAccessKey"]
} else {
var keyName string
Expand Down
155 changes: 155 additions & 0 deletions pkg/scalers/aws_kinesis_stream_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package scalers

import (
"context"
"fmt"
"strconv"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
v2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
awsKinesisStreamMetricName = "ShardCount"
targetShardCountDefault = 2
)

type awsKinesisStreamScaler struct {
metadata *awsKinesisStreamMetadata
}

type awsKinesisStreamMetadata struct {
targetShardCount int
streamName string
awsRegion string
awsAuthorization awsAuthorizationMetadata
}

var kinesisStreamLog = logf.Log.WithName("aws_kinesis_stream_scaler")

// NewAwsKinesisStreamScaler creates a new awsKinesisStreamScaler
func NewAwsKinesisStreamScaler(resolvedEnv, metadata map[string]string, authParams map[string]string) (Scaler, error) {
meta, err := parseAwsKinesisStreamMetadata(metadata, resolvedEnv, authParams)
if err != nil {
return nil, fmt.Errorf("Error parsing Kinesis stream metadata: %s", err)
}

return &awsKinesisStreamScaler{
metadata: meta,
}, nil
}

func parseAwsKinesisStreamMetadata(metadata, resolvedEnv, authParams map[string]string) (*awsKinesisStreamMetadata, error) {
meta := awsKinesisStreamMetadata{}
meta.targetShardCount = targetShardCountDefault

if val, ok := metadata["shardCount"]; ok && val != "" {
shardCount, err := strconv.Atoi(val)
if err != nil {
meta.targetShardCount = targetShardCountDefault
kinesisStreamLog.Error(err, "Error parsing Kinesis stream metadata shardCount, using default %n", targetShardCountDefault)
} else {
meta.targetShardCount = shardCount
}
}

if val, ok := metadata["streamName"]; ok && val != "" {
meta.streamName = val
} else {
return nil, fmt.Errorf("no streamName given")
}

if val, ok := metadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
return nil, fmt.Errorf("no awsRegion given")
}

auth, err := getAwsAuthorization(authParams, metadata, resolvedEnv)
if err != nil {
return nil, err
}

meta.awsAuthorization = auth

return &meta, nil
}

// IsActive determines if we need to scale from zero
func (s *awsKinesisStreamScaler) IsActive(ctx context.Context) (bool, error) {
count, err := s.GetAwsKinesisOpenShardCount()

if err != nil {
return false, err
}

return count > 0, nil
}

func (s *awsKinesisStreamScaler) Close() error {
return nil
}

func (s *awsKinesisStreamScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
targetShardCountQty := resource.NewQuantity(int64(s.metadata.targetShardCount), resource.DecimalSI)
externalMetric := &v2beta1.ExternalMetricSource{MetricName: fmt.Sprintf("%s-%s-%s", "AWS-Kinesis-Stream", awsKinesisStreamMetricName, s.metadata.streamName),
TargetAverageValue: targetShardCountQty}
metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta1.MetricSpec{metricSpec}
}

//GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *awsKinesisStreamScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
shardCount, err := s.GetAwsKinesisOpenShardCount()

if err != nil {
kinesisStreamLog.Error(err, "Error getting shard count")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(shardCount), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// Get Kinesis open shard count
func (s *awsKinesisStreamScaler) GetAwsKinesisOpenShardCount() (int64, error) {
input := &kinesis.DescribeStreamSummaryInput{
StreamName: &s.metadata.streamName,
}

sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(s.metadata.awsRegion),
}))
creds := credentials.NewStaticCredentials(s.metadata.awsAuthorization.awsAccessKeyID, s.metadata.awsAuthorization.awsSecretAccessKey, "")

if s.metadata.awsAuthorization.awsRoleArn != "" {
creds = stscreds.NewCredentials(sess, s.metadata.awsAuthorization.awsRoleArn)
}

kinesisClinent := kinesis.New(sess, &aws.Config{
Region: aws.String(s.metadata.awsRegion),
Credentials: creds,
})

output, err := kinesisClinent.DescribeStreamSummary(input)
if err != nil {
return -1, err
}

return *output.StreamDescriptionSummary.OpenShardCount, nil
}
167 changes: 167 additions & 0 deletions pkg/scalers/aws_kinesis_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package scalers

import (
"reflect"
"testing"
)

const (
testAWSKinesisRoleArn = "none"
testAWSKinesisAccessKeyID = "none"
testAWSKinesisSecretAccessKey = "none"
testAWSKinesisStreamName = "test"
testAWSRegion = "eu-west-1"
)

var testAWSKinesisResolvedEnv = map[string]string{
"AWS_ACCESS_KEY": "none",
"AWS_SECRET_ACCESS_KEY": "none",
}

var testAWSKinesisAuthentication = map[string]string{
"awsAccessKeyID": testAWSKinesisAccessKeyID,
"awsSecretAccessKey": testAWSKinesisSecretAccessKey,
}

type parseAWSKinesisMetadataTestData struct {
metadata map[string]string
expected *awsKinesisStreamMetadata
authParams map[string]string
isError bool
comment string
}

var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
{
metadata: map[string]string{},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "metadata empty"},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{
targetShardCount: 2,
streamName: testAWSKinesisStreamName,
awsRegion: testAWSRegion,
awsAuthorization: awsAuthorizationMetadata{
awsAccessKeyID: testAWSKinesisAccessKeyID,
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
},
},
isError: false,
comment: "properly formed stream name and region"},
{
metadata: map[string]string{
"streamName": "",
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "missing stream name"},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": ""},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "properly formed stream name, empty region"},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "",
"awsRegion": testAWSRegion},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{
targetShardCount: 2,
streamName: testAWSKinesisStreamName,
awsRegion: testAWSRegion,
awsAuthorization: awsAuthorizationMetadata{
awsAccessKeyID: testAWSKinesisAccessKeyID,
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
},
},
isError: false,
comment: "properly formed stream name and region, empty shard count"},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "a",
"awsRegion": testAWSRegion},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{
targetShardCount: 2,
streamName: testAWSKinesisStreamName,
awsRegion: testAWSRegion,
awsAuthorization: awsAuthorizationMetadata{
awsAccessKeyID: testAWSKinesisAccessKeyID,
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
},
},
isError: false,
comment: "properly formed stream name and region, wrong shard count"},

{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: map[string]string{
"awsAccessKeyID": "",
"awsSecretAccessKey": testAWSKinesisSecretAccessKey,
},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Access Key Id"},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: map[string]string{
"awsAccessKeyID": testAWSKinesisAccessKeyID,
"awsSecretAccessKey": "",
},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Secret Access Key"},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: map[string]string{
"awsRoleArn": testAWSKinesisRoleArn,
},
expected: &awsKinesisStreamMetadata{
targetShardCount: 2,
streamName: testAWSKinesisStreamName,
awsRegion: testAWSRegion,
awsAuthorization: awsAuthorizationMetadata{
awsRoleArn: testAWSKinesisRoleArn,
},
},
isError: false,
comment: "with AWS Role from TriggerAuthentication"},
}

func TestKinesisParseMetadata(t *testing.T) {
for _, testData := range testAWSKinesisMetadata {
result, err := parseAwsKinesisStreamMetadata(testData.metadata, testAWSKinesisAuthentication, testData.authParams)
if err != nil && !testData.isError {
t.Errorf("Expected success because %s got error, %s", testData.comment, err)
}
if testData.isError && err == nil {
t.Errorf("Expected error because %s but got success, %#v", testData.comment, testData)
}

if !testData.isError && !reflect.DeepEqual(testData.expected, result) {
t.Fatalf("Expected %#v but got %+#v", testData.expected, result)
}
}
}