Skip to content

Commit

Permalink
gitrepo: Use summarize helper for success events
Browse files Browse the repository at this point in the history
When source fetch failure takes happens, a warning event is emitted but
when resolved, there's no resolved event emitted.

This change introduces a new ResultProcessor, NotifySuccess, for
SummarizeAndPatch helper which consolidates all the messages to be
emitted as a single event. It checks for any recovery from failure
condition and adds that to the success event message.

The gitrepo reconciler is modified to use the use NotifySuccess
ResultProcessor. A change in artifact is checked in reconcile() and a
Notification object is returned, populated with the success information.

When the Notification is empty/zero, and no failure recovery, no event
is emitted. Failure recovery without any Notification also results in an
event with only the recovery information.

Signed-off-by: Sunny <darkowlzz@protonmail.com>
  • Loading branch information
darkowlzz committed Mar 8, 2022
1 parent 2410766 commit b18a7d2
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 13 deletions.
39 changes: 30 additions & 9 deletions controllers/gitrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
}

oldObj := obj.DeepCopy()

// Initialize the patch helper with the current version of the object.
patchHelper, err := patch.NewHelper(obj, r.Client)
if err != nil {
Expand All @@ -152,6 +154,8 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques

// recResult stores the abstracted reconcile result.
var recResult sreconcile.Result
// successEvent stores the notification event to be emitted on success.
var successEvent summarize.Notification

// Always attempt to patch the object and status after each reconciliation
// NOTE: The final runtime result and error are set in this block.
Expand All @@ -165,6 +169,10 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
summarize.WithProcessors(
summarize.RecordContextualError,
summarize.RecordReconcileReq,
summarize.NotifySuccess(oldObj, successEvent, []string{
sourcev1.FetchFailedCondition,
sourcev1.IncludeUnavailableCondition,
}),
),
summarize.WithResultBuilder(sreconcile.AlwaysRequeueResultBuilder{RequeueAfter: obj.GetInterval().Duration}),
summarize.WithPatchFieldOwner(r.ControllerName),
Expand Down Expand Up @@ -197,25 +205,32 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
r.reconcileInclude,
r.reconcileArtifact,
}
recResult, retErr = r.reconcile(ctx, obj, reconcilers)
successEvent, recResult, retErr = r.reconcile(ctx, obj, reconcilers)
return
}

// reconcile steps iterates through the actual reconciliation tasks for objec,
// it returns early on the first step that returns ResultRequeue or produces an
// error.
func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.GitRepository, reconcilers []gitRepoReconcilerFunc) (sreconcile.Result, error) {
func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.GitRepository, reconcilers []gitRepoReconcilerFunc) (summarize.Notification, sreconcile.Result, error) {
if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
}

var successEvent summarize.Notification
// Record old checksum to help determine a change after reconciling.
var oldChecksum string
if obj.GetArtifact() != nil {
oldChecksum = obj.GetArtifact().Checksum
}

var commit git.Commit
var includes artifactSet

// Create temp dir for Git clone
tmpDir, err := util.TempDirForObj("", obj)
if err != nil {
return sreconcile.ResultEmpty, &serror.Event{
return successEvent, sreconcile.ResultEmpty, &serror.Event{
Err: fmt.Errorf("failed to create temporary directory: %w", err),
Reason: sourcev1.StorageOperationFailedReason,
}
Expand All @@ -229,7 +244,7 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G
recResult, err := rec(ctx, obj, &commit, &includes, tmpDir)
// Exit immediately on ResultRequeue.
if recResult == sreconcile.ResultRequeue {
return sreconcile.ResultRequeue, nil
return successEvent, sreconcile.ResultRequeue, nil
}
// If an error is received, prioritize the returned results because an
// error also means immediate requeue.
Expand All @@ -241,7 +256,16 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G
// Prioritize requeue request in the result.
res = sreconcile.LowestRequeuingResult(res, recResult)
}
return res, resErr
// Construct success event on successful reconciliation with new artifact.
if resErr == nil && res == sreconcile.ResultSuccess && oldChecksum != obj.GetArtifact().Checksum {
successEvent.Reason = "NewArtifact"
successEvent.Message = fmt.Sprintf("stored artifact for commit '%s'", commit.ShortMessage())
successEvent.Annotations = map[string]string{
"revision": obj.Status.Artifact.Revision,
"checksum": obj.Status.Artifact.Checksum,
}
}
return successEvent, res, resErr
}

