Skip to content

Commit

Permalink
Remove controller.Events
Browse files Browse the repository at this point in the history
Since events.Recorder implements the k8s EventRecorder interface, there
is no need of the controller.Events any more. The reconcilers can embed
the k8s EventRecorder and use a events.Recorder recorder.

Update the events.Recorder to embed a k8s event recorder to pass events
to k8s along with an external recorder.

The trace events are passed to k8s recorder as a normal event since it
only accepts normal and warning event types.

Update tests to use testenv with suite_test.

Signed-off-by: Sunny <darkowlzz@protonmail.com>
  • Loading branch information
darkowlzz committed Oct 18, 2021
1 parent b732fd5 commit a26ceb2
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 116 deletions.
97 changes: 0 additions & 97 deletions runtime/controller/events.go

This file was deleted.

41 changes: 34 additions & 7 deletions runtime/events/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,27 @@ import (
"k8s.io/apimachinery/pkg/runtime"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
)

// Recorder posts events to the webhook address.
// Recorder posts events to the Kubernetes API and any other event recorder webhook address, like the GitOps Toolkit
// notification-controller.
//
// Use it by embedding EventRecorder in reconciler struct:
//
// import (
// ...
// kuberecorder "k8s.io/client-go/tools/record"
// ...
// )
//
// type MyTypeReconciler {
// client.Client
// // ... etc.
// kuberecorder.EventRecorder
// }
//
// Use NewRecorder to create a working Recorder.
type Recorder struct {
// URL address of the events endpoint.
Webhook string
Expand All @@ -45,7 +63,10 @@ type Recorder struct {
// Retryable HTTP client.
Client *retryablehttp.Client

// Scheme of the recorded objects.
// EventRecorder is the Kubernetes event recorder.
EventRecorder kuberecorder.EventRecorder

// Scheme to look up the recorded objects.
Scheme *runtime.Scheme

// Log is the recorder logger.
Expand All @@ -54,9 +75,10 @@ type Recorder struct {

var _ kuberecorder.EventRecorder = &Recorder{}

// NewRecorder creates an event Recorder with default settings.
// The recorder performs automatic retries for connection errors and 500-range response codes.
func NewRecorder(scheme *runtime.Scheme, log logr.Logger, webhook, reportingController string) (*Recorder, error) {
// NewRecorder creates an event Recorder with a Kubernetes event recorder and an external event recorder based on the
// given webhook. The recorder performs automatic retries for connection errors and 500-range response codes from the
// external recorder.
func NewRecorder(mgr ctrl.Manager, log logr.Logger, webhook, reportingController string) (*Recorder, error) {
if _, err := url.Parse(webhook); err != nil {
return nil, err
}
Expand All @@ -67,10 +89,11 @@ func NewRecorder(scheme *runtime.Scheme, log logr.Logger, webhook, reportingCont
httpClient.Logger = nil

return &Recorder{
Scheme: scheme,
Scheme: mgr.GetScheme(),
Webhook: webhook,
ReportingController: reportingController,
Client: httpClient,
EventRecorder: mgr.GetEventRecorderFor(reportingController),
Log: log,
}, nil
}
Expand Down Expand Up @@ -101,11 +124,15 @@ func (r *Recorder) AnnotatedEventf(
severity := eventTypeToSeverity(eventtype)

// Do not send trace events to notification controller,
// traces are persisted as Kubernetes events only.
// traces are persisted as Kubernetes events only as normal events.
if severity == EventSeverityTrace {
r.EventRecorder.AnnotatedEventf(object, annotations, corev1.EventTypeNormal, reason, messageFmt, args...)
return
}

// Forward the event to the Kubernetes recorder.
r.EventRecorder.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)

if r.Client == nil {
err := fmt.Errorf("retryable HTTP client has not been initialized")
r.Log.Error(err, "unable to record event")
Expand Down
16 changes: 4 additions & 12 deletions runtime/events/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand All @@ -50,9 +48,7 @@ func TestEventRecorder_AnnotatedEventf(t *testing.T) {
}))
defer ts.Close()

scheme := runtime.NewScheme()
require.NoError(t, clientgoscheme.AddToScheme(scheme))
eventRecorder, err := NewRecorder(scheme, ctrl.Log, ts.URL, "test-controller")
eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller")
require.NoError(t, err)

obj := &corev1.ConfigMap{}
Expand Down Expand Up @@ -86,9 +82,7 @@ func TestEventRecorder_AnnotatedEventf_Retry(t *testing.T) {
}))
defer ts.Close()

scheme := runtime.NewScheme()
require.NoError(t, clientgoscheme.AddToScheme(scheme))
eventRecorder, err := NewRecorder(scheme, ctrl.Log, ts.URL, "test-controller")
eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller")
require.NoError(t, err)
eventRecorder.Client.RetryMax = 2

Expand All @@ -115,16 +109,14 @@ func TestEventRecorder_AnnotatedEventf_RateLimited(t *testing.T) {
}))
defer ts.Close()

scheme := runtime.NewScheme()
require.NoError(t, clientgoscheme.AddToScheme(scheme))
eventRecorder, err := NewRecorder(scheme, ctrl.Log, ts.URL, "test-controller")
eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller")
require.NoError(t, err)
eventRecorder.Client.RetryMax = 2

obj := &corev1.ConfigMap{}
obj.Namespace = "gitops-system"
obj.Name = "webapp"

eventRecorder.AnnotatedEventf(obj, nil, "sync", "sync %s", obj.Name)
eventRecorder.AnnotatedEventf(obj, nil, corev1.EventTypeNormal, "sync", "sync %s", obj.Name)
require.Equal(t, 1, requestCount)
}
76 changes: 76 additions & 0 deletions runtime/events/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package events

import (
"fmt"
"os"
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/fluxcd/pkg/runtime/testenv"
)

var (
env *testenv.Environment
ctx = ctrl.SetupSignalHandler()
)

func TestMain(m *testing.M) {
scheme := runtime.NewScheme()
utilruntime.Must(corev1.AddToScheme(scheme))

env = testenv.New(
testenv.WithScheme(scheme),
)

go func() {
fmt.Println("Starting the test environment")
if err := env.Start(ctx); err != nil {
panic(fmt.Sprintf("Failed to start the test environment manager: %v", err))
}
}()
<-env.Manager.Elected()

// Create test namespace.
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "gitops-system",
},
}
if err := env.Client.Create(ctx, ns); err != nil {
panic(fmt.Sprintf("Failed to create gitops-system namespace: %v", err))
}

code := m.Run()

if err := env.Client.Delete(ctx, ns); err != nil {
panic(fmt.Sprintf("Failed to delete gitops-system namespace: %v", err))
}

fmt.Println("Stopping the test environment")
if err := env.Stop(); err != nil {
panic(fmt.Sprintf("Failed to stop the test environment: %v", err))
}

os.Exit(code)
}

0 comments on commit a26ceb2

Please sign in to comment.