Skip to content

Commit

Permalink
chore: simplify deprovisioning controller and remove dead code
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Jan 31, 2023
1 parent 08b676a commit bb90655
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 254 deletions.
12 changes: 0 additions & 12 deletions pkg/controllers/deprovisioning/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,6 @@ func (c *consolidation) String() string {
return metrics.ConsolidationReason
}

// RecordLastState is used to record the last state that the consolidation implementation failed to work in to allow
// skipping future consolidation attempts until the state changes.
func (c *consolidation) RecordLastState(currentState int64) {
c.lastConsolidationState = currentState
}

func (c *consolidation) ShouldAttemptConsolidation() bool {
// the last cluster consolidation wasn't able to improve things and nothing has changed regarding
// the cluster that makes us think we would be successful now
return c.lastConsolidationState != c.cluster.ClusterConsolidationState()
}

// sortAndFilterCandidates orders deprovisionable nodes by the disruptionCost, removing any that we already know won't
// be viable consolidation options.
func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []CandidateNode) ([]CandidateNode, error) {
Expand Down
173 changes: 50 additions & 123 deletions pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,14 @@ import (

// Controller is the deprovisioning controller.
type Controller struct {
kubeClient client.Client
cluster *state.Cluster
provisioner *provisioning.Provisioner
recorder events.Recorder
clock clock.Clock
cloudProvider cloudprovider.CloudProvider
emptiness *Emptiness
expiration *Expiration
drift *Drift
singleNodeConsolidation *SingleNodeConsolidation
multiNodeConsolidation *MultiNodeConsolidation
emptyNodeConsolidation *EmptyNodeConsolidation
reporter *Reporter
kubeClient client.Client
cluster *state.Cluster
provisioner *provisioning.Provisioner
recorder events.Recorder
clock clock.Clock
cloudProvider cloudprovider.CloudProvider
deprovisioners []Deprovisioner
reporter *Reporter
}

// pollingPeriod that we inspect cluster to look for opportunities to deprovision
Expand All @@ -80,19 +75,27 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi

reporter := NewReporter(recorder)
return &Controller{
clock: clk,
kubeClient: kubeClient,
cluster: cluster,
provisioner: provisioner,
recorder: recorder,
reporter: reporter,
cloudProvider: cp,
expiration: NewExpiration(clk, kubeClient, cluster, provisioner),
emptiness: NewEmptiness(clk, kubeClient, cluster),
drift: NewDrift(kubeClient, cluster, provisioner),
emptyNodeConsolidation: NewEmptyNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
multiNodeConsolidation: NewMultiNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
singleNodeConsolidation: NewSingleNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
clock: clk,
kubeClient: kubeClient,
cluster: cluster,
provisioner: provisioner,
recorder: recorder,
reporter: reporter,
cloudProvider: cp,
deprovisioners: []Deprovisioner{
// Expire any nodes that must be deleted, allowing their pods to potentially land on currently
NewExpiration(clk, kubeClient, cluster, provisioner),
// Terminate any nodes that have drifted from provisioning specifications, allowing the pods to reschedule.
NewDrift(kubeClient, cluster, provisioner),
// Delete any remaining empty nodes as there is zero cost in terms of dirsuption. Emptiness and
// emptyNodeConsolidation are mutually exclusive, only one of these will operate
NewEmptiness(clk),
NewEmptyNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
// Attempt to identify multiple nodes that we can consolidate simultaneously to reduce pod churn
NewMultiNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
// And finally fall back our single node consolidation to further reduce cluster cost.
NewSingleNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
},
}
}

Expand All @@ -105,126 +108,50 @@ func (c *Controller) Builder(_ context.Context, m manager.Manager) controller.Bu
}

func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
// capture the state of the cluster before we do any analysis
currentState := c.cluster.ClusterConsolidationState()
result, err := c.ProcessCluster(ctx)