// reconcileStorage ensures the current state of the storage matches the desired and previously observed state.
Expand Down Expand Up @@ -373,6 +397,7 @@ func (r *GitRepositoryReconciler) reconcileSource(ctx context.Context,
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
conditions.MarkReconciling(obj, "NewRevision", message)
}

return sreconcile.ResultSuccess, nil
}

Expand Down Expand Up @@ -456,10 +481,6 @@ func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context,
Reason: sourcev1.StorageOperationFailedReason,
}
}
r.AnnotatedEventf(obj, map[string]string{
"revision": artifact.Revision,
"checksum": artifact.Checksum,
}, corev1.EventTypeNormal, "NewArtifact", "stored artifact for commit '%s'", commit.ShortMessage())

// Record it on the object
obj.Status.Artifact = artifact.DeepCopy()
Expand Down
44 changes: 40 additions & 4 deletions internal/reconcile/summarize/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package summarize

import (
"context"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
serror "github.com/fluxcd/source-controller/internal/error"
"github.com/fluxcd/source-controller/internal/object"
"github.com/fluxcd/source-controller/internal/reconcile"
Expand All @@ -33,15 +35,15 @@ import (
// ResultProcessor processes the results of reconciliation (the object, result
// and error). Any errors during processing need not result in the
// reconciliation failure. The errors can be recorded as logs and events.
type ResultProcessor func(context.Context, kuberecorder.EventRecorder, client.Object, reconcile.Result, error)
type ResultProcessor func(context.Context, kuberecorder.EventRecorder, conditions.Setter, reconcile.Result, error)

// RecordContextualError is a ResultProcessor that records the contextual errors
// based on their types.
// An event is recorded for the errors that are returned to the runtime. The
// runtime handles the logging of the error.
// An event is recorded and an error is logged for errors that are known to be
// swallowed, not returned to the runtime.
func RecordContextualError(ctx context.Context, recorder kuberecorder.EventRecorder, obj client.Object, _ reconcile.Result, err error) {
func RecordContextualError(ctx context.Context, recorder kuberecorder.EventRecorder, obj conditions.Setter, _ reconcile.Result, err error) {
switch e := err.(type) {
case *serror.Event:
recorder.Eventf(obj, corev1.EventTypeWarning, e.Reason, e.Error())
Expand All @@ -59,8 +61,42 @@ func RecordContextualError(ctx context.Context, recorder kuberecorder.EventRecor
// RecordReconcileReq is a ResultProcessor that checks the reconcile
// annotation value and sets it in the object status as
// status.lastHandledReconcileAt.
func RecordReconcileReq(ctx context.Context, recorder kuberecorder.EventRecorder, obj client.Object, _ reconcile.Result, _ error) {
func RecordReconcileReq(ctx context.Context, recorder kuberecorder.EventRecorder, obj conditions.Setter, _ reconcile.Result, _ error) {
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
object.SetStatusLastHandledReconcileAt(obj, v)
}
}

// NotifySuccess takes an old object, a Notification and fail conditions to
// return a ResultProcessor that emits an event on successful result with any
// recovery from previous fail condition.
func NotifySuccess(oldObj conditions.Setter, n Notification, failConditions []string) ResultProcessor {
return func(ctx context.Context, recorder kuberecorder.EventRecorder, obj conditions.Setter, result reconcile.Result, err error) {
if err == nil && result == reconcile.ResultSuccess {
var annotations map[string]string
reason := meta.SucceededReason
messages := []string{}

// Check the old object status conditions to determine if there was
// a recovery from some failure.
for _, failCondition := range failConditions {
oldFailedCondition := conditions.Get(oldObj, failCondition)
if oldFailedCondition != nil && conditions.Get(obj, failCondition) == nil {
messages = append(messages, fmt.Sprintf("resolved '%s'", oldFailedCondition.Reason))
}
}

// Populate event metadata from the notification.
if !n.IsZero() {
annotations = n.Annotations
reason = n.Reason
messages = append(messages, n.Message)
}

// No event if there's no message.
if len(messages) > 0 {
recorder.AnnotatedEventf(obj, annotations, corev1.EventTypeNormal, reason, strings.Join(messages, ", "))
}
}
}
}
146 changes: 146 additions & 0 deletions internal/reconcile/summarize/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package summarize

import (
"context"
"fmt"
"testing"

. "github.com/onsi/gomega"
Expand All @@ -26,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/object"
"github.com/fluxcd/source-controller/internal/reconcile"
Expand Down Expand Up @@ -89,3 +91,147 @@ func TestRecordReconcileReq(t *testing.T) {
})
}
}

func TestNotifySuccess(t *testing.T) {
tests := []struct {
name string
oldObjBeforeFunc func(obj conditions.Setter)
newObjBeforeFunc func(obj conditions.Setter)
notification Notification
failConditions []string
result reconcile.Result
resultErr error
wantEvent string
}{
{
name: "fetch failed recovery",
oldObjBeforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail msg foo")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "something failed")
},
newObjBeforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "artifact ready")
},
failConditions: []string{sourcev1.FetchFailedCondition, sourcev1.IncludeUnavailableCondition},
result: reconcile.ResultSuccess,
wantEvent: "resolved 'GitOperationFailed'",
},
{
name: "fetch failed recovery with notification",
oldObjBeforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail msg foo")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "something failed")
},
newObjBeforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "artifact ready")
},
notification: Notification{
Reason: "NewArtifact",
Message: "stored artifact for commit 'Foo'",
Annotations: map[string]string{
"revision": "some-rev",
"checksum": "some-checksum",
},
},
failConditions: []string{sourcev1.FetchFailedCondition, sourcev1.IncludeUnavailableCondition},
result: reconcile.ResultSuccess,
wantEvent: "resolved 'GitOperationFailed', stored artifact for commit 'Foo'",
},
{
name: "fetch failed, no recovery",
oldObjBeforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail msg foo")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "something failed")
},
newObjBeforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail msg foo")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "something failed")
},
failConditions: []string{sourcev1.FetchFailedCondition, sourcev1.IncludeUnavailableCondition},
result: reconcile.ResultSuccess,
},
{
name: "notification without failure",
oldObjBeforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "artifact ready")
},
newObjBeforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "artifact ready")
},
notification: Notification{
Reason: "NewArtifact",
Message: "stored artifact for commit 'Foo'",
Annotations: map[string]string{
"revision": "some-rev",
"checksum": "some-checksum",
},
},
failConditions: []string{sourcev1.FetchFailedCondition, sourcev1.IncludeUnavailableCondition},
wantEvent: "stored artifact for commit 'Foo'",
result: reconcile.ResultSuccess,
},
{
name: "no notification, no failure",
failConditions: []string{sourcev1.FetchFailedCondition, sourcev1.IncludeUnavailableCondition},
result: reconcile.ResultSuccess,
},
{
name: "empty result",
oldObjBeforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail msg foo")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "something failed")
},
newObjBeforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "artifact ready")
},
failConditions: []string{sourcev1.FetchFailedCondition, sourcev1.IncludeUnavailableCondition},
result: reconcile.ResultEmpty,
},
{
name: "error result",
oldObjBeforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail msg foo")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "something failed")
},
newObjBeforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "artifact ready")
},
failConditions: []string{sourcev1.FetchFailedCondition, sourcev1.IncludeUnavailableCondition},
result: reconcile.ResultSuccess,
resultErr: fmt.Errorf("some error"),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
recorder := record.NewFakeRecorder(32)

