Skip to content

Commit

Permalink
Refined Queue Sizing: Incorporating Activity Names in Evaluation
Browse files Browse the repository at this point in the history
Signed-off-by: Prajith P <prajith.palakkuda@cleartax.in>
  • Loading branch information
prajithp13 committed Aug 10, 2023
1 parent bca8a3e commit 673d9f1
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 9 deletions.
94 changes: 87 additions & 7 deletions pkg/scalers/temporal_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/go-logr/logr"
kedautil "github.com/kedacore/keda/v2/pkg/util"
tclfilter "go.temporal.io/api/filter/v1"
workflowservice "go.temporal.io/api/workflowservice/v1"
Expand All @@ -19,19 +22,27 @@ import (
const (
defaultTargetWorkflowLength = 5
defaultActivationTargetWorkflowLength = 0
temporalClientTimeOut = 30
)

type executionInfo struct {
workflowId string
runId string
}

type temporalWorkflowScaler struct {
metricType v2.MetricTargetType
metadata *temporalWorkflowMetadata
tcl sdk.Client
logger logr.Logger
}

type temporalWorkflowMetadata struct {
activationTargetWorkflowLength int64
endpoint string
namespace string
workflowName string
activities []string
scalerIndex int
targetQueueSize int64
metricName string
Expand All @@ -49,11 +60,13 @@ func NewTemporalWorkflowScaler(config *ScalerConfig) (Scaler, error) {
return nil, fmt.Errorf("failed to parse Temporal metadata: %w", err)
}

logger := InitializeLogger(config, "temporal_workflow_scaler")

c, err := sdk.Dial(sdk.Options{
HostPort: meta.endpoint,
ConnectionOptions: sdk.ConnectionOptions{
DialOptions: []grpc.DialOption{
grpc.WithTimeout(time.Duration(10) * time.Second),
grpc.WithTimeout(time.Duration(temporalClientTimeOut) * time.Second),
},
},
})
Expand All @@ -66,6 +79,7 @@ func NewTemporalWorkflowScaler(config *ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: meta,
tcl: c,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -107,9 +121,8 @@ func (s *temporalWorkflowScaler) GetMetricsAndActivity(ctx context.Context, metr
// getQueueSize returns the queue size of open workflows.
func (s *temporalWorkflowScaler) getQueueSize(ctx context.Context) (int64, error) {

var executionIds = make([]string, 0)
var executions []executionInfo
var nextPageToken []byte

for {
listOpenWorkflowExecutionsRequest := &workflowservice.ListOpenWorkflowExecutionsRequest{
Namespace: s.metadata.namespace,
Expand All @@ -126,20 +139,83 @@ func (s *temporalWorkflowScaler) getQueueSize(ctx context.Context) (int64, error
return 0, fmt.Errorf("failed to get workflows: %w", err)
}

for _, execution := range ws.Executions {
executionId := execution.Execution.WorkflowId + "__" + execution.Execution.RunId
executionIds = append(executionIds, executionId)
for _, exec := range ws.GetExecutions() {
execution := executionInfo{
workflowId: exec.Execution.GetWorkflowId(),
runId: exec.Execution.RunId,
}
executions = append(executions, execution)
}

if nextPageToken = ws.NextPageToken; len(nextPageToken) == 0 {
break
}
}

queueLength := int64(len(executionIds))
pendingCh := make(chan string, len(executions))
var wg sync.WaitGroup

for _, execInfo := range executions {
wg.Add(1)
go func(e executionInfo) {
defer wg.Done()

workflowId := e.workflowId
runId := e.runId

if !s.isActivityRunning(ctx, workflowId, runId) {
executionId := workflowId + "__" + runId
pendingCh <- executionId
}

}(execInfo)
}
wg.Wait()
close(pendingCh)

var queueLength int64
for range pendingCh {
queueLength++
}
return queueLength, nil
}

// isActivityRunning checks if there are running activities associated with a specific workflow execution.
func (s *temporalWorkflowScaler) isActivityRunning(ctx context.Context, workflowId, runId string) bool {
resp, err := s.tcl.DescribeWorkflowExecution(ctx, workflowId, runId)
if err != nil {
s.logger.Error(err, "error describing workflow execution", "workflowId", workflowId, "runId", runId)
return false
}

// If there is no activityName and there are running activities, return true.
if len(s.metadata.activities) == 0 && len(resp.GetPendingActivities()) > 0 {
return true
}

// Store the IDs of running activities. Make sure no duplicates incase of anything.
runningActivities := make(map[string]struct{})
for _, pendingActivity := range resp.GetPendingActivities() {
activityName := pendingActivity.ActivityType.GetName()
if s.hasMatchingActivityName(activityName) {
runningActivities[pendingActivity.ActivityId] = struct{}{}
}
}

// Return true if there are any running activities, otherwise false.
return len(runningActivities) > 0
}

// hasMatchingActivityName checks if the provided activity name matches any of the defined activity names in the metadata.
func (s *temporalWorkflowScaler) hasMatchingActivityName(activityName string) bool {
for _, activity := range s.metadata.activities {
if activityName == activity {
return true
}
}
return false
}

// parseTemporalMetadata parses the Temporal metadata from the ScalerConfig.
func parseTemporalMetadata(config *ScalerConfig) (*temporalWorkflowMetadata, error) {
meta := &temporalWorkflowMetadata{}
Expand All @@ -162,6 +238,10 @@ func parseTemporalMetadata(config *ScalerConfig) (*temporalWorkflowMetadata, err
}
meta.workflowName = config.TriggerMetadata["workflowName"]

if activities := config.TriggerMetadata["activityName"]; activities != "" {
meta.activities = strings.Split(activities, ",")
}

if size, ok := config.TriggerMetadata["targetQueueSize"]; ok {
queueSize, err := strconv.ParseInt(size, 10, 64)
if err != nil {
Expand Down
102 changes: 100 additions & 2 deletions pkg/scalers/temporal_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package scalers
import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

var (
temporalEndpoint = "localhost:7233"
temporalNamespace = "v2"
temporalWorkflowName = "SayHello"
activityName = "say_hello"
)

type parseTemporalMetadataTestData struct {
Expand All @@ -32,9 +35,9 @@ var testTemporalMetadata = []parseTemporalMetadataTestData{
// Missing endpoint, should fail
{map[string]string{"workflowName": temporalWorkflowName, "namespace": temporalNamespace}, true},
// All good.
{map[string]string{"endpoint": temporalEndpoint, "workflowName": temporalWorkflowName, "namespace": temporalNamespace}, false},
{map[string]string{"endpoint": temporalEndpoint, "activityName": activityName, "workflowName": temporalWorkflowName, "namespace": temporalNamespace}, false},
// All good + activationLagThreshold
{map[string]string{"endpoint": temporalEndpoint, "workflowName": temporalWorkflowName, "namespace": temporalNamespace, "activationTargetQueueSize": "10"}, false},
{map[string]string{"endpoint": temporalEndpoint, "activityName": activityName, "workflowName": temporalWorkflowName, "namespace": temporalNamespace, "activationTargetQueueSize": "10"}, false},
}

var temporalMetricIdentifiers = []temporalMetricIdentifier{
Expand Down Expand Up @@ -71,3 +74,98 @@ func TestTemporalGetMetricSpecForScaling(t *testing.T) {
}
}
}

func TestParseTemporalMetadata(t *testing.T) {
cases := []struct {
name string
metadata map[string]string
wantMeta *temporalWorkflowMetadata
wantErr bool
}{
{
name: "empty metadata",
wantMeta: nil,
wantErr: true,
},
{
name: "empty workflowName",
metadata: map[string]string{
"endpoint": "test:7233",
"namespace": "default",
"activityName": "test123",
},
wantMeta: nil,
wantErr: true,
},
{
name: "multiple activityName",
metadata: map[string]string{
"endpoint": "test:7233",
"namespace": "default",
"activityName": "test123,test",
"workflowName": "testxx",
},
wantMeta: &temporalWorkflowMetadata{
endpoint: "test:7233",
namespace: "default",
activities: []string{"test123", "test"},
workflowName: "testxx",
targetQueueSize: 5,
metricName: "s0-temporal-default-testxx",
},
wantErr: false,
},
{
name: "empty activityName",
metadata: map[string]string{
"endpoint": "test:7233",
"namespace": "default",
"workflowName": "testxx",
},
wantMeta: &temporalWorkflowMetadata{
endpoint: "test:7233",
namespace: "default",
activities: nil,
workflowName: "testxx",
targetQueueSize: 5,
metricName: "s0-temporal-default-testxx",
},
wantErr: false,
},
{
name: "activationTargetQueueSize should not be 0",
metadata: map[string]string{
"endpoint": "test:7233",
"namespace": "default",
"workflowName": "testxx",
"activationTargetQueueSize": "12",
},
wantMeta: &temporalWorkflowMetadata{
endpoint: "test:7233",
namespace: "default",
activities: nil,
workflowName: "testxx",
targetQueueSize: 5,
metricName: "s0-temporal-default-testxx",
activationTargetWorkflowLength: 12,
},
wantErr: false,
},
}

for _, testCase := range cases {
c := testCase
t.Run(c.name, func(t *testing.T) {
config := &ScalerConfig{
TriggerMetadata: c.metadata,
}
meta, err := parseTemporalMetadata(config)
if c.wantErr == true && err != nil {
t.Log("Expected error, got err")
} else {
assert.NoError(t, err)
}
assert.Equal(t, c.wantMeta, meta)
})
}
}
1 change: 1 addition & 0 deletions tests/scalers/temporal/temporal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ spec:
metadata:
namespace: default
workflowName: SayHello
activityName: say_hello
targetQueueSize: "2"
activationTargetQueueSize: "3"
endpoint: {{.TemporalDeploymentName}}.{{.TestNamespace}}.svc.cluster.local:7233
Expand Down

0 comments on commit 673d9f1

Please sign in to comment.