switch result {
case ResultFailed:
return reconcile.Result{}, fmt.Errorf("processing cluster, %w", err)
case ResultRetry:
return reconcile.Result{Requeue: true}, nil
case ResultNothingToDo:
// we record the cluster state for consolidation methods as they are expensive to compute and this allows
// them to defer calculations until something about the cluster has changed that may allow them to
// succeed
c.emptyNodeConsolidation.RecordLastState(currentState)
c.singleNodeConsolidation.RecordLastState(currentState)
c.multiNodeConsolidation.RecordLastState(currentState)
}
return reconcile.Result{RequeueAfter: pollingPeriod}, nil
}

// CandidateNode is a node that we are considering for deprovisioning along with extra information to be used in
// making that determination
type CandidateNode struct {
*v1.Node
instanceType *cloudprovider.InstanceType
capacityType string
zone string
provisioner *v1alpha5.Provisioner
disruptionCost float64
pods []*v1.Pod
}

// ProcessCluster is exposed for unit testing purposes
// ProcessCluster loops through implemented deprovisioners
func (c *Controller) ProcessCluster(ctx context.Context) (Result, error) {
// range over the different deprovisioning methods. We'll only let one method perform an action
for _, d := range []Deprovisioner{
// Expire any nodes that must be deleted, allowing their pods to potentially land on currently
// empty nodes
c.expiration,

// Terminate any nodes that have drifted from provisioning specifications, allowing the pods to reschedule.
c.drift,

// Delete any remaining empty nodes as there is zero cost in terms of dirsuption. Emptiness and
// emptyNodeConsolidation are mutually exclusive, only one of these will operate
c.emptiness,
c.emptyNodeConsolidation,

// Attempt to identify multiple nodes that we can consolidate simultaneously to reduce pod churn
c.multiNodeConsolidation,

// And finally fall back our single node consolidation to further reduce cluster cost.
c.singleNodeConsolidation,
} {
// Attempt different deprovisioning methods. We'll only let one method perform an action
for _, d := range c.deprovisioners {
candidates, err := candidateNodes(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, d.ShouldDeprovision)
if err != nil {
return ResultFailed, fmt.Errorf("determining candidate nodes, %w", err)
return reconcile.Result{}, fmt.Errorf("determining candidate nodes, %w", err)
}
// If there are no candidate nodes, move to the next deprovisioner
if len(candidates) == 0 {
continue
}

result, err := c.executeDeprovisioning(ctx, d, candidates...)
// Determine the deprovisioning action
cmd, err := d.ComputeCommand(ctx, candidates...)
if err != nil {
return ResultFailed, fmt.Errorf("deprovisioning nodes, %w", err)
return reconcile.Result{}, fmt.Errorf("computing deprovisioning decision, %w", err)
}

switch result {
case ResultFailed:
return ResultFailed, err
case ResultRetry, ResultSuccess:
// the controller wants to retry, or was successful in deprovisioning
return result, nil
case ResultNothingToDo:
// found nothing to do, so try the next deprovisioner
if cmd.action == actionDoNothing {
continue
default:
logging.FromContext(ctx).Errorf("unexpected result %s", result)
}
if cmd.action == actionRetry {
return reconcile.Result{Requeue: true}, nil
}

// Attempt to deprovision
if err := c.executeCommand(ctx, d, cmd); err != nil {
return reconcile.Result{}, fmt.Errorf("deprovisioning nodes, %w", err)
}
return reconcile.Result{Requeue: true}, nil
}

// All deprovisioners did nothing, so return nothing to do
return ResultNothingToDo, nil
}

// Given candidate nodes, compute best deprovisioning action
func (c *Controller) executeDeprovisioning(ctx context.Context, d Deprovisioner, nodes ...CandidateNode) (Result, error) {
// Each attempt will try at least one node, limit to that many attempts.
cmd, err := d.ComputeCommand(ctx, nodes...)
if err != nil {
return ResultFailed, err
}
// Convert action to result
switch cmd.action {
case actionFailed:
return ResultFailed, err
case actionDoNothing:
return ResultNothingToDo, nil
case actionRetry:
return ResultRetry, nil
}
// If delete or replace, execute command
result, err := c.executeCommand(ctx, cmd, d)
if err != nil {
return ResultFailed, err
}
return result, nil
c.cluster.SetConsolidated(true) // Mark cluster as consolidated
return reconcile.Result{RequeueAfter: pollingPeriod}, nil
}

func (c *Controller) executeCommand(ctx context.Context, command Command, d Deprovisioner) (Result, error) {
func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, command Command) error {
deprovisioningActionsPerformedCounter.With(prometheus.Labels{"action": fmt.Sprintf("%s/%s", d, command.action)}).Add(1)
logging.FromContext(ctx).Infof("deprovisioning via %s %s", d, command)

if command.action == actionReplace {
if err := c.launchReplacementNodes(ctx, command); err != nil {
// If we failed to launch the replacement, don't deprovision. If this is some permanent failure,
// we don't want to disrupt workloads with no way to provision new nodes for them.
return ResultFailed, fmt.Errorf("launching replacement node, %w", err)
return fmt.Errorf("launching replacement node, %w", err)
}
}

Expand All @@ -242,7 +169,7 @@ func (c *Controller) executeCommand(ctx context.Context, command Command, d Depr
for _, oldnode := range command.nodesToRemove {
c.waitForDeletion(ctx, oldnode)
}
return ResultSuccess, nil
return nil
}

// waitForDeletion waits for the specified node to be removed from the API server. This deletion can take some period
Expand Down
11 changes: 3 additions & 8 deletions pkg/controllers/deprovisioning/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
v1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/samber/lo"

Expand All @@ -35,16 +34,12 @@ import (
// Emptiness is a subreconciler that deletes empty nodes.
// Emptiness will respect TTLSecondsAfterEmpty
type Emptiness struct {
clock clock.Clock
kubeClient client.Client
cluster *state.Cluster
clock clock.Clock
}

func NewEmptiness(clk clock.Clock, kubeClient client.Client, cluster *state.Cluster) *Emptiness {
func NewEmptiness(clk clock.Clock) *Emptiness {
return &Emptiness{
clock: clk,
kubeClient: kubeClient,
cluster: cluster,
clock: clk,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewEmptyNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeClie

// ComputeCommand generates a deprovisioning command given deprovisionable nodes
func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, candidates ...CandidateNode) (Command, error) {
if !c.ShouldAttemptConsolidation() {
if c.cluster.Consolidated() {
return Command{action: actionDoNothing}, nil
}
candidates, err := c.sortAndFilterCandidates(ctx, candidates)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/deprovisioning/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
candidateNodes ...CandidateNode) (newNodes []*pscheduling.Node, allPodsScheduled bool, err error) {

candidateNodeNames := sets.NewString(lo.Map(candidateNodes, func(t CandidateNode, i int) string { return t.Name })...)
allNodes := cluster.Nodes()
deletingNodes := allNodes.DeletingNodes()
stateNodes := lo.Filter(allNodes, func(n *state.Node, _ int) bool {
nodes := cluster.Nodes()
deletingNodes := nodes.Deleting()
stateNodes := lo.Filter(nodes.Active(), func(n *state.Node, _ int) bool {
return !candidateNodeNames.Has(n.Name())
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewMultiNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeClie
}

func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, candidates ...CandidateNode) (Command, error) {
if !m.ShouldAttemptConsolidation() {
if m.cluster.Consolidated() {
return Command{action: actionDoNothing}, nil
}
candidates, err := m.sortAndFilterCandidates(ctx, candidates)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/deprovisioning/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewSingleNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeCli
//
//nolint:gocyclo
func (c *SingleNodeConsolidation) ComputeCommand(ctx context.Context, candidates ...CandidateNode) (Command, error) {
if !c.ShouldAttemptConsolidation() {
if c.cluster.Consolidated() {
return Command{action: actionDoNothing}, nil
}
candidates, err := c.sortAndFilterCandidates(ctx, candidates)
Expand All @@ -58,7 +58,7 @@ func (c *SingleNodeConsolidation) ComputeCommand(ctx context.Context, candidates
logging.FromContext(ctx).Errorf("computing consolidation %s", err)
continue
}
if cmd.action == actionDoNothing || cmd.action == actionRetry || cmd.action == actionFailed {
if cmd.action == actionDoNothing || cmd.action == actionRetry {
continue
}

Expand Down
Loading

0 comments on commit bb90655

Please sign in to comment.