Skip to content

Commit

Permalink
Provide CloudEvents around the management of ScaledJobs resources (#6072
Browse files Browse the repository at this point in the history
)

* Update

Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>

* Update ChangeLog

Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>

* Update

Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>

---------

Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>
  • Loading branch information
SpiritZhou committed Aug 19, 2024
1 parent fc002f0 commit 36cc226
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### New

- **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533))
- **CloudEventSource**: Provide CloudEvents around the management of ScaledJobs resources ([#3523](https://github.com/kedacore/keda/issues/3523))

#### Experimental

Expand Down
18 changes: 15 additions & 3 deletions apis/eventing/v1alpha1/cloudevent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package v1alpha1

// CloudEventType contains the list of cloudevent types
// +kubebuilder:validation:Enum=keda.scaledobject.ready.v1;keda.scaledobject.failed.v1
// +kubebuilder:validation:Enum=keda.scaledobject.ready.v1;keda.scaledobject.failed.v1;keda.scaledobject.removed.v1;keda.scaledjob.ready.v1;keda.scaledjob.failed.v1;keda.scaledjob.removed.v1
type CloudEventType string

const (
Expand All @@ -27,8 +27,20 @@ const (
// ScaledObjectFailedType is for event when creating ScaledObject failed
ScaledObjectFailedType CloudEventType = "keda.scaledobject.failed.v1"

// ScaledObjectFailedType is for event when removed ScaledObject
// ScaledObjectRemovedType is for event when removed ScaledObject
ScaledObjectRemovedType CloudEventType = "keda.scaledobject.removed.v1"

// ScaledJobReadyType is for event when a new ScaledJob is ready
ScaledJobReadyType CloudEventType = "keda.scaledjob.ready.v1"

// ScaledJobFailedType is for event when creating ScaledJob failed
ScaledJobFailedType CloudEventType = "keda.scaledjob.failed.v1"

// ScaledJobRemovedType is for event when removed ScaledJob
ScaledJobRemovedType CloudEventType = "keda.scaledjob.removed.v1"
)

var AllEventTypes = []CloudEventType{ScaledObjectFailedType, ScaledObjectReadyType}
var AllEventTypes = []CloudEventType{
ScaledObjectFailedType, ScaledObjectReadyType, ScaledObjectRemovedType,
ScaledJobFailedType, ScaledJobReadyType, ScaledJobRemovedType,
}
2 changes: 1 addition & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func main() {
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
EventEmitter: eventEmitter,
SecretsLister: secretInformer.Lister(),
SecretsSynced: secretInformer.Informer().HasSynced,
}).SetupWithManager(mgr, controller.Options{
Expand Down
8 changes: 8 additions & 0 deletions config/crd/bases/eventing.keda.sh_cloudeventsources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ spec:
enum:
- keda.scaledobject.ready.v1
- keda.scaledobject.failed.v1
- keda.scaledobject.removed.v1
- keda.scaledjob.ready.v1
- keda.scaledjob.failed.v1
- keda.scaledjob.removed.v1
type: string
type: array
includedEventTypes:
Expand All @@ -97,6 +101,10 @@ spec:
enum:
- keda.scaledobject.ready.v1
- keda.scaledobject.failed.v1
- keda.scaledobject.removed.v1
- keda.scaledjob.ready.v1
- keda.scaledjob.failed.v1
- keda.scaledjob.removed.v1
type: string
type: array
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ spec:
enum:
- keda.scaledobject.ready.v1
- keda.scaledobject.failed.v1
- keda.scaledobject.removed.v1
- keda.scaledjob.ready.v1
- keda.scaledjob.failed.v1
- keda.scaledjob.removed.v1
type: string
type: array
includedEventTypes:
Expand All @@ -95,6 +99,10 @@ spec:
enum:
- keda.scaledobject.ready.v1
- keda.scaledobject.failed.v1
- keda.scaledobject.removed.v1
- keda.scaledjob.ready.v1
- keda.scaledjob.failed.v1
- keda.scaledjob.removed.v1
type: string
type: array
type: object
Expand Down
16 changes: 9 additions & 7 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,18 @@ import (
"k8s.io/apimachinery/pkg/runtime"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1"
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/common/message"
"github.com/kedacore/keda/v2/pkg/eventemitter"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/scaling"
Expand All @@ -56,7 +58,7 @@ type ScaledJobReconciler struct {
client.Client
Scheme *runtime.Scheme
GlobalHTTPTimeout time.Duration
Recorder record.EventRecorder
EventEmitter eventemitter.EventHandler

scaledJobGenerations *sync.Map
scaleHandler scaling.ScaleHandler
Expand Down Expand Up @@ -133,7 +135,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if !scaledJob.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, conditions); err != nil {
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobUpdateFailed, err.Error())
r.EventEmitter.Emit(scaledJob, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledJobFailedType, eventreason.ScaledJobUpdateFailed, err.Error())
return ctrl.Result{}, err
}
}
Expand All @@ -143,7 +145,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
errMsg := "ScaledJob.spec.jobTargetRef not found"
err := fmt.Errorf(errMsg)
reqLogger.Error(err, errMsg)
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, errMsg)
r.EventEmitter.Emit(scaledJob, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledJobFailedType, eventreason.ScaledJobCheckFailed, errMsg)
return ctrl.Result{}, err
}
conditions := scaledJob.Status.Conditions.DeepCopy()
Expand All @@ -152,18 +154,18 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg)
r.EventEmitter.Emit(scaledJob, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledJobFailedType, eventreason.ScaledJobCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling")
r.EventEmitter.Emit(scaledJob, req.NamespacedName.Namespace, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectReadyType, eventreason.ScaledJobReady, message.ScaledJobReadyMsg)
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
}

