Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Remove auto-patching from typed controllers #159

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/controllers/counter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
Expand Down Expand Up @@ -59,6 +60,7 @@ func (c *Controller) Name() string {

// Reconcile a control loop for the resource
func (c *Controller) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisioner) (reconcile.Result, error) {
stored := provisioner.DeepCopy()
nodes := v1.NodeList{}
if err := c.kubeClient.List(ctx, &nodes, client.MatchingLabels{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}); err != nil {
return reconcile.Result{}, err
Expand All @@ -68,8 +70,12 @@ func (c *Controller) Reconcile(ctx context.Context, provisioner *v1alpha5.Provis
return reconcile.Result{RequeueAfter: 250 * time.Millisecond}, nil
}
// Determine resource usage and update provisioner.status.resources
resourceCounts := c.resourceCountsFor(provisioner.Name)
provisioner.Status.Resources = resourceCounts
provisioner.Status.Resources = c.resourceCountsFor(provisioner.Name)
if !equality.Semantic.DeepEqual(stored, provisioner) {
if err := c.kubeClient.Status().Patch(ctx, provisioner, client.MergeFrom(stored)); err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}

Expand Down
19 changes: 12 additions & 7 deletions pkg/controllers/node/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package node

import (
"context"
"time"

"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/clock"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -69,17 +69,19 @@ func (c *Controller) Name() string {

// Reconcile executes a reallocation control loop for the resource
func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Result, error) {
provisioner := &v1alpha5.Provisioner{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: node.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
stored := node.DeepCopy()
if _, ok := node.Labels[v1alpha5.ProvisionerNameLabelKey]; !ok {
return reconcile.Result{}, nil
}
if !node.DeletionTimestamp.IsZero() {
return reconcile.Result{}, nil
}

provisioner := &v1alpha5.Provisioner{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: node.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// Execute Reconcilers
var results []reconcile.Result
var errs error
Expand All @@ -95,8 +97,11 @@ func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Re
errs = multierr.Append(errs, err)
results = append(results, res)
}
// Append a short requeue result so that we requeue to check for emptiness when the node nomination time ends
results = append(results, reconcile.Result{RequeueAfter: time.Minute})
if !equality.Semantic.DeepEqual(stored, node) {
if err := c.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil {
return reconcile.Result{}, err
}
}
return result.Min(results...), errs
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/node/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func (r *Emptiness) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisi
logging.FromContext(ctx).Infof("added TTL to empty node")
}

return reconcile.Result{}, nil
// Short requeue result so that we requeue to check for emptiness when the node nomination time ends
return reconcile.Result{RequeueAfter: time.Minute}, nil
}

func (r *Emptiness) isEmpty(ctx context.Context, n *v1.Node) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *Controller) Name() string {
}

// Reconcile the resource
func (c *Controller) Reconcile(ctx context.Context, p *v1.Pod) (reconcile.Result, error) {
func (c *Controller) Reconcile(_ context.Context, p *v1.Pod) (reconcile.Result, error) {
if !pod.IsProvisionable(p) {
return reconcile.Result{}, nil
}
Expand Down
25 changes: 9 additions & 16 deletions pkg/controllers/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
"knative.dev/pkg/logging"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -35,12 +34,10 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
corecontroller "github.com/aws/karpenter-core/pkg/operator/controller"
"github.com/aws/karpenter-core/pkg/operator/injection"

"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/metrics"
corecontroller "github.com/aws/karpenter-core/pkg/operator/controller"
)

var (
Expand All @@ -59,6 +56,8 @@ func init() {
crmetrics.Registry.MustRegister(terminationSummary)
}

var _ corecontroller.FinalizingTypedController[*v1.Node] = (*Controller)(nil)

// Controller for the resource
type Controller struct {
Terminator *Terminator
Expand All @@ -70,7 +69,7 @@ type Controller struct {
func NewController(clk clock.Clock, kubeClient client.Client, evictionQueue *EvictionQueue,
recorder events.Recorder, cloudProvider cloudprovider.CloudProvider) corecontroller.Controller {

return &Controller{
return corecontroller.Typed[*v1.Node](kubeClient, &Controller{
KubeClient: kubeClient,
Terminator: &Terminator{
KubeClient: kubeClient,
Expand All @@ -79,24 +78,18 @@ func NewController(clk clock.Clock, kubeClient client.Client, evictionQueue *Evi
Clock: clk,
},
Recorder: recorder,
}
})
}

func (c *Controller) Name() string {
return "termination"
}

func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named(c.Name()).With("node", req.Name))
ctx = injection.WithControllerName(ctx, c.Name())
func (c *Controller) Reconcile(_ context.Context, _ *v1.Node) (reconcile.Result, error) {
return reconcile.Result{}, nil
}

node := &v1.Node{}
if err := c.KubeClient.Get(ctx, req.NamespacedName, node); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
if node.DeletionTimestamp.IsZero() {
return reconcile.Result{}, nil
}
func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Result, error) {
if !controllerutil.ContainsFinalizer(node, v1alpha5.TerminationFinalizer) {
return reconcile.Result{}, nil
}
Expand Down
58 changes: 4 additions & 54 deletions pkg/operator/controller/typed.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (
"strings"

"github.com/samber/lo"
"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/runtime"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -50,10 +47,10 @@ type typedDecorator[T client.Object] struct {
typedController TypedController[T]
}

func Typed[T client.Object](kubeClient client.Client, typedReconciler TypedController[T]) Controller {
func Typed[T client.Object](kubeClient client.Client, typedController TypedController[T]) Controller {
return &typedDecorator[T]{
kubeClient: kubeClient,
typedController: typedReconciler,
typedController: typedController,
}
}

Expand All @@ -75,60 +72,13 @@ func (t *typedDecorator[T]) Reconcile(ctx context.Context, req reconcile.Request
if err := t.kubeClient.Get(ctx, req.NamespacedName, obj); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
stored := obj.DeepCopyObject().(client.Object)

var result reconcile.Result
var err error

finalizingTypedController, ok := t.typedController.(FinalizingTypedController[T])
if !obj.GetDeletionTimestamp().IsZero() && ok {
result, err = finalizingTypedController.Finalize(ctx, obj)
} else {
result, err = t.typedController.Reconcile(ctx, obj)
}

if e := t.patch(ctx, stored, obj); e != nil {
return reconcile.Result{}, multierr.Combine(e, err)
return finalizingTypedController.Finalize(ctx, obj)
}
return result, err
return t.typedController.Reconcile(ctx, obj)
}

func (t *typedDecorator[T]) Builder(ctx context.Context, m manager.Manager) Builder {
return t.typedController.Builder(ctx, m)
}

func (t *typedDecorator[T]) patch(ctx context.Context, obj, updated client.Object) error {
// Patch Body if changed
if !bodyEqual(obj, updated) {
if err := t.kubeClient.Patch(ctx, updated.DeepCopyObject().(client.Object), client.MergeFrom(obj)); err != nil {
return client.IgnoreNotFound(err)
}
}
// Patch Status if changed
if !statusEqual(obj, updated) {
if err := t.kubeClient.Status().Patch(ctx, updated.DeepCopyObject().(client.Object), client.MergeFrom(obj)); err != nil {
return client.IgnoreNotFound(err) // Status subresource may not exist
}
}
return nil
}

// bodyEqual compares two objects, ignoring their status and determines if they are deeply-equal
func bodyEqual(a, b client.Object) bool {
unstructuredA := lo.Must(runtime.DefaultUnstructuredConverter.ToUnstructured(a))
unstructuredB := lo.Must(runtime.DefaultUnstructuredConverter.ToUnstructured(b))

// Remove the status fields, so we are only left with non-status info
delete(unstructuredA, "status")
delete(unstructuredB, "status")

return equality.Semantic.DeepEqual(unstructuredA, unstructuredB)
}

// statusEqual compares two object statuses and determines if they are deeply-equal
func statusEqual(a, b client.Object) bool {
unstructuredA := lo.Must(runtime.DefaultUnstructuredConverter.ToUnstructured(a))
unstructuredB := lo.Must(runtime.DefaultUnstructuredConverter.ToUnstructured(b))

return equality.Semantic.DeepEqual(unstructuredA["status"], unstructuredB["status"])
}
77 changes: 0 additions & 77 deletions pkg/operator/controller/typed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ package controller_test
import (
"context"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand Down Expand Up @@ -62,55 +59,6 @@ var _ = Describe("Typed", func() {
typedController := controller.Typed[*v1.Node](env.Client, fakeController)
ExpectReconcileSucceeded(ctx, typedController, client.ObjectKeyFromObject(node))
})
It("should modify the node when updated in the reconcile", func() {
node := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: "default",
},
},
})
ExpectApplied(ctx, env.Client, node)
fakeController := &FakeTypedController[*v1.Node]{
ReconcileAssertions: []TypedReconcileAssertion[*v1.Node]{
func(ctx context.Context, n *v1.Node) {
n.Labels = lo.Assign(n.Labels, map[string]string{
"custom-key": "custom-value",
})
},
},
}
typedController := controller.Typed[*v1.Node](env.Client, fakeController)
ExpectReconcileSucceeded(ctx, typedController, client.ObjectKeyFromObject(node))
node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Labels).To(HaveKeyWithValue("custom-key", "custom-value"))
})
It("should modify the provisioner and the provisioner status in the reconcile", func() {
provisioner := test.Provisioner()
ExpectApplied(ctx, env.Client, provisioner)
fakeController := &FakeTypedController[*v1alpha5.Provisioner]{
ReconcileAssertions: []TypedReconcileAssertion[*v1alpha5.Provisioner]{
func(ctx context.Context, p *v1alpha5.Provisioner) {
p.Labels = lo.Assign(p.Labels, map[string]string{
"custom-key": "custom-value",
})
p.Status.Resources = v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Mi"),
v1.ResourceEphemeralStorage: resource.MustParse("1Gi"),
}
},
},
}
typedController := controller.Typed[*v1alpha5.Provisioner](env.Client, fakeController)
ExpectReconcileSucceeded(ctx, typedController, client.ObjectKeyFromObject(provisioner))
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(provisioner), provisioner)).To(Succeed())
Expect(provisioner.Labels).To(HaveKeyWithValue("custom-key", "custom-value"))
Expect(provisioner.Status.Resources).To(HaveLen(3))
Expect(provisioner.Status.Resources).To(HaveKeyWithValue(v1.ResourceCPU, resource.MustParse("1")))
Expect(provisioner.Status.Resources).To(HaveKeyWithValue(v1.ResourceMemory, resource.MustParse("1Mi")))
Expect(provisioner.Status.Resources).To(HaveKeyWithValue(v1.ResourceEphemeralStorage, resource.MustParse("1Gi")))
})
It("should call finalizer func when finalizing", func() {
node := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -137,31 +85,6 @@ var _ = Describe("Typed", func() {
ExpectReconcileSucceeded(ctx, typedController, client.ObjectKeyFromObject(node))
Expect(called).To(BeTrue())
})
It("should update and remove the finalizer when finalizing", func() {
node := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: "default",
},
Finalizers: []string{
v1alpha5.TestingGroup + "/finalizer",
},
},
})
ExpectApplied(ctx, env.Client, node)
Expect(env.Client.Delete(ctx, node)).To(Succeed())
fakeController := &FakeTypedController[*v1.Node]{
FinalizeAssertions: []TypedReconcileAssertion[*v1.Node]{
func(ctx context.Context, node *v1.Node) {
controllerutil.RemoveFinalizer(node, v1alpha5.TestingGroup+"/finalizer")
},
},
}
typedController := controller.Typed[*v1.Node](env.Client, fakeController)
ExpectExists(ctx, env.Client, node)
ExpectReconcileSucceeded(ctx, typedController, client.ObjectKeyFromObject(node))
ExpectNotFound(ctx, env.Client, node)
})
})

type TypedReconcileAssertion[T client.Object] func(context.Context, T)
Expand Down