Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: simplify deprovisioning controller and remove dead code #179

Merged
merged 1 commit into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions pkg/controllers/deprovisioning/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,6 @@ func (c *consolidation) String() string {
return metrics.ConsolidationReason
}

// RecordLastState is used to record the last state that the consolidation implementation failed to work in to allow
// skipping future consolidation attempts until the state changes.
func (c *consolidation) RecordLastState(currentState int64) {
c.lastConsolidationState = currentState
}

func (c *consolidation) ShouldAttemptConsolidation() bool {
// the last cluster consolidation wasn't able to improve things and nothing has changed regarding
// the cluster that makes us think we would be successful now
return c.lastConsolidationState != c.cluster.ClusterConsolidationState()
}

// sortAndFilterCandidates orders deprovisionable nodes by the disruptionCost, removing any that we already know won't
// be viable consolidation options.
func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []CandidateNode) ([]CandidateNode, error) {
Expand Down
173 changes: 50 additions & 123 deletions pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,14 @@ import (

// Controller is the deprovisioning controller.
type Controller struct {
kubeClient client.Client
cluster *state.Cluster
provisioner *provisioning.Provisioner
recorder events.Recorder
clock clock.Clock
cloudProvider cloudprovider.CloudProvider
emptiness *Emptiness
expiration *Expiration
drift *Drift
singleNodeConsolidation *SingleNodeConsolidation
multiNodeConsolidation *MultiNodeConsolidation
emptyNodeConsolidation *EmptyNodeConsolidation
reporter *Reporter
kubeClient client.Client
cluster *state.Cluster
provisioner *provisioning.Provisioner
recorder events.Recorder
clock clock.Clock
cloudProvider cloudprovider.CloudProvider
deprovisioners []Deprovisioner
reporter *Reporter
}

// pollingPeriod that we inspect cluster to look for opportunities to deprovision
Expand All @@ -80,19 +75,27 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi

reporter := NewReporter(recorder)
return &Controller{
clock: clk,
kubeClient: kubeClient,
cluster: cluster,
provisioner: provisioner,
recorder: recorder,
reporter: reporter,
cloudProvider: cp,
expiration: NewExpiration(clk, kubeClient, cluster, provisioner),
emptiness: NewEmptiness(clk, kubeClient, cluster),
drift: NewDrift(kubeClient, cluster, provisioner),
emptyNodeConsolidation: NewEmptyNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
multiNodeConsolidation: NewMultiNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
singleNodeConsolidation: NewSingleNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
clock: clk,
kubeClient: kubeClient,
cluster: cluster,
provisioner: provisioner,
recorder: recorder,
reporter: reporter,
cloudProvider: cp,
deprovisioners: []Deprovisioner{
// Expire any nodes that must be deleted, allowing their pods to potentially land on currently
NewExpiration(clk, kubeClient, cluster, provisioner),
// Terminate any nodes that have drifted from provisioning specifications, allowing the pods to reschedule.
NewDrift(kubeClient, cluster, provisioner),
// Delete any remaining empty nodes as there is zero cost in terms of dirsuption. Emptiness and
// emptyNodeConsolidation are mutually exclusive, only one of these will operate
NewEmptiness(clk),
NewEmptyNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
// Attempt to identify multiple nodes that we can consolidate simultaneously to reduce pod churn
NewMultiNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
// And finally fall back our single node consolidation to further reduce cluster cost.
NewSingleNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
},
}
}

Expand All @@ -105,126 +108,50 @@ func (c *Controller) Builder(_ context.Context, m manager.Manager) controller.Bu
}

func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
// capture the state of the cluster before we do any analysis
currentState := c.cluster.ClusterConsolidationState()
result, err := c.ProcessCluster(ctx)

switch result {
case ResultFailed:
return reconcile.Result{}, fmt.Errorf("processing cluster, %w", err)
case ResultRetry:
return reconcile.Result{Requeue: true}, nil
case ResultNothingToDo:
// we record the cluster state for consolidation methods as they are expensive to compute and this allows
// them to defer calculations until something about the cluster has changed that may allow them to
// succeed
c.emptyNodeConsolidation.RecordLastState(currentState)
c.singleNodeConsolidation.RecordLastState(currentState)
c.multiNodeConsolidation.RecordLastState(currentState)
}
return reconcile.Result{RequeueAfter: pollingPeriod}, nil
}