if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, &conditions); err != nil {
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobUpdateFailed, err.Error())
r.EventEmitter.Emit(scaledJob, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledJobFailedType, eventreason.ScaledJobUpdateFailed, err.Error())
return ctrl.Result{}, err
}

Expand Down
4 changes: 3 additions & 1 deletion controllers/keda/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"

eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1"
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/common/message"
"github.com/kedacore/keda/v2/pkg/eventreason"
)

Expand Down Expand Up @@ -57,7 +59,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr
}

logger.Info("Successfully finalized ScaledJob")
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobDeleted, "ScaledJob was deleted")
r.EventEmitter.Emit(scaledJob, namespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledJobRemovedType, eventreason.ScaledJobDeleted, message.ScaledJobRemoved)
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred())

err = (&ScaledJobReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("keda-operator"),
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
EventEmitter: eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default", nil),
}).SetupWithManager(k8sManager, controller.Options{})
Expect(err).ToNot(HaveOccurred())

Expand Down
4 changes: 4 additions & 0 deletions pkg/common/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ const (
ScaleTargetNoSubresourceMsg = "Target resource doesn't expose /scale subresource"

ScaledObjectRemoved = "ScaledObject was deleted"

ScaledJobReadyMsg = "ScaledJob is ready for scaling"

ScaledJobRemoved = "ScaledJob was deleted"
)
102 changes: 102 additions & 0 deletions tests/internals/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ var (
scaledObjectName = fmt.Sprintf("%s-so", testName)
scaledObjectTargetNotFoundName = fmt.Sprintf("%s-so-target-error", testName)
scaledObjectTargetNoSubresourceName = fmt.Sprintf("%s-so-target-no-subresource", testName)

scaledJobName = fmt.Sprintf("%s-sj", testName)
scaledJobErrName = fmt.Sprintf("%s-sj-target-error", testName)
)

