Skip to content

Commit

Permalink
feat: use status resources for controller reconciliation. Closes #1029 (
Browse files Browse the repository at this point in the history
#1030)

* feat: use status resources for controller reconciliation

Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy authored Jan 22, 2021
1 parent f9d2cc0 commit 74b9fc2
Show file tree
Hide file tree
Showing 23 changed files with 189 additions and 114 deletions.
49 changes: 33 additions & 16 deletions controllers/eventbus/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package main
import (
"flag"
"os"
"reflect"

"go.uber.org/zap"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

argoevents "github.com/argoproj/argo-events"
Expand Down Expand Up @@ -55,61 +58,75 @@ func main() {
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), opts)
if err != nil {
logger.Desugar().Fatal("unable to get a controller-runtime manager", zap.Error(err))
logger.Fatalw("unable to get a controller-runtime manager", zap.Error(err))
}

// Readyness probe
err = mgr.AddReadyzCheck("readiness", healthz.Ping)
if err != nil {
logger.Desugar().Fatal("unable add a readiness check", zap.Error(err))
logger.Fatalw("unable add a readiness check", zap.Error(err))
}

// Liveness probe
err = mgr.AddHealthzCheck("liveness", healthz.Ping)
if err != nil {
logger.Desugar().Fatal("unable add a health check", zap.Error(err))
logger.Fatalw("unable add a health check", zap.Error(err))
}

err = v1alpha1.AddToScheme(mgr.GetScheme())
if err != nil {
logger.Desugar().Fatal("unable to add scheme", zap.Error(err))
logger.Fatalw("unable to add scheme", zap.Error(err))
}

// A controller with DefaultControllerRateLimiter
c, err := controller.New(eventbus.ControllerName, mgr, controller.Options{
Reconciler: eventbus.NewReconciler(mgr.GetClient(), mgr.GetScheme(), natsStreamingImage, natsMetricsImage, logger),
})
if err != nil {
logger.Desugar().Fatal("unable to set up individual controller", zap.Error(err))
logger.Fatalw("unable to set up individual controller", zap.Error(err))
}

// Watch EventBus and enqueue EventBus object key
if err := c.Watch(&source.Kind{Type: &v1alpha1.EventBus{}}, &handler.EnqueueRequestForObject{}); err != nil {
logger.Desugar().Fatal("unable to watch EventBus", zap.Error(err))
if err := c.Watch(&source.Kind{Type: &v1alpha1.EventBus{}}, &handler.EnqueueRequestForObject{},
predicate.Or(
predicate.GenerationChangedPredicate{},
// TODO: change to use LabelChangedPredicate with controller-runtime v0.8
predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
if e.ObjectOld == nil {
return false
}
if e.ObjectNew == nil {
return false
}
return !reflect.DeepEqual(e.ObjectNew.GetLabels(), e.ObjectOld.GetLabels())
}},
)); err != nil {
logger.Fatalw("unable to watch EventBus", zap.Error(err))
}

