Skip to content

Commit

Permalink
Improve scale out event with active trigger's name (#5637)
Browse files Browse the repository at this point in the history
* Update

Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>

* Update

Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>

* Update CHANGELOG.md

Co-authored-by: Tom Kerkhove <kerkhove.tom@gmail.com>
Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>

* Fix

Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>

* Update pkg/scaling/executor/scale_scaledobjects.go

Co-authored-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>

---------

Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>
Co-authored-by: Tom Kerkhove <kerkhove.tom@gmail.com>
Co-authored-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
  • Loading branch information
3 people authored Apr 4, 2024
1 parent cc05dac commit 1bfb223
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- **General**: Add active trigger name in ScaledObject's scale out event ([#5577](https://github.com/kedacore/keda/issues/5577))
- **General**: Add command-line flag in Adapter to allow override of gRPC Authority Header ([#5449](https://github.com/kedacore/keda/issues/5449))
- **General**: Add GRPC Healthchecks ([#5590](https://github.com/kedacore/keda/issues/5590))
- **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375))
Expand Down
9 changes: 5 additions & 4 deletions pkg/mock/mock_scaling/mock_executor/mock_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ const (
// ScaleExecutor contains methods RequestJobScale and RequestScale
type ScaleExecutor interface {
RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64)
RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool)
RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool, options *ScaleExecutorOptions)
}

// ScaleExecutorOptions contains the optional parameters for the RequestScale method.
type ScaleExecutorOptions struct {
ActiveTriggers []string
}

type scaleExecutor struct {
Expand Down
9 changes: 5 additions & 4 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package executor
import (
"context"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
Expand All @@ -33,7 +34,7 @@ import (
kedastatus "github.com/kedacore/keda/v2/pkg/status"
)

func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool) {
func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool, options *ScaleExecutorOptions) {
logger := e.logger.WithValues("scaledobject.Name", scaledObject.Name,
"scaledObject.Namespace", scaledObject.Namespace,
"scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name)
Expand Down Expand Up @@ -134,7 +135,7 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
// replica count is equal to 0

// Scale the ScaleTarget up
e.scaleFromZeroOrIdle(ctx, logger, scaledObject, currentScale)
e.scaleFromZeroOrIdle(ctx, logger, scaledObject, currentScale, options.ActiveTriggers)
case isError:
// some triggers are active, but some responded with error

Expand Down Expand Up @@ -295,7 +296,7 @@ func (e *scaleExecutor) scaleToZeroOrIdle(ctx context.Context, logger logr.Logge
}
}

func (e *scaleExecutor) scaleFromZeroOrIdle(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) {
func (e *scaleExecutor) scaleFromZeroOrIdle(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale, activeTriggers []string) {
var replicas int32
if scaledObject.Spec.MinReplicaCount != nil && *scaledObject.Spec.MinReplicaCount > 0 {
replicas = *scaledObject.Spec.MinReplicaCount
Expand All @@ -309,7 +310,7 @@ func (e *scaleExecutor) scaleFromZeroOrIdle(ctx context.Context, logger logr.Log
logger.Info("Successfully updated ScaleTarget",
"Original Replicas Count", currentReplicas,
"New Replicas Count", replicas)
e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.KEDAScaleTargetActivated, "Scaled %s %s/%s from %d to %d", scaledObject.Status.ScaleTargetKind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas)
e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.KEDAScaleTargetActivated, "Scaled %s %s/%s from %d to %d, triggered by %s", scaledObject.Status.ScaleTargetKind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas, strings.Join(activeTriggers, ";"))

// Scale was successful. Update lastScaleTime and lastActiveTime on the scaledObject
if err := e.updateLastActiveTime(ctx, logger, scaledObject); err != nil {
Expand Down
75 changes: 68 additions & 7 deletions pkg/scaling/executor/scale_scaledobjects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestScaleToFallbackReplicasWhenNotActiveAndIsError(t *testing.T) {
client.EXPECT().Status().Times(2).Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, true)
scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, true, &ScaleExecutorOptions{})

assert.Equal(t, int32(5), scale.Spec.Replicas)
condition := scaledObject.Status.Conditions.GetFallbackCondition()
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestScaleToMinReplicasWhenNotActive(t *testing.T) {
client.EXPECT().Status().Return(statusWriter).Times(2)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, false)
scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, false, &ScaleExecutorOptions{})

assert.Equal(t, minReplicas, scale.Spec.Replicas)
condition := scaledObject.Status.Conditions.GetActiveCondition()
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestScaleToMinReplicasFromLowerInitialReplicaCount(t *testing.T) {
client.EXPECT().Status().Return(statusWriter).Times(2)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, false)
scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, false, &ScaleExecutorOptions{})

assert.Equal(t, minReplicas, scale.Spec.Replicas)
condition := scaledObject.Status.Conditions.GetActiveCondition()
Expand Down Expand Up @@ -269,7 +269,7 @@ func TestScaleFromMinReplicasWhenActive(t *testing.T) {
client.EXPECT().Status().Times(2).Return(statusWriter).Times(3)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(3)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false)
scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false, &ScaleExecutorOptions{})

assert.Equal(t, int32(1), scale.Spec.Replicas)
condition := scaledObject.Status.Conditions.GetActiveCondition()
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestScaleToIdleReplicasWhenNotActive(t *testing.T) {
client.EXPECT().Status().Return(statusWriter).Times(2)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, false)
scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, false, &ScaleExecutorOptions{})

