Skip to content

Commit

Permalink
fix: problem where pods blocking eviction were not respected for cons… (
Browse files Browse the repository at this point in the history
#247)

* fix: problem where pods blocking eviction were not respected for consolidation

* remove ctx

* fix ci checks

* reorder blocking condition

* dedupe filter code

* remove stale function

* fix

* change events back to blocked and pull out filter logic for expiration
  • Loading branch information
njtran authored Mar 22, 2023
1 parent 0d99636 commit cb03ff7
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 137 deletions.
92 changes: 5 additions & 87 deletions pkg/controllers/deprovisioning/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sort"
"time"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/utils/clock"
"knative.dev/pkg/ptr"
Expand Down Expand Up @@ -73,32 +72,15 @@ func (c *consolidation) String() string {
// sortAndFilterCandidates orders deprovisionable nodes by the disruptionCost, removing any that we already know won't
// be viable consolidation options.
func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []*Candidate) ([]*Candidate, error) {
pdbs, err := NewPDBLimits(ctx, c.kubeClient)
candidates, err := filterCandidates(ctx, c.kubeClient, c.recorder, nodes)
if err != nil {
return nil, fmt.Errorf("tracking PodDisruptionBudgets, %w", err)
return nil, fmt.Errorf("filtering candidates, %w", err)
}

// filter out nodes that can't be terminated
nodes = lo.Filter(nodes, func(cn *Candidate, _ int) bool {
if !cn.Node.DeletionTimestamp.IsZero() {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, "in the process of deletion")...)
return false
}
if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
return false
}
if p, ok := hasDoNotEvictPod(cn); ok {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...)
return false
}
return true
sort.Slice(candidates, func(i int, j int) bool {
return candidates[i].disruptionCost < candidates[j].disruptionCost
})

sort.Slice(nodes, func(i int, j int) bool {
return nodes[i].disruptionCost < nodes[j].disruptionCost
})
return nodes, nil
return candidates, nil
}

// ShouldDeprovision is a predicate used to filter deprovisionable nodes
Expand All @@ -118,70 +100,6 @@ func (c *consolidation) ShouldDeprovision(_ context.Context, cn *Candidate) bool
return true
}

// ValidateCommand validates a command for a deprovisioner
func (c *consolidation) ValidateCommand(ctx context.Context, cmd Command, candidateNodes []*Candidate) (bool, error) {
// map from nodes we are about to remove back into candidate nodes with cluster state
nodesToDelete := mapCandidates(cmd.candidates, candidateNodes)
// None of the chosen candidate nodes are valid for execution, so retry
if len(nodesToDelete) == 0 {
return false, nil
}

newNodes, allPodsScheduled, err := simulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, nodesToDelete...)
if err != nil {
return false, fmt.Errorf("simluating scheduling, %w", err)
}
if !allPodsScheduled {
return false, nil
}

// We want to ensure that the re-simulated scheduling using the current cluster state produces the same result.
// There are three possible options for the number of new nodesToDelete that we need to handle:
// len(newNodes) == 0, as long as we weren't expecting a new node, this is valid
// len(newNodes) > 1, something in the cluster changed so that the nodesToDelete we were going to delete can no longer
// be deleted without producing more than one node
// len(newNodes) == 1, as long as the node looks like what we were expecting, this is valid
if len(newNodes) == 0 {
if len(cmd.replacements) == 0 {
// scheduling produced zero new nodes and we weren't expecting any, so this is valid.
return true, nil
}
// if it produced no new nodes, but we were expecting one we should re-simulate as there is likely a better
// consolidation option now
return false, nil
}

// we need more than one replacement node which is never valid currently (all of our node replacement is m->1, never m->n)
if len(newNodes) > 1 {
return false, nil
}

// we now know that scheduling simulation wants to create one new node
if len(cmd.replacements) == 0 {
// but we weren't expecting any new nodes, so this is invalid
return false, nil
}