// Watch ConfigMaps and enqueue owning EventBus key
if err := c.Watch(&source.Kind{Type: &corev1.ConfigMap{}}, &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.EventBus{}, IsController: true}); err != nil {
logger.Desugar().Fatal("unable to watch ConfigMaps", zap.Error(err))
if err := c.Watch(&source.Kind{Type: &corev1.ConfigMap{}}, &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.EventBus{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch ConfigMaps", zap.Error(err))
}

// Watch Secrets and enqueue owning EventBus key
if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.EventBus{}, IsController: true}); err != nil {
logger.Desugar().Fatal("unable to watch Secrets", zap.Error(err))
if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.EventBus{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Secrets", zap.Error(err))
}

// Watch StatefulSets and enqueue owning EventBus key
if err := c.Watch(&source.Kind{Type: &appv1.StatefulSet{}}, &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.EventBus{}, IsController: true}); err != nil {
logger.Desugar().Fatal("unable to watch StatefulSets", zap.Error(err))
if err := c.Watch(&source.Kind{Type: &appv1.StatefulSet{}}, &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.EventBus{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch StatefulSets", zap.Error(err))
}

// Watch Services and enqueue owning EventBus key
if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.EventBus{}, IsController: true}); err != nil {
logger.Desugar().Fatal("unable to watch Services", zap.Error(err))
if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.EventBus{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Services", zap.Error(err))
}

logger.Infow("starting eventbus controller", "version", argoevents.GetVersion())
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
logger.Desugar().Fatal("unable to run eventbus controller", zap.Error(err))
logger.Fatalw("unable to run eventbus controller", zap.Error(err))
}
}
39 changes: 14 additions & 25 deletions controllers/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/argoproj/argo-events/controllers/eventbus/installer"
Expand Down Expand Up @@ -44,20 +44,23 @@ func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
r.logger.Warnw("WARNING: eventbus not found", "request", req)
return reconcile.Result{}, nil
}
r.logger.Errorw("unable to get eventbus ctl", "request", req, "error", err)
r.logger.Errorw("unable to get eventbus ctl", zap.Any("request", req), zap.Error(err))
return ctrl.Result{}, err
}
log := r.logger.With("namespace", eventBus.Namespace).With("eventbus", eventBus.Name)
busCopy := eventBus.DeepCopy()
reconcileErr := r.reconcile(ctx, busCopy)
if reconcileErr != nil {
log.Desugar().Error("reconcile error", zap.Error(reconcileErr))
log.Errorw("reconcile error", zap.Error(reconcileErr))
}
if r.needsUpdate(eventBus, busCopy) {
if err := r.client.Update(ctx, busCopy); err != nil {
return reconcile.Result{}, err
}
}
if err := r.client.Status().Update(ctx, busCopy); err != nil {
return reconcile.Result{}, err
}
return ctrl.Result{}, reconcileErr
}

Expand All @@ -66,40 +69,26 @@ func (r *reconciler) reconcile(ctx context.Context, eventBus *v1alpha1.EventBus)
log := r.logger.With("namespace", eventBus.Namespace).With("eventbus", eventBus.Name)
if !eventBus.DeletionTimestamp.IsZero() {
log.Info("deleting eventbus")
// Finalizer logic should be added here.
err := installer.Uninstall(eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log)
if err != nil {
log.Errorw("failed to uninstall", "error", err)
return nil
if controllerutil.ContainsFinalizer(eventBus, finalizerName) {
// Finalizer logic should be added here.
if err := installer.Uninstall(eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log); err != nil {
log.Errorw("failed to uninstall", zap.Error(err))
return nil
}
controllerutil.RemoveFinalizer(eventBus, finalizerName)
}
r.removeFinalizer(eventBus)
return nil
}
r.addFinalizer(eventBus)
controllerutil.AddFinalizer(eventBus, finalizerName)

eventBus.Status.InitConditions()
return installer.Install(eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log)
}

func (r *reconciler) addFinalizer(s *v1alpha1.EventBus) {
finalizers := sets.NewString(s.Finalizers...)
finalizers.Insert(finalizerName)
s.Finalizers = finalizers.List()
}

func (r *reconciler) removeFinalizer(s *v1alpha1.EventBus) {
finalizers := sets.NewString(s.Finalizers...)
finalizers.Delete(finalizerName)
s.Finalizers = finalizers.List()
}

func (r *reconciler) needsUpdate(old, new *v1alpha1.EventBus) bool {
if old == nil {
return true
}
if !equality.Semantic.DeepEqual(old.Status, new.Status) {
return true
}
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) {
return true
}
Expand Down
7 changes: 4 additions & 3 deletions controllers/eventbus/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/argoproj/argo-events/common/logging"
"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
Expand Down Expand Up @@ -122,14 +123,14 @@ func TestNeedsUpdate(t *testing.T) {
logger: logging.NewArgoEventsLogger(),
}
assert.False(t, r.needsUpdate(nativeBus, testBus))
r.addFinalizer(testBus)
controllerutil.AddFinalizer(testBus, finalizerName)
assert.True(t, contains(testBus.Finalizers, finalizerName))
assert.True(t, r.needsUpdate(nativeBus, testBus))
r.removeFinalizer(testBus)
controllerutil.RemoveFinalizer(testBus, finalizerName)
assert.False(t, contains(testBus.Finalizers, finalizerName))
assert.False(t, r.needsUpdate(nativeBus, testBus))
testBus.Status.MarkConfigured()
assert.True(t, r.needsUpdate(nativeBus, testBus))
assert.False(t, r.needsUpdate(nativeBus, testBus))
})
}

