Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure cleanup of Stack in foreground deletion #760

Merged
merged 7 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CHANGELOG

- Stack Controller: watch for delete events. [#756](https://github.com/pulumi/pulumi-kubernetes-operator/pull/756)
- Stack Controller: fix an issue where new commits weren't detected when using git sources. https://github.com/pulumi/pulumi-kubernetes-operator/issues/762
- Ensure cleanup of Stack in foreground deletion. [#760](https://github.com/pulumi/pulumi-kubernetes-operator/pull/760)

## 2.0.0-beta.2 (2024-11-11)

Expand Down
2 changes: 1 addition & 1 deletion operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified
.PHONY: deploy
deploy: manifests kustomize ## Deploy controller manager to the K8s cluster specified in ~/.kube/config.
cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}
$(KUSTOMIZE) build config/default | $(KUBECTL) apply --server-side=true -f -
$(KUSTOMIZE) build config/default | $(KUBECTL) apply --server-side=true --force-conflicts -f -

.PHONY: undeploy
undeploy: ## Undeploy controller manager from the K8s cluster specified in ~/.kube/config. Call with ignore-not-found=true to ignore resource not found errors during deletion.
Expand Down
90 changes: 42 additions & 48 deletions operator/internal/controller/auto/update_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package controller
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"regexp"
Expand All @@ -31,8 +30,6 @@ import (
autov1alpha1 "github.com/pulumi/pulumi-kubernetes-operator/v2/operator/api/auto/v1alpha1"
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -44,6 +41,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -62,6 +60,11 @@ const (
UpdateConditionReasonComplete = "Complete"
UpdateConditionReasonUpdated = "Updated"
UpdateConditionReasonProgressing = "Progressing"

UpdateConditionReasonAborted = "Aborted"
UpdateConditionReasonCanceled = "Canceled"
UpdateConditionReasonUpdateFailed = "UpdateFailed"
UpdateConditionReasonUpdateSucceeded = "UpdateSucceeded"
)

// UpdateReconciler reconciles a Update object
Expand Down Expand Up @@ -98,15 +101,33 @@ func (r *UpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, nil
}

// guard against retrying an incomplete update
if rs.progressing.Status == metav1.ConditionTrue {
l.Info("was progressing; marking as failed")
markFailed := func(reason string) {
rs.progressing.Status = metav1.ConditionFalse
rs.progressing.Reason = "Failed"
rs.failed.Status = metav1.ConditionTrue
rs.failed.Reason = "unknown"
rs.failed.Reason = reason
rs.complete.Status = metav1.ConditionTrue
rs.complete.Reason = "Aborted"
rs.complete.Reason = "Failed"
}

// guard against retrying an incomplete update
if rs.progressing.Status == metav1.ConditionTrue {
l.Info("was progressing; marking as failed")
markFailed(UpdateConditionReasonAborted)
return ctrl.Result{}, rs.updateStatus(ctx, obj)
}

// cancel if the update is being deleted
if !obj.DeletionTimestamp.IsZero() {
l.Info("deleting; marking as failed")
markFailed(UpdateConditionReasonCanceled)
return ctrl.Result{}, rs.updateStatus(ctx, obj)
}

// cancel if the update was orphaned from its workspace
if isOrphaned(obj) {
l.Info("orphaned; marking as failed")
markFailed(UpdateConditionReasonCanceled)
return ctrl.Result{}, rs.updateStatus(ctx, obj)
}

Expand Down Expand Up @@ -195,8 +216,13 @@ func (r *UpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}
}

func isOrphaned(obj *autov1alpha1.Update) bool {
// an Update is considered orphaned if it has no managing controller.
return !controllerutil.HasControllerReference(obj)
}

func isWorkspaceReady(ws *autov1alpha1.Workspace) bool {
if ws == nil || ws.Generation != ws.Status.ObservedGeneration {
if ws == nil || !ws.DeletionTimestamp.IsZero() || ws.Generation != ws.Status.ObservedGeneration {
return false
}
return meta.IsStatusConditionTrue(ws.Status.Conditions, autov1alpha1.WorkspaceReady)
Expand All @@ -211,7 +237,7 @@ func (workspaceReadyPredicate) Create(e event.CreateEvent) bool {
}

func (workspaceReadyPredicate) Delete(_ event.DeleteEvent) bool {
return false
return true
}

func (workspaceReadyPredicate) Update(e event.UpdateEvent) bool {
Expand Down Expand Up @@ -438,7 +464,8 @@ func (r *UpdateReconciler) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
Named("update-controller").
For(&autov1alpha1.Update{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
For(&autov1alpha1.Update{}, builder.WithPredicates(predicate.Or(
predicate.GenerationChangedPredicate{}, OwnerReferencesChangedPredicate{}))).
Watches(&autov1alpha1.Workspace{},
handler.EnqueueRequestsFromMapFunc(r.mapWorkspaceToUpdate),
builder.WithPredicates(&workspaceReadyPredicate{})).
Expand Down Expand Up @@ -547,22 +574,9 @@ func (s streamReader[T]) Result() (result, error) {
if err == io.EOF {
break
}
if transient(err) {
// Surface transient errors to trigger another reconcile.
return nil, err
}
if err != nil {
// For all other errors treat the operation as failed.
s.l.Error(err, "Update failed")
s.obj.Status.Message = status.Convert(err).Message()
s.u.progressing.Status = metav1.ConditionFalse
s.u.progressing.Reason = UpdateConditionReasonComplete
s.u.complete.Status = metav1.ConditionTrue
s.u.complete.Reason = UpdateConditionReasonComplete
s.u.failed.Status = metav1.ConditionTrue
s.u.failed.Reason = status.Code(err).String()
s.u.failed.Message = s.obj.Status.Message
return res, nil
s.l.Error(err, "Unexpected error from response stream")
return nil, err
}

res = stream.GetResult()
Expand All @@ -585,11 +599,11 @@ func (s streamReader[T]) Result() (result, error) {
switch res.GetSummary().Result {
case string(apitype.StatusSucceeded):
s.u.failed.Status = metav1.ConditionFalse
s.u.failed.Reason = res.GetSummary().Result
s.u.failed.Reason = UpdateConditionReasonUpdateSucceeded
s.u.failed.Message = res.GetSummary().Message
default:
s.u.failed.Status = metav1.ConditionTrue
s.u.failed.Reason = res.GetSummary().Result
s.u.failed.Reason = UpdateConditionReasonUpdateFailed
s.u.failed.Message = res.GetSummary().Message
}
return res, nil
Expand All @@ -598,26 +612,6 @@ func (s streamReader[T]) Result() (result, error) {
return res, fmt.Errorf("didn't receive a result")
}

// transient returns false when the given error is nil, or when the error
// represents a condition that is not likely to resolve quickly. This is used
// to determine whether to retry immediately or much more slowly.
func transient(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return true
}

code := status.Code(err)
switch code {
case codes.Unknown, codes.Unauthenticated, codes.PermissionDenied, codes.InvalidArgument:
return false
default:
return true
}
}

// getResulter glues our various result types to a common interface.
type getResulter[T stream] struct {
stream *T
Expand Down
22 changes: 4 additions & 18 deletions operator/internal/controller/auto/update_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestUpdate(t *testing.T) {
EndTime: metav1.NewTime(time.Unix(0, 0).UTC()),
Conditions: []metav1.Condition{
{Type: "Progressing", Status: "False", Reason: "Complete"},
{Type: "Failed", Status: "False", Reason: "succeeded"},
{Type: "Failed", Status: "False", Reason: "UpdateSucceeded"},
{Type: "Complete", Status: "True", Reason: "Updated"},
},
},
Expand Down Expand Up @@ -206,17 +206,15 @@ func TestUpdate(t *testing.T) {
{
Type: "Failed",
Status: "True",
Reason: "failed",
Reason: "UpdateFailed",
Message: "something went wrong",
},
{Type: "Complete", Status: "True", Reason: "Updated"},
},
},
},
{
// Auto API failures are currently returned as non-nil errors,
// which we translate into "failed" Results.
name: "update failed error",
name: "update grpc error",
obj: autov1alpha1.Update{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "uid"}},
client: func(ctrl *gomock.Controller) upper {
upper := NewMockupper(ctrl)
Expand All @@ -234,19 +232,7 @@ func TestUpdate(t *testing.T) {
return upper
},
kclient: func(*gomock.Controller) creater { return nil },
want: autov1alpha1.UpdateStatus{
Message: "failed to run update: exit status 255",
Conditions: []metav1.Condition{
{Type: "Progressing", Status: "False", Reason: "Complete"},
{
Type: "Failed",
Status: "True",
Reason: "Unknown",
Message: "failed to run update: exit status 255",
},
{Type: "Complete", Status: "True", Reason: "Complete"},
},
},
wantErr: "failed to run update: exit status 255",
},
{
name: "workspace grpc failure",
Expand Down
24 changes: 24 additions & 0 deletions operator/internal/controller/auto/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"reflect"

agentclient "github.com/pulumi/pulumi-kubernetes-operator/v2/agent/pkg/client"
agentpb "github.com/pulumi/pulumi-kubernetes-operator/v2/agent/pkg/proto"
Expand Down Expand Up @@ -118,3 +119,26 @@ func (p *DebugPredicate) Generic(e event.GenericEvent) bool {
l.V(1).Info("Generic", "controller", p.Controller, "type", fmt.Sprintf("%T", e.Object), "name", e.Object.GetName(), "revision", e.Object.GetResourceVersion())
return true
}

type OwnerReferencesChangedPredicate struct{}

var _ predicate.Predicate = &OwnerReferencesChangedPredicate{}

func (OwnerReferencesChangedPredicate) Create(e event.CreateEvent) bool {
return false
}

func (OwnerReferencesChangedPredicate) Delete(_ event.DeleteEvent) bool {
return false
}

func (OwnerReferencesChangedPredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}
return reflect.DeepEqual(e.ObjectOld.GetOwnerReferences(), e.ObjectNew.GetOwnerReferences())
}

func (OwnerReferencesChangedPredicate) Generic(_ event.GenericEvent) bool {
return false
}
43 changes: 41 additions & 2 deletions operator/internal/controller/pulumi/stack_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ func (r *StackReconciler) Reconcile(ctx context.Context, request ctrl.Request) (
return reconcile.Result{}, fmt.Errorf("unable to define workspace for stack: %w", err)
}

var toBeFinalized *autov1alpha1.Update
saveStatus := func() error {
oldRevision := instance.ResourceVersion
if err := r.Status().Update(ctx, instance); err != nil {
Expand All @@ -563,6 +564,15 @@ func (r *StackReconciler) Reconcile(ctx context.Context, request ctrl.Request) (
"currentUpdate", instance.Status.CurrentUpdate,
"conditions", instance.Status.Conditions)
}
if toBeFinalized != nil {
// remove the finalizer from the Update object that was being watched,
// after the status update is persisted.
if controllerutil.RemoveFinalizer(toBeFinalized, pulumiFinalizer) {
if err := r.Update(ctx, toBeFinalized, client.FieldOwner(FieldManager)); err != nil {
log.Error(err, "unable to remove finalizer from current update; update object will be orphaned")
}
}
}
return nil
}

Expand Down Expand Up @@ -601,11 +611,16 @@ func (r *StackReconciler) Reconcile(ctx context.Context, request ctrl.Request) (
}
}

toBeFinalized = sess.update
instance.Status.CurrentUpdate = nil
}

// We can exit early if there is no clean-up to do.
if isStackMarkedToBeDeleted && !stack.DestroyOnFinalize {
instance.Status.MarkReadyCondition()
if err = saveStatus(); err != nil {
return reconcile.Result{}, err
}
if controllerutil.RemoveFinalizer(instance, pulumiFinalizer) {
return reconcile.Result{}, r.Update(ctx, instance, client.FieldOwner(FieldManager))
}
Expand Down Expand Up @@ -886,6 +901,10 @@ func (r *StackReconciler) Reconcile(ctx context.Context, request ctrl.Request) (
return reconcile.Result{}, fmt.Errorf("unable to prepare update (up) for stack: %w", err)
}
}
// apply a finalizer to the update object to ensure it doesn't disappear entirely
// while we're waiting for it to complete.
update.Finalizers = append(update.Finalizers, pulumiFinalizer)

instance.Status.CurrentUpdate = &shared.CurrentStackUpdate{
Generation: instance.Generation,
ReconcileRequest: syncRequest,
Expand Down Expand Up @@ -1471,7 +1490,7 @@ func (sess *stackReconcilerSession) newUp(ctx context.Context, o *pulumiv1.Stack
},
}

if err := controllerutil.SetControllerReference(o, update, sess.scheme); err != nil {
if err := sess.setOwnerReferences(o, update); err != nil {
return nil, err
}

Expand All @@ -1497,13 +1516,33 @@ func (sess *stackReconcilerSession) newDestroy(ctx context.Context, o *pulumiv1.
},
}

if err := controllerutil.SetControllerReference(o, update, sess.scheme); err != nil {
if err := sess.setOwnerReferences(o, update); err != nil {
return nil, err
}

return update, nil
}

func (sess *stackReconcilerSession) setOwnerReferences(o *pulumiv1.Stack, update *autov1alpha1.Update) error {
// see "The Three Laws of Controllers":
// https://github.com/kubernetes/design-proposals-archive/blob/acc25e14ca83dfda4f66d8cb1f1b491f26e78ffe/api-machinery/controller-ref.md#behavior

// Set the workspace as the managing controller of the update.
// If the workspace is deleted, the update would be a candidate for adoption by a replacement workspace.
if err := controllerutil.SetControllerReference(sess.ws, update, sess.scheme); err != nil {
return err
}

// Set the stack as an owner of the update. Updates should survive deletion of the Workspace,
// to retain some history even if the workspace is deleted as an optimization.
// The WithBlockOwnerDeletion option ensures that the owner reference is not removed eagerly in background deletion
// of the stack, which would break the EnqueueRequestForOwner logic that triggers a new reconcile loop.
if err := controllerutil.SetOwnerReference(o, update, sess.scheme, controllerutil.WithBlockOwnerDeletion(true)); err != nil {
return err
}
return nil
}

func makeUpdateName(o *pulumiv1.Stack) string {
return fmt.Sprintf("%s-%x", o.Name, time.Now().UnixMilli())
}
Expand Down
Loading