oldObj := &sourcev1.GitRepository{}
newObj := oldObj.DeepCopy()

if tt.oldObjBeforeFunc != nil {
tt.oldObjBeforeFunc(oldObj)
}
if tt.newObjBeforeFunc != nil {
tt.newObjBeforeFunc(newObj)
}

resultProcessor := NotifySuccess(oldObj, tt.notification, tt.failConditions)

resultProcessor(context.TODO(), recorder, newObj, tt.result, tt.resultErr)

select {
case x, ok := <-recorder.Events:
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
if tt.wantEvent != "" {
g.Expect(x).To(ContainSubstring(tt.wantEvent))
}
default:
if tt.wantEvent != "" {
t.Errorf("expected some event to be emitted")
}
}
})
}
}
20 changes: 20 additions & 0 deletions internal/reconcile/summarize/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,26 @@ type Conditions struct {
NegativePolarity []string
}

// Notification contains information for constructing an event to be emitted. It
// is used to define notifications from the SummarizeAndPatch helper.
// Notification in itself doesn't have any type, like warning or normal, based
// on the context, a notification can be used to construct different types of
// events.
type Notification struct {
Reason string
Message string
Annotations map[string]string
}

// IsZero evaluates if the Notification is empty based on the Reason and Message
// values.
func (n Notification) IsZero() bool {
if n.Reason == "" && n.Message == "" {
return true
}
return false
}

// Helper is SummarizeAndPatch helper.
type Helper struct {
recorder kuberecorder.EventRecorder
Expand Down

0 comments on commit b18a7d2

Please sign in to comment.