From 943abc66e6e2f9670ca038ea5ecf4d2adb4136c6 Mon Sep 17 00:00:00 2001 From: Ellis Tarn Date: Fri, 27 Jan 2023 13:20:47 -0800 Subject: [PATCH] chore: simplify deprovisioning controller and remove dead code --- .../deprovisioning/consolidation.go | 17 +- pkg/controllers/deprovisioning/controller.go | 181 ++++++------------ pkg/controllers/deprovisioning/emptiness.go | 11 +- .../deprovisioning/emptynodeconsolidation.go | 2 +- pkg/controllers/deprovisioning/helpers.go | 6 +- .../deprovisioning/multinodeconsolidation.go | 2 +- .../deprovisioning/singlenodeconsolidation.go | 4 +- pkg/controllers/deprovisioning/suite_test.go | 81 ++++---- pkg/controllers/deprovisioning/types.go | 32 +--- pkg/controllers/provisioning/provisioner.go | 11 +- pkg/controllers/state/node.go | 8 +- 11 files changed, 129 insertions(+), 226 deletions(-) diff --git a/pkg/controllers/deprovisioning/consolidation.go b/pkg/controllers/deprovisioning/consolidation.go index 83f5eddc76..9a613345bb 100644 --- a/pkg/controllers/deprovisioning/consolidation.go +++ b/pkg/controllers/deprovisioning/consolidation.go @@ -68,16 +68,15 @@ func (c *consolidation) String() string { return metrics.ConsolidationReason } -// RecordLastState is used to record the last state that the consolidation implementation failed to work in to allow -// skipping future consolidation attempts until the state changes. -func (c *consolidation) RecordLastState(currentState int64) { +func (c *consolidation) AttemptConsolidation() bool { + currentState := c.cluster.ClusterConsolidationState() + // the last consolidation wasn't able to improve things, so don't reattempt + if currentState == c.lastConsolidationState { + return false + } + // something has changed, so try to consolidate and track the latest change c.lastConsolidationState = currentState -} - -func (c *consolidation) ShouldAttemptConsolidation() bool { - // the last cluster consolidation wasn't able to improve things and nothing has changed regarding - // the cluster that makes us think we would be successful now - return c.lastConsolidationState != c.cluster.ClusterConsolidationState() + return true } // sortAndFilterCandidates orders deprovisionable nodes by the disruptionCost, removing any that we already know won't diff --git a/pkg/controllers/deprovisioning/controller.go b/pkg/controllers/deprovisioning/controller.go index 4369faef86..6d3d820bb5 100644 --- a/pkg/controllers/deprovisioning/controller.go +++ b/pkg/controllers/deprovisioning/controller.go @@ -45,19 +45,14 @@ import ( // Controller is the deprovisioning controller. type Controller struct { - kubeClient client.Client - cluster *state.Cluster - provisioner *provisioning.Provisioner - recorder events.Recorder - clock clock.Clock - cloudProvider cloudprovider.CloudProvider - emptiness *Emptiness - expiration *Expiration - drift *Drift - singleNodeConsolidation *SingleNodeConsolidation - multiNodeConsolidation *MultiNodeConsolidation - emptyNodeConsolidation *EmptyNodeConsolidation - reporter *Reporter + kubeClient client.Client + cluster *state.Cluster + provisioner *provisioning.Provisioner + recorder events.Recorder + clock clock.Clock + cloudProvider cloudprovider.CloudProvider + deprovisioners []Deprovisioner + reporter *Reporter } // pollingPeriod that we inspect cluster to look for opportunities to deprovision @@ -80,19 +75,27 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi reporter := NewReporter(recorder) return &Controller{ - clock: clk, - kubeClient: kubeClient, - cluster: cluster, - provisioner: provisioner, - recorder: recorder, - reporter: reporter, - cloudProvider: cp, - expiration: NewExpiration(clk, kubeClient, cluster, provisioner), - emptiness: NewEmptiness(clk, kubeClient, cluster), - drift: NewDrift(kubeClient, cluster, provisioner), - emptyNodeConsolidation: NewEmptyNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter), - multiNodeConsolidation: NewMultiNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter), - singleNodeConsolidation: NewSingleNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter), + clock: clk, + kubeClient: kubeClient, + cluster: cluster, + provisioner: provisioner, + recorder: recorder, + reporter: reporter, + cloudProvider: cp, + deprovisioners: []Deprovisioner{ + // Expire any nodes that must be deleted, allowing their pods to potentially land on currently + NewExpiration(clk, kubeClient, cluster, provisioner), + // Terminate any nodes that have drifted from provisioning specifications, allowing the pods to reschedule. + NewDrift(kubeClient, cluster, provisioner), + // Delete any remaining empty nodes as there is zero cost in terms of dirsuption. Emptiness and + // emptyNodeConsolidation are mutually exclusive, only one of these will operate + NewEmptiness(clk), + NewEmptyNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter), + // Attempt to identify multiple nodes that we can consolidate simultaneously to reduce pod churn + NewMultiNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter), + // And finally fall back our single node consolidation to further reduce cluster cost. + NewSingleNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter), + }, } } @@ -105,118 +108,54 @@ func (c *Controller) Builder(_ context.Context, m manager.Manager) controller.Bu } func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { - // capture the state of the cluster before we do any analysis - currentState := c.cluster.ClusterConsolidationState() - result, err := c.ProcessCluster(ctx) - - switch result { - case ResultFailed: - return reconcile.Result{}, fmt.Errorf("processing cluster, %w", err) - case ResultRetry: - return reconcile.Result{Requeue: true}, nil - case ResultNothingToDo: - // we record the cluster state for consolidation methods as they are expensive to compute and this allows - // them to defer calculations until something about the cluster has changed that may allow them to - // succeed - c.emptyNodeConsolidation.RecordLastState(currentState) - c.singleNodeConsolidation.RecordLastState(currentState) - c.multiNodeConsolidation.RecordLastState(currentState) - } - return reconcile.Result{RequeueAfter: pollingPeriod}, nil -} - -// CandidateNode is a node that we are considering for deprovisioning along with extra information to be used in -// making that determination -type CandidateNode struct { - *v1.Node - instanceType *cloudprovider.InstanceType - capacityType string - zone string - provisioner *v1alpha5.Provisioner - disruptionCost float64 - pods []*v1.Pod -} - -// ProcessCluster is exposed for unit testing purposes -// ProcessCluster loops through implemented deprovisioners -func (c *Controller) ProcessCluster(ctx context.Context) (Result, error) { - // range over the different deprovisioning methods. We'll only let one method perform an action - for _, d := range []Deprovisioner{ - // Expire any nodes that must be deleted, allowing their pods to potentially land on currently - // empty nodes - c.expiration, - - // Terminate any nodes that have drifted from provisioning specifications, allowing the pods to reschedule. - c.drift, - - // Delete any remaining empty nodes as there is zero cost in terms of dirsuption. Emptiness and - // emptyNodeConsolidation are mutually exclusive, only one of these will operate - c.emptiness, - c.emptyNodeConsolidation, - - // Attempt to identify multiple nodes that we can consolidate simultaneously to reduce pod churn - c.multiNodeConsolidation, - - // And finally fall back our single node consolidation to further reduce cluster cost. - c.singleNodeConsolidation, - } { + // Attempt different deprovisioning methods. We'll only let one method perform an action + for _, d := range c.deprovisioners { candidates, err := candidateNodes(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, d.ShouldDeprovision) if err != nil { - return ResultFailed, fmt.Errorf("determining candidate nodes, %w", err) + return reconcile.Result{}, fmt.Errorf("determining candidate nodes, %w", err) } // If there are no candidate nodes, move to the next deprovisioner if len(candidates) == 0 { continue } - result, err := c.executeDeprovisioning(ctx, d, candidates...) + // Determine the deprovisioning action + cmd, err := d.ComputeCommand(ctx, candidates...) if err != nil { - return ResultFailed, fmt.Errorf("deprovisioning nodes, %w", err) + return reconcile.Result{}, fmt.Errorf("computing deprovisioning decision, %w", err) } - - switch result { - case ResultFailed: - return ResultFailed, err - case ResultRetry, ResultSuccess: - // the controller wants to retry, or was successful in deprovisioning - return result, nil - case ResultNothingToDo: - // found nothing to do, so try the next deprovisioner + if cmd.action == actionDoNothing { continue - default: - logging.FromContext(ctx).Errorf("unexpected result %s", result) } + if cmd.action == actionRetry { + return reconcile.Result{Requeue: true}, nil + } + + // Attempt to deprovision + + if err := c.executeCommand(ctx, d, cmd); err != nil { + return reconcile.Result{}, fmt.Errorf("deprovisioning nodes, %w", err) + } + return reconcile.Result{Requeue: true}, nil } // All deprovisioners did nothing, so return nothing to do - return ResultNothingToDo, nil + return reconcile.Result{RequeueAfter: pollingPeriod}, nil } -// Given candidate nodes, compute best deprovisioning action -func (c *Controller) executeDeprovisioning(ctx context.Context, d Deprovisioner, nodes ...CandidateNode) (Result, error) { - // Each attempt will try at least one node, limit to that many attempts. - cmd, err := d.ComputeCommand(ctx, nodes...) - if err != nil { - return ResultFailed, err - } - // Convert action to result - switch cmd.action { - case actionFailed: - return ResultFailed, err - case actionDoNothing: - return ResultNothingToDo, nil - case actionRetry: - return ResultRetry, nil - } - // If delete or replace, execute command - result, err := c.executeCommand(ctx, cmd, d) - if err != nil { - return ResultFailed, err - } - return result, nil +// CandidateNode is a node that we are considering for deprovisioning along with extra information to be used in +// making that determination +type CandidateNode struct { + *v1.Node + instanceType *cloudprovider.InstanceType + capacityType string + zone string + provisioner *v1alpha5.Provisioner + disruptionCost float64 + pods []*v1.Pod } -func (c *Controller) executeCommand(ctx context.Context, command Command, d Deprovisioner) (Result, error) { +func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, command Command) error { deprovisioningActionsPerformedCounter.With(prometheus.Labels{"action": fmt.Sprintf("%s/%s", d, command.action)}).Add(1) logging.FromContext(ctx).Infof("deprovisioning via %s %s", d, command) @@ -224,7 +163,7 @@ func (c *Controller) executeCommand(ctx context.Context, command Command, d Depr if err := c.launchReplacementNodes(ctx, command); err != nil { // If we failed to launch the replacement, don't deprovision. If this is some permanent failure, // we don't want to disrupt workloads with no way to provision new nodes for them. - return ResultFailed, fmt.Errorf("launching replacement node, %w", err) + return fmt.Errorf("launching replacement node, %w", err) } } @@ -242,7 +181,7 @@ func (c *Controller) executeCommand(ctx context.Context, command Command, d Depr for _, oldnode := range command.nodesToRemove { c.waitForDeletion(ctx, oldnode) } - return ResultSuccess, nil + return nil } // waitForDeletion waits for the specified node to be removed from the API server. This deletion can take some period diff --git a/pkg/controllers/deprovisioning/emptiness.go b/pkg/controllers/deprovisioning/emptiness.go index 70b2d1100f..cf0e2ce457 100644 --- a/pkg/controllers/deprovisioning/emptiness.go +++ b/pkg/controllers/deprovisioning/emptiness.go @@ -23,7 +23,6 @@ import ( v1 "k8s.io/api/core/v1" "knative.dev/pkg/logging" "knative.dev/pkg/ptr" - "sigs.k8s.io/controller-runtime/pkg/client" "github.com/samber/lo" @@ -35,16 +34,12 @@ import ( // Emptiness is a subreconciler that deletes empty nodes. // Emptiness will respect TTLSecondsAfterEmpty type Emptiness struct { - clock clock.Clock - kubeClient client.Client - cluster *state.Cluster + clock clock.Clock } -func NewEmptiness(clk clock.Clock, kubeClient client.Client, cluster *state.Cluster) *Emptiness { +func NewEmptiness(clk clock.Clock) *Emptiness { return &Emptiness{ - clock: clk, - kubeClient: kubeClient, - cluster: cluster, + clock: clk, } } diff --git a/pkg/controllers/deprovisioning/emptynodeconsolidation.go b/pkg/controllers/deprovisioning/emptynodeconsolidation.go index 5a65479a45..0b89fd1d3e 100644 --- a/pkg/controllers/deprovisioning/emptynodeconsolidation.go +++ b/pkg/controllers/deprovisioning/emptynodeconsolidation.go @@ -42,7 +42,7 @@ func NewEmptyNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeClie // ComputeCommand generates a deprovisioning command given deprovisionable nodes func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, candidates ...CandidateNode) (Command, error) { - if !c.ShouldAttemptConsolidation() { + if !c.AttemptConsolidation() { return Command{action: actionDoNothing}, nil } candidates, err := c.sortAndFilterCandidates(ctx, candidates) diff --git a/pkg/controllers/deprovisioning/helpers.go b/pkg/controllers/deprovisioning/helpers.go index b3b5d8948d..74b7ee8da3 100644 --- a/pkg/controllers/deprovisioning/helpers.go +++ b/pkg/controllers/deprovisioning/helpers.go @@ -42,9 +42,9 @@ func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster * candidateNodes ...CandidateNode) (newNodes []*pscheduling.Node, allPodsScheduled bool, err error) { candidateNodeNames := sets.NewString(lo.Map(candidateNodes, func(t CandidateNode, i int) string { return t.Name })...) - allNodes := cluster.Nodes() - deletingNodes := allNodes.DeletingNodes() - stateNodes := lo.Filter(allNodes, func(n *state.Node, _ int) bool { + nodes := cluster.Nodes() + deletingNodes := nodes.Deleting() + stateNodes := lo.Filter(nodes, func(n *state.Node, _ int) bool { return !candidateNodeNames.Has(n.Name()) }) diff --git a/pkg/controllers/deprovisioning/multinodeconsolidation.go b/pkg/controllers/deprovisioning/multinodeconsolidation.go index 1665d10ab3..12bb389d8d 100644 --- a/pkg/controllers/deprovisioning/multinodeconsolidation.go +++ b/pkg/controllers/deprovisioning/multinodeconsolidation.go @@ -39,7 +39,7 @@ func NewMultiNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeClie } func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, candidates ...CandidateNode) (Command, error) { - if !m.ShouldAttemptConsolidation() { + if !m.AttemptConsolidation() { return Command{action: actionDoNothing}, nil } candidates, err := m.sortAndFilterCandidates(ctx, candidates) diff --git a/pkg/controllers/deprovisioning/singlenodeconsolidation.go b/pkg/controllers/deprovisioning/singlenodeconsolidation.go index 992766024a..1a9bf3083a 100644 --- a/pkg/controllers/deprovisioning/singlenodeconsolidation.go +++ b/pkg/controllers/deprovisioning/singlenodeconsolidation.go @@ -41,7 +41,7 @@ func NewSingleNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeCli // //nolint:gocyclo func (c *SingleNodeConsolidation) ComputeCommand(ctx context.Context, candidates ...CandidateNode) (Command, error) { - if !c.ShouldAttemptConsolidation() { + if !c.AttemptConsolidation() { return Command{action: actionDoNothing}, nil } candidates, err := c.sortAndFilterCandidates(ctx, candidates) @@ -58,7 +58,7 @@ func (c *SingleNodeConsolidation) ComputeCommand(ctx context.Context, candidates logging.FromContext(ctx).Errorf("computing consolidation %s", err) continue } - if cmd.action == actionDoNothing || cmd.action == actionRetry || cmd.action == actionFailed { + if cmd.action == actionDoNothing || cmd.action == actionRetry { continue } diff --git a/pkg/controllers/deprovisioning/suite_test.go b/pkg/controllers/deprovisioning/suite_test.go index 6e09460e63..5bfdef477f 100644 --- a/pkg/controllers/deprovisioning/suite_test.go +++ b/pkg/controllers/deprovisioning/suite_test.go @@ -36,6 +36,7 @@ import ( . "knative.dev/pkg/logging/testing" "knative.dev/pkg/ptr" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/aws/karpenter-core/pkg/apis" "github.com/aws/karpenter-core/pkg/apis/settings" @@ -173,7 +174,7 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -205,7 +206,7 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -232,7 +233,7 @@ var _ = Describe("Drift", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) fakeClock.Step(10 * time.Minute) - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need a new node @@ -266,7 +267,7 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need a new node, but we should evict everything off one of node2 which only has a single pod @@ -321,7 +322,7 @@ var _ = Describe("Drift", func() { wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -413,7 +414,7 @@ var _ = Describe("Drift", func() { wg := ExpectMakeNewNodesReady(ctx, env.Client, 3, node) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -457,7 +458,7 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need a new node, but we should evict everything off one of node2 which only has a single pod @@ -492,7 +493,7 @@ var _ = Describe("Expiration", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) fakeClock.Step(10 * time.Minute) - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need a new node @@ -525,7 +526,7 @@ var _ = Describe("Expiration", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need a new node, but we should evict everything off one of node2 which only has a single pod @@ -569,7 +570,7 @@ var _ = Describe("Expiration", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(nodeNotExpire)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need a new node, but we should evict everything off one of node2 which only has a single pod @@ -623,7 +624,7 @@ var _ = Describe("Expiration", func() { wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -713,7 +714,7 @@ var _ = Describe("Expiration", func() { fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).To(HaveOccurred()) // Expiration should try to make 3 calls but fail for the third. @@ -805,7 +806,7 @@ var _ = Describe("Expiration", func() { wg := ExpectMakeNewNodesReady(ctx, env.Client, 3, node) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -918,7 +919,7 @@ var _ = Describe("Replace Nodes", func() { wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -993,7 +994,7 @@ var _ = Describe("Replace Nodes", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) fakeClock.Step(10 * time.Minute) - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need a new node @@ -1063,7 +1064,7 @@ var _ = Describe("Replace Nodes", func() { wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -1145,7 +1146,7 @@ var _ = Describe("Replace Nodes", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(annotatedNode)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -1235,7 +1236,7 @@ var _ = Describe("Replace Nodes", func() { fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) Expect(cloudProvider.CreateCalls).To(HaveLen(0)) ExpectNodeExists(ctx, env.Client, node.Name) @@ -1338,7 +1339,7 @@ var _ = Describe("Replace Nodes", func() { fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) Expect(cloudProvider.CreateCalls).To(HaveLen(0)) ExpectNodeExists(ctx, env.Client, node.Name) @@ -1393,7 +1394,7 @@ var _ = Describe("Replace Nodes", func() { var consolidationFinished atomic.Bool go triggerVerifyAction() go func() { - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) consolidationFinished.Store(true) }() @@ -1486,7 +1487,7 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need a new node, but we should evict everything off one of node2 which only has a single pod @@ -1578,7 +1579,7 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need a new node @@ -1656,7 +1657,7 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need a new node @@ -1731,7 +1732,7 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need a new node @@ -1814,7 +1815,7 @@ var _ = Describe("Node Lifetime Consideration", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) fakeClock.SetTime(time.Now()) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // the second node has more pods so it would normally not be picked for consolidation, except it very little @@ -1912,7 +1913,7 @@ var _ = Describe("Topology Consideration", func() { wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, zone1Node, zone2Node, zone3Node) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -2014,7 +2015,7 @@ var _ = Describe("Topology Consideration", func() { wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, zone1Node, zone2Node, zone3Node) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -2053,7 +2054,7 @@ var _ = Describe("Empty Nodes", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need any new nodes @@ -2097,7 +2098,7 @@ var _ = Describe("Empty Nodes", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need any new nodes @@ -2131,7 +2132,7 @@ var _ = Describe("Empty Nodes", func() { fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need any new nodes @@ -2180,7 +2181,7 @@ var _ = Describe("Empty Nodes", func() { fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // we don't need any new nodes and consolidation should notice the huge pending pod that needs the large @@ -2221,7 +2222,7 @@ var _ = Describe("consolidation TTL", func() { go func() { defer wg.Done() defer finished.Store(true) - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) }() @@ -2313,7 +2314,7 @@ var _ = Describe("consolidation TTL", func() { go func() { defer wg.Done() defer finished.Store(true) - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) }() @@ -2366,7 +2367,7 @@ var _ = Describe("consolidation TTL", func() { go func() { defer wg.Done() defer finished.Store(true) - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) }() @@ -2445,7 +2446,7 @@ var _ = Describe("Parallelization", func() { // Run the processing loop in parallel in the background with environment context go triggerVerifyAction() go func() { - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) }() @@ -2545,9 +2546,9 @@ var _ = Describe("Parallelization", func() { // Trigger a reconciliation run which should take into account the deleting node // cnsolidation shouldn't trigger additional actions fakeClock.Step(10 * time.Minute) - result, err := deprovisioningController.ProcessCluster(ctx) + result, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) - Expect(result).To(Equal(deprovisioning.ResultNothingToDo)) + Expect(result.RequeueAfter).To(BeNumerically(">", 0)) }) }) @@ -2630,7 +2631,7 @@ var _ = Describe("Multi-Node Consolidation", func() { fakeClock.Step(10 * time.Minute) wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node1, node2, node3) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -2704,7 +2705,7 @@ var _ = Describe("Multi-Node Consolidation", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) fakeClock.Step(10 * time.Minute) go triggerVerifyAction() - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // We have [cheap-node, cheap-node] which multi-node consolidation could consolidate via @@ -2789,7 +2790,7 @@ var _ = Describe("Multi-Node Consolidation", func() { go func() { defer wg.Done() defer finished.Store(true) - _, err := deprovisioningController.ProcessCluster(ctx) + _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) }() diff --git a/pkg/controllers/deprovisioning/types.go b/pkg/controllers/deprovisioning/types.go index 391c1c2e4f..1f262c54d9 100644 --- a/pkg/controllers/deprovisioning/types.go +++ b/pkg/controllers/deprovisioning/types.go @@ -26,32 +26,6 @@ import ( "github.com/aws/karpenter-core/pkg/controllers/state" ) -// Result is used to indicate the action of consolidating so we can optimize by not trying to consolidate if -// we were unable to consolidate the cluster and it hasn't changed state with respect to pods/nodes. -type Result byte - -const ( - ResultNothingToDo Result = iota // there are no actions that can be performed given the current cluster state - ResultRetry // we attempted an action, but its validation failed so retry soon - ResultFailed // the action failed entirely - ResultSuccess // the action was successful -) - -func (r Result) String() string { - switch r { - case ResultNothingToDo: - return "Nothing to do" - case ResultRetry: - return "Retry" - case ResultFailed: - return "Failed" - case ResultSuccess: - return "Success" - default: - return fmt.Sprintf("Unknown (%d)", r) - } -} - type Deprovisioner interface { ShouldDeprovision(context.Context, *state.Node, *v1alpha5.Provisioner, []*v1.Pod) bool ComputeCommand(context.Context, ...CandidateNode) (Command, error) @@ -61,8 +35,7 @@ type Deprovisioner interface { type action byte const ( - actionFailed action = iota - actionDelete + actionDelete action = iota actionReplace actionRetry actionDoNothing @@ -79,9 +52,6 @@ func (a action) String() string { // Deprovisioning failed for a retryable reason case actionRetry: return "retry" - // Deprovisioning computation unsuccessful - case actionFailed: - return "failed" case actionDoNothing: return "do nothing" default: diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 3c1f521408..7c85bb14a9 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -117,8 +117,7 @@ func (p *Provisioner) Reconcile(ctx context.Context, _ reconcile.Request) (resul // We don't consider the nodes that are MarkedForDeletion since this capacity shouldn't be considered // as persistent capacity for the cluster (since it will soon be removed). Additionally, we are scheduling for // the pods that are on these nodes so the MarkedForDeletion node capacity can't be considered. - allNodes := p.cluster.Nodes() - stateNodes := allNodes.ActiveNodes() + nodes := p.cluster.Nodes() // Get pods, exit if nothing to do pendingPods, err := p.GetPendingPods(ctx) @@ -129,7 +128,7 @@ func (p *Provisioner) Reconcile(ctx context.Context, _ reconcile.Request) (resul // We do this after getting the pending pods so that we undershoot if pods are // actively migrating from a node that is being deleted // NOTE: The assumption is that these nodes are cordoned and no additional pods will schedule to them - deletingNodePods, err := allNodes.DeletingNodes().Pods(ctx, p.kubeClient) + deletingNodePods, err := nodes.Deleting().Pods(ctx, p.kubeClient) if err != nil { return reconcile.Result{}, err } @@ -139,15 +138,15 @@ func (p *Provisioner) Reconcile(ctx context.Context, _ reconcile.Request) (resul } // Schedule pods to potential nodes, exit if nothing to do - nodes, err := p.schedule(ctx, pods, stateNodes) + machines, err := p.schedule(ctx, pods, nodes.Active()) if err != nil { return reconcile.Result{}, err } - if len(nodes) == 0 { + if len(machines) == 0 { return reconcile.Result{}, nil } - nodeNames, err := p.LaunchMachines(ctx, nodes, RecordPodNomination) + nodeNames, err := p.LaunchMachines(ctx, machines, RecordPodNomination) // Any successfully created node is going to have the nodeName value filled in the slice successfullyCreatedNodeCount := lo.CountBy(nodeNames, func(name string) bool { return name != "" }) diff --git a/pkg/controllers/state/node.go b/pkg/controllers/state/node.go index c25fe1c828..09cadbd029 100644 --- a/pkg/controllers/state/node.go +++ b/pkg/controllers/state/node.go @@ -35,15 +35,15 @@ import ( // Nodes is a typed version of a list of *Node type Nodes []*Node -// ActiveNodes filters nodes that are not in a MarkedForDeletion state -func (n Nodes) ActiveNodes() Nodes { +// Active filters nodes that are not in a MarkedForDeletion state +func (n Nodes) Active() Nodes { return lo.Filter(n, func(node *Node, _ int) bool { return !node.MarkedForDeletion() }) } -// DeletingNodes filters nodes that are in a MarkedForDeletion state -func (n Nodes) DeletingNodes() Nodes { +// Deleting filters nodes that are in a MarkedForDeletion state +func (n Nodes) Deleting() Nodes { return lo.Filter(n, func(node *Node, _ int) bool { return node.MarkedForDeletion() })