Skip to content

Commit

Permalink
support apiKey authentication
Browse files Browse the repository at this point in the history
Signed-off-by: Prajithp <prajithpalakkuda@gmail.com>
  • Loading branch information
Prajithp committed Sep 30, 2024
1 parent 6018463 commit 984d1de
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 57 deletions.
63 changes: 43 additions & 20 deletions pkg/scalers/temporal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
sdk "go.temporal.io/sdk/client"
sdklog "go.temporal.io/sdk/log"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"
)
Expand All @@ -32,16 +33,16 @@ type temporalScaler struct {
}

type temporalMetadata struct {
ActivationLagThreshold int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, default=0"`
Endpoint string `keda:"name=endpoint, order=triggerMetadata;resolvedEnv"`
Namespace string `keda:"name=namespace, order=triggerMetadata, default=default"`
TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, default=5"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`
QueueTypes []string `keda:"name=queueTypes, order=triggerMetadata, optional"`
BuildIDs []string `keda:"name=buildIds, order=triggerMetadata, optional"`
AllActive bool `keda:"name=selectAllActive, order=triggerMetadata, default=true"`
Unversioned bool `keda:"name=selectUnversioned, order=triggerMetadata, default=true"`
ApiKey string `keda:"name=apiKey, order=authParams;triggerMetadata, optional"`
ActivationTargetQueueSize int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, default=0"`
Endpoint string `keda:"name=endpoint, order=triggerMetadata;resolvedEnv"`
Namespace string `keda:"name=namespace, order=triggerMetadata, default=default"`
TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, default=5"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`
QueueTypes []string `keda:"name=queueTypes, order=triggerMetadata, optional"`
BuildIDs []string `keda:"name=buildIds, order=triggerMetadata, optional"`
AllActive bool `keda:"name=selectAllActive, order=triggerMetadata, default=true"`
Unversioned bool `keda:"name=selectUnversioned, order=triggerMetadata, default=true"`
APIKey string `keda:"name=apiKey, order=authParams;triggerMetadata, optional"`

triggerIndex int
}
Expand All @@ -50,7 +51,7 @@ func (a *temporalMetadata) Validate() error {
if a.TargetQueueSize <= 0 {
return fmt.Errorf("targetQueueSize must be a positive number")
}
if a.ActivationLagThreshold < 0 {
if a.ActivationTargetQueueSize < 0 {
return fmt.Errorf("activationTargetQueueSize must be a positive number")
}

Expand Down Expand Up @@ -115,7 +116,7 @@ func (s *temporalScaler) GetMetricsAndActivity(ctx context.Context, metricName s

metric := GenerateMetricInMili(metricName, float64(queueSize))

return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.ActivationLagThreshold, nil
return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.ActivationTargetQueueSize, nil
}

func (s *temporalScaler) getQueueSize(ctx context.Context) (int64, error) {
Expand Down Expand Up @@ -177,18 +178,40 @@ func getCombinedBacklogCount(description sdk.TaskQueueDescription) int64 {
}

func getTemporalClient(meta *temporalMetadata) (sdk.Client, error) {
return sdk.Dial(sdk.Options{
options := sdk.Options{
HostPort: meta.Endpoint,
Namespace: meta.Namespace,
Logger: sdklog.NewStructuredLogger(slog.Default()),
ConnectionOptions: sdk.ConnectionOptions{
DialOptions: []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{
MinConnectTimeout: 5 * time.Second,
}),
}

dialOptions := []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{
MinConnectTimeout: 5 * time.Second,
}),
}

if meta.APIKey != "" {
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(
func(ctx context.Context, method string, req any, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(
metadata.AppendToOutgoingContext(ctx, "temporal-namespace", meta.Namespace),
method,
req,
reply,
cc,
opts...,
)
},
},
})
))
options.Credentials = sdk.NewAPIKeyStaticCredentials(meta.APIKey)
}

options.ConnectionOptions = sdk.ConnectionOptions{
DialOptions: dialOptions,
}

return sdk.Dial(options)
}

func parseTemporalMetadata(config *scalersconfig.ScalerConfig, _ logr.Logger) (*temporalMetadata, error) {
Expand Down
74 changes: 37 additions & 37 deletions pkg/scalers/temporal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func TestParseTemporalMetadata(t *testing.T) {
"namespace": "default",
},
wantMeta: &temporalMetadata{
Endpoint: "test:7233",
Namespace: "default",
QueueName: "",
TargetQueueSize: 5,
ActivationLagThreshold: 0,
AllActive: true,
Unversioned: true,
Endpoint: "test:7233",
Namespace: "default",
QueueName: "",
TargetQueueSize: 5,
ActivationTargetQueueSize: 0,
AllActive: true,
Unversioned: true,
},
wantErr: true,
},
Expand All @@ -116,13 +116,13 @@ func TestParseTemporalMetadata(t *testing.T) {
"queueName": "testxx",
},
wantMeta: &temporalMetadata{
Endpoint: "test:7233",
Namespace: "default",
QueueName: "testxx",
TargetQueueSize: 5,
ActivationLagThreshold: 0,
AllActive: true,
Unversioned: true,
Endpoint: "test:7233",
Namespace: "default",
QueueName: "testxx",
TargetQueueSize: 5,
ActivationTargetQueueSize: 0,
AllActive: true,
Unversioned: true,
},
wantErr: false,
},
Expand All @@ -135,13 +135,13 @@ func TestParseTemporalMetadata(t *testing.T) {
"activationTargetQueueSize": "12",
},
wantMeta: &temporalMetadata{
Endpoint: "test:7233",
Namespace: "default",
QueueName: "testxx",
TargetQueueSize: 5,
ActivationLagThreshold: 12,
AllActive: true,
Unversioned: true,
Endpoint: "test:7233",
Namespace: "default",
QueueName: "testxx",
TargetQueueSize: 5,
ActivationTargetQueueSize: 12,
AllActive: true,
Unversioned: true,
},
wantErr: false,
},
Expand All @@ -154,14 +154,14 @@ func TestParseTemporalMetadata(t *testing.T) {
"apiKey": "test01",
},
wantMeta: &temporalMetadata{
Endpoint: "test:7233",
Namespace: "default",
QueueName: "testxx",
TargetQueueSize: 5,
ActivationLagThreshold: 0,
AllActive: true,
Unversioned: true,
ApiKey: "test01",
Endpoint: "test:7233",
Namespace: "default",
QueueName: "testxx",
TargetQueueSize: 5,
ActivationTargetQueueSize: 0,
AllActive: true,
Unversioned: true,
APIKey: "test01",
},
authParams: map[string]string{
"apiKey": "test01",
Expand All @@ -177,14 +177,14 @@ func TestParseTemporalMetadata(t *testing.T) {
"queueTypes": "workflow,activity",
},
wantMeta: &temporalMetadata{
Endpoint: "test:7233",
Namespace: "default",
QueueName: "testxx",
TargetQueueSize: 5,
ActivationLagThreshold: 0,
AllActive: true,
Unversioned: true,
QueueTypes: []string{"workflow", "activity"},
Endpoint: "test:7233",
Namespace: "default",
QueueName: "testxx",
TargetQueueSize: 5,
ActivationTargetQueueSize: 0,
AllActive: true,
Unversioned: true,
QueueTypes: []string{"workflow", "activity"},
},
wantErr: false,
},
Expand Down

0 comments on commit 984d1de

Please sign in to comment.