// CandidateNode is a node that we are considering for deprovisioning along with extra information to be used in
// making that determination
type CandidateNode struct {
*v1.Node
instanceType *cloudprovider.InstanceType
capacityType string
zone string
provisioner *v1alpha5.Provisioner
disruptionCost float64
pods []*v1.Pod
}

// ProcessCluster is exposed for unit testing purposes
// ProcessCluster loops through implemented deprovisioners
func (c *Controller) ProcessCluster(ctx context.Context) (Result, error) {
// range over the different deprovisioning methods. We'll only let one method perform an action
for _, d := range []Deprovisioner{
// Expire any nodes that must be deleted, allowing their pods to potentially land on currently
// empty nodes
c.expiration,

// Terminate any nodes that have drifted from provisioning specifications, allowing the pods to reschedule.
c.drift,

// Delete any remaining empty nodes as there is zero cost in terms of dirsuption. Emptiness and
// emptyNodeConsolidation are mutually exclusive, only one of these will operate
c.emptiness,
c.emptyNodeConsolidation,

// Attempt to identify multiple nodes that we can consolidate simultaneously to reduce pod churn
c.multiNodeConsolidation,

// And finally fall back our single node consolidation to further reduce cluster cost.
c.singleNodeConsolidation,
} {
// Attempt different deprovisioning methods. We'll only let one method perform an action
for _, d := range c.deprovisioners {
candidates, err := candidateNodes(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, d.ShouldDeprovision)
if err != nil {
return ResultFailed, fmt.Errorf("determining candidate nodes, %w", err)
return reconcile.Result{}, fmt.Errorf("determining candidate nodes, %w", err)
}
// If there are no candidate nodes, move to the next deprovisioner
if len(candidates) == 0 {
continue
}

result, err := c.executeDeprovisioning(ctx, d, candidates...)
// Determine the deprovisioning action
cmd, err := d.ComputeCommand(ctx, candidates...)
if err != nil {
return ResultFailed, fmt.Errorf("deprovisioning nodes, %w", err)
return reconcile.Result{}, fmt.Errorf("computing deprovisioning decision, %w", err)
}

switch result {
case ResultFailed:
return ResultFailed, err
case ResultRetry, ResultSuccess:
// the controller wants to retry, or was successful in deprovisioning
return result, nil
case ResultNothingToDo:
// found nothing to do, so try the next deprovisioner
if cmd.action == actionDoNothing {
continue
default:
logging.FromContext(ctx).Errorf("unexpected result %s", result)
}
if cmd.action == actionRetry {
return reconcile.Result{Requeue: true}, nil
}

// Attempt to deprovision
if err := c.executeCommand(ctx, d, cmd); err != nil {
return reconcile.Result{}, fmt.Errorf("deprovisioning nodes, %w", err)
}
return reconcile.Result{Requeue: true}, nil
}

// All deprovisioners did nothing, so return nothing to do
return ResultNothingToDo, nil
}

// Given candidate nodes, compute best deprovisioning action
func (c *Controller) executeDeprovisioning(ctx context.Context, d Deprovisioner, nodes ...CandidateNode) (Result, error) {
// Each attempt will try at least one node, limit to that many attempts.
cmd, err := d.ComputeCommand(ctx, nodes...)
if err != nil {
return ResultFailed, err
}
// Convert action to result
switch cmd.action {
case actionFailed:
return ResultFailed, err
case actionDoNothing:
return ResultNothingToDo, nil
case actionRetry:
return ResultRetry, nil
}
// If delete or replace, execute command
result, err := c.executeCommand(ctx, cmd, d)
if err != nil {
return ResultFailed, err
}
return result, nil
c.cluster.SetConsolidated(true) // Mark cluster as consolidated
return reconcile.Result{RequeueAfter: pollingPeriod}, nil
}

func (c *Controller) executeCommand(ctx context.Context, command Command, d Deprovisioner) (Result, error) {
func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, command Command) error {
deprovisioningActionsPerformedCounter.With(prometheus.Labels{"action": fmt.Sprintf("%s/%s", d, command.action)}).Add(1)
logging.FromContext(ctx).Infof("deprovisioning via %s %s", d, command)

if command.action == actionReplace {
if err := c.launchReplacementNodes(ctx, command); err != nil {
// If we failed to launch the replacement, don't deprovision. If this is some permanent failure,
// we don't want to disrupt workloads with no way to provision new nodes for them.
return ResultFailed, fmt.Errorf("launching replacement node, %w", err)
return fmt.Errorf("launching replacement node, %w", err)
}
}

Expand All @@ -242,7 +169,7 @@ func (c *Controller) executeCommand(ctx context.Context, command Command, d Depr
for _, oldnode := range command.nodesToRemove {
c.waitForDeletion(ctx, oldnode)
}
return ResultSuccess, nil
return nil
}

// waitForDeletion waits for the specified node to be removed from the API server. This deletion can take some period
Expand Down
11 changes: 3 additions & 8 deletions pkg/controllers/deprovisioning/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
v1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/samber/lo"

Expand All @@ -35,16 +34,12 @@ import (
// Emptiness is a subreconciler that deletes empty nodes.
// Emptiness will respect TTLSecondsAfterEmpty
type Emptiness struct {
clock clock.Clock
kubeClient client.Client
cluster *state.Cluster
clock clock.Clock
}

func NewEmptiness(clk clock.Clock, kubeClient client.Client, cluster *state.Cluster) *Emptiness {
func NewEmptiness(clk clock.Clock) *Emptiness {
return &Emptiness{
clock: clk,
kubeClient: kubeClient,
cluster: cluster,
clock: clk,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewEmptyNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeClie

// ComputeCommand generates a deprovisioning command given deprovisionable nodes
func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, candidates ...CandidateNode) (Command, error) {
if !c.ShouldAttemptConsolidation() {
if c.cluster.Consolidated() {
return Command{action: actionDoNothing}, nil
}
candidates, err := c.sortAndFilterCandidates(ctx, candidates)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/deprovisioning/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
candidateNodes ...CandidateNode) (newNodes []*pscheduling.Node, allPodsScheduled bool, err error) {

candidateNodeNames := sets.NewString(lo.Map(candidateNodes, func(t CandidateNode, i int) string { return t.Name })...)
allNodes := cluster.Nodes()
deletingNodes := allNodes.DeletingNodes()
stateNodes := lo.Filter(allNodes, func(n *state.Node, _ int) bool {
nodes := cluster.Nodes()
deletingNodes := nodes.Deleting()
stateNodes := lo.Filter(nodes.Active(), func(n *state.Node, _ int) bool {
return !candidateNodeNames.Has(n.Name())
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewMultiNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeClie
}

func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, candidates ...CandidateNode) (Command, error) {
if !m.ShouldAttemptConsolidation() {
if m.cluster.Consolidated() {
return Command{action: actionDoNothing}, nil
}
candidates, err := m.sortAndFilterCandidates(ctx, candidates)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/deprovisioning/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewSingleNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeCli
//
//nolint:gocyclo
func (c *SingleNodeConsolidation) ComputeCommand(ctx context.Context, candidates ...CandidateNode) (Command, error) {
if !c.ShouldAttemptConsolidation() {
if c.cluster.Consolidated() {
return Command{action: actionDoNothing}, nil
}
candidates, err := c.sortAndFilterCandidates(ctx, candidates)
Expand All @@ -58,7 +58,7 @@ func (c *SingleNodeConsolidation) ComputeCommand(ctx context.Context, candidates
logging.FromContext(ctx).Errorf("computing consolidation %s", err)
continue
}
if cmd.action == actionDoNothing || cmd.action == actionRetry || cmd.action == actionFailed {
if cmd.action == actionDoNothing || cmd.action == actionRetry {
continue
}

Expand Down
Loading