// We know that the scheduling simulation wants to create a new node and that the command we are verifying wants
// to create a new node. The scheduling simulation doesn't apply any filtering to instance types, so it may include
// instance types that we don't want to launch which were filtered out when the lifecycleCommand was created. To
// check if our lifecycleCommand is valid, we just want to ensure that the list of instance types we are considering
// creating are a subset of what scheduling says we should create.
//
// This is necessary since consolidation only wants cheaper nodes. Suppose consolidation determined we should delete
// a 4xlarge and replace it with a 2xlarge. If things have changed and the scheduling simulation we just performed
// now says that we need to launch a 4xlarge. It's still launching the correct number of nodes, but it's just
// as expensive or possibly more so we shouldn't validate.
if !instanceTypesAreSubset(cmd.replacements[0].InstanceTypeOptions, newNodes[0].InstanceTypeOptions) {
return false, nil
}

// Now we know:
// - current scheduling simulation says to create a new node with types T = {T_0, T_1, ..., T_n}
// - our lifecycle command says to create a node with types {U_0, U_1, ..., U_n} where U is a subset of T
return true, nil
}

// computeConsolidation computes a consolidation action to take
//
// nolint:gocyclo
Expand Down
24 changes: 3 additions & 21 deletions pkg/controllers/deprovisioning/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ import (
"errors"
"fmt"

"github.com/samber/lo"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/karpenter-core/pkg/apis/settings"
"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events"
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
Expand Down Expand Up @@ -59,27 +57,11 @@ func (d *Drift) ShouldDeprovision(ctx context.Context, c *Candidate) bool {
}

// ComputeCommand generates a deprovisioning command given deprovisionable machines
func (d *Drift) ComputeCommand(ctx context.Context, candidates ...*Candidate) (Command, error) {
pdbs, err := NewPDBLimits(ctx, d.kubeClient)
func (d *Drift) ComputeCommand(ctx context.Context, nodes ...*Candidate) (Command, error) {
candidates, err := filterCandidates(ctx, d.kubeClient, d.recorder, nodes)
if err != nil {
return Command{}, fmt.Errorf("tracking PodDisruptionBudgets, %w", err)
return Command{}, fmt.Errorf("filtering candidates, %w", err)
}
// filter out machines that can't be terminated
candidates = lo.Filter(candidates, func(cn *Candidate, _ int) bool {
if !cn.Node.DeletionTimestamp.IsZero() {
return false
}
if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok {
d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
return false
}
if p, ok := hasDoNotEvictPod(cn); ok {
d.recorder.Publish(deprovisioningevents.Blocked(cn.Node,
fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...)
return false
}
return true
})

for _, candidate := range candidates {
// Check if we need to create any machines.
Expand Down
33 changes: 9 additions & 24 deletions pkg/controllers/deprovisioning/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sort"
"time"

"github.com/samber/lo"
"k8s.io/utils/clock"

v1 "k8s.io/api/core/v1"
Expand All @@ -30,7 +29,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events"
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
Expand Down Expand Up @@ -63,36 +61,23 @@ func (e *Expiration) ShouldDeprovision(ctx context.Context, c *Candidate) bool {
}

// SortCandidates orders expired nodes by when they've expired
func (e *Expiration) SortCandidates(candidates []*Candidate) []*Candidate {
func (e *Expiration) filterAndSortCandidates(ctx context.Context, nodes []*Candidate) ([]*Candidate, error) {
candidates, err := filterCandidates(ctx, e.kubeClient, e.recorder, nodes)
if err != nil {
return nil, fmt.Errorf("filtering candidates, %w", err)
}
sort.Slice(candidates, func(i int, j int) bool {
return getExpirationTime(candidates[i].Node, candidates[i].provisioner).Before(getExpirationTime(candidates[j].Node, candidates[j].provisioner))
})
return candidates
return candidates, nil
}

// ComputeCommand generates a deprovisioning command given deprovisionable nodes
func (e *Expiration) ComputeCommand(ctx context.Context, candidates ...*Candidate) (Command, error) {
candidates = e.SortCandidates(candidates)
pdbs, err := NewPDBLimits(ctx, e.kubeClient)
func (e *Expiration) ComputeCommand(ctx context.Context, nodes ...*Candidate) (Command, error) {
candidates, err := e.filterAndSortCandidates(ctx, nodes)
if err != nil {
return Command{}, fmt.Errorf("tracking PodDisruptionBudgets, %w", err)
return Command{}, fmt.Errorf("filtering candidates, %w", err)
}
// filter out nodes that can't be terminated
candidates = lo.Filter(candidates, func(cn *Candidate, _ int) bool {
if !cn.Node.DeletionTimestamp.IsZero() {
return false
}
if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok {
e.recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
return false
}
if p, ok := hasDoNotEvictPod(cn); ok {
e.recorder.Publish(deprovisioningevents.Blocked(cn.Node,
fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...)
return false
}
return true
})

for _, candidate := range candidates {
// Check if we need to create any nodes.
Expand Down
27 changes: 27 additions & 0 deletions pkg/controllers/deprovisioning/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events"
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
pscheduling "github.com/aws/karpenter-core/pkg/controllers/provisioning/scheduling"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/scheduling"
"github.com/aws/karpenter-core/pkg/utils/pod"

Expand All @@ -37,6 +39,31 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func filterCandidates(ctx context.Context, kubeClient client.Client, recorder events.Recorder, nodes []*Candidate) ([]*Candidate, error) {
pdbs, err := NewPDBLimits(ctx, kubeClient)
if err != nil {
return nil, fmt.Errorf("tracking PodDisruptionBudgets, %w", err)
}

// filter out nodes that can't be terminated
nodes = lo.Filter(nodes, func(cn *Candidate, _ int) bool {
if !cn.Node.DeletionTimestamp.IsZero() {
recorder.Publish(deprovisioningevents.Blocked(cn.Node, "in the process of deletion")...)
return false
}
if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok {
recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
return false
}
if p, ok := hasDoNotEvictPod(cn); ok {
recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...)
return false
}
return true
})
return nodes, nil
}

//nolint:gocyclo
func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner,
candidates ...*Candidate) (newMachines []*pscheduling.Machine, allPodsScheduled bool, err error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (m *MultiMachineConsolidation) ComputeCommand(ctx context.Context, candidat
return cmd, nil
}

v := NewValidation(consolidationTTL, m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider)
v := NewValidation(consolidationTTL, m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder)
isValid, err := v.IsValid(ctx, cmd)
if err != nil {
return Command{}, fmt.Errorf("validating, %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (c *SingleMachineConsolidation) ComputeCommand(ctx context.Context, candida
return Command{}, fmt.Errorf("sorting candidates, %w", err)
}

v := NewValidation(consolidationTTL, c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider)
v := NewValidation(consolidationTTL, c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder)
var failedValidation bool
for _, candidate := range candidates {
// compute a possible consolidation option
Expand Down
83 changes: 83 additions & 0 deletions pkg/controllers/deprovisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,6 +1546,89 @@ var _ = Describe("Consolidation TTL", func() {
},
})
})
It("should not deprovision nodes that receive blocking pods during the TTL", func() {
labels := map[string]string{
"app": "test",
}
// create our RS so we can link a pod to it
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())

pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: ptr.Bool(true),
BlockOwnerDeletion: ptr.Bool(true),
},
},
},
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
}})
noEvictPod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
Annotations: map[string]string{v1alpha5.DoNotEvictPodAnnotationKey: "true"},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: ptr.Bool(true),
BlockOwnerDeletion: ptr.Bool(true),
},
},
},
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
}})
ExpectApplied(ctx, env.Client, machine1, node1, prov, pod, noEvictPod)
ExpectManualBinding(ctx, env.Client, pod, node1)

// inform cluster state about nodes and machines
ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1}, []*v1alpha5.Machine{machine1})

var wg sync.WaitGroup
wg.Add(1)
finished := atomic.Bool{}
go func() {
defer GinkgoRecover()
defer wg.Done()
defer finished.Store(true)
ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{})
}()

// wait for the deprovisioningController to block on the validation timeout
Eventually(fakeClock.HasWaiters, time.Second*10).Should(BeTrue())
// controller should be blocking during the timeout
Expect(finished.Load()).To(BeFalse())
// and the node should not be deleted yet
ExpectExists(ctx, env.Client, node1)

// make the node non-empty by binding it
ExpectManualBinding(ctx, env.Client, noEvictPod, node1)
ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1))

// advance the clock so that the timeout expires
fakeClock.Step(31 * time.Second)
// controller should finish
Eventually(finished.Load, 10*time.Second).Should(BeTrue())
wg.Wait()

// nothing should be removed since the node is no longer empty
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node1)
})
It("should wait for the node TTL for empty nodes before consolidating", func() {
ExpectApplied(ctx, env.Client, machine1, node1, prov)

Expand Down
Loading

0 comments on commit cb03ff7

Please sign in to comment.