Expand Down
43 changes: 30 additions & 13 deletions controllers/eventsource/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ package main
import (
"flag"
"os"
"reflect"

"github.com/pkg/errors"
"go.uber.org/zap"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

argoevents "github.com/argoproj/argo-events"
Expand Down Expand Up @@ -59,54 +62,68 @@ func main() {
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), opts)
if err != nil {
logger.Desugar().Fatal("unable to get a controller-runtime manager", zap.Error(err))
logger.Fatalw("unable to get a controller-runtime manager", zap.Error(err))
}
err = v1alpha1.AddToScheme(mgr.GetScheme())
if err != nil {
logger.Desugar().Fatal("unable to add EventSource scheme", zap.Error(err))
logger.Fatalw("unable to add EventSource scheme", zap.Error(err))
}

// Readyness probe
err = mgr.AddReadyzCheck("readiness", healthz.Ping)
if err != nil {
logger.Desugar().Fatal("unable add a readiness check", zap.Error(err))
logger.Fatalw("unable add a readiness check", zap.Error(err))
}

// Liveness probe
err = mgr.AddHealthzCheck("liveness", healthz.Ping)
if err != nil {
logger.Desugar().Fatal("unable add a health check", zap.Error(err))
logger.Fatalw("unable add a health check", zap.Error(err))
}

err = eventbusv1alpha1.AddToScheme(mgr.GetScheme())
if err != nil {
logger.Desugar().Fatal("unable to add EventBus scheme", zap.Error(err))
logger.Fatalw("unable to add EventBus scheme", zap.Error(err))
}
// A controller with DefaultControllerRateLimiter
c, err := controller.New(eventsource.ControllerName, mgr, controller.Options{
Reconciler: eventsource.NewReconciler(mgr.GetClient(), mgr.GetScheme(), eventSourceImage, logger),
})
if err != nil {
logger.Desugar().Fatal("unable to set up individual controller", zap.Error(err))
logger.Fatalw("unable to set up individual controller", zap.Error(err))
}

// Watch EventSource and enqueue EventSource object key
if err := c.Watch(&source.Kind{Type: &v1alpha1.EventSource{}}, &handler.EnqueueRequestForObject{}); err != nil {
logger.Desugar().Fatal("unable to watch EventSources", zap.Error(err))
if err := c.Watch(&source.Kind{Type: &v1alpha1.EventSource{}}, &handler.EnqueueRequestForObject{},
predicate.Or(
predicate.GenerationChangedPredicate{},
// TODO: change to use LabelChangedPredicate with controller-runtime v0.8
predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
if e.ObjectOld == nil {
return false
}
if e.ObjectNew == nil {
return false
}
return !reflect.DeepEqual(e.ObjectNew.GetLabels(), e.ObjectOld.GetLabels())
}},
)); err != nil {
logger.Fatalw("unable to watch EventSources", zap.Error(err))
}

// Watch Deployments and enqueue owning EventSource key
if err := c.Watch(&source.Kind{Type: &appv1.Deployment{}}, &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.EventSource{}, IsController: true}); err != nil {
logger.Desugar().Fatal("unable to watch Deployments", zap.Error(err))
if err := c.Watch(&source.Kind{Type: &appv1.Deployment{}}, &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.EventSource{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Deployments", zap.Error(err))
}

// Watch Services and enqueue owning EventSource key
if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.EventSource{}, IsController: true}); err != nil {
logger.Desugar().Fatal("unable to watch Services", zap.Error(err))
if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.EventSource{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Services", zap.Error(err))
}

logger.Infow("starting eventsource controller", "version", argoevents.GetVersion())
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
logger.Desugar().Fatal("unable to run eventsource controller", zap.Error(err))
logger.Fatalw("unable to run eventsource controller", zap.Error(err))
}
}
Loading

0 comments on commit 74b9fc2

Please sign in to comment.