From 9d26cd809dd866f703e0b7487867d271e1f40edb Mon Sep 17 00:00:00 2001 From: Jonathan Innis Date: Thu, 16 Mar 2023 15:23:43 -0700 Subject: [PATCH] Revert "Revert machine migration changes (#176) (#241)" This reverts commit 9973eac0c293f6c4d9c34fcd696b5f790ca1967e. --- .../templates/karpenter.sh_machines.yaml | 1 + go.mod | 2 +- go.sum | 4 +- pkg/cloudprovider/fake/cloudprovider.go | 3 +- pkg/controllers/controllers.go | 3 + .../deprovisioning/consolidation.go | 22 +- pkg/controllers/deprovisioning/controller.go | 36 +- pkg/controllers/deprovisioning/drift.go | 8 +- pkg/controllers/deprovisioning/drift_test.go | 40 ++- pkg/controllers/deprovisioning/emptiness.go | 2 +- .../deprovisioning/events/events.go | 49 ++- pkg/controllers/deprovisioning/expiration.go | 6 +- .../deprovisioning/expiration_test.go | 37 +- pkg/controllers/deprovisioning/metrics.go | 4 +- pkg/controllers/deprovisioning/suite_test.go | 237 ++++++++----- pkg/controllers/deprovisioning/types.go | 3 + pkg/controllers/inflightchecks/controller.go | 36 +- .../inflightchecks/events/events.go | 10 +- pkg/controllers/inflightchecks/failedinit.go | 46 +-- pkg/controllers/inflightchecks/nodeshape.go | 39 +- pkg/controllers/inflightchecks/suite_test.go | 18 +- pkg/controllers/inflightchecks/termination.go | 5 +- pkg/controllers/node/controller.go | 33 +- pkg/controllers/node/drift.go | 31 +- pkg/controllers/node/emptiness.go | 5 - pkg/controllers/node/finalizer.go | 49 --- pkg/controllers/node/initialization.go | 125 ------- pkg/controllers/node/suite_test.go | 270 +------------- pkg/controllers/provisioning/provisioner.go | 57 +-- .../provisioning/scheduling/events/events.go | 24 +- .../provisioning/scheduling/scheduler.go | 2 +- .../provisioning/scheduling/suite_test.go | 19 +- pkg/controllers/state/cluster.go | 14 +- pkg/controllers/state/suite_test.go | 2 - pkg/controllers/termination/controller.go | 33 ++ pkg/controllers/termination/suite_test.go | 334 ++---------------- pkg/events/suite_test.go | 16 +- pkg/metrics/metrics.go | 36 +- pkg/operator/operator.go | 7 + pkg/test/expectations/expectations.go | 20 +- pkg/test/nodes.go | 1 - 41 files changed, 561 insertions(+), 1128 deletions(-) create mode 120000 charts/karpenter-core-crd/templates/karpenter.sh_machines.yaml delete mode 100644 pkg/controllers/node/finalizer.go delete mode 100644 pkg/controllers/node/initialization.go diff --git a/charts/karpenter-core-crd/templates/karpenter.sh_machines.yaml b/charts/karpenter-core-crd/templates/karpenter.sh_machines.yaml new file mode 120000 index 0000000000..10e93ba5ee --- /dev/null +++ b/charts/karpenter-core-crd/templates/karpenter.sh_machines.yaml @@ -0,0 +1 @@ +../../pkg/apis/crds/karpenter.sh_machines.yaml \ No newline at end of file diff --git a/go.mod b/go.mod index f1ad61d5ba..fcf053d906 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( k8s.io/client-go v0.25.4 k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a - sigs.k8s.io/controller-runtime v0.13.1 + sigs.k8s.io/controller-runtime v0.13.0 ) require ( diff --git a/go.sum b/go.sum index 4534ee48f4..a795a3ca7a 100644 --- a/go.sum +++ b/go.sum @@ -848,8 +848,8 @@ knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a/go.mod h1:fckNBPf9bu5/p1RbnOh rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sigs.k8s.io/controller-runtime v0.13.1 h1:tUsRCSJVM1QQOOeViGeX3GMT3dQF1eePPw6sEE3xSlg= -sigs.k8s.io/controller-runtime v0.13.1/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI= +sigs.k8s.io/controller-runtime v0.13.0 h1:iqa5RNciy7ADWnIc8QxCbOX5FEKVR3uxVxKHRMc2WIQ= +sigs.k8s.io/controller-runtime v0.13.0/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE= diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index be415b09c4..dc6198e3c5 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -111,10 +111,9 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) ( break } } - name := test.RandomName() created := &v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: machine.Name, Labels: lo.Assign(labels, machine.Labels), Annotations: machine.Annotations, }, diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 65cee3d11b..501a24ccf5 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -25,6 +25,7 @@ import ( "github.com/aws/karpenter-core/pkg/controllers/counter" "github.com/aws/karpenter-core/pkg/controllers/deprovisioning" "github.com/aws/karpenter-core/pkg/controllers/inflightchecks" + "github.com/aws/karpenter-core/pkg/controllers/machine" "github.com/aws/karpenter-core/pkg/controllers/machine/terminator" metricspod "github.com/aws/karpenter-core/pkg/controllers/metrics/pod" metricsprovisioner "github.com/aws/karpenter-core/pkg/controllers/metrics/provisioner" @@ -64,11 +65,13 @@ func NewControllers( informer.NewNodeController(kubeClient, cluster), informer.NewPodController(kubeClient, cluster), informer.NewProvisionerController(kubeClient, cluster), + informer.NewMachineController(kubeClient, cluster), node.NewController(clock, kubeClient, cloudProvider, cluster), termination.NewController(kubeClient, cloudProvider, terminator, recorder), metricspod.NewController(kubeClient), metricsprovisioner.NewController(kubeClient), counter.NewController(kubeClient, cluster), inflightchecks.NewController(clock, kubeClient, recorder, cloudProvider), + machine.NewController(clock, kubeClient, cloudProvider, terminator, recorder), } } diff --git a/pkg/controllers/deprovisioning/consolidation.go b/pkg/controllers/deprovisioning/consolidation.go index a3a93f3eb0..a58e8512fb 100644 --- a/pkg/controllers/deprovisioning/consolidation.go +++ b/pkg/controllers/deprovisioning/consolidation.go @@ -80,16 +80,16 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []*Ca // filter out nodes that can't be terminated nodes = lo.Filter(nodes, func(cn *Candidate, _ int) bool { - if !cn.Node.DeletionTimestamp.IsZero() { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, "in the process of deletion")...) + if !cn.Machine.DeletionTimestamp.IsZero() { + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, "in the process of deletion")...) return false } if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) return false } if p, ok := hasDoNotEvictPod(cn); ok { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) return false } return true @@ -104,15 +104,15 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []*Ca // ShouldDeprovision is a predicate used to filter deprovisionable nodes func (c *consolidation) ShouldDeprovision(_ context.Context, cn *Candidate) bool { if val, ok := cn.Annotations()[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...) return val != "true" } if cn.provisioner == nil { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, "provisioner is unknown")...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, "provisioner is unknown")...) return false } if cn.provisioner.Spec.Consolidation == nil || !ptr.BoolValue(cn.provisioner.Spec.Consolidation.Enabled) { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("provisioner %s has consolidation disabled", cn.provisioner.Name))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("provisioner %s has consolidation disabled", cn.provisioner.Name))...) return false } return true @@ -201,7 +201,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... if !allPodsScheduled { // This method is used by multi-node consolidation as well, so we'll only report in the single node case if len(candidates) == 1 { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "not all pods would schedule")...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "not all pods would schedule")...) } return Command{action: actionDoNothing}, nil } @@ -217,7 +217,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... // we're not going to turn a single node into multiple candidates if len(newMachines) != 1 { if len(candidates) == 1 { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...) } return Command{action: actionDoNothing}, nil } @@ -231,7 +231,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... newMachines[0].InstanceTypeOptions = filterByPrice(newMachines[0].InstanceTypeOptions, newMachines[0].Requirements, nodesPrice) if len(newMachines[0].InstanceTypeOptions) == 0 { if len(candidates) == 1 { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace with a cheaper node")...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace with a cheaper node")...) } // no instance types remain after filtering by price return Command{action: actionDoNothing}, nil @@ -250,7 +250,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... if allExistingAreSpot && newMachines[0].Requirements.Get(v1alpha5.LabelCapacityType).Has(v1alpha5.CapacityTypeSpot) { if len(candidates) == 1 { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace a spot node with a spot node")...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace a spot node with a spot node")...) } return Command{action: actionDoNothing}, nil } diff --git a/pkg/controllers/deprovisioning/controller.go b/pkg/controllers/deprovisioning/controller.go index a94c114866..2f64f1402a 100644 --- a/pkg/controllers/deprovisioning/controller.go +++ b/pkg/controllers/deprovisioning/controller.go @@ -117,7 +117,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc for _, d := range c.deprovisioners { candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, d.ShouldDeprovision) if err != nil { - return reconcile.Result{}, fmt.Errorf("determining candidates, %w", err) + return reconcile.Result{}, fmt.Errorf("determining candidate nodes, %w", err) } // If there are no candidate nodes, move to the next deprovisioner if len(candidates) == 0 { @@ -138,7 +138,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc // Attempt to deprovision if err := c.executeCommand(ctx, d, cmd); err != nil { - return reconcile.Result{}, fmt.Errorf("deprovisioning candidates, %w", err) + return reconcile.Result{}, fmt.Errorf("deprovisioning nodes, %w", err) } return reconcile.Result{Requeue: true}, nil } @@ -165,15 +165,15 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman } for _, candidate := range command.candidates { - c.recorder.Publish(deprovisioningevents.Terminating(candidate.Node, command.String())...) + c.recorder.Publish(deprovisioningevents.Terminating(candidate.Node, candidate.Machine, command.String())...) - if err := c.kubeClient.Delete(ctx, candidate.Node); err != nil { + if err := c.kubeClient.Delete(ctx, candidate.Machine); err != nil { if errors.IsNotFound(err) { continue } logging.FromContext(ctx).Errorf("terminating machine, %s", err) } else { - metrics.NodesTerminatedCounter.With(prometheus.Labels{ + metrics.MachinesTerminatedCounter.With(prometheus.Labels{ metrics.ReasonLabel: reason, metrics.ProvisionerLabel: candidate.provisioner.Name, }).Inc() @@ -183,7 +183,7 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman // We wait for nodes to delete to ensure we don't start another round of deprovisioning until this node is fully // deleted. for _, oldCandidate := range command.candidates { - c.waitForDeletion(ctx, oldCandidate.Node) + c.waitForDeletion(ctx, oldCandidate.Machine) } return nil } @@ -233,8 +233,8 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name var once sync.Once pollStart := time.Now() return retry.Do(func() error { - node := &v1.Node{} - if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: name}, node); err != nil { + machine := &v1alpha5.Machine{} + if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: name}, machine); err != nil { // If the machine was deleted after a few seconds (to give the cache time to update), then we assume // that the machine was deleted due to an Insufficient Capacity error if errors.IsNotFound(err) && c.clock.Since(pollStart) > time.Second*5 { @@ -243,12 +243,12 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name return fmt.Errorf("getting machine, %w", err) } once.Do(func() { - c.recorder.Publish(deprovisioningevents.Launching(node, action.String())) + c.recorder.Publish(deprovisioningevents.Launching(machine, action.String())) }) - if _, ok := node.Labels[v1alpha5.LabelNodeInitialized]; !ok { + if !machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() { // make the user aware of why deprovisioning is paused - c.recorder.Publish(deprovisioningevents.WaitingOnReadiness(node)) - return fmt.Errorf("node is not initialized") + c.recorder.Publish(deprovisioningevents.WaitingOnReadiness(machine)) + return fmt.Errorf("machine is not initialized") } return nil }, waitRetryOptions...) @@ -257,21 +257,21 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name // waitForDeletion waits for the specified machine to be removed from the API server. This deletion can take some period // of time if there are PDBs that govern pods on the machine as we need to wait until the node drains before // it's actually deleted. -func (c *Controller) waitForDeletion(ctx context.Context, node *v1.Node) { +func (c *Controller) waitForDeletion(ctx context.Context, machine *v1alpha5.Machine) { if err := retry.Do(func() error { - m := &v1.Node{} - nerr := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(node), m) + m := &v1alpha5.Machine{} + nerr := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(machine), m) // We expect the not machine found error, at which point we know the machine is deleted. if errors.IsNotFound(nerr) { return nil } // make the user aware of why deprovisioning is paused - c.recorder.Publish(deprovisioningevents.WaitingOnDeletion(node)) + c.recorder.Publish(deprovisioningevents.WaitingOnDeletion(machine)) if nerr != nil { - return fmt.Errorf("expected node to be not found, %w", nerr) + return fmt.Errorf("expected machine to be not found, %w", nerr) } // the machine still exists - return fmt.Errorf("expected node to be not found") + return fmt.Errorf("expected machine to be not found") }, waitRetryOptions..., ); err != nil { logging.FromContext(ctx).Errorf("Waiting on machine deletion, %s", err) diff --git a/pkg/controllers/deprovisioning/drift.go b/pkg/controllers/deprovisioning/drift.go index 8754a9df55..851d73b54c 100644 --- a/pkg/controllers/deprovisioning/drift.go +++ b/pkg/controllers/deprovisioning/drift.go @@ -66,15 +66,15 @@ func (d *Drift) ComputeCommand(ctx context.Context, candidates ...*Candidate) (C } // filter out machines that can't be terminated candidates = lo.Filter(candidates, func(cn *Candidate, _ int) bool { - if !cn.Node.DeletionTimestamp.IsZero() { + if !cn.Machine.DeletionTimestamp.IsZero() { return false } if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok { - d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) + d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) return false } if p, ok := hasDoNotEvictPod(cn); ok { - d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, + d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) return false } @@ -93,7 +93,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, candidates ...*Candidate) (C } // Log when all pods can't schedule, as the command will get executed immediately. if !allPodsScheduled { - logging.FromContext(ctx).With("node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods") + logging.FromContext(ctx).With("machine", candidate.Machine.Name, "node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods") } if len(newMachines) == 0 { return Command{ diff --git a/pkg/controllers/deprovisioning/drift_test.go b/pkg/controllers/deprovisioning/drift_test.go index fa201c4349..9d8ae9874d 100644 --- a/pkg/controllers/deprovisioning/drift_test.go +++ b/pkg/controllers/deprovisioning/drift_test.go @@ -76,8 +76,8 @@ var _ = Describe("Drift", func() { wg.Wait() // Expect to not create or delete more machines - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, machine) }) It("should ignore nodes with the drift label, but not the drifted value", func() { node.Annotations = lo.Assign(node.Annotations, map[string]string{ @@ -96,8 +96,8 @@ var _ = Describe("Drift", func() { wg.Wait() // Expect to not create or delete more machines - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, machine) }) It("should ignore nodes without the drift label", func() { ExpectApplied(ctx, env.Client, machine, node, prov) @@ -110,8 +110,8 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) // Expect to not create or delete more machines - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, machine) }) It("can delete drifted nodes", func() { node.Annotations = lo.Assign(node.Annotations, map[string]string{ @@ -129,9 +129,13 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + // We should delete the machine that has drifted + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) }) It("can replace drifted nodes", func() { labels := map[string]string{ @@ -171,15 +175,21 @@ var _ = Describe("Drift", func() { // deprovisioning won't delete the old machine until the new machine is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() - ExpectNotFound(ctx, env.Client, node) + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + + ExpectNotFound(ctx, env.Client, machine, node) // Expect that the new machine was created and its different than the original + machines := ExpectMachines(ctx, env.Client) nodes := ExpectNodes(ctx, env.Client) + Expect(machines).To(HaveLen(1)) Expect(nodes).To(HaveLen(1)) + Expect(machines[0].Name).ToNot(Equal(machine.Name)) Expect(nodes[0].Name).ToNot(Equal(node.Name)) }) It("can replace drifted nodes with multiple nodes", func() { @@ -269,12 +279,16 @@ var _ = Describe("Drift", func() { // deprovisioning won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 3) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 3) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + // expect that drift provisioned three nodes, one for each pod - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) }) It("should delete one drifted node at a time", func() { @@ -324,7 +338,11 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2) + // Expect one of the nodes to be deleted + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) }) }) diff --git a/pkg/controllers/deprovisioning/emptiness.go b/pkg/controllers/deprovisioning/emptiness.go index 1b019706c6..adf8351c2c 100644 --- a/pkg/controllers/deprovisioning/emptiness.go +++ b/pkg/controllers/deprovisioning/emptiness.go @@ -64,7 +64,7 @@ func (e *Emptiness) ShouldDeprovision(ctx context.Context, c *Candidate) bool { // ComputeCommand generates a deprovisioning command given deprovisionable machines func (e *Emptiness) ComputeCommand(_ context.Context, candidates ...*Candidate) (Command, error) { emptyCandidates := lo.Filter(candidates, func(cn *Candidate, _ int) bool { - return cn.Node.DeletionTimestamp.IsZero() && len(cn.pods) == 0 + return cn.Machine.DeletionTimestamp.IsZero() && len(cn.pods) == 0 }) if len(emptyCandidates) == 0 { diff --git a/pkg/controllers/deprovisioning/events/events.go b/pkg/controllers/deprovisioning/events/events.go index 337cd5695c..53cd07a6c4 100644 --- a/pkg/controllers/deprovisioning/events/events.go +++ b/pkg/controllers/deprovisioning/events/events.go @@ -20,10 +20,11 @@ import ( v1 "k8s.io/api/core/v1" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/events" ) -func Blocked(node *v1.Node, reason string) []events.Event { +func Blocked(node *v1.Node, machine *v1alpha5.Machine, reason string) []events.Event { return []events.Event{ { InvolvedObject: node, @@ -32,10 +33,17 @@ func Blocked(node *v1.Node, reason string) []events.Event { Message: fmt.Sprintf("Cannot deprovision node due to %s", reason), DedupeValues: []string{node.Name, reason}, }, + { + InvolvedObject: machine, + Type: v1.EventTypeNormal, + Reason: "DeprovisioningBlocked", + Message: fmt.Sprintf("Cannot deprovision machine due to %s", reason), + DedupeValues: []string{machine.Name, reason}, + }, } } -func Terminating(node *v1.Node, reason string) []events.Event { +func Terminating(node *v1.Node, machine *v1alpha5.Machine, reason string) []events.Event { return []events.Event{ { InvolvedObject: node, @@ -44,40 +52,47 @@ func Terminating(node *v1.Node, reason string) []events.Event { Message: fmt.Sprintf("Deprovisioning node via %s", reason), DedupeValues: []string{node.Name, reason}, }, + { + InvolvedObject: machine, + Type: v1.EventTypeNormal, + Reason: "DeprovisioningTerminating", + Message: fmt.Sprintf("Deprovisioning machine via %s", reason), + DedupeValues: []string{machine.Name, reason}, + }, } } -func Launching(node *v1.Node, reason string) events.Event { +func Launching(machine *v1alpha5.Machine, reason string) events.Event { return events.Event{ - InvolvedObject: node, + InvolvedObject: machine, Type: v1.EventTypeNormal, Reason: "DeprovisioningLaunching", - Message: fmt.Sprintf("Launching node for %s", reason), - DedupeValues: []string{node.Name, reason}, + Message: fmt.Sprintf("Launching machine for %s", reason), + DedupeValues: []string{machine.Name, reason}, } } -func WaitingOnReadiness(node *v1.Node) events.Event { +func WaitingOnReadiness(machine *v1alpha5.Machine) events.Event { return events.Event{ - InvolvedObject: node, + InvolvedObject: machine, Type: v1.EventTypeNormal, Reason: "DeprovisioningWaitingReadiness", Message: "Waiting on readiness to continue deprovisioning", - DedupeValues: []string{node.Name}, + DedupeValues: []string{machine.Name}, } } -func WaitingOnDeletion(node *v1.Node) events.Event { +func WaitingOnDeletion(machine *v1alpha5.Machine) events.Event { return events.Event{ - InvolvedObject: node, + InvolvedObject: machine, Type: v1.EventTypeNormal, Reason: "DeprovisioningWaitingDeletion", Message: "Waiting on deletion to continue deprovisioning", - DedupeValues: []string{node.Name}, + DedupeValues: []string{machine.Name}, } } -func Unconsolidatable(node *v1.Node, reason string) []events.Event { +func Unconsolidatable(node *v1.Node, machine *v1alpha5.Machine, reason string) []events.Event { return []events.Event{ { InvolvedObject: node, @@ -87,5 +102,13 @@ func Unconsolidatable(node *v1.Node, reason string) []events.Event { DedupeValues: []string{node.Name}, DedupeTimeout: time.Minute * 15, }, + { + InvolvedObject: machine, + Type: v1.EventTypeNormal, + Reason: "Unconsolidatable", + Message: reason, + DedupeValues: []string{machine.Name}, + DedupeTimeout: time.Minute * 15, + }, } } diff --git a/pkg/controllers/deprovisioning/expiration.go b/pkg/controllers/deprovisioning/expiration.go index 9a445a6432..79e275098d 100644 --- a/pkg/controllers/deprovisioning/expiration.go +++ b/pkg/controllers/deprovisioning/expiration.go @@ -79,15 +79,15 @@ func (e *Expiration) ComputeCommand(ctx context.Context, candidates ...*Candidat } // filter out nodes that can't be terminated candidates = lo.Filter(candidates, func(cn *Candidate, _ int) bool { - if !cn.Node.DeletionTimestamp.IsZero() { + if !cn.Machine.DeletionTimestamp.IsZero() { return false } if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok { - e.recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) + e.recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) return false } if p, ok := hasDoNotEvictPod(cn); ok { - e.recorder.Publish(deprovisioningevents.Blocked(cn.Node, + e.recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) return false } diff --git a/pkg/controllers/deprovisioning/expiration_test.go b/pkg/controllers/deprovisioning/expiration_test.go index 87c2a500c9..ca10ac7dfe 100644 --- a/pkg/controllers/deprovisioning/expiration_test.go +++ b/pkg/controllers/deprovisioning/expiration_test.go @@ -70,8 +70,9 @@ var _ = Describe("Expiration", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) // Expect to not create or delete more machines + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + ExpectExists(ctx, env.Client, machine) }) It("can delete expired nodes", func() { prov.Spec.TTLSecondsUntilExpired = ptr.Int64(60) @@ -88,9 +89,13 @@ var _ = Describe("Expiration", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + // Expect that the expired machine is gone + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) }) It("should expire one node at a time, starting with most expired", func() { expireProv := test.Provisioner(test.ProvisionerOptions{ @@ -139,9 +144,13 @@ var _ = Describe("Expiration", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machineToExpire) + // Expect that one of the expired machines is gone + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, nodeToExpire) + ExpectNotFound(ctx, env.Client, machineToExpire, nodeToExpire) }) It("can replace node for expiration", func() { labels := map[string]string{ @@ -179,18 +188,23 @@ var _ = Describe("Expiration", func() { // deprovisioning won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + // Expect that the new machine was created, and it's different than the original - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) + machines := ExpectMachines(ctx, env.Client) nodes := ExpectNodes(ctx, env.Client) + Expect(machines).To(HaveLen(1)) Expect(nodes).To(HaveLen(1)) + Expect(machines[0].Name).ToNot(Equal(machine.Name)) Expect(nodes[0].Name).ToNot(Equal(node.Name)) }) - It("should uncordon nodes when expiration replacement fails", func() { - cloudProvider.AllowedCreateCalls = 0 // fail the replacement and expect it to uncordon + It("should uncordon nodes when expiration replacement partially fails", func() { prov.Spec.TTLSecondsUntilExpired = ptr.Int64(30) labels := map[string]string{ @@ -226,6 +240,7 @@ var _ = Describe("Expiration", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) + ExpectNewMachinesDeleted(ctx, env.Client, &wg, 1) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).To(HaveOccurred()) wg.Wait() @@ -318,11 +333,15 @@ var _ = Describe("Expiration", func() { // deprovisioning won't delete the old machine until the new machine is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 3) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 3) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) }) }) diff --git a/pkg/controllers/deprovisioning/metrics.go b/pkg/controllers/deprovisioning/metrics.go index 554b3c26bf..99e00ba0c1 100644 --- a/pkg/controllers/deprovisioning/metrics.go +++ b/pkg/controllers/deprovisioning/metrics.go @@ -44,8 +44,8 @@ var deprovisioningReplacementNodeInitializedHistogram = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: metrics.Namespace, Subsystem: deprovisioningSubsystem, - Name: "replacement_node_initialized_seconds", - Help: "Amount of time required for a replacement node to become initialized.", + Name: "replacement_machine_initialized_seconds", + Help: "Amount of time required for a replacement machine to become initialized.", Buckets: metrics.DurationBuckets(), }) diff --git a/pkg/controllers/deprovisioning/suite_test.go b/pkg/controllers/deprovisioning/suite_test.go index 2ab65cd23d..6fd8574b97 100644 --- a/pkg/controllers/deprovisioning/suite_test.go +++ b/pkg/controllers/deprovisioning/suite_test.go @@ -260,21 +260,26 @@ var _ = Describe("Replace Nodes", func() { // consolidation won't delete the old machine until the new machine is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + // should create a new machine as there is a cheaper one that can hold the pod + machines := ExpectMachines(ctx, env.Client) nodes := ExpectNodes(ctx, env.Client) + Expect(machines).To(HaveLen(1)) Expect(nodes).To(HaveLen(1)) // Expect that the new machine does not request the most expensive instance type - Expect(nodes[0].Name).ToNot(Equal(node.Name)) - Expect(scheduling.NewLabelRequirements(nodes[0].Labels).Has(v1.LabelInstanceTypeStable)).To(BeTrue()) - Expect(scheduling.NewLabelRequirements(nodes[0].Labels).Get(v1.LabelInstanceTypeStable).Has(mostExpensiveInstance.Name)).To(BeFalse()) + Expect(machines[0].Name).ToNot(Equal(machine.Name)) + Expect(scheduling.NewNodeSelectorRequirements(machines[0].Spec.Requirements...).Has(v1.LabelInstanceTypeStable)).To(BeTrue()) + Expect(scheduling.NewNodeSelectorRequirements(machines[0].Spec.Requirements...).Get(v1.LabelInstanceTypeStable).Has(mostExpensiveInstance.Name)).To(BeFalse()) // and delete the old one - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) }) It("can replace nodes, considers PDB", func() { labels := map[string]string{ @@ -350,7 +355,7 @@ var _ = Describe("Replace Nodes", func() { // we didn't create a new machine or delete the old one Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + ExpectExists(ctx, env.Client, machine) }) It("can replace nodes, PDB namespace must match", func() { labels := map[string]string{ @@ -419,13 +424,17 @@ var _ = Describe("Replace Nodes", func() { // consolidation won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + // should create a new machine as there is a cheaper one that can hold the pod + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) }) It("can replace nodes, considers do-not-consolidate annotation", func() { labels := map[string]string{ @@ -509,9 +518,13 @@ var _ = Describe("Replace Nodes", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, regularMachine) + // we should delete the non-annotated node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, regularNode) + ExpectNotFound(ctx, env.Client, regularMachine, regularNode) }) It("won't replace node if any spot replacement is more expensive", func() { currentInstance := fake.NewInstanceType(fake.InstanceTypeOptions{ @@ -609,8 +622,9 @@ var _ = Describe("Replace Nodes", func() { Expect(cluster.Consolidated()).To(BeTrue()) // Expect to not create or delete more machines + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + ExpectExists(ctx, env.Client, machine) }) It("won't replace on-demand node if on-demand replacement is more expensive", func() { currentInstance := fake.NewInstanceType(fake.InstanceTypeOptions{ @@ -723,8 +737,9 @@ var _ = Describe("Replace Nodes", func() { Expect(cluster.Consolidated()).To(BeTrue()) // Expect to not create or delete more machines + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + ExpectExists(ctx, env.Client, machine) }) It("waits for node deletion to finish", func() { labels := map[string]string{ @@ -753,6 +768,7 @@ var _ = Describe("Replace Nodes", func() { }) machine, node := test.MachineAndNode(v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{"unit-test.com/block-deletion"}, Labels: map[string]string{ v1alpha5.ProvisionerNameLabelKey: prov.Name, v1.LabelInstanceTypeStable: mostExpensiveInstance.Name, @@ -765,7 +781,6 @@ var _ = Describe("Replace Nodes", func() { Allocatable: map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("32")}, }, }) - node.Finalizers = []string{"unit-test.com/block-deletion"} ExpectApplied(ctx, env.Client, rs, pod, machine, node, prov) @@ -780,7 +795,7 @@ var _ = Describe("Replace Nodes", func() { // consolidation won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) var consolidationFinished atomic.Bool go func() { @@ -791,25 +806,31 @@ var _ = Describe("Replace Nodes", func() { }() wg.Wait() - // node should still exist - ExpectExists(ctx, env.Client, node) + // machine should still exist + ExpectExists(ctx, env.Client, machine) // and consolidation should still be running waiting on the machine's deletion Expect(consolidationFinished.Load()).To(BeFalse()) - // fetch the latest node object and remove the finalizer - node = ExpectExists(ctx, env.Client, node) - ExpectFinalizersRemoved(ctx, env.Client, node) + // fetch the latest machine object and remove the finalizer + machine = ExpectExists(ctx, env.Client, machine) + ExpectFinalizersRemoved(ctx, env.Client, machine) // consolidation should complete now that the finalizer on the machine is gone and it can // was actually deleted Eventually(consolidationFinished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() - ExpectNotFound(ctx, env.Client, node) + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + + ExpectNotFound(ctx, env.Client, machine, node) // Expect that the new machine was created and its different than the original + machines := ExpectMachines(ctx, env.Client) nodes := ExpectNodes(ctx, env.Client) + Expect(machines).To(HaveLen(1)) Expect(nodes).To(HaveLen(1)) + Expect(machines[0].Name).ToNot(Equal(machine.Name)) Expect(nodes[0].Name).ToNot(Equal(node.Name)) }) }) @@ -894,10 +915,14 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine2) + // we don't need a new node, but we should evict everything off one of node2 which only has a single pod + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // and delete the old one - ExpectNotFound(ctx, env.Client, node2) + ExpectNotFound(ctx, env.Client, machine2, node2) }) It("can delete nodes, considers PDB", func() { var nl v1.NodeList @@ -955,11 +980,15 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // we don't need a new node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // but we expect to delete the machine with more pods (node1) as the pod on machine2 has a PDB preventing // eviction - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) }) It("can delete nodes, considers do-not-evict", func() { // create our RS, so we can link a pod to it @@ -1002,10 +1031,14 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // we don't need a new node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // but we expect to delete the machine with more pods (machine1) as the pod on machine2 has a do-not-evict annotation - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) }) It("can delete nodes, evicts pods without an ownerRef", func() { // create our RS so we can link a pod to it @@ -1046,11 +1079,15 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine2) + // we don't need a new node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // but we expect to delete the machine with the fewest pods (machine 2) even though the pod has no ownerRefs // and will not be recreated - ExpectNotFound(ctx, env.Client, node2) + ExpectNotFound(ctx, env.Client, machine2, node2) }) }) @@ -1143,10 +1180,14 @@ var _ = Describe("Node Lifetime Consideration", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // the second node has more pods, so it would normally not be picked for consolidation, except it very little // lifetime remaining, so it should be deleted + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) }) }) @@ -1154,7 +1195,7 @@ var _ = Describe("Topology Consideration", func() { var prov *v1alpha5.Provisioner var zone1Machine, zone2Machine, zone3Machine *v1alpha5.Machine var zone1Node, zone2Node, zone3Node *v1.Node - var oldNodeNames sets.String + var oldMachineNames sets.String BeforeEach(func() { testZone1Instance := leastExpensiveInstanceWithZone("test-zone-1") @@ -1206,7 +1247,7 @@ var _ = Describe("Topology Consideration", func() { Allocatable: map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("1")}, }, }) - oldNodeNames = sets.NewString(zone1Node.Name, zone2Node.Name, zone3Node.Name) + oldMachineNames = sets.NewString(zone1Machine.Name, zone2Machine.Name, zone3Machine.Name) }) It("can replace node maintaining zonal topology spread", func() { labels := map[string]string{ @@ -1255,17 +1296,25 @@ var _ = Describe("Topology Consideration", func() { // consolidation won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, zone2Machine) + // should create a new node as there is a cheaper one that can hold the pod + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) - ExpectNotFound(ctx, env.Client, zone2Node) + ExpectNotFound(ctx, env.Client, zone2Machine, zone2Node) - // Find the new node + // Find the new node associated with the machine + newMachine, ok := lo.Find(ExpectMachines(ctx, env.Client), func(m *v1alpha5.Machine) bool { + return !oldMachineNames.Has(m.Name) + }) + Expect(ok).To(BeTrue()) newNode, ok := lo.Find(ExpectNodes(ctx, env.Client), func(n *v1.Node) bool { - return !oldNodeNames.Has(n.Name) + return newMachine.Status.ProviderID == n.Spec.ProviderID }) Expect(ok).To(BeTrue()) @@ -1338,10 +1387,11 @@ var _ = Describe("Topology Consideration", func() { // our nodes are already the cheapest available, so we can't replace them. If we delete, it would // violate the anti-affinity rule, so we can't do anything. + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) - ExpectExists(ctx, env.Client, zone1Node) - ExpectExists(ctx, env.Client, zone2Node) - ExpectExists(ctx, env.Client, zone3Node) + ExpectExists(ctx, env.Client, zone1Machine) + ExpectExists(ctx, env.Client, zone2Machine) + ExpectExists(ctx, env.Client, zone3Machine) }) }) @@ -1400,9 +1450,13 @@ var _ = Describe("Empty Nodes", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // we should delete the empty node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) }) It("can delete multiple empty nodes with consolidation", func() { ExpectApplied(ctx, env.Client, machine1, node1, machine2, node2, prov) @@ -1415,10 +1469,14 @@ var _ = Describe("Empty Nodes", func() { ExpectTriggerVerifyAction(&wg) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2) + // we should delete the empty nodes + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, node1) - ExpectNotFound(ctx, env.Client, node2) + ExpectNotFound(ctx, env.Client, machine1) + ExpectNotFound(ctx, env.Client, machine2) }) It("can delete empty nodes with TTLSecondsAfterEmpty with the emptiness timestamp", func() { prov = test.Provisioner(test.ProvisionerOptions{TTLSecondsAfterEmpty: ptr.Int64(10)}) @@ -1444,9 +1502,13 @@ var _ = Describe("Empty Nodes", func() { ExpectTriggerVerifyAction(&wg) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // we should delete the empty node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) }) It("considers pending pods when consolidating", func() { machine1, node1 = test.MachineAndNode(v1alpha5.Machine{ @@ -1499,8 +1561,9 @@ var _ = Describe("Empty Nodes", func() { // we don't need any new nodes and consolidation should notice the huge pending pod that needs the large // node to schedule, which prevents the large expensive node from being replaced + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node1) + ExpectExists(ctx, env.Client, machine1) }) }) @@ -1567,7 +1630,7 @@ var _ = Describe("Consolidation TTL", func() { // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) // and the node should not be deleted yet - ExpectExists(ctx, env.Client, node1) + ExpectExists(ctx, env.Client, machine1) // advance the clock so that the timeout expires fakeClock.Step(31 * time.Second) @@ -1575,9 +1638,13 @@ var _ = Describe("Consolidation TTL", func() { Eventually(finished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // machine should be deleted after the TTL due to emptiness + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) }) It("should wait for the node TTL for non-empty nodes before consolidating", func() { labels := map[string]string{ @@ -1647,8 +1714,8 @@ var _ = Describe("Consolidation TTL", func() { // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) // and the node should not be deleted yet - ExpectExists(ctx, env.Client, node1) - ExpectExists(ctx, env.Client, node2) + ExpectExists(ctx, env.Client, machine1) + ExpectExists(ctx, env.Client, machine2) // advance the clock so that the timeout expires fakeClock.Step(31 * time.Second) @@ -1656,9 +1723,13 @@ var _ = Describe("Consolidation TTL", func() { Eventually(finished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine2) + // machine should be deleted after the TTL due to emptiness + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, node2) + ExpectNotFound(ctx, env.Client, machine2, node2) }) It("should not consolidate if the action becomes invalid during the node TTL wait", func() { pod := test.Pod() @@ -1682,7 +1753,7 @@ var _ = Describe("Consolidation TTL", func() { // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) // and the node should not be deleted yet - ExpectExists(ctx, env.Client, node1) + ExpectExists(ctx, env.Client, machine1) // make the node non-empty by binding it ExpectManualBinding(ctx, env.Client, pod, node1) @@ -1695,8 +1766,9 @@ var _ = Describe("Consolidation TTL", func() { wg.Wait() // nothing should be removed since the node is no longer empty + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node1) + ExpectExists(ctx, env.Client, machine1) }) }) @@ -1764,7 +1836,7 @@ var _ = Describe("Parallelization", func() { // Run the processing loop in parallel in the background with environment context var wg sync.WaitGroup - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectTriggerVerifyAction(&wg) go func() { defer GinkgoRecover() @@ -1772,11 +1844,12 @@ var _ = Describe("Parallelization", func() { }() wg.Wait() - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2)) + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(2)) // Add a new pending pod that should schedule while node is not yet deleted pod = test.UnschedulablePod() ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, provisioner, pod) + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(2)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2)) ExpectScheduled(ctx, env.Client, pod) }) @@ -1820,6 +1893,8 @@ var _ = Describe("Parallelization", func() { ExpectApplied(ctx, env.Client, rs, prov) ExpectProvisionedNoBinding(ctx, env.Client, cluster, cloudProvider, provisioner, lo.Map(pods, func(p *v1.Pod, _ int) *v1.Pod { return p.DeepCopy() })...) + machines := ExpectMachines(ctx, env.Client) + Expect(machines).To(HaveLen(1)) nodes := ExpectNodes(ctx, env.Client) Expect(nodes).To(HaveLen(1)) @@ -1953,13 +2028,17 @@ var _ = Describe("Multi-Node Consolidation", func() { var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2, machine3) + // three machines should be replaced with a single machine + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, node1, node2, node3) + ExpectNotFound(ctx, env.Client, machine1, node1, machine2, node2, machine3, node3) }) It("won't merge 2 nodes into 1 of the same type", func() { labels := map[string]string{ @@ -2020,15 +2099,20 @@ var _ = Describe("Multi-Node Consolidation", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // We have [cheap-node, cheap-node] which multi-node consolidation could consolidate via // [delete cheap-node, delete cheap-node, launch cheap-node]. This isn't the best method though // as we should instead just delete one of the nodes instead of deleting both and launching a single // identical replacement. This test verifies the filterOutSameType function from multi-node consolidation // works to ensure we perform the least-disruptive action. + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // should have just deleted the node with the fewest pods - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) // and left the other node alone + ExpectExists(ctx, env.Client, machine2) ExpectExists(ctx, env.Client, node2) }) It("should wait for the node TTL for non-empty nodes before consolidating (multi-node)", func() { @@ -2062,7 +2146,7 @@ var _ = Describe("Multi-Node Consolidation", func() { ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1, node2}, []*v1alpha5.Machine{machine1, machine2}) var wg sync.WaitGroup - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) wg.Add(1) finished := atomic.Bool{} @@ -2078,8 +2162,8 @@ var _ = Describe("Multi-Node Consolidation", func() { // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) // and the node should not be deleted yet - ExpectExists(ctx, env.Client, node1) - ExpectExists(ctx, env.Client, node2) + ExpectExists(ctx, env.Client, machine1) + ExpectExists(ctx, env.Client, machine2) // advance the clock so that the timeout expires fakeClock.Step(31 * time.Second) @@ -2087,10 +2171,14 @@ var _ = Describe("Multi-Node Consolidation", func() { Eventually(finished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2) + // should launch a single smaller replacement node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // and delete the two large ones - ExpectNotFound(ctx, env.Client, node1, node2) + ExpectNotFound(ctx, env.Client, machine1, node1, machine2, node2) }) }) @@ -2183,47 +2271,6 @@ func ExpectNewMachinesDeleted(ctx context.Context, c client.Client, wg *sync.Wai }() } -func ExpectMakeNewNodesReady(ctx context.Context, c client.Client, wg *sync.WaitGroup, numNewNodes int) { - existingNodes := ExpectNodes(ctx, c) - existingNodeNames := sets.NewString(lo.Map(existingNodes, func(n *v1.Node, _ int) string { - return n.Name - })...) - - wg.Add(1) - go func() { - nodesMadeReady := 0 - ctx, cancel := context.WithTimeout(ctx, time.Second*10) // give up after 10s - defer GinkgoRecover() - defer wg.Done() - defer cancel() - for { - select { - case <-time.After(50 * time.Millisecond): - nodeList := &v1.NodeList{} - if err := c.List(ctx, nodeList); err != nil { - continue - } - for i := range nodeList.Items { - n := &nodeList.Items[i] - if existingNodeNames.Has(n.Name) { - continue - } - ExpectMakeNodesReady(ctx, c, n) - - nodesMadeReady++ - existingNodeNames.Insert(n.Name) - // did we make all the nodes ready that we expected? - if nodesMadeReady == numNewNodes { - return - } - } - case <-ctx.Done(): - Fail(fmt.Sprintf("waiting for nodes to be ready, %s", ctx.Err())) - } - } - }() -} - func ExpectMakeNewMachinesReady(ctx context.Context, c client.Client, wg *sync.WaitGroup, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, numNewMachines int) { diff --git a/pkg/controllers/deprovisioning/types.go b/pkg/controllers/deprovisioning/types.go index 57ac93f175..f980a2438d 100644 --- a/pkg/controllers/deprovisioning/types.go +++ b/pkg/controllers/deprovisioning/types.go @@ -90,6 +90,9 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, clk clock.Clock if node.Nominated() { return nil, fmt.Errorf("state node is nominated") } + if node.Node == nil || node.Machine == nil { + return nil, fmt.Errorf("state node doesn't contain both a node and a machine") + } pods, err := node.Pods(ctx, kubeClient) if err != nil { diff --git a/pkg/controllers/inflightchecks/controller.go b/pkg/controllers/inflightchecks/controller.go index 5b4a4ff5d0..d9de8bb1d5 100644 --- a/pkg/controllers/inflightchecks/controller.go +++ b/pkg/controllers/inflightchecks/controller.go @@ -27,15 +27,17 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" inflightchecksevents "github.com/aws/karpenter-core/pkg/controllers/inflightchecks/events" "github.com/aws/karpenter-core/pkg/events" corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" + machineutil "github.com/aws/karpenter-core/pkg/utils/machine" ) -var _ corecontroller.TypedController[*v1.Node] = (*Controller)(nil) +var _ corecontroller.TypedController[*v1alpha5.Machine] = (*Controller)(nil) type Controller struct { clock clock.Clock @@ -50,7 +52,7 @@ type Issue string type Check interface { // Check performs the inflight check, this should return a list of slice discovered, or an empty // slice if no issues were found - Check(context.Context, *v1.Node) ([]Issue, error) + Check(context.Context, *v1.Node, *v1alpha5.Machine) ([]Issue, error) } // scanPeriod is how often we inspect and report issues that are found. @@ -59,15 +61,15 @@ const scanPeriod = 10 * time.Minute func NewController(clk clock.Clock, kubeClient client.Client, recorder events.Recorder, provider cloudprovider.CloudProvider) corecontroller.Controller { - return corecontroller.Typed[*v1.Node](kubeClient, &Controller{ + return corecontroller.Typed[*v1alpha5.Machine](kubeClient, &Controller{ clock: clk, kubeClient: kubeClient, recorder: recorder, lastScanned: cache.New(scanPeriod, 1*time.Minute), checks: []Check{ - NewFailedInit(clk, kubeClient, provider), + NewFailedInit(clk, provider), NewTermination(kubeClient), - NewNodeShape(kubeClient, provider), + NewNodeShape(provider), }}, ) } @@ -76,12 +78,12 @@ func (c *Controller) Name() string { return "inflightchecks" } -func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Result, error) { - if node.Labels[v1alpha5.ProvisionerNameLabelKey] == "" { +func (c *Controller) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) { + if machine.Status.ProviderID == "" { return reconcile.Result{}, nil } // If we get an event before we should check for inflight checks, we ignore and wait - if lastTime, ok := c.lastScanned.Get(client.ObjectKeyFromObject(node).String()); ok { + if lastTime, ok := c.lastScanned.Get(client.ObjectKeyFromObject(machine).String()); ok { if lastTime, ok := lastTime.(time.Time); ok { remaining := scanPeriod - c.clock.Since(lastTime) return reconcile.Result{RequeueAfter: remaining}, nil @@ -89,11 +91,15 @@ func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Re // the above should always succeed return reconcile.Result{RequeueAfter: scanPeriod}, nil } - c.lastScanned.SetDefault(client.ObjectKeyFromObject(node).String(), c.clock.Now()) + c.lastScanned.SetDefault(client.ObjectKeyFromObject(machine).String(), c.clock.Now()) + node, err := machineutil.NodeForMachine(ctx, c.kubeClient, machine) + if err != nil { + return reconcile.Result{}, machineutil.IgnoreDuplicateNodeError(machineutil.IgnoreNodeNotFoundError(err)) + } var allIssues []Issue for _, check := range c.checks { - issues, err := check.Check(ctx, node) + issues, err := check.Check(ctx, node, machine) if err != nil { logging.FromContext(ctx).Errorf("checking node with %T, %s", check, err) } @@ -101,15 +107,19 @@ func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Re } for _, issue := range allIssues { logging.FromContext(ctx).Infof("inflight check failed, %s", issue) - c.recorder.Publish(inflightchecksevents.InflightCheck(node, string(issue))...) + c.recorder.Publish(inflightchecksevents.InflightCheck(node, machine, string(issue))...) } return reconcile.Result{RequeueAfter: scanPeriod}, nil } -func (c *Controller) Builder(_ context.Context, m manager.Manager) corecontroller.Builder { +func (c *Controller) Builder(ctx context.Context, m manager.Manager) corecontroller.Builder { return corecontroller.Adapt(controllerruntime. NewControllerManagedBy(m). - For(&v1.Node{}). + For(&v1alpha5.Machine{}). + Watches( + &source.Kind{Type: &v1.Node{}}, + machineutil.NodeEventHandler(ctx, c.kubeClient), + ). WithOptions(controller.Options{MaxConcurrentReconciles: 10}), ) } diff --git a/pkg/controllers/inflightchecks/events/events.go b/pkg/controllers/inflightchecks/events/events.go index e2d3a06b8e..69e8d81d92 100644 --- a/pkg/controllers/inflightchecks/events/events.go +++ b/pkg/controllers/inflightchecks/events/events.go @@ -17,10 +17,11 @@ package events import ( v1 "k8s.io/api/core/v1" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/events" ) -func InflightCheck(node *v1.Node, message string) []events.Event { +func InflightCheck(node *v1.Node, machine *v1alpha5.Machine, message string) []events.Event { return []events.Event{ { InvolvedObject: node, @@ -29,5 +30,12 @@ func InflightCheck(node *v1.Node, message string) []events.Event { Message: message, DedupeValues: []string{node.Name, message}, }, + { + InvolvedObject: machine, + Type: v1.EventTypeWarning, + Reason: "FailedInflightCheck", + Message: message, + DedupeValues: []string{machine.Name, message}, + }, } } diff --git a/pkg/controllers/inflightchecks/failedinit.go b/pkg/controllers/inflightchecks/failedinit.go index f30df1cc88..08b4727bc1 100644 --- a/pkg/controllers/inflightchecks/failedinit.go +++ b/pkg/controllers/inflightchecks/failedinit.go @@ -19,15 +19,12 @@ import ( "fmt" "time" - "github.com/samber/lo" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/utils/clock" - "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" - "github.com/aws/karpenter-core/pkg/controllers/node" + "github.com/aws/karpenter-core/pkg/controllers/machine" ) // initFailureTime is the time after which we start reporting a node as having failed to initialize. This is set @@ -37,51 +34,34 @@ const initFailureTime = time.Hour // FailedInit detects nodes that fail to initialize within an hour and reports the reason for the initialization // failure type FailedInit struct { - clock clock.Clock - kubeClient client.Client - provider cloudprovider.CloudProvider + clock clock.Clock + provider cloudprovider.CloudProvider } -func NewFailedInit(clk clock.Clock, kubeClient client.Client, provider cloudprovider.CloudProvider) Check { - return &FailedInit{clock: clk, kubeClient: kubeClient, provider: provider} +func NewFailedInit(clk clock.Clock, provider cloudprovider.CloudProvider) Check { + return &FailedInit{clock: clk, provider: provider} } -func (f FailedInit) Check(ctx context.Context, n *v1.Node) ([]Issue, error) { - // ignore nodes that are deleting - if !n.DeletionTimestamp.IsZero() { +func (f FailedInit) Check(_ context.Context, node *v1.Node, m *v1alpha5.Machine) ([]Issue, error) { + // ignore machines that are deleting + if !m.DeletionTimestamp.IsZero() { return nil, nil } - - nodeAge := f.clock.Since(n.CreationTimestamp.Time) - // n is already initialized or not old enough - if n.Labels[v1alpha5.LabelNodeInitialized] == "true" || nodeAge < initFailureTime { + // machine is already initialized or isn't old enough + if m.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() || + f.clock.Now().Before(m.CreationTimestamp.Time.Add(initFailureTime)) { return nil, nil } - provisioner := &v1alpha5.Provisioner{} - if err := f.kubeClient.Get(ctx, types.NamespacedName{Name: n.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil { - // provisioner is missing, node should be removed soon - return nil, client.IgnoreNotFound(err) - } - instanceTypes, err := f.provider.GetInstanceTypes(ctx, provisioner) - if err != nil { - return nil, err - } - instanceType, ok := lo.Find(instanceTypes, func(it *cloudprovider.InstanceType) bool { return it.Name == n.Labels[v1.LabelInstanceTypeStable] }) - if !ok { - return []Issue{Issue(fmt.Sprintf("Instance Type %q not found", n.Labels[v1.LabelInstanceTypeStable]))}, nil - } // detect startup taints which should be removed var result []Issue - if taint, ok := node.IsStartupTaintRemoved(n, provisioner); !ok { + if taint, ok := machine.IsStartupTaintRemoved(node, m); !ok { result = append(result, Issue(fmt.Sprintf("Startup taint %q is still on the node", formatTaint(taint)))) } - // and extended resources which never registered - if resource, ok := node.IsExtendedResourceRegistered(n, instanceType); !ok { + if resource, ok := machine.RequestedResourcesRegistered(node, m); !ok { result = append(result, Issue(fmt.Sprintf("Expected resource %q didn't register on the node", resource))) } - return result, nil } diff --git a/pkg/controllers/inflightchecks/nodeshape.go b/pkg/controllers/inflightchecks/nodeshape.go index c3648c4bbf..3895c4679e 100644 --- a/pkg/controllers/inflightchecks/nodeshape.go +++ b/pkg/controllers/inflightchecks/nodeshape.go @@ -18,10 +18,7 @@ import ( "context" "fmt" - "github.com/samber/lo" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" @@ -29,44 +26,29 @@ import ( // NodeShape detects nodes that have launched with 10% or less of any resource than was expected. type NodeShape struct { - kubeClient client.Client - provider cloudprovider.CloudProvider + provider cloudprovider.CloudProvider } -func NewNodeShape(kubeClient client.Client, provider cloudprovider.CloudProvider) Check { +func NewNodeShape(provider cloudprovider.CloudProvider) Check { return &NodeShape{ - kubeClient: kubeClient, - provider: provider, + provider: provider, } } -func (n *NodeShape) Check(ctx context.Context, node *v1.Node) ([]Issue, error) { - // ignore nodes that are deleting - if !node.DeletionTimestamp.IsZero() { +func (n *NodeShape) Check(ctx context.Context, node *v1.Node, machine *v1alpha5.Machine) ([]Issue, error) { + // ignore machines that are deleting + if !machine.DeletionTimestamp.IsZero() { return nil, nil } - // and nodes that haven't initialized yet - if node.Labels[v1alpha5.LabelNodeInitialized] != "true" { + // and machines that haven't initialized yet + if machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() { return nil, nil } - provisioner := &v1alpha5.Provisioner{} - if err := n.kubeClient.Get(ctx, types.NamespacedName{Name: node.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil { - // provisioner is missing, node should be removed soon - return nil, client.IgnoreNotFound(err) - } - instanceTypes, err := n.provider.GetInstanceTypes(ctx, provisioner) - if err != nil { - return nil, err - } - instanceType, ok := lo.Find(instanceTypes, func(it *cloudprovider.InstanceType) bool { return it.Name == node.Labels[v1.LabelInstanceTypeStable] }) - if !ok { - return []Issue{Issue(fmt.Sprintf("Instance Type %q not found", node.Labels[v1.LabelInstanceTypeStable]))}, nil - } var issues []Issue - for resourceName, expectedQuantity := range instanceType.Capacity { + for resourceName, expectedQuantity := range machine.Status.Capacity { nodeQuantity, ok := node.Status.Capacity[resourceName] if !ok && !expectedQuantity.IsZero() { - issues = append(issues, Issue(fmt.Sprintf("Expected resource %s not found", resourceName))) + issues = append(issues, Issue(fmt.Sprintf("Expected resource \"%s\" not found", resourceName))) continue } @@ -75,6 +57,7 @@ func (n *NodeShape) Check(ctx context.Context, node *v1.Node) ([]Issue, error) { issues = append(issues, Issue(fmt.Sprintf("Expected %s of resource %s, but found %s (%0.1f%% of expected)", expectedQuantity.String(), resourceName, nodeQuantity.String(), pct*100))) } + } return issues, nil } diff --git a/pkg/controllers/inflightchecks/suite_test.go b/pkg/controllers/inflightchecks/suite_test.go index 57c67caefa..ee3a5d968c 100644 --- a/pkg/controllers/inflightchecks/suite_test.go +++ b/pkg/controllers/inflightchecks/suite_test.go @@ -124,16 +124,10 @@ var _ = Describe("Controller", func() { } ExpectApplied(ctx, env.Client, provisioner, machine, node) fakeClock.Step(2 * time.Hour) - ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(node)) + ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(machine)) Expect(recorder.DetectedEvent("Expected resource \"fake.com/vendor-a\" didn't register on the node")).To(BeTrue()) }) It("should detect issues with nodes that have a startup taint which isn't removed", func() { - provisioner.Spec.StartupTaints = []v1.Taint{ - { - Key: "my.startup.taint", - Effect: v1.TaintEffectNoSchedule, - }, - } machine, node := test.MachineAndNode(v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -160,7 +154,7 @@ var _ = Describe("Controller", func() { }) ExpectApplied(ctx, env.Client, provisioner, machine, node) fakeClock.Step(2 * time.Hour) - ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(node)) + ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(machine)) Expect(recorder.DetectedEvent("Startup taint \"my.startup.taint:NoSchedule\" is still on the node")).To(BeTrue()) }) }) @@ -188,12 +182,12 @@ var _ = Describe("Controller", func() { Labels: podsLabels, MaxUnavailable: &intstr.IntOrString{IntVal: 0, Type: intstr.Int}, }) - node.Finalizers = []string{"prevent.deletion/now"} + machine.Finalizers = []string{"prevent.deletion/now"} p := test.Pod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: podsLabels}}) ExpectApplied(ctx, env.Client, provisioner, machine, node, p, pdb) ExpectManualBinding(ctx, env.Client, p, node) - _ = env.Client.Delete(ctx, node) - ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(node)) + _ = env.Client.Delete(ctx, machine) + ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(machine)) Expect(recorder.DetectedEvent(fmt.Sprintf("Can't drain node, PDB %s/%s is blocking evictions", pdb.Namespace, pdb.Name))).To(BeTrue()) }) }) @@ -223,7 +217,7 @@ var _ = Describe("Controller", func() { v1.ResourcePods: resource.MustParse("10"), } ExpectApplied(ctx, env.Client, provisioner, machine, node) - ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(node)) + ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(machine)) Expect(recorder.DetectedEvent("Expected 128Gi of resource memory, but found 64Gi (50.0% of expected)")).To(BeTrue()) }) }) diff --git a/pkg/controllers/inflightchecks/termination.go b/pkg/controllers/inflightchecks/termination.go index b4116c1917..836de78496 100644 --- a/pkg/controllers/inflightchecks/termination.go +++ b/pkg/controllers/inflightchecks/termination.go @@ -21,6 +21,7 @@ import ( v1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/controllers/deprovisioning" nodeutils "github.com/aws/karpenter-core/pkg/utils/node" ) @@ -36,9 +37,9 @@ func NewTermination(kubeClient client.Client) Check { } } -func (t *Termination) Check(ctx context.Context, node *v1.Node) ([]Issue, error) { +func (t *Termination) Check(ctx context.Context, node *v1.Node, machine *v1alpha5.Machine) ([]Issue, error) { // we are only looking at nodes that are hung deleting - if node.DeletionTimestamp.IsZero() { + if machine.DeletionTimestamp.IsZero() { return nil, nil } pdbs, err := deprovisioning.NewPDBLimits(ctx, t.kubeClient) diff --git a/pkg/controllers/node/controller.go b/pkg/controllers/node/controller.go index 3a9e9ae57b..3304ed9ec5 100644 --- a/pkg/controllers/node/controller.go +++ b/pkg/controllers/node/controller.go @@ -49,22 +49,21 @@ var _ corecontroller.TypedController[*v1.Node] = (*Controller)(nil) // Controller manages a set of properties on karpenter provisioned nodes, such as // taints, labels, finalizers. type Controller struct { - kubeClient client.Client - cluster *state.Cluster - initialization *Initialization - emptiness *Emptiness - finalizer *Finalizer - drift *Drift + kubeClient client.Client + cluster *state.Cluster + + emptiness *Emptiness + drift *Drift } // NewController constructs a nodeController instance func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster) corecontroller.Controller { return corecontroller.Typed[*v1.Node](kubeClient, &Controller{ - kubeClient: kubeClient, - cluster: cluster, - initialization: &Initialization{kubeClient: kubeClient, cloudProvider: cloudProvider}, - emptiness: &Emptiness{kubeClient: kubeClient, clock: clk, cluster: cluster}, - drift: &Drift{kubeClient: kubeClient, cloudProvider: cloudProvider}, + kubeClient: kubeClient, + cluster: cluster, + + emptiness: &Emptiness{kubeClient: kubeClient, clock: clk, cluster: cluster}, + drift: &Drift{kubeClient: kubeClient, cloudProvider: cloudProvider}, }) } @@ -81,24 +80,22 @@ func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Re if !node.DeletionTimestamp.IsZero() { return reconcile.Result{}, nil } - provisioner := &v1alpha5.Provisioner{} if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: node.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil { return reconcile.Result{}, client.IgnoreNotFound(err) } - - // Execute Reconcilers - var results []reconcile.Result - var errs error + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provisioner", provisioner.Name)) reconcilers := []nodeReconciler{ - c.initialization, c.emptiness, - c.finalizer, } if settings.FromContext(ctx).DriftEnabled { reconcilers = append(reconcilers, c.drift) } + + // Execute Reconcilers + var results []reconcile.Result + var errs error for _, reconciler := range reconcilers { res, err := reconciler.Reconcile(ctx, provisioner, node) errs = multierr.Append(errs, err) diff --git a/pkg/controllers/node/drift.go b/pkg/controllers/node/drift.go index ce19347635..dba1d62b4f 100644 --- a/pkg/controllers/node/drift.go +++ b/pkg/controllers/node/drift.go @@ -19,15 +19,15 @@ import ( "fmt" "time" + "github.com/samber/lo" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/samber/lo" - "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" - "github.com/aws/karpenter-core/pkg/utils/machine" ) type Drift struct { @@ -39,17 +39,30 @@ func (d *Drift) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisioner if _, ok := node.Annotations[v1alpha5.VoluntaryDisruptionAnnotationKey]; ok { return reconcile.Result{}, nil } - + machineName, ok := node.Labels[v1alpha5.MachineNameLabelKey] + if !ok { + return reconcile.Result{}, nil + } + machine := &v1alpha5.Machine{} + if err := d.kubeClient.Get(ctx, types.NamespacedName{Name: machineName}, machine); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(err) + } + if !machine.StatusConditions().GetCondition(v1alpha5.MachineCreated).IsTrue() { + return reconcile.Result{Requeue: true}, nil + } // TODO: Add Provisioner Drift - drifted, err := d.cloudProvider.IsMachineDrifted(ctx, machine.NewFromNode(node)) + drifted, err := d.cloudProvider.IsMachineDrifted(ctx, machine) if err != nil { return reconcile.Result{}, fmt.Errorf("getting drift for node, %w", err) } - if drifted { - node.Annotations = lo.Assign(node.Annotations, map[string]string{ - v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionDriftedAnnotationValue, - }) + if !drifted { + // Requeue after 5 minutes for the cache TTL + return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil } + node.Annotations = lo.Assign(node.Annotations, map[string]string{ + v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionDriftedAnnotationValue, + }) + logging.FromContext(ctx).Debugf("annotated node as drifted") // Requeue after 5 minutes for the cache TTL return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil } diff --git a/pkg/controllers/node/emptiness.go b/pkg/controllers/node/emptiness.go index fbbc363e65..eeb3656dae 100644 --- a/pkg/controllers/node/emptiness.go +++ b/pkg/controllers/node/emptiness.go @@ -46,22 +46,18 @@ func (r *Emptiness) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisi if provisioner.Spec.TTLSecondsAfterEmpty == nil { return reconcile.Result{}, nil } - // node is not ready yet, so we don't consider it to possibly be empty if n.Labels[v1alpha5.LabelNodeInitialized] != "true" { return reconcile.Result{}, nil } - empty, err := r.isEmpty(ctx, n) if err != nil { return reconcile.Result{}, err } - // node is empty, but it is in-use per the last scheduling round so we don't consider it empty if r.cluster.IsNodeNominated(n.Name) { return reconcile.Result{}, nil } - _, hasEmptinessTimestamp := n.Annotations[v1alpha5.EmptinessTimestampAnnotationKey] if !empty && hasEmptinessTimestamp { delete(n.Annotations, v1alpha5.EmptinessTimestampAnnotationKey) @@ -72,7 +68,6 @@ func (r *Emptiness) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisi }) logging.FromContext(ctx).Infof("added TTL to empty node") } - // Short requeue result so that we requeue to check for emptiness when the node nomination time ends return reconcile.Result{RequeueAfter: time.Minute}, nil } diff --git a/pkg/controllers/node/finalizer.go b/pkg/controllers/node/finalizer.go deleted file mode 100644 index 2a160553b1..0000000000 --- a/pkg/controllers/node/finalizer.go +++ /dev/null @@ -1,49 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package node - -import ( - "context" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/pkg/ptr" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/aws/karpenter-core/pkg/apis/v1alpha5" -) - -// Finalizer is a subreconciler that ensures nodes have the termination -// finalizer. This protects against instances that launch when Karpenter fails -// to create the node object. In this case, the node will come online without -// the termination finalizer. This controller will update the node accordingly. -type Finalizer struct{} - -// Reconcile reconciles the node -func (r *Finalizer) Reconcile(_ context.Context, provisioner *v1alpha5.Provisioner, node *v1.Node) (reconcile.Result, error) { - if !node.DeletionTimestamp.IsZero() { - return reconcile.Result{}, nil - } - node.OwnerReferences = []metav1.OwnerReference{{ - APIVersion: v1alpha5.SchemeGroupVersion.String(), - Kind: "Provisioner", - Name: provisioner.Name, - UID: provisioner.UID, - BlockOwnerDeletion: ptr.Bool(true), - }} - controllerutil.AddFinalizer(node, v1alpha5.TerminationFinalizer) - return reconcile.Result{}, nil -} diff --git a/pkg/controllers/node/initialization.go b/pkg/controllers/node/initialization.go deleted file mode 100644 index 37da64b16a..0000000000 --- a/pkg/controllers/node/initialization.go +++ /dev/null @@ -1,125 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package node - -import ( - "context" - "fmt" - - "github.com/samber/lo" - v1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/aws/karpenter-core/pkg/apis/v1alpha5" - - "github.com/aws/karpenter-core/pkg/cloudprovider" - "github.com/aws/karpenter-core/pkg/utils/node" - "github.com/aws/karpenter-core/pkg/utils/resources" -) - -type Initialization struct { - kubeClient client.Client - cloudProvider cloudprovider.CloudProvider -} - -// Reconcile reconciles the node -func (r *Initialization) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisioner, n *v1.Node) (reconcile.Result, error) { - // node has been previously determined to be ready, so there's nothing to do - if n.Labels[v1alpha5.LabelNodeInitialized] == "true" { - return reconcile.Result{}, nil - } - - // node is not ready per the label, we need to check if kubelet indicates that the node is ready as well as if - // startup taints are removed and extended resources have been initialized - instanceType, err := r.getInstanceType(ctx, provisioner, n.Labels[v1.LabelInstanceTypeStable]) - if err != nil { - return reconcile.Result{}, fmt.Errorf("determining instance type, %w", err) - } - if !r.isInitialized(n, provisioner, instanceType) { - return reconcile.Result{}, nil - } - - n.Labels[v1alpha5.LabelNodeInitialized] = "true" - return reconcile.Result{}, nil -} - -func (r *Initialization) getInstanceType(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypeName string) (*cloudprovider.InstanceType, error) { - instanceTypes, err := r.cloudProvider.GetInstanceTypes(ctx, provisioner) - if err != nil { - return nil, err - } - // The instance type may not be found which can occur if the instance type label was removed/edited. This shouldn't occur, - // but if it does we only lose the ability to check for extended resources. - return lo.FindOrElse(instanceTypes, nil, func(it *cloudprovider.InstanceType) bool { return it.Name == instanceTypeName }), nil -} - -// isInitialized returns true if the node has: -// a) its current status is set to Ready -// b) all the startup taints have been removed from the node -// c) all extended resources have been registered -// This method handles both nil provisioners and nodes without extended resources gracefully. -func (r *Initialization) isInitialized(n *v1.Node, provisioner *v1alpha5.Provisioner, instanceType *cloudprovider.InstanceType) bool { - // fast checks first - if node.GetCondition(n, v1.NodeReady).Status != v1.ConditionTrue { - return false - } - if _, ok := IsStartupTaintRemoved(n, provisioner); !ok { - return false - } - - if _, ok := IsExtendedResourceRegistered(n, instanceType); !ok { - return false - } - return true -} - -// IsStartupTaintRemoved returns true if there are no startup taints registered for the provisioner, or if all startup -// taints have been removed from the node -func IsStartupTaintRemoved(node *v1.Node, provisioner *v1alpha5.Provisioner) (*v1.Taint, bool) { - if provisioner != nil { - for _, startupTaint := range provisioner.Spec.StartupTaints { - for i := 0; i < len(node.Spec.Taints); i++ { - // if the node still has a startup taint applied, it's not ready - if startupTaint.MatchTaint(&node.Spec.Taints[i]) { - return &node.Spec.Taints[i], false - } - } - } - } - return nil, true -} - -// IsExtendedResourceRegistered returns true if there are no extended resources on the node, or they have all been -// registered by device plugins -func IsExtendedResourceRegistered(node *v1.Node, instanceType *cloudprovider.InstanceType) (v1.ResourceName, bool) { - if instanceType == nil { - // no way to know, so assume they're registered - return "", true - } - for resourceName, quantity := range instanceType.Capacity { - if quantity.IsZero() { - continue - } - // kubelet will zero out both the capacity and allocatable for an extended resource on startup, so if our - // annotation says the resource should be there, but it's zero'd in both then the device plugin hasn't - // registered it yet. - // We wait on allocatable since this is the value that is used in scheduling - if resources.IsZero(node.Status.Allocatable[resourceName]) { - return resourceName, false - } - } - return "", true -} diff --git a/pkg/controllers/node/suite_test.go b/pkg/controllers/node/suite_test.go index 7ed54dbbdb..8190de34ff 100644 --- a/pkg/controllers/node/suite_test.go +++ b/pkg/controllers/node/suite_test.go @@ -20,7 +20,6 @@ import ( "time" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clock "k8s.io/utils/clock/testing" @@ -120,220 +119,26 @@ var _ = Describe("Controller", func() { }) It("should annotate the node when it has drifted in the cloud provider", func() { cp.Drifted = true - node := test.Node(test.NodeOptions{ + machine, node := test.MachineAndNode(v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1alpha5.ProvisionerNameLabelKey: provisioner.Name, v1.LabelInstanceTypeStable: test.RandomName(), }, }, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Annotations).To(HaveKeyWithValue(v1alpha5.VoluntaryDisruptionAnnotationKey, v1alpha5.VoluntaryDisruptionDriftedAnnotationValue)) - }) - }) - - Context("Initialization", func() { - It("should initialize the node when ready", func() { - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - }, - }, - ReadyStatus: v1.ConditionTrue, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).To(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - It("should not initialize the node when not ready", func() { - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - }, - }, - ReadyStatus: v1.ConditionFalse, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).ToNot(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - It("should initialize the node when extended resources are registered", func() { - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - v1.LabelInstanceTypeStable: "gpu-vendor-instance-type", - }, - }, - ReadyStatus: v1.ConditionTrue, - Allocatable: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - v1.ResourceMemory: resource.MustParse("4Gi"), - v1.ResourcePods: resource.MustParse("5"), - fake.ResourceGPUVendorA: resource.MustParse("2"), - }, - Capacity: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - v1.ResourceMemory: resource.MustParse("4Gi"), - v1.ResourcePods: resource.MustParse("5"), - fake.ResourceGPUVendorA: resource.MustParse("2"), - }, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).To(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - It("should not initialize the node when extended resource isn't registered", func() { - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - v1.LabelInstanceTypeStable: "gpu-vendor-instance-type", - }, - }, - ReadyStatus: v1.ConditionTrue, - Allocatable: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - v1.ResourceMemory: resource.MustParse("4Gi"), - v1.ResourcePods: resource.MustParse("5"), - }, - Capacity: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - v1.ResourceMemory: resource.MustParse("4Gi"), - v1.ResourcePods: resource.MustParse("5"), - }, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).ToNot(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - It("should not initialize the node when capacity is filled but allocatable isn't set", func() { - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - v1.LabelInstanceTypeStable: "gpu-vendor-instance-type", - }, - }, - ReadyStatus: v1.ConditionTrue, - Allocatable: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - v1.ResourceMemory: resource.MustParse("4Gi"), - v1.ResourcePods: resource.MustParse("5"), - }, - Capacity: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - v1.ResourceMemory: resource.MustParse("4Gi"), - v1.ResourcePods: resource.MustParse("5"), - fake.ResourceGPUVendorA: resource.MustParse("2"), - }, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).ToNot(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - It("should initialize the node when startup taints are removed", func() { - provisioner.Spec.StartupTaints = []v1.Taint{ - { - Key: "example.com/startup-taint1", - Value: "true", - Effect: v1.TaintEffectNoExecute, - }, - { - Key: "example.com/startup-taint1", - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - { - Key: "example.com/startup-taint2", - Value: "true", - Effect: v1.TaintEffectNoExecute, - }, - } - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - }, - }, - ReadyStatus: v1.ConditionTrue, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).To(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - It("should not initialize the node when startup taints aren't removed", func() { - provisioner.Spec.StartupTaints = []v1.Taint{ - { - Key: "example.com/startup-taint1", - Value: "true", - Effect: v1.TaintEffectNoExecute, - }, - { - Key: "example.com/startup-taint1", - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - { - Key: "example.com/startup-taint2", - Value: "true", - Effect: v1.TaintEffectNoExecute, - }, - } - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - }, - }, - Taints: []v1.Taint{ - { - Key: "example.com/startup-taint1", - Value: "true", - Effect: v1.TaintEffectNoExecute, - }, + Status: v1alpha5.MachineStatus{ + ProviderID: test.RandomProviderID(), }, - ReadyStatus: v1.ConditionTrue, }) - ExpectApplied(ctx, env.Client, provisioner, node) + ExpectApplied(ctx, env.Client, provisioner, machine, node) + ExpectMakeMachinesReady(ctx, env.Client, machine) ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).ToNot(HaveKey(v1alpha5.LabelNodeInitialized)) + Expect(node.Annotations).To(HaveKeyWithValue(v1alpha5.VoluntaryDisruptionAnnotationKey, v1alpha5.VoluntaryDisruptionDriftedAnnotationValue)) }) }) Context("Emptiness", func() { - It("should not TTL nodes that have ready status unknown", func() { - provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30) - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, - ReadyStatus: v1.ConditionUnknown, - }) - - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Annotations).ToNot(HaveKey(v1alpha5.EmptinessTimestampAnnotationKey)) - }) - It("should not TTL nodes that have ready status false", func() { + It("should not TTL nodes that are not initialized", func() { provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30) node := test.Node(test.NodeOptions{ ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, @@ -349,7 +154,10 @@ var _ = Describe("Controller", func() { It("should label nodes as underutilized and add TTL", func() { provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30) node := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1alpha5.LabelNodeInitialized: "true", + }, }}) ExpectApplied(ctx, env.Client, provisioner, node) @@ -367,7 +175,10 @@ var _ = Describe("Controller", func() { It("should remove labels from non-empty nodes", func() { provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30) node := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1alpha5.LabelNodeInitialized: "true", + }, Annotations: map[string]string{ v1alpha5.EmptinessTimestampAnnotationKey: fakeClock.Now().Add(100 * time.Second).Format(time.RFC3339), }}, @@ -384,57 +195,6 @@ var _ = Describe("Controller", func() { Expect(node.Annotations).ToNot(HaveKey(v1alpha5.EmptinessTimestampAnnotationKey)) }) }) - Context("Finalizer", func() { - It("should add the termination finalizer if missing", func() { - n := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, - Finalizers: []string{"fake.com/finalizer"}, - }}) - ExpectApplied(ctx, env.Client, provisioner, n) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(n)) - - n = ExpectNodeExists(ctx, env.Client, n.Name) - Expect(n.Finalizers).To(ConsistOf(n.Finalizers[0], v1alpha5.TerminationFinalizer)) - }) - It("should do nothing if terminating", func() { - n := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, - Finalizers: []string{"fake.com/finalizer"}, - }}) - ExpectApplied(ctx, env.Client, provisioner, n) - Expect(env.Client.Delete(ctx, n)).To(Succeed()) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(n)) - - n = ExpectNodeExists(ctx, env.Client, n.Name) - Expect(n.Finalizers).To(Equal(n.Finalizers)) - }) - It("should do nothing if the termination finalizer already exists", func() { - n := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, - Finalizers: []string{v1alpha5.TerminationFinalizer, "fake.com/finalizer"}, - }}) - ExpectApplied(ctx, env.Client, provisioner, n) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(n)) - - n = ExpectNodeExists(ctx, env.Client, n.Name) - Expect(n.Finalizers).To(Equal(n.Finalizers)) - }) - It("should add an owner reference to the node", func() { - n := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, - }}) - ExpectApplied(ctx, env.Client, provisioner, n) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(n)) - n = ExpectNodeExists(ctx, env.Client, n.Name) - Expect(n.OwnerReferences).To(Equal([]metav1.OwnerReference{{ - APIVersion: v1alpha5.SchemeGroupVersion.String(), - Kind: "Provisioner", - Name: provisioner.Name, - UID: provisioner.UID, - BlockOwnerDeletion: ptr.Bool(true), - }})) - }) - }) Context("Filters", func() { BeforeEach(func() { innerCtx, cancel := context.WithCancel(ctx) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 772feb626d..7b940eecfb 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -19,14 +19,11 @@ import ( "fmt" "time" - "github.com/imdario/mergo" "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" "go.uber.org/multierr" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -317,6 +314,7 @@ func (p *Provisioner) Schedule(ctx context.Context) ([]*scheduler.Machine, []*sc func (p *Provisioner) Launch(ctx context.Context, m *scheduler.Machine, opts ...functional.Option[LaunchOptions]) (string, error) { ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provisioner", m.Labels[v1alpha5.ProvisionerNameLabelKey])) + // Check limits latest := &v1alpha5.Provisioner{} if err := p.kubeClient.Get(ctx, types.NamespacedName{Name: m.ProvisionerName}, latest); err != nil { @@ -328,56 +326,21 @@ func (p *Provisioner) Launch(ctx context.Context, m *scheduler.Machine, opts ... options := functional.ResolveOptions(opts...) logging.FromContext(ctx).Infof("launching %s", m) - created, err := p.cloudProvider.Create( - logging.WithLogger(ctx, logging.FromContext(ctx).Named("cloudprovider")), - m.ToMachine(latest), - ) - if err != nil { - return "", fmt.Errorf("creating cloud provider instance, %w", err) - } - k8sNode := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: created.Name, - Labels: created.Labels, - }, - Spec: v1.NodeSpec{ - ProviderID: created.Status.ProviderID, - }, - } - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("node", k8sNode.Name)) - - if err := mergo.Merge(k8sNode, m.ToNode()); err != nil { - return "", fmt.Errorf("merging cloud provider node, %w", err) - } - // ensure we clear out the status - k8sNode.Status = v1.NodeStatus{} - - // Idempotently create a node. In rare cases, nodes can come online and - // self register before the controller is able to register a node object - // with the API server. In the common case, we create the node object - // ourselves to enforce the binding decision and enable images to be pulled - // before the node is fully Ready. - if _, err := p.coreV1Client.Nodes().Create(ctx, k8sNode, metav1.CreateOptions{}); err != nil { - if errors.IsAlreadyExists(err) { - logging.FromContext(ctx).Debugf("node already registered") - } else { - return "", fmt.Errorf("creating node %s, %w", k8sNode.Name, err) - } - } - if err := p.cluster.UpdateNode(ctx, k8sNode); err != nil { - return "", fmt.Errorf("updating cluster state, %w", err) + machine := m.ToMachine(latest) + if err := p.kubeClient.Create(ctx, machine); err != nil { + return "", err } - metrics.NodesCreatedCounter.With(prometheus.Labels{ + p.cluster.NominateNodeForPod(ctx, machine.Name) + metrics.MachinesCreatedCounter.With(prometheus.Labels{ metrics.ReasonLabel: options.Reason, - metrics.ProvisionerLabel: k8sNode.Labels[v1alpha5.ProvisionerNameLabelKey], + metrics.ProvisionerLabel: machine.Labels[v1alpha5.ProvisionerNameLabelKey], }).Inc() - p.cluster.NominateNodeForPod(ctx, k8sNode.Name) - if options.RecordPodNomination { + if functional.ResolveOptions(opts...).RecordPodNomination { for _, pod := range m.Pods { - p.recorder.Publish(schedulingevents.NominatePod(pod, k8sNode)...) + p.recorder.Publish(schedulingevents.NominatePod(pod, nil, machine)...) } } - return k8sNode.Name, nil + return machine.Name, nil } func (p *Provisioner) getDaemonSetPods(ctx context.Context) ([]*v1.Pod, error) { diff --git a/pkg/controllers/provisioning/scheduling/events/events.go b/pkg/controllers/provisioning/scheduling/events/events.go index 35d3120019..90e90011a7 100644 --- a/pkg/controllers/provisioning/scheduling/events/events.go +++ b/pkg/controllers/provisioning/scheduling/events/events.go @@ -20,23 +20,39 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/util/flowcontrol" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/events" ) // PodNominationRateLimiter is a pointer so it rate-limits across events var PodNominationRateLimiter = flowcontrol.NewTokenBucketRateLimiter(5, 10) -func NominatePod(pod *v1.Pod, node *v1.Node) []events.Event { - return []events.Event{ - { +// PodNominationRateLimiterForMachine is a pointer so it rate-limits across events +var PodNominationRateLimiterForMachine = flowcontrol.NewTokenBucketRateLimiter(5, 10) + +func NominatePod(pod *v1.Pod, node *v1.Node, machine *v1alpha5.Machine) []events.Event { + var evts []events.Event + if node != nil { + evts = append(evts, events.Event{ InvolvedObject: pod, Type: v1.EventTypeNormal, Reason: "Nominated", Message: fmt.Sprintf("Pod should schedule on node: %s", node.Name), DedupeValues: []string{string(pod.UID)}, RateLimiter: PodNominationRateLimiter, - }, + }) + } + if machine != nil { + evts = append(evts, events.Event{ + InvolvedObject: pod, + Type: v1.EventTypeNormal, + Reason: "NominatedMachine", + Message: fmt.Sprintf("Pod should schedule on node associated with machine: %s", machine.Name), + DedupeValues: []string{string(pod.UID)}, + RateLimiter: PodNominationRateLimiterForMachine, + }) } + return evts } func PodFailedToSchedule(pod *v1.Pod, err error) events.Event { diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 539d5e6ede..d051b39151 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -145,7 +145,7 @@ func (s *Scheduler) recordSchedulingResults(ctx context.Context, pods []*v1.Pod, s.cluster.NominateNodeForPod(ctx, existing.Name()) } for _, pod := range existing.Pods { - s.recorder.Publish(schedulingevents.NominatePod(pod, existing.Node)...) + s.recorder.Publish(schedulingevents.NominatePod(pod, existing.Node, existing.Machine)...) } } diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 84a9b767be..4f5256a5cd 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -1800,12 +1800,15 @@ var _ = Describe("In-Flight Nodes", func() { bindings := ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, initialPod) ExpectScheduled(ctx, env.Client, initialPod) - // delete the node + // delete the node/machine + machine1 := bindings.Get(initialPod).Machine node1 := bindings.Get(initialPod).Node + machine1.Finalizers = nil node1.Finalizers = nil - ExpectApplied(ctx, env.Client, node1) - ExpectDeleted(ctx, env.Client, node1) + ExpectApplied(ctx, env.Client, machine1, node1) + ExpectDeleted(ctx, env.Client, machine1, node1) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) + ExpectReconcileSucceeded(ctx, machineStateController, client.ObjectKeyFromObject(machine1)) secondPod := test.UnschedulablePod(opts) ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, secondPod) @@ -1919,7 +1922,9 @@ var _ = Describe("In-Flight Nodes", func() { bindings := ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, initialPod) ExpectScheduled(ctx, env.Client, initialPod) + machine1 := bindings.Get(initialPod).Machine node1 := bindings.Get(initialPod).Node + machine1.StatusConditions().MarkTrue(v1alpha5.MachineInitialized) // delete the pod so that the node is empty ExpectDeleted(ctx, env.Client, initialPod) @@ -1929,7 +1934,7 @@ var _ = Describe("In-Flight Nodes", func() { Value: "tainted", Effect: v1.TaintEffectNoSchedule, }) - ExpectApplied(ctx, env.Client, node1) + ExpectApplied(ctx, env.Client, machine1, node1) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) secondPod := test.UnschedulablePod() @@ -1983,11 +1988,13 @@ var _ = Describe("In-Flight Nodes", func() { // Mark it initialized which only occurs once the startup taint was removed and re-apply only the startup taint. // We also need to add resource capacity as after initialization we assume that kubelet has recorded them. + + machine1 := bindings.Get(initialPod).Machine node1 := bindings.Get(initialPod).Node - node1.Labels[v1alpha5.LabelNodeInitialized] = "true" + machine1.StatusConditions().MarkTrue(v1alpha5.MachineInitialized) node1.Spec.Taints = []v1.Taint{startupTaint} node1.Status.Capacity = v1.ResourceList{v1.ResourcePods: resource.MustParse("10")} - ExpectApplied(ctx, env.Client, node1) + ExpectApplied(ctx, env.Client, machine1, node1) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index faa309f926..0e27aac08e 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -80,6 +80,11 @@ func NewCluster(clk clock.Clock, client client.Client, cp cloudprovider.CloudPro // of the cluster is as close to correct as it can be when we begin to perform operations // utilizing the cluster state as our source of truth func (c *Cluster) Synced(ctx context.Context) bool { + machineList := &v1alpha5.MachineList{} + if err := c.kubeClient.List(ctx, machineList); err != nil { + logging.FromContext(ctx).Errorf("checking cluster state sync, %v", err) + return false + } nodeList := &v1.NodeList{} if err := c.kubeClient.List(ctx, nodeList); err != nil { logging.FromContext(ctx).Errorf("checking cluster state sync, %v", err) @@ -90,6 +95,13 @@ func (c *Cluster) Synced(ctx context.Context) bool { c.mu.RUnlock() providerIDs := sets.New[string]() + for _, machine := range machineList.Items { + // If the machine hasn't resolved its provider id, then it hasn't resolved its status + if machine.Status.ProviderID == "" { + return false + } + providerIDs.Insert(machine.Status.ProviderID) + } for _, node := range nodeList.Items { if node.Spec.ProviderID == "" { node.Spec.ProviderID = node.Name @@ -98,7 +110,7 @@ func (c *Cluster) Synced(ctx context.Context) bool { } // The provider ids tracked in-memory should at least have all the data that is in the api-server // This doesn't ensure that the two states are exactly aligned (we could still not be tracking a node - // that exists in the cluster state but not in the apiserver) but it ensures that we have a state + // that exists on the apiserver but not in the cluster state) but it ensures that we have a state // representation for every node/machine that exists on the apiserver return stateProviderIDs.IsSuperset(providerIDs) } diff --git a/pkg/controllers/state/suite_test.go b/pkg/controllers/state/suite_test.go index 2aa44fb395..6ae08afe8c 100644 --- a/pkg/controllers/state/suite_test.go +++ b/pkg/controllers/state/suite_test.go @@ -1331,7 +1331,6 @@ var _ = Describe("Cluster State Sync", func() { Expect(cluster.Synced(ctx)).To(BeTrue()) }) It("shouldn't consider the cluster state synced if a machine hasn't resolved its provider id", func() { - Skip("enable this test when cluster state sync relies on machines") // Deploy 1000 machines and sync them all with the cluster for i := 0; i < 1000; i++ { machine := test.Machine(v1alpha5.Machine{ @@ -1349,7 +1348,6 @@ var _ = Describe("Cluster State Sync", func() { Expect(cluster.Synced(ctx)).To(BeFalse()) }) It("shouldn't consider the cluster state synced if a machine isn't tracked", func() { - Skip("enable this test when cluster state sync relies on machines") // Deploy 1000 machines and sync them all with the cluster for i := 0; i < 1000; i++ { machine := test.Machine(v1alpha5.Machine{ diff --git a/pkg/controllers/termination/controller.go b/pkg/controllers/termination/controller.go index 213064c6b1..c39021b1e4 100644 --- a/pkg/controllers/termination/controller.go +++ b/pkg/controllers/termination/controller.go @@ -19,6 +19,7 @@ import ( "fmt" "time" + "go.uber.org/multierr" "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -30,12 +31,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" "github.com/aws/karpenter-core/pkg/controllers/machine/terminator" terminatorevents "github.com/aws/karpenter-core/pkg/controllers/machine/terminator/events" "github.com/aws/karpenter-core/pkg/events" + "github.com/aws/karpenter-core/pkg/metrics" corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" machineutil "github.com/aws/karpenter-core/pkg/utils/machine" ) @@ -73,6 +76,17 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res if !controllerutil.ContainsFinalizer(node, v1alpha5.TerminationFinalizer) { return reconcile.Result{}, nil } + allRemoved, err := c.ensureMachinesRemoved(ctx, node) + if err != nil { + return reconcile.Result{}, fmt.Errorf("removing machines, %w", err) + } + if !allRemoved { + return reconcile.Result{}, nil + } + // TODO @joinnis: Remove this section after v1beta1 migration is completed + // We need to keep the full termination flow in here during the migration timeframe + // This is because there is a short time where a node with the karpenter.sh/termination finalizer + // may not have a machine owner and we should still terminate gracefully if err := c.terminator.Cordon(ctx, node); err != nil { return reconcile.Result{}, fmt.Errorf("cordoning node, %w", err) } @@ -92,15 +106,34 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res if err := c.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { return reconcile.Result{}, client.IgnoreNotFound(err) } + metrics.NodesTerminatedCounter.Inc() logging.FromContext(ctx).Infof("deleted node") } return reconcile.Result{}, nil } +func (c *Controller) ensureMachinesRemoved(ctx context.Context, n *v1.Node) (allRemoved bool, err error) { + machineList := &v1alpha5.MachineList{} + if err = c.kubeClient.List(ctx, machineList, client.MatchingFields{"status.providerID": n.Spec.ProviderID}); err != nil { + return false, err + } + if len(machineList.Items) == 0 { + return true, nil + } + for i := range machineList.Items { + err = multierr.Append(err, client.IgnoreNotFound(c.kubeClient.Delete(ctx, &machineList.Items[i]))) + } + return false, err +} + func (c *Controller) Builder(ctx context.Context, m manager.Manager) corecontroller.Builder { return corecontroller.Adapt(controllerruntime. NewControllerManagedBy(m). For(&v1.Node{}). + Watches( + &source.Kind{Type: &v1alpha5.Machine{}}, + machineutil.EventHandler(ctx, c.kubeClient), + ). WithOptions( controller.Options{ RateLimiter: workqueue.NewMaxOfRateLimiter( diff --git a/pkg/controllers/termination/suite_test.go b/pkg/controllers/termination/suite_test.go index c272d1689a..658b9285a2 100644 --- a/pkg/controllers/termination/suite_test.go +++ b/pkg/controllers/termination/suite_test.go @@ -16,15 +16,19 @@ package termination_test import ( "context" - "fmt" "sync" "testing" "time" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" - - "github.com/samber/lo" + . "knative.dev/pkg/logging/testing" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/karpenter-core/pkg/apis" @@ -36,22 +40,12 @@ import ( "github.com/aws/karpenter-core/pkg/operator/controller" "github.com/aws/karpenter-core/pkg/operator/scheme" "github.com/aws/karpenter-core/pkg/test" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" - . "knative.dev/pkg/logging/testing" - . "github.com/aws/karpenter-core/pkg/test/expectations" ) var ctx context.Context var terminationController controller.Controller -var evictionQueue *terminator.EvictionQueue var env *test.Environment -var defaultOwnerRefs = []metav1.OwnerReference{{Kind: "ReplicaSet", APIVersion: "appsv1", Name: "rs", UID: "1234567890"}} var fakeClock *clock.FakeClock func TestAPIs(t *testing.T) { @@ -62,11 +56,14 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { fakeClock = clock.NewFakeClock(time.Now()) - env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...)) - - cloudProvider := fake.NewCloudProvider() - evictionQueue = terminator.NewEvictionQueue(ctx, env.KubernetesInterface.CoreV1(), events.NewRecorder(&record.FakeRecorder{})) - terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, evictionQueue), events.NewRecorder(&record.FakeRecorder{})) + env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...), test.WithFieldIndexers(func(c cache.Cache) error { + return c.IndexField(ctx, &v1alpha5.Machine{}, "status.providerID", func(obj client.Object) []string { + return []string{obj.(*v1alpha5.Machine).Status.ProviderID} + }) + })) + evictionQueue := terminator.NewEvictionQueue(ctx, env.KubernetesInterface.CoreV1(), events.NewRecorder(&record.FakeRecorder{})) + terminator := terminator.NewTerminator(fakeClock, env.Client, evictionQueue) + terminationController = termination.NewController(env.Client, fake.NewCloudProvider(), terminator, events.NewRecorder(&record.FakeRecorder{})) }) var _ = AfterSuite(func() { @@ -74,26 +71,33 @@ var _ = AfterSuite(func() { }) var _ = Describe("Termination", func() { + var machine *v1alpha5.Machine var node *v1.Node BeforeEach(func() { - node = test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1alpha5.TerminationFinalizer}}}) + machine = test.Machine(v1alpha5.Machine{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1alpha5.TerminationFinalizer}}, Status: v1alpha5.MachineStatus{ProviderID: test.RandomProviderID()}}) + node = test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1alpha5.TerminationFinalizer}}, ProviderID: machine.Status.ProviderID}) }) - AfterEach(func() { ExpectCleanedUp(ctx, env.Client) fakeClock.SetTime(time.Now()) }) - Context("Reconciliation", func() { - It("should delete nodes", func() { + It("should not delete node if machine still exists", func() { + ExpectApplied(ctx, env.Client, machine, node) + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectExists(ctx, env.Client, node) + }) + It("should delete nodes if machine doesn't exist", func() { ExpectApplied(ctx, env.Client, node) Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) ExpectNotFound(ctx, env.Client, node) }) - It("should not race if deleting nodes in parallel", func() { + It("should not race if deleting nodes in parallel if machines don't exist", func() { var nodes []*v1.Node for i := 0; i < 10; i++ { node = test.Node(test.NodeOptions{ @@ -120,289 +124,5 @@ var _ = Describe("Termination", func() { wg.Wait() ExpectNotFound(ctx, env.Client, lo.Map(nodes, func(n *v1.Node, _ int) client.Object { return n })...) }) - It("should exclude nodes from load balancers when terminating", func() { - // This is a kludge to prevent the node from being deleted before we can - // inspect its labels - podNoEvict := test.Pod(test.PodOptions{ - NodeName: node.Name, - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{v1alpha5.DoNotEvictPodAnnotationKey: "true"}, - OwnerReferences: defaultOwnerRefs, - }, - }) - - ExpectApplied(ctx, env.Client, node, podNoEvict) - - Expect(env.Client.Delete(ctx, node)).To(Succeed()) - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels[v1.LabelNodeExcludeBalancers]).Should(Equal("karpenter")) - }) - It("should not evict pods that tolerate unschedulable taint", func() { - podEvict := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) - podSkip := test.Pod(test.PodOptions{ - NodeName: node.Name, - Tolerations: []v1.Toleration{{Key: v1.TaintNodeUnschedulable, Operator: v1.TolerationOpExists, Effect: v1.TaintEffectNoSchedule}}, - ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}, - }) - ExpectApplied(ctx, env.Client, node, podEvict, podSkip) - - // Trigger Termination Controller - Expect(env.Client.Delete(ctx, node)).To(Succeed()) - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - - // Expect node to exist and be draining - ExpectNodeDraining(env.Client, node.Name) - - // Expect podEvict to be evicting, and delete it - ExpectEvicted(env.Client, podEvict) - ExpectDeleted(ctx, env.Client, podEvict) - - // Reconcile to delete node - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectNotFound(ctx, env.Client, node) - }) - It("should delete nodes that have pods without an ownerRef", func() { - pod := test.Pod(test.PodOptions{ - NodeName: node.Name, - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: nil, - }, - }) - - ExpectApplied(ctx, env.Client, node, pod) - Expect(env.Client.Delete(ctx, node)).To(Succeed()) - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - - // Expect pod with no owner ref to be enqueued for eviction - ExpectEvicted(env.Client, pod) - - // Expect node to exist and be draining - ExpectNodeDraining(env.Client, node.Name) - - // Delete no owner refs pod to simulate successful eviction - ExpectDeleted(ctx, env.Client, pod) - - // Reconcile node to evict pod - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - - // Reconcile to delete node - ExpectNotFound(ctx, env.Client, node) - }) - It("should delete nodes with terminal pods", func() { - podEvictPhaseSucceeded := test.Pod(test.PodOptions{ - NodeName: node.Name, - Phase: v1.PodSucceeded, - }) - podEvictPhaseFailed := test.Pod(test.PodOptions{ - NodeName: node.Name, - Phase: v1.PodFailed, - }) - - ExpectApplied(ctx, env.Client, node, podEvictPhaseSucceeded, podEvictPhaseFailed) - Expect(env.Client.Delete(ctx, node)).To(Succeed()) - node = ExpectNodeExists(ctx, env.Client, node.Name) - // Trigger Termination Controller, which should ignore these pods and delete the node - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectNotFound(ctx, env.Client, node) - }) - It("should fail to evict pods that violate a PDB", func() { - minAvailable := intstr.FromInt(1) - labelSelector := map[string]string{test.RandomName(): test.RandomName()} - pdb := test.PodDisruptionBudget(test.PDBOptions{ - Labels: labelSelector, - // Don't let any pod evict - MinAvailable: &minAvailable, - }) - podNoEvict := test.Pod(test.PodOptions{ - NodeName: node.Name, - ObjectMeta: metav1.ObjectMeta{ - Labels: labelSelector, - OwnerReferences: defaultOwnerRefs, - }, - Phase: v1.PodRunning, - }) - - ExpectApplied(ctx, env.Client, node, podNoEvict, pdb) - - // Trigger Termination Controller - Expect(env.Client.Delete(ctx, node)).To(Succeed()) - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - - // Expect node to exist and be draining - ExpectNodeDraining(env.Client, node.Name) - - // Expect podNoEvict to fail eviction due to PDB, and be retried - Eventually(func() int { - return evictionQueue.NumRequeues(client.ObjectKeyFromObject(podNoEvict)) - }).Should(BeNumerically(">=", 1)) - - // Delete pod to simulate successful eviction - ExpectDeleted(ctx, env.Client, podNoEvict) - ExpectNotFound(ctx, env.Client, podNoEvict) - - // Reconcile to delete node - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectNotFound(ctx, env.Client, node) - }) - It("should evict non-critical pods first", func() { - podEvict := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) - podNodeCritical := test.Pod(test.PodOptions{NodeName: node.Name, PriorityClassName: "system-node-critical", ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) - podClusterCritical := test.Pod(test.PodOptions{NodeName: node.Name, PriorityClassName: "system-cluster-critical", ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) - - ExpectApplied(ctx, env.Client, node, podEvict, podNodeCritical, podClusterCritical) - - // Trigger Termination Controller - Expect(env.Client.Delete(ctx, node)).To(Succeed()) - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - - // Expect node to exist and be draining - ExpectNodeDraining(env.Client, node.Name) - - // Expect podEvict to be evicting, and delete it - ExpectEvicted(env.Client, podEvict) - ExpectDeleted(ctx, env.Client, podEvict) - - // Expect the critical pods to be evicted and deleted - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectEvicted(env.Client, podNodeCritical) - ExpectDeleted(ctx, env.Client, podNodeCritical) - ExpectEvicted(env.Client, podClusterCritical) - ExpectDeleted(ctx, env.Client, podClusterCritical) - - // Reconcile to delete node - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectNotFound(ctx, env.Client, node) - }) - It("should not evict static pods", func() { - ExpectApplied(ctx, env.Client, node) - podEvict := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) - ExpectApplied(ctx, env.Client, node, podEvict) - - podNoEvict := test.Pod(test.PodOptions{ - NodeName: node.Name, - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - APIVersion: "v1", - Kind: "Node", - Name: node.Name, - UID: node.UID, - }}, - }, - }) - ExpectApplied(ctx, env.Client, podNoEvict) - - Expect(env.Client.Delete(ctx, node)).To(Succeed()) - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - - // Expect mirror pod to not be queued for eviction - ExpectNotEnqueuedForEviction(evictionQueue, podNoEvict) - - // Expect podEvict to be enqueued for eviction then be successful - ExpectEvicted(env.Client, podEvict) - - // Expect node to exist and be draining - ExpectNodeDraining(env.Client, node.Name) - - // Reconcile node to evict pod - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - - // Delete pod to simulate successful eviction - ExpectDeleted(ctx, env.Client, podEvict) - - // Reconcile to delete node - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectNotFound(ctx, env.Client, node) - - }) - It("should not delete nodes until all pods are deleted", func() { - pods := []*v1.Pod{ - test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}), - test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}), - } - ExpectApplied(ctx, env.Client, node, pods[0], pods[1]) - - // Trigger Termination Controller - Expect(env.Client.Delete(ctx, node)).To(Succeed()) - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - - // Expect the pods to be evicted - ExpectEvicted(env.Client, pods[0], pods[1]) - - // Expect node to exist and be draining, but not deleted - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectNodeDraining(env.Client, node.Name) - - ExpectDeleted(ctx, env.Client, pods[1]) - - // Expect node to exist and be draining, but not deleted - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectNodeDraining(env.Client, node.Name) - - ExpectDeleted(ctx, env.Client, pods[0]) - - // Reconcile to delete node - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectNotFound(ctx, env.Client, node) - }) - It("should wait for pods to terminate", func() { - pod := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) - fakeClock.SetTime(time.Now()) // make our fake clock match the pod creation time - ExpectApplied(ctx, env.Client, node, pod) - - // Before grace period, node should not delete - Expect(env.Client.Delete(ctx, node)).To(Succeed()) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectNodeExists(ctx, env.Client, node.Name) - ExpectEvicted(env.Client, pod) - - // After grace period, node should delete. The deletion timestamps are from etcd which we can't control, so - // to eliminate test-flakiness we reset the time to current time + 90 seconds instead of just advancing - // the clock by 90 seconds. - fakeClock.SetTime(time.Now().Add(90 * time.Second)) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectNotFound(ctx, env.Client, node) - }) }) }) - -func ExpectNotEnqueuedForEviction(e *terminator.EvictionQueue, pods ...*v1.Pod) { - for _, pod := range pods { - ExpectWithOffset(1, e.Contains(client.ObjectKeyFromObject(pod))).To(BeFalse()) - } -} - -func ExpectEvicted(c client.Client, pods ...*v1.Pod) { - for _, pod := range pods { - EventuallyWithOffset(1, func() bool { - return ExpectPodExists(ctx, c, pod.Name, pod.Namespace).GetDeletionTimestamp().IsZero() - }, ReconcilerPropagationTime, RequestInterval).Should(BeFalse(), func() string { - return fmt.Sprintf("expected %s/%s to be evicting, but it isn't", pod.Namespace, pod.Name) - }) - } -} - -func ExpectNodeDraining(c client.Client, nodeName string) *v1.Node { - node := ExpectNodeExistsWithOffset(1, ctx, c, nodeName) - ExpectWithOffset(1, node.Spec.Unschedulable).To(BeTrue()) - ExpectWithOffset(1, lo.Contains(node.Finalizers, v1alpha5.TerminationFinalizer)).To(BeTrue()) - ExpectWithOffset(1, node.DeletionTimestamp.IsZero()).To(BeFalse()) - return node -} diff --git a/pkg/events/suite_test.go b/pkg/events/suite_test.go index f7d234bd17..95c8eb6d4d 100644 --- a/pkg/events/suite_test.go +++ b/pkg/events/suite_test.go @@ -77,12 +77,14 @@ var _ = BeforeEach(func() { internalRecorder = NewInternalRecorder() eventRecorder = events.NewRecorder(internalRecorder) schedulingevents.PodNominationRateLimiter = flowcontrol.NewTokenBucketRateLimiter(5, 10) + schedulingevents.PodNominationRateLimiterForMachine = flowcontrol.NewTokenBucketRateLimiter(5, 10) }) var _ = Describe("Event Creation", func() { It("should create a NominatePod event", func() { - eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())...) - Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())[0].Reason)).To(Equal(1)) + eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())...) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[0].Reason)).To(Equal(1)) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[1].Reason)).To(Equal(1)) }) It("should create a EvictPod event", func() { eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID())) @@ -133,18 +135,20 @@ var _ = Describe("Dedupe", func() { var _ = Describe("Rate Limiting", func() { It("should only create max-burst when many events are created quickly", func() { for i := 0; i < 100; i++ { - eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())...) + eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())...) } - Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())[0].Reason)).To(Equal(10)) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[0].Reason)).To(Equal(10)) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[1].Reason)).To(Equal(10)) }) It("should allow many events over time due to smoothed rate limiting", func() { for i := 0; i < 3; i++ { for j := 0; j < 5; j++ { - eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())...) + eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())...) } time.Sleep(time.Second) } - Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())[0].Reason)).To(Equal(15)) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[0].Reason)).To(Equal(15)) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[1].Reason)).To(Equal(15)) }) }) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index b5e1d32570..32e56ada26 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -25,36 +25,12 @@ const ( ) var ( - NodesCreatedCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: Namespace, - Subsystem: nodeSubsystem, - Name: "created", - Help: "Number of nodes created in total by Karpenter. Labeled by reason the node was created and the owning provisioner.", - }, - []string{ - ReasonLabel, - ProvisionerLabel, - }, - ) - NodesTerminatedCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: Namespace, - Subsystem: nodeSubsystem, - Name: "terminated", - Help: "Number of nodes terminated in total by Karpenter. Labeled by reason the node was terminated and the owning provisioner.", - }, - []string{ - ReasonLabel, - ProvisionerLabel, - }, - ) MachinesCreatedCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, Subsystem: machineSubsystem, Name: "created", - Help: "Number of machines created in total by Karpenter. Labeled by reason the machine was terminated and the owning provisioner.", + Help: "Number of machines created in total by Karpenter. Labeled by reason the machine was created.", }, []string{ ReasonLabel, @@ -66,15 +42,21 @@ var ( Namespace: Namespace, Subsystem: machineSubsystem, Name: "terminated", - Help: "Number of machines terminated in total by Karpenter. Labeled by reason the machine was terminated and the owning provisioner.", + Help: "Number of machines terminated in total by Karpenter. Labeled by reason the machine was terminated.", }, []string{ ReasonLabel, ProvisionerLabel, }, ) + NodesTerminatedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: nodeSubsystem, + Name: "terminated", + Help: "Number of nodes terminated in total by Karpenter.", + }) ) func MustRegister() { - crmetrics.Registry.MustRegister(NodesCreatedCounter, NodesTerminatedCounter) + crmetrics.Registry.MustRegister(MachinesCreatedCounter, MachinesTerminatedCounter, NodesTerminatedCounter) } diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 596669524c..40de0b3204 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/aws/karpenter-core/pkg/apis" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/events" corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" "github.com/aws/karpenter-core/pkg/operator/injection" @@ -127,6 +128,12 @@ func NewOperator() (context.Context, *Operator) { lo.Must0(manager.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "spec.nodeName", func(o client.Object) []string { return []string{o.(*v1.Pod).Spec.NodeName} }), "failed to setup pod indexer") + lo.Must0(manager.GetFieldIndexer().IndexField(ctx, &v1.Node{}, "spec.providerID", func(o client.Object) []string { + return []string{o.(*v1.Node).Spec.ProviderID} + }), "failed to setup node provider id indexer") + lo.Must0(manager.GetFieldIndexer().IndexField(ctx, &v1alpha5.Machine{}, "status.providerID", func(o client.Object) []string { + return []string{o.(*v1alpha5.Machine).Status.ProviderID} + }), "failed to setup machine provider id indexer") return ctx, &Operator{ Manager: manager, diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index 5f4351cccb..f7d0cb83f6 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -240,6 +240,7 @@ func ExpectFinalizersRemoved(ctx context.Context, c client.Client, objs ...clien ExpectWithOffset(1, client.IgnoreNotFound(c.Patch(ctx, obj, client.MergeFrom(stored)))).To(Succeed()) } } + func ExpectProvisioned(ctx context.Context, c client.Client, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, provisioner *provisioning.Provisioner, pods ...*v1.Pod) Bindings { bindings := ExpectProvisionedNoBindingWithOffset(1, ctx, c, cluster, cloudProvider, provisioner, pods...) podKeys := sets.NewString(lo.Map(pods, func(p *v1.Pod, _ int) string { return client.ObjectKeyFromObject(p).String() })...) @@ -257,7 +258,7 @@ func ExpectProvisionedNoBinding(ctx context.Context, c client.Client, cluster *s return ExpectProvisionedNoBindingWithOffset(1, ctx, c, cluster, cloudProvider, provisioner, pods...) } -func ExpectProvisionedNoBindingWithOffset(offset int, ctx context.Context, c client.Client, _ *state.Cluster, _ cloudprovider.CloudProvider, provisioner *provisioning.Provisioner, pods ...*v1.Pod) Bindings { +func ExpectProvisionedNoBindingWithOffset(offset int, ctx context.Context, c client.Client, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, provisioner *provisioning.Provisioner, pods ...*v1.Pod) Bindings { // Persist objects for _, pod := range pods { ExpectAppliedWithOffset(offset+1, ctx, c, pod) @@ -271,13 +272,24 @@ func ExpectProvisionedNoBindingWithOffset(offset int, ctx context.Context, c cli if err != nil { return bindings } - for _, pod := range m.Pods { - bindings[pod] = &Binding{Node: ExpectNodeExistsWithOffset(offset+1, ctx, c, name)} + machine := &v1alpha5.Machine{} + ExpectWithOffset(offset+1, c.Get(ctx, types.NamespacedName{Name: name}, machine)).To(Succeed()) + machine, node := ExpectMachineDeployedWithOffset(offset+1, ctx, c, cluster, cloudProvider, machine) + if machine != nil && node != nil { + for _, pod := range m.Pods { + bindings[pod] = &Binding{ + Machine: machine, + Node: node, + } + } } } for _, node := range nodes { for _, pod := range node.Pods { - bindings[pod] = &Binding{Node: node.Node} + bindings[pod] = &Binding{ + Node: node.Node, + Machine: node.Machine, + } } } return bindings diff --git a/pkg/test/nodes.go b/pkg/test/nodes.go index eba25e448f..dce019738e 100644 --- a/pkg/test/nodes.go +++ b/pkg/test/nodes.go @@ -71,7 +71,6 @@ func MachineLinkedNode(machine *v1alpha5.Machine) *v1.Node { ObjectMeta: metav1.ObjectMeta{ Labels: machine.Labels, Annotations: machine.Annotations, - Finalizers: machine.Finalizers, }, Taints: append(machine.Spec.Taints, machine.Spec.StartupTaints...), Capacity: machine.Status.Capacity,