Skip to content

Commit

Permalink
fix: Requeue immediately after deprovisioning (#279)
Browse files Browse the repository at this point in the history
* fix: Requeue immediately after deprovisioning

* Continue after deprovisioning errors

* Shortcircuit if any deprovisioner fails
  • Loading branch information
ellistarn authored and tzneal committed Apr 11, 2023
1 parent ec51fba commit 913d0a2
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 41 deletions.
55 changes: 32 additions & 23 deletions pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Controller struct {

// pollingPeriod that we inspect cluster to look for opportunities to deprovision
const pollingPeriod = 10 * time.Second
const immediately = time.Millisecond

var errCandidateDeleting = fmt.Errorf("candidate is deleting")

Expand Down Expand Up @@ -115,32 +116,13 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
// Attempt different deprovisioning methods. We'll only let one method perform an action
isConsolidated := c.cluster.Consolidated()
for _, d := range c.deprovisioners {
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, d.ShouldDeprovision)
success, err := c.deprovision(ctx, d)
if err != nil {
return reconcile.Result{}, fmt.Errorf("determining candidates, %w", err)
return reconcile.Result{}, fmt.Errorf("deprovisioning via %q, %w", d, err)
}
// If there are no candidate nodes, move to the next deprovisioner
if len(candidates) == 0 {
continue
}

// Determine the deprovisioning action
cmd, err := d.ComputeCommand(ctx, candidates...)
if err != nil {
return reconcile.Result{}, fmt.Errorf("computing deprovisioning decision, %w", err)
}
if cmd.action == actionDoNothing {
continue
if success {
return reconcile.Result{RequeueAfter: immediately}, nil
}
if cmd.action == actionRetry {
return reconcile.Result{Requeue: true}, nil
}

// Attempt to deprovision
if err := c.executeCommand(ctx, d, cmd); err != nil {
return reconcile.Result{}, fmt.Errorf("deprovisioning candidates, %w", err)
}
return reconcile.Result{Requeue: true}, nil
}

if !isConsolidated {
Expand All @@ -151,6 +133,33 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
return reconcile.Result{RequeueAfter: pollingPeriod}, nil
}

func (c *Controller) deprovision(ctx context.Context, deprovisioner Deprovisioner) (bool, error) {
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, deprovisioner.ShouldDeprovision)
if err != nil {
return false, fmt.Errorf("determining candidates, %w", err)
}
// If there are no candidate nodes, move to the next deprovisioner
if len(candidates) == 0 {
return false, nil
}

// Determine the deprovisioning action
cmd, err := deprovisioner.ComputeCommand(ctx, candidates...)
if err != nil {
return false, fmt.Errorf("computing deprovisioning decision, %w", err)
}
if cmd.action == actionDoNothing {
return false, nil
}

// Attempt to deprovision
if err := c.executeCommand(ctx, deprovisioner, cmd); err != nil {
return false, fmt.Errorf("deprovisioning candidates, %w", err)
}

return true, nil
}

func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, command Command) error {
deprovisioningActionsPerformedCounter.With(prometheus.Labels{"action": fmt.Sprintf("%s/%s", d, command.action)}).Add(1)
logging.FromContext(ctx).Infof("deprovisioning via %s %s", d, command)
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/deprovisioning/emptymachineconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ func (c *EmptyMachineConsolidation) ComputeCommand(ctx context.Context, candidat
// the machine isn't a target of a recent scheduling simulation
for _, n := range candidatesToDelete {
if len(n.pods) != 0 && !c.cluster.IsNodeNominated(n.Name()) {
return Command{action: actionRetry}, nil
return Command{}, fmt.Errorf("command is no longer valid, %s", cmd)
}
}

return cmd, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (m *MultiMachineConsolidation) ComputeCommand(ctx context.Context, candidat
}

if !isValid {
return Command{action: actionRetry}, nil
return Command{}, fmt.Errorf("command is no longer valid, %s", cmd)
}
return cmd, nil
}
Expand Down
11 changes: 2 additions & 9 deletions pkg/controllers/deprovisioning/singlemachineconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,14 @@ func (c *SingleMachineConsolidation) ComputeCommand(ctx context.Context, candida
}

v := NewValidation(consolidationTTL, c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder)
var failedValidation bool
for _, candidate := range candidates {
// compute a possible consolidation option
cmd, err := c.computeConsolidation(ctx, candidate)
if err != nil {
logging.FromContext(ctx).Errorf("computing consolidation %s", err)
continue
}
if cmd.action == actionDoNothing || cmd.action == actionRetry {
if cmd.action == actionDoNothing {
continue
}

Expand All @@ -68,18 +67,12 @@ func (c *SingleMachineConsolidation) ComputeCommand(ctx context.Context, candida
continue
}
if !isValid {
failedValidation = true
continue
return Command{}, fmt.Errorf("command is no longer valid, %s", cmd)
}

if cmd.action == actionReplace || cmd.action == actionDelete {
return cmd, nil
}
}

// we failed validation, so we need to retry
if failedValidation {
return Command{action: actionRetry}, nil
}
return Command{action: actionDoNothing}, nil
}
6 changes: 4 additions & 2 deletions pkg/controllers/deprovisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,8 @@ var _ = Describe("Consolidation TTL", func() {
defer GinkgoRecover()
defer wg.Done()
defer finished.Store(true)
ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{})
_, err := deprovisioningController.Reconcile(ctx, reconcile.Request{})
Expect(err).To(HaveOccurred())
}()

// wait for the deprovisioningController to block on the validation timeout
Expand Down Expand Up @@ -1757,7 +1758,8 @@ var _ = Describe("Consolidation TTL", func() {
defer GinkgoRecover()
defer wg.Done()
defer finished.Store(true)
ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{})
_, err := deprovisioningController.Reconcile(ctx, reconcile.Request{})
Expect(err).To(HaveOccurred())
}()

// wait for the deprovisioningController to block on the validation timeout
Expand Down
4 changes: 0 additions & 4 deletions pkg/controllers/deprovisioning/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ type action byte
const (
actionDelete action = iota
actionReplace
actionRetry
actionDoNothing
)

Expand All @@ -139,9 +138,6 @@ func (a action) String() string {
// Deprovisioning action with replacement machines
case actionReplace:
return "replace"
// Deprovisioning failed for a retryable reason
case actionRetry:
return "retry"
case actionDoNothing:
return "do nothing"
default:
Expand Down

0 comments on commit 913d0a2

Please sign in to comment.