diff --git a/pkg/scalers/temporal.go b/pkg/scalers/temporal.go index f1b5eb476c2..c6b42dbd1aa 100644 --- a/pkg/scalers/temporal.go +++ b/pkg/scalers/temporal.go @@ -12,6 +12,7 @@ import ( sdk "go.temporal.io/sdk/client" sdklog "go.temporal.io/sdk/log" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" v2 "k8s.io/api/autoscaling/v2" "k8s.io/metrics/pkg/apis/external_metrics" ) @@ -32,16 +33,16 @@ type temporalScaler struct { } type temporalMetadata struct { - ActivationLagThreshold int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, default=0"` - Endpoint string `keda:"name=endpoint, order=triggerMetadata;resolvedEnv"` - Namespace string `keda:"name=namespace, order=triggerMetadata, default=default"` - TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, default=5"` - QueueName string `keda:"name=queueName, order=triggerMetadata"` - QueueTypes []string `keda:"name=queueTypes, order=triggerMetadata, optional"` - BuildIDs []string `keda:"name=buildIds, order=triggerMetadata, optional"` - AllActive bool `keda:"name=selectAllActive, order=triggerMetadata, default=true"` - Unversioned bool `keda:"name=selectUnversioned, order=triggerMetadata, default=true"` - ApiKey string `keda:"name=apiKey, order=authParams;triggerMetadata, optional"` + ActivationTargetQueueSize int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, default=0"` + Endpoint string `keda:"name=endpoint, order=triggerMetadata;resolvedEnv"` + Namespace string `keda:"name=namespace, order=triggerMetadata, default=default"` + TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, default=5"` + QueueName string `keda:"name=queueName, order=triggerMetadata"` + QueueTypes []string `keda:"name=queueTypes, order=triggerMetadata, optional"` + BuildIDs []string `keda:"name=buildIds, order=triggerMetadata, optional"` + AllActive bool `keda:"name=selectAllActive, order=triggerMetadata, default=true"` + Unversioned bool `keda:"name=selectUnversioned, order=triggerMetadata, default=true"` + APIKey string `keda:"name=apiKey, order=authParams;triggerMetadata, optional"` triggerIndex int } @@ -50,7 +51,7 @@ func (a *temporalMetadata) Validate() error { if a.TargetQueueSize <= 0 { return fmt.Errorf("targetQueueSize must be a positive number") } - if a.ActivationLagThreshold < 0 { + if a.ActivationTargetQueueSize < 0 { return fmt.Errorf("activationTargetQueueSize must be a positive number") } @@ -115,7 +116,7 @@ func (s *temporalScaler) GetMetricsAndActivity(ctx context.Context, metricName s metric := GenerateMetricInMili(metricName, float64(queueSize)) - return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.ActivationLagThreshold, nil + return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.ActivationTargetQueueSize, nil } func (s *temporalScaler) getQueueSize(ctx context.Context) (int64, error) { @@ -177,18 +178,40 @@ func getCombinedBacklogCount(description sdk.TaskQueueDescription) int64 { } func getTemporalClient(meta *temporalMetadata) (sdk.Client, error) { - return sdk.Dial(sdk.Options{ + options := sdk.Options{ HostPort: meta.Endpoint, Namespace: meta.Namespace, Logger: sdklog.NewStructuredLogger(slog.Default()), - ConnectionOptions: sdk.ConnectionOptions{ - DialOptions: []grpc.DialOption{ - grpc.WithConnectParams(grpc.ConnectParams{ - MinConnectTimeout: 5 * time.Second, - }), + } + + dialOptions := []grpc.DialOption{ + grpc.WithConnectParams(grpc.ConnectParams{ + MinConnectTimeout: 5 * time.Second, + }), + } + + if meta.APIKey != "" { + dialOptions = append(dialOptions, grpc.WithUnaryInterceptor( + func(ctx context.Context, method string, req any, reply any, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + return invoker( + metadata.AppendToOutgoingContext(ctx, "temporal-namespace", meta.Namespace), + method, + req, + reply, + cc, + opts..., + ) }, - }, - }) + )) + options.Credentials = sdk.NewAPIKeyStaticCredentials(meta.APIKey) + } + + options.ConnectionOptions = sdk.ConnectionOptions{ + DialOptions: dialOptions, + } + + return sdk.Dial(options) } func parseTemporalMetadata(config *scalersconfig.ScalerConfig, _ logr.Logger) (*temporalMetadata, error) { diff --git a/pkg/scalers/temporal_test.go b/pkg/scalers/temporal_test.go index 0b4cef2cbe3..e97c6c57cc6 100644 --- a/pkg/scalers/temporal_test.go +++ b/pkg/scalers/temporal_test.go @@ -99,13 +99,13 @@ func TestParseTemporalMetadata(t *testing.T) { "namespace": "default", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - QueueName: "", - TargetQueueSize: 5, - ActivationLagThreshold: 0, - AllActive: true, - Unversioned: true, + Endpoint: "test:7233", + Namespace: "default", + QueueName: "", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: true, + Unversioned: true, }, wantErr: true, }, @@ -116,13 +116,13 @@ func TestParseTemporalMetadata(t *testing.T) { "queueName": "testxx", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - QueueName: "testxx", - TargetQueueSize: 5, - ActivationLagThreshold: 0, - AllActive: true, - Unversioned: true, + Endpoint: "test:7233", + Namespace: "default", + QueueName: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: true, + Unversioned: true, }, wantErr: false, }, @@ -135,13 +135,13 @@ func TestParseTemporalMetadata(t *testing.T) { "activationTargetQueueSize": "12", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - QueueName: "testxx", - TargetQueueSize: 5, - ActivationLagThreshold: 12, - AllActive: true, - Unversioned: true, + Endpoint: "test:7233", + Namespace: "default", + QueueName: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 12, + AllActive: true, + Unversioned: true, }, wantErr: false, }, @@ -154,14 +154,14 @@ func TestParseTemporalMetadata(t *testing.T) { "apiKey": "test01", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - QueueName: "testxx", - TargetQueueSize: 5, - ActivationLagThreshold: 0, - AllActive: true, - Unversioned: true, - ApiKey: "test01", + Endpoint: "test:7233", + Namespace: "default", + QueueName: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: true, + Unversioned: true, + APIKey: "test01", }, authParams: map[string]string{ "apiKey": "test01", @@ -177,14 +177,14 @@ func TestParseTemporalMetadata(t *testing.T) { "queueTypes": "workflow,activity", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - QueueName: "testxx", - TargetQueueSize: 5, - ActivationLagThreshold: 0, - AllActive: true, - Unversioned: true, - QueueTypes: []string{"workflow", "activity"}, + Endpoint: "test:7233", + Namespace: "default", + QueueName: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: true, + Unversioned: true, + QueueTypes: []string{"workflow", "activity"}, }, wantErr: false, },