assert.Equal(t, idleReplicas, scale.Spec.Replicas)
condition := scaledObject.Status.Conditions.GetActiveCondition()
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestScaleFromIdleToMinReplicasWhenActive(t *testing.T) {
client.EXPECT().Status().Times(2).Return(statusWriter).Times(3)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(3)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false)
scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false, &ScaleExecutorOptions{})

assert.Equal(t, minReplicas, scale.Spec.Replicas)
condition := scaledObject.Status.Conditions.GetActiveCondition()
Expand Down Expand Up @@ -455,9 +455,70 @@ func TestScaleToPausedReplicasCount(t *testing.T) {
client.EXPECT().Status().Return(statusWriter).Times(2)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false)
scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false, &ScaleExecutorOptions{})

assert.Equal(t, pausedReplicaCount, scale.Spec.Replicas)
condition := scaledObject.Status.Conditions.GetActiveCondition()
assert.Equal(t, false, condition.IsTrue())
}

func TestEventWitTriggerInfo(t *testing.T) {
ctrl := gomock.NewController(t)
client := mock_client.NewMockClient(ctrl)
recorder := record.NewFakeRecorder(1)
mockScaleClient := mock_scale.NewMockScalesGetter(ctrl)
mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl)
statusWriter := mock_client.NewMockStatusWriter(ctrl)

scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder)

replicaCount := int32(2)
idleReplicas := int32(0)
minReplicas := int32(5)

scaledObject := v1alpha1.ScaledObject{
ObjectMeta: v1.ObjectMeta{
Name: "name",
Namespace: "namespace",
},
Spec: v1alpha1.ScaledObjectSpec{
ScaleTargetRef: &v1alpha1.ScaleTarget{
Name: "name",
},
IdleReplicaCount: &idleReplicas,
MinReplicaCount: &minReplicas,
},
Status: v1alpha1.ScaledObjectStatus{
ScaleTargetGVKR: &v1alpha1.GroupVersionKindResource{
Group: "apps",
Kind: "Deployment",
},
},
}

// scaledObject.Status.Conditions = *v1alpha1.GetInitializedConditions()

client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).SetArg(2, appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Replicas: &replicaCount,
},
})

scale := &autoscalingv1.Scale{
Spec: autoscalingv1.ScaleSpec{
Replicas: replicaCount,
},
}

mockScaleClient.EXPECT().Scales(gomock.Any()).Return(mockScaleInterface).Times(2)
mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil)
mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any())

client.EXPECT().Status().Return(statusWriter).AnyTimes()
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false, &ScaleExecutorOptions{ActiveTriggers: []string{"testTrigger"}})

eventstring := <-recorder.Events
assert.Equal(t, "Normal KEDAScaleTargetActivated Scaled namespace/name from 2 to 5, triggered by testTrigger", eventstring)
}
Loading

0 comments on commit 1bfb223

Please sign in to comment.