diff --git a/pkg/controllers/deprovisioning/consolidation.go b/pkg/controllers/deprovisioning/consolidation.go index a3a93f3eb0..ecd2e0e1b0 100644 --- a/pkg/controllers/deprovisioning/consolidation.go +++ b/pkg/controllers/deprovisioning/consolidation.go @@ -21,7 +21,6 @@ import ( "sort" "time" - "github.com/samber/lo" v1 "k8s.io/api/core/v1" "k8s.io/utils/clock" "knative.dev/pkg/ptr" @@ -73,32 +72,15 @@ func (c *consolidation) String() string { // sortAndFilterCandidates orders deprovisionable nodes by the disruptionCost, removing any that we already know won't // be viable consolidation options. func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []*Candidate) ([]*Candidate, error) { - pdbs, err := NewPDBLimits(ctx, c.kubeClient) + candidates, err := filterCandidates(ctx, c.kubeClient, c.recorder, nodes) if err != nil { - return nil, fmt.Errorf("tracking PodDisruptionBudgets, %w", err) + return nil, fmt.Errorf("filtering candidates, %w", err) } - // filter out nodes that can't be terminated - nodes = lo.Filter(nodes, func(cn *Candidate, _ int) bool { - if !cn.Node.DeletionTimestamp.IsZero() { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, "in the process of deletion")...) - return false - } - if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) - return false - } - if p, ok := hasDoNotEvictPod(cn); ok { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) - return false - } - return true + sort.Slice(candidates, func(i int, j int) bool { + return candidates[i].disruptionCost < candidates[j].disruptionCost }) - - sort.Slice(nodes, func(i int, j int) bool { - return nodes[i].disruptionCost < nodes[j].disruptionCost - }) - return nodes, nil + return candidates, nil } // ShouldDeprovision is a predicate used to filter deprovisionable nodes @@ -118,70 +100,6 @@ func (c *consolidation) ShouldDeprovision(_ context.Context, cn *Candidate) bool return true } -// ValidateCommand validates a command for a deprovisioner -func (c *consolidation) ValidateCommand(ctx context.Context, cmd Command, candidateNodes []*Candidate) (bool, error) { - // map from nodes we are about to remove back into candidate nodes with cluster state - nodesToDelete := mapCandidates(cmd.candidates, candidateNodes) - // None of the chosen candidate nodes are valid for execution, so retry - if len(nodesToDelete) == 0 { - return false, nil - } - - newNodes, allPodsScheduled, err := simulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, nodesToDelete...) - if err != nil { - return false, fmt.Errorf("simluating scheduling, %w", err) - } - if !allPodsScheduled { - return false, nil - } - - // We want to ensure that the re-simulated scheduling using the current cluster state produces the same result. - // There are three possible options for the number of new nodesToDelete that we need to handle: - // len(newNodes) == 0, as long as we weren't expecting a new node, this is valid - // len(newNodes) > 1, something in the cluster changed so that the nodesToDelete we were going to delete can no longer - // be deleted without producing more than one node - // len(newNodes) == 1, as long as the node looks like what we were expecting, this is valid - if len(newNodes) == 0 { - if len(cmd.replacements) == 0 { - // scheduling produced zero new nodes and we weren't expecting any, so this is valid. - return true, nil - } - // if it produced no new nodes, but we were expecting one we should re-simulate as there is likely a better - // consolidation option now - return false, nil - } - - // we need more than one replacement node which is never valid currently (all of our node replacement is m->1, never m->n) - if len(newNodes) > 1 { - return false, nil - } - - // we now know that scheduling simulation wants to create one new node - if len(cmd.replacements) == 0 { - // but we weren't expecting any new nodes, so this is invalid - return false, nil - } - - // We know that the scheduling simulation wants to create a new node and that the command we are verifying wants - // to create a new node. The scheduling simulation doesn't apply any filtering to instance types, so it may include - // instance types that we don't want to launch which were filtered out when the lifecycleCommand was created. To - // check if our lifecycleCommand is valid, we just want to ensure that the list of instance types we are considering - // creating are a subset of what scheduling says we should create. - // - // This is necessary since consolidation only wants cheaper nodes. Suppose consolidation determined we should delete - // a 4xlarge and replace it with a 2xlarge. If things have changed and the scheduling simulation we just performed - // now says that we need to launch a 4xlarge. It's still launching the correct number of nodes, but it's just - // as expensive or possibly more so we shouldn't validate. - if !instanceTypesAreSubset(cmd.replacements[0].InstanceTypeOptions, newNodes[0].InstanceTypeOptions) { - return false, nil - } - - // Now we know: - // - current scheduling simulation says to create a new node with types T = {T_0, T_1, ..., T_n} - // - our lifecycle command says to create a node with types {U_0, U_1, ..., U_n} where U is a subset of T - return true, nil -} - // computeConsolidation computes a consolidation action to take // // nolint:gocyclo diff --git a/pkg/controllers/deprovisioning/drift.go b/pkg/controllers/deprovisioning/drift.go index 8754a9df55..bf52b81a5e 100644 --- a/pkg/controllers/deprovisioning/drift.go +++ b/pkg/controllers/deprovisioning/drift.go @@ -19,13 +19,11 @@ import ( "errors" "fmt" - "github.com/samber/lo" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/karpenter-core/pkg/apis/settings" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" - deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events" "github.com/aws/karpenter-core/pkg/controllers/provisioning" "github.com/aws/karpenter-core/pkg/controllers/state" "github.com/aws/karpenter-core/pkg/events" @@ -59,27 +57,11 @@ func (d *Drift) ShouldDeprovision(ctx context.Context, c *Candidate) bool { } // ComputeCommand generates a deprovisioning command given deprovisionable machines -func (d *Drift) ComputeCommand(ctx context.Context, candidates ...*Candidate) (Command, error) { - pdbs, err := NewPDBLimits(ctx, d.kubeClient) +func (d *Drift) ComputeCommand(ctx context.Context, nodes ...*Candidate) (Command, error) { + candidates, err := filterCandidates(ctx, d.kubeClient, d.recorder, nodes) if err != nil { - return Command{}, fmt.Errorf("tracking PodDisruptionBudgets, %w", err) + return Command{}, fmt.Errorf("filtering candidates, %w", err) } - // filter out machines that can't be terminated - candidates = lo.Filter(candidates, func(cn *Candidate, _ int) bool { - if !cn.Node.DeletionTimestamp.IsZero() { - return false - } - if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok { - d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) - return false - } - if p, ok := hasDoNotEvictPod(cn); ok { - d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, - fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) - return false - } - return true - }) for _, candidate := range candidates { // Check if we need to create any machines. diff --git a/pkg/controllers/deprovisioning/expiration.go b/pkg/controllers/deprovisioning/expiration.go index 9a445a6432..8ba8a096d0 100644 --- a/pkg/controllers/deprovisioning/expiration.go +++ b/pkg/controllers/deprovisioning/expiration.go @@ -21,7 +21,6 @@ import ( "sort" "time" - "github.com/samber/lo" "k8s.io/utils/clock" v1 "k8s.io/api/core/v1" @@ -30,7 +29,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" - deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events" "github.com/aws/karpenter-core/pkg/controllers/provisioning" "github.com/aws/karpenter-core/pkg/controllers/state" "github.com/aws/karpenter-core/pkg/events" @@ -63,36 +61,23 @@ func (e *Expiration) ShouldDeprovision(ctx context.Context, c *Candidate) bool { } // SortCandidates orders expired nodes by when they've expired -func (e *Expiration) SortCandidates(candidates []*Candidate) []*Candidate { +func (e *Expiration) filterAndSortCandidates(ctx context.Context, nodes []*Candidate) ([]*Candidate, error) { + candidates, err := filterCandidates(ctx, e.kubeClient, e.recorder, nodes) + if err != nil { + return nil, fmt.Errorf("filtering candidates, %w", err) + } sort.Slice(candidates, func(i int, j int) bool { return getExpirationTime(candidates[i].Node, candidates[i].provisioner).Before(getExpirationTime(candidates[j].Node, candidates[j].provisioner)) }) - return candidates + return candidates, nil } // ComputeCommand generates a deprovisioning command given deprovisionable nodes -func (e *Expiration) ComputeCommand(ctx context.Context, candidates ...*Candidate) (Command, error) { - candidates = e.SortCandidates(candidates) - pdbs, err := NewPDBLimits(ctx, e.kubeClient) +func (e *Expiration) ComputeCommand(ctx context.Context, nodes ...*Candidate) (Command, error) { + candidates, err := e.filterAndSortCandidates(ctx, nodes) if err != nil { - return Command{}, fmt.Errorf("tracking PodDisruptionBudgets, %w", err) + return Command{}, fmt.Errorf("filtering candidates, %w", err) } - // filter out nodes that can't be terminated - candidates = lo.Filter(candidates, func(cn *Candidate, _ int) bool { - if !cn.Node.DeletionTimestamp.IsZero() { - return false - } - if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok { - e.recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) - return false - } - if p, ok := hasDoNotEvictPod(cn); ok { - e.recorder.Publish(deprovisioningevents.Blocked(cn.Node, - fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) - return false - } - return true - }) for _, candidate := range candidates { // Check if we need to create any nodes. diff --git a/pkg/controllers/deprovisioning/helpers.go b/pkg/controllers/deprovisioning/helpers.go index 8abc573172..4ec13a88c7 100644 --- a/pkg/controllers/deprovisioning/helpers.go +++ b/pkg/controllers/deprovisioning/helpers.go @@ -24,9 +24,11 @@ import ( "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" + deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events" "github.com/aws/karpenter-core/pkg/controllers/provisioning" pscheduling "github.com/aws/karpenter-core/pkg/controllers/provisioning/scheduling" "github.com/aws/karpenter-core/pkg/controllers/state" + "github.com/aws/karpenter-core/pkg/events" "github.com/aws/karpenter-core/pkg/scheduling" "github.com/aws/karpenter-core/pkg/utils/pod" @@ -37,6 +39,31 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +func filterCandidates(ctx context.Context, kubeClient client.Client, recorder events.Recorder, nodes []*Candidate) ([]*Candidate, error) { + pdbs, err := NewPDBLimits(ctx, kubeClient) + if err != nil { + return nil, fmt.Errorf("tracking PodDisruptionBudgets, %w", err) + } + + // filter out nodes that can't be terminated + nodes = lo.Filter(nodes, func(cn *Candidate, _ int) bool { + if !cn.Node.DeletionTimestamp.IsZero() { + recorder.Publish(deprovisioningevents.Blocked(cn.Node, "in the process of deletion")...) + return false + } + if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok { + recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) + return false + } + if p, ok := hasDoNotEvictPod(cn); ok { + recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) + return false + } + return true + }) + return nodes, nil +} + //nolint:gocyclo func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, candidates ...*Candidate) (newMachines []*pscheduling.Machine, allPodsScheduled bool, err error) { diff --git a/pkg/controllers/deprovisioning/multimachineconsolidation.go b/pkg/controllers/deprovisioning/multimachineconsolidation.go index f425228584..f487e4bca4 100644 --- a/pkg/controllers/deprovisioning/multimachineconsolidation.go +++ b/pkg/controllers/deprovisioning/multimachineconsolidation.go @@ -58,7 +58,7 @@ func (m *MultiMachineConsolidation) ComputeCommand(ctx context.Context, candidat return cmd, nil } - v := NewValidation(consolidationTTL, m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider) + v := NewValidation(consolidationTTL, m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder) isValid, err := v.IsValid(ctx, cmd) if err != nil { return Command{}, fmt.Errorf("validating, %w", err) diff --git a/pkg/controllers/deprovisioning/singlemachineconsolidation.go b/pkg/controllers/deprovisioning/singlemachineconsolidation.go index d05d700762..3b99c7106f 100644 --- a/pkg/controllers/deprovisioning/singlemachineconsolidation.go +++ b/pkg/controllers/deprovisioning/singlemachineconsolidation.go @@ -49,7 +49,7 @@ func (c *SingleMachineConsolidation) ComputeCommand(ctx context.Context, candida return Command{}, fmt.Errorf("sorting candidates, %w", err) } - v := NewValidation(consolidationTTL, c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider) + v := NewValidation(consolidationTTL, c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder) var failedValidation bool for _, candidate := range candidates { // compute a possible consolidation option diff --git a/pkg/controllers/deprovisioning/suite_test.go b/pkg/controllers/deprovisioning/suite_test.go index 2ab65cd23d..6d6ea21185 100644 --- a/pkg/controllers/deprovisioning/suite_test.go +++ b/pkg/controllers/deprovisioning/suite_test.go @@ -1546,6 +1546,89 @@ var _ = Describe("Consolidation TTL", func() { }, }) }) + It("should not deprovision nodes that receive blocking pods during the TTL", func() { + labels := map[string]string{ + "app": "test", + } + // create our RS so we can link a pod to it + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) + + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: ptr.Bool(true), + BlockOwnerDeletion: ptr.Bool(true), + }, + }, + }, + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1"), + }, + }}) + noEvictPod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + Annotations: map[string]string{v1alpha5.DoNotEvictPodAnnotationKey: "true"}, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: ptr.Bool(true), + BlockOwnerDeletion: ptr.Bool(true), + }, + }, + }, + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1"), + }, + }}) + ExpectApplied(ctx, env.Client, machine1, node1, prov, pod, noEvictPod) + ExpectManualBinding(ctx, env.Client, pod, node1) + + // inform cluster state about nodes and machines + ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1}, []*v1alpha5.Machine{machine1}) + + var wg sync.WaitGroup + wg.Add(1) + finished := atomic.Bool{} + go func() { + defer GinkgoRecover() + defer wg.Done() + defer finished.Store(true) + ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) + }() + + // wait for the deprovisioningController to block on the validation timeout + Eventually(fakeClock.HasWaiters, time.Second*10).Should(BeTrue()) + // controller should be blocking during the timeout + Expect(finished.Load()).To(BeFalse()) + // and the node should not be deleted yet + ExpectExists(ctx, env.Client, node1) + + // make the node non-empty by binding it + ExpectManualBinding(ctx, env.Client, noEvictPod, node1) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) + + // advance the clock so that the timeout expires + fakeClock.Step(31 * time.Second) + // controller should finish + Eventually(finished.Load, 10*time.Second).Should(BeTrue()) + wg.Wait() + + // nothing should be removed since the node is no longer empty + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, node1) + }) It("should wait for the node TTL for empty nodes before consolidating", func() { ExpectApplied(ctx, env.Client, machine1, node1, prov) diff --git a/pkg/controllers/deprovisioning/validation.go b/pkg/controllers/deprovisioning/validation.go index 4f20a178ea..0272bfc993 100644 --- a/pkg/controllers/deprovisioning/validation.go +++ b/pkg/controllers/deprovisioning/validation.go @@ -29,6 +29,7 @@ import ( "github.com/aws/karpenter-core/pkg/cloudprovider" "github.com/aws/karpenter-core/pkg/controllers/provisioning" "github.com/aws/karpenter-core/pkg/controllers/state" + "github.com/aws/karpenter-core/pkg/events" ) // Validation is used to perform validation on a consolidation command. It makes an assumption that when re-used, all @@ -43,12 +44,13 @@ type Validation struct { cloudProvider cloudprovider.CloudProvider provisioner *provisioning.Provisioner once sync.Once + recorder events.Recorder // validationCandidates are the cached validation candidates. We capture these when validating the first command and reuse them for // validating subsequent commands. validationCandidates []*Candidate } -func NewValidation(validationPeriod time.Duration, clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner, cp cloudprovider.CloudProvider) *Validation { +func NewValidation(validationPeriod time.Duration, clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner, cp cloudprovider.CloudProvider, recorder events.Recorder) *Validation { return &Validation{ validationPeriod: validationPeriod, clock: clk, @@ -56,6 +58,7 @@ func NewValidation(validationPeriod time.Duration, clk clock.Clock, cluster *sta kubeClient: kubeClient, provisioner: provisioner, cloudProvider: cp, + recorder: recorder, } } @@ -73,14 +76,20 @@ func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) { case <-v.clock.After(waitDuration): } } - if len(v.validationCandidates) == 0 { v.validationCandidates, err = GetCandidates(ctx, v.cluster, v.kubeClient, v.clock, v.cloudProvider, v.ShouldDeprovision) if err != nil { return false, fmt.Errorf("constructing validation candidates, %w", err) } } - + nodes, err := filterCandidates(ctx, v.kubeClient, v.recorder, cmd.candidates) + if err != nil { + return false, fmt.Errorf("filtering candidates, %w", err) + } + // If we filtered out any candidates, return false as some nodes in the consolidation decision have changed. + if len(nodes) != len(cmd.candidates) { + return false, nil + } // a candidate we are about to delete is a target of a currently pending pod, wait for that to settle // before continuing consolidation for _, n := range cmd.candidates {