type templateData struct {
Expand All @@ -38,6 +41,8 @@ type templateData struct {
DeploymentName string
MonitoredDeploymentName string
DaemonsetName string
ScaledJobName string
ScaledJobErrName string
}

const (
Expand Down Expand Up @@ -155,6 +160,69 @@ spec:
podSelector: 'app={{.DeploymentName}}'
value: '1'
`

scaledJobTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{.ScaledJobName}}
namespace: {{.TestNamespace}}
spec:
jobTargetRef:
template:
spec:
containers:
- name: external-executor
image: busybox
command:
- sleep
- "30"
imagePullPolicy: IfNotPresent
restartPolicy: Never
backoffLimit: 1
pollingInterval: 5
minReplicaCount: 0
maxReplicaCount: 8
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 0
triggers:
- type: kubernetes-workload
metadata:
podSelector: 'app={{.MonitoredDeploymentName}}'
value: '1'
`

scaledJobErrTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{.ScaledJobErrName}}
namespace: {{.TestNamespace}}
spec:
jobTargetRef:
template:
spec:
containers:
- name: external-executor
image: busybox
command:
- sleep
- "30"
imagePullPolicy: IfNotPresent
restartPolicy: Never
backoffLimit: 1
pollingInterval: 5
minReplicaCount: 0
maxReplicaCount: 8
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 0
triggers:
- type: cpu
name: x
metadata:
typex: Utilization
value: "50"
`
)

func TestEvents(t *testing.T) {
Expand All @@ -172,6 +240,8 @@ func TestEvents(t *testing.T) {
testTargetNotFoundErr(t, kc, data)
testTargetNotSupportEventErr(t, kc, data)

testScaledJobNormalEvent(t, kc, data)
testScaledJobTargetNotSupportEventErr(t, kc, data)
// cleanup
DeleteKubernetesResources(t, testNamespace, data, templates)
}
Expand All @@ -185,6 +255,8 @@ func getTemplateData() (templateData, []Template) {
ScaledObjectName: scaledObjectName,
ScaledObjectTargetNotFoundName: scaledObjectTargetNotFoundName,
ScaledObjectTargetNoSubresourceName: scaledObjectTargetNoSubresourceName,
ScaledJobName: scaledJobName,
ScaledJobErrName: scaledJobErrName,
}, []Template{}
}

Expand All @@ -210,6 +282,10 @@ func testNormalEvent(t *testing.T, kc *kubernetes.Clientset, data templateData)
checkingEvent(t, scaledObjectName, 0, eventreason.KEDAScalersStarted, fmt.Sprintf(message.ScalerIsBuiltMsg, "kubernetes-workload"))
checkingEvent(t, scaledObjectName, 1, eventreason.KEDAScalersStarted, message.ScalerStartMsg)
checkingEvent(t, scaledObjectName, 2, eventreason.ScaledObjectReady, message.ScalerReadyMsg)

KubectlDeleteWithTemplate(t, data, "deploymentTemplate", deploymentTemplate)
KubectlDeleteWithTemplate(t, data, "monitoredDeploymentName", monitoredDeploymentTemplate)
KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
}

func testTargetNotFoundErr(t *testing.T, _ *kubernetes.Clientset, data templateData) {
Expand All @@ -228,3 +304,29 @@ func testTargetNotSupportEventErr(t *testing.T, _ *kubernetes.Clientset, data te
checkingEvent(t, scaledObjectTargetNoSubresourceName, -2, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNoSubresourceMsg)
checkingEvent(t, scaledObjectTargetNoSubresourceName, -1, eventreason.ScaledObjectCheckFailed, message.ScaleTargetErrMsg)
}

func testScaledJobNormalEvent(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing ScaledJob normal event ---")

KubectlApplyWithTemplate(t, data, "deploymentTemplate", deploymentTemplate)
KubectlApplyWithTemplate(t, data, "monitoredDeploymentName", monitoredDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate)

KubernetesScaleDeployment(t, kc, monitoredDeploymentName, 2, testNamespace)
assert.True(t, WaitForJobCount(t, kc, testNamespace, 2, 60, 1),
"replica count should be 2 after 1 minute")
checkingEvent(t, scaledJobName, 0, eventreason.KEDAScalersStarted, fmt.Sprintf(message.ScalerIsBuiltMsg, "kubernetes-workload"))
checkingEvent(t, scaledJobName, 1, eventreason.KEDAScalersStarted, message.ScalerStartMsg)
checkingEvent(t, scaledJobName, 2, eventreason.ScaledJobReady, message.ScaledJobReadyMsg)

KubectlDeleteWithTemplate(t, data, "deploymentTemplate", deploymentTemplate)
KubectlDeleteWithTemplate(t, data, "monitoredDeploymentName", monitoredDeploymentTemplate)
KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate)
}

func testScaledJobTargetNotSupportEventErr(t *testing.T, _ *kubernetes.Clientset, data templateData) {
t.Log("--- testing target not support error event ---")

KubectlApplyWithTemplate(t, data, "scaledJobErrTemplate", scaledJobErrTemplate)
checkingEvent(t, scaledJobErrName, -1, eventreason.ScaledJobCheckFailed, "Failed to ensure ScaledJob is correctly created")
}

0 comments on commit 36cc226

Please sign in to comment.