Skip to content

Commit

Permalink
Add notify() in all the reconcilers
Browse files Browse the repository at this point in the history
notify() is used to emit events for new artifact and failure recovery
scenarios. It's implemented in all the reconcilers.
Previously, when there used to be a failure due to any reason, on a
subsequent successful reconciliation, no notification was sent to
indicate that the failure has been resolved.
With notify(), the old version of the object is compared with the new
version of the object to determine if all, if any, of the failures have
been resolved and a notification is sent. The notification message is
the same that's sent in usual successful source reconciliation message
about stored artifact.

Signed-off-by: Sunny <darkowlzz@protonmail.com>
  • Loading branch information
darkowlzz committed Apr 7, 2022
1 parent 62604a2 commit f81a617
Show file tree
Hide file tree
Showing 10 changed files with 721 additions and 35 deletions.
47 changes: 42 additions & 5 deletions controllers/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ var bucketReadyCondition = summarize.Conditions{
},
}

// bucketFailConditions contains the conditions that represent a failure.
var bucketFailConditions = []string{
sourcev1.FetchFailedCondition,
sourcev1.StorageOperationFailedCondition,
}

// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete
Expand Down Expand Up @@ -307,10 +313,12 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
return
}

// reconcile iterates through the gitRepositoryReconcileFunc tasks for the
// reconcile iterates through the bucketReconcileFunc tasks for the
// object. It returns early on the first call that returns
// reconcile.ResultRequeue, or produces an error.
func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) {
oldObj := obj.DeepCopy()

if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
}
Expand Down Expand Up @@ -355,9 +363,42 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
// Prioritize requeue request in the result.
res = sreconcile.LowestRequeuingResult(res, recResult)
}

r.notify(oldObj, obj, index, res, resErr)

return res, resErr
}

// notify emits notification related to the reconciliation.
func (r *BucketReconciler) notify(oldObj, newObj *sourcev1.Bucket, index *etagIndex, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
annotations := map[string]string{
"revision": newObj.Status.Artifact.Revision,
"checksum": newObj.Status.Artifact.Checksum,
}

var oldChecksum string
if oldObj.GetArtifact() != nil {
oldChecksum = oldObj.GetArtifact().Checksum
}

message := fmt.Sprintf("stored artifact with %d fetched files from '%s' bucket", index.Len(), newObj.Spec.BucketName)

// Notify on new artifact and failure recovery.
if oldChecksum != newObj.GetArtifact().Checksum {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
"NewArtifact", message)
} else {
if sreconcile.FailureRecovery(oldObj, newObj, bucketFailConditions) {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
meta.SucceededReason, message)
}
}
}
}

// reconcileStorage ensures the current state of the storage matches the
// desired and previously observed state.
//
Expand Down Expand Up @@ -574,10 +615,6 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
r.annotatedEventLogf(ctx, obj, map[string]string{
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
}, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", index.Len(), obj.Spec.BucketName)

// Record it on the object
obj.Status.Artifact = artifact.DeepCopy()
Expand Down
115 changes: 115 additions & 0 deletions controllers/bucket_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -1163,3 +1164,117 @@ func TestBucketReconciler_statusConditions(t *testing.T) {
})
}
}

func TestBucketReconciler_notify(t *testing.T) {
tests := []struct {
name string
res sreconcile.Result
resErr error
oldObjBeforeFunc func(obj *sourcev1.Bucket)
newObjBeforeFunc func(obj *sourcev1.Bucket)
wantEvent string
}{
{
name: "error - no event",
res: sreconcile.ResultEmpty,
resErr: errors.New("some error"),
},
{
name: "new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
},
wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from",
},
{
name: "recovery from failure",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal Succeeded stored artifact with 2 fetched files from",
},
{
name: "recovery and new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from",
},
{
name: "no updates",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

recorder := record.NewFakeRecorder(32)

oldObj := &sourcev1.Bucket{
Spec: sourcev1.BucketSpec{
BucketName: "test-bucket",
},
}
newObj := oldObj.DeepCopy()

if tt.oldObjBeforeFunc != nil {
tt.oldObjBeforeFunc(oldObj)
}
if tt.newObjBeforeFunc != nil {
tt.newObjBeforeFunc(newObj)
}

reconciler := &BucketReconciler{
EventRecorder: recorder,
}
index := &etagIndex{
index: map[string]string{
"zzz": "qqq",
"bbb": "ddd",
},
}
reconciler.notify(oldObj, newObj, index, tt.res, tt.resErr)

select {
case x, ok := <-recorder.Events:
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
if tt.wantEvent != "" {
g.Expect(x).To(ContainSubstring(tt.wantEvent))
}
default:
if tt.wantEvent != "" {
t.Errorf("expected some event to be emitted")
}
}
})
}
}
46 changes: 42 additions & 4 deletions controllers/gitrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ var gitRepositoryReadyCondition = summarize.Conditions{
},
}

// gitRepositoryFailConditions contains the conditions that represent a failure.
var gitRepositoryFailConditions = []string{
sourcev1.FetchFailedCondition,
sourcev1.IncludeUnavailableCondition,
sourcev1.StorageOperationFailedCondition,
}

// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/finalizers,verbs=get;create;update;patch;delete
Expand Down Expand Up @@ -212,6 +219,8 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// object. It returns early on the first call that returns
// reconcile.ResultRequeue, or produces an error.
func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.GitRepository, reconcilers []gitRepositoryReconcileFunc) (sreconcile.Result, error) {
oldObj := obj.DeepCopy()

// Mark as reconciling if generation differs
if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
Expand Down Expand Up @@ -258,9 +267,42 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G
// Prioritize requeue request in the result.
res = sreconcile.LowestRequeuingResult(res, recResult)
}

r.notify(oldObj, obj, commit, res, resErr)

return res, resErr
}

// notify emits notification related to the reconciliation.
func (r *GitRepositoryReconciler) notify(oldObj, newObj *sourcev1.GitRepository, commit git.Commit, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
annotations := map[string]string{
"revision": newObj.Status.Artifact.Revision,
"checksum": newObj.Status.Artifact.Checksum,
}

var oldChecksum string
if oldObj.GetArtifact() != nil {
oldChecksum = oldObj.GetArtifact().Checksum
}

message := fmt.Sprintf("stored artifact for commit '%s'", commit.ShortMessage())

// Notify on new artifact and failure recovery.
if oldChecksum != newObj.GetArtifact().Checksum {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
"NewArtifact", message)
} else {
if sreconcile.FailureRecovery(oldObj, newObj, gitRepositoryFailConditions) {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
meta.SucceededReason, message)
}
}
}
}

// reconcileStorage ensures the current state of the storage matches the
// desired and previously observed state.
//
Expand Down Expand Up @@ -523,10 +565,6 @@ func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context,
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
r.AnnotatedEventf(obj, map[string]string{
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
}, corev1.EventTypeNormal, "NewArtifact", "stored artifact for commit '%s'", commit.ShortMessage())

// Record it on the object
obj.Status.Artifact = artifact.DeepCopy()
Expand Down
Loading

0 comments on commit f81a617

Please sign in to comment.