Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: problem where pods blocking eviction were not respected for cons… #247

Merged
merged 8 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (m *MultiMachineConsolidation) ComputeCommand(ctx context.Context, candidat
return cmd, nil
}

v := NewValidation(consolidationTTL, m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider)
v := NewValidation(consolidationTTL, m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder)
isValid, err := v.IsValid(ctx, cmd)
if err != nil {
return Command{}, fmt.Errorf("validating, %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (c *SingleMachineConsolidation) ComputeCommand(ctx context.Context, candida
return Command{}, fmt.Errorf("sorting candidates, %w", err)
}

v := NewValidation(consolidationTTL, c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider)
v := NewValidation(consolidationTTL, c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder)
var failedValidation bool
for _, candidate := range candidates {
// compute a possible consolidation option
Expand Down
83 changes: 83 additions & 0 deletions pkg/controllers/deprovisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,6 +1546,89 @@ var _ = Describe("Consolidation TTL", func() {
},
})
})
It("should not deprovision nodes that receive blocking pods during the TTL", func() {
labels := map[string]string{
"app": "test",
}
// create our RS so we can link a pod to it
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())

pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: ptr.Bool(true),
BlockOwnerDeletion: ptr.Bool(true),
},
},
},
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
}})
noEvictPod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
Annotations: map[string]string{v1alpha5.DoNotEvictPodAnnotationKey: "true"},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: ptr.Bool(true),
BlockOwnerDeletion: ptr.Bool(true),
},
},
},
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
}})
ExpectApplied(ctx, env.Client, machine1, node1, prov, pod, noEvictPod)
ExpectManualBinding(ctx, env.Client, pod, node1)

// inform cluster state about nodes and machines
ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1}, []*v1alpha5.Machine{machine1})

var wg sync.WaitGroup
wg.Add(1)
finished := atomic.Bool{}
go func() {
defer GinkgoRecover()
defer wg.Done()
defer finished.Store(true)
ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{})
}()

// wait for the deprovisioningController to block on the validation timeout
Eventually(fakeClock.HasWaiters, time.Second*10).Should(BeTrue())
// controller should be blocking during the timeout
Expect(finished.Load()).To(BeFalse())
// and the node should not be deleted yet
ExpectExists(ctx, env.Client, node1)

// make the node non-empty by binding it
ExpectManualBinding(ctx, env.Client, noEvictPod, node1)
ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1))

// advance the clock so that the timeout expires
fakeClock.Step(31 * time.Second)
// controller should finish
Eventually(finished.Load, 10*time.Second).Should(BeTrue())
wg.Wait()

// nothing should be removed since the node is no longer empty
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node1)
})
It("should wait for the node TTL for empty nodes before consolidating", func() {
ExpectApplied(ctx, env.Client, machine1, node1, prov)

Expand Down
44 changes: 36 additions & 8 deletions pkg/controllers/deprovisioning/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events"
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
)

// Validation is used to perform validation on a consolidation command. It makes an assumption that when re-used, all
Expand All @@ -43,19 +45,21 @@ type Validation struct {
cloudProvider cloudprovider.CloudProvider
provisioner *provisioning.Provisioner
once sync.Once
recorder events.Recorder
// validationCandidates are the cached validation candidates. We capture these when validating the first command and reuse them for
// validating subsequent commands.
validationCandidates []*Candidate
}

func NewValidation(validationPeriod time.Duration, clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner, cp cloudprovider.CloudProvider) *Validation {
func NewValidation(validationPeriod time.Duration, clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner, cp cloudprovider.CloudProvider, recorder events.Recorder) *Validation {
return &Validation{
validationPeriod: validationPeriod,
clock: clk,
cluster: cluster,
kubeClient: kubeClient,
provisioner: provisioner,
cloudProvider: cp,
recorder: recorder,
}
}

Expand All @@ -73,20 +77,20 @@ func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) {
case <-v.clock.After(waitDuration):
}
}

if len(v.validationCandidates) == 0 {
v.validationCandidates, err = GetCandidates(ctx, v.cluster, v.kubeClient, v.clock, v.cloudProvider, v.ShouldDeprovision)
if err != nil {
return false, fmt.Errorf("constructing validation candidates, %w", err)
}
}
// Compute PDBs and check if all the nodes are still deprovisionable.
pdbs, err := NewPDBLimits(ctx, v.kubeClient)
if err != nil {
return false, fmt.Errorf("tracking PodDisruptionBudgets, %w", err)
njtran marked this conversation as resolved.
Show resolved Hide resolved
}

// a candidate we are about to delete is a target of a currently pending pod, wait for that to settle
// before continuing consolidation
for _, n := range cmd.candidates {
if v.cluster.IsNodeNominated(n.Name()) {
return false, nil
}
if valid := v.validateCandidates(cmd, pdbs); !valid {
return false, nil
}

isValid, err := v.ValidateCommand(ctx, cmd, v.validationCandidates)
Expand All @@ -97,6 +101,30 @@ func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) {
return isValid, nil
}

// Returns if the candidates chosen can still be terminated after the TTL
func (v *Validation) validateCandidates(cmd Command, pdbs *PDBLimits) bool {
for _, n := range cmd.candidates {
if !n.Node.DeletionTimestamp.IsZero() {
njtran marked this conversation as resolved.
Show resolved Hide resolved
v.recorder.Publish(deprovisioningevents.Unconsolidatable(n.Node, "in the process of deletion")...)
return false
}
if p, ok := hasDoNotEvictPod(n); ok {
njtran marked this conversation as resolved.
Show resolved Hide resolved
v.recorder.Publish(deprovisioningevents.Unconsolidatable(n.Node, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...)
return false
}
if pdb, ok := pdbs.CanEvictPods(n.pods); !ok {
v.recorder.Publish(deprovisioningevents.Unconsolidatable(n.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
return false
}
// a candidate we are about to delete is a target of a currently pending pod, wait for that to settle
// before continuing consolidation
if v.cluster.IsNodeNominated(n.Name()) {
njtran marked this conversation as resolved.
Show resolved Hide resolved
return false
}
}
return true
}

// ShouldDeprovision is a predicate used to filter deprovisionable nodes
func (v *Validation) ShouldDeprovision(_ context.Context, c *Candidate) bool {
if val, ok := c.Annotations()[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok {
Expand Down