diff --git a/charts/karpenter-core-crd/templates/karpenter.sh_machines.yaml b/charts/karpenter-core-crd/templates/karpenter.sh_machines.yaml deleted file mode 120000 index 10e93ba5ee..0000000000 --- a/charts/karpenter-core-crd/templates/karpenter.sh_machines.yaml +++ /dev/null @@ -1 +0,0 @@ -../../pkg/apis/crds/karpenter.sh_machines.yaml \ No newline at end of file diff --git a/go.mod b/go.mod index fcf053d906..f1ad61d5ba 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( k8s.io/client-go v0.25.4 k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a - sigs.k8s.io/controller-runtime v0.13.0 + sigs.k8s.io/controller-runtime v0.13.1 ) require ( diff --git a/go.sum b/go.sum index a795a3ca7a..4534ee48f4 100644 --- a/go.sum +++ b/go.sum @@ -848,8 +848,8 @@ knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a/go.mod h1:fckNBPf9bu5/p1RbnOh rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sigs.k8s.io/controller-runtime v0.13.0 h1:iqa5RNciy7ADWnIc8QxCbOX5FEKVR3uxVxKHRMc2WIQ= -sigs.k8s.io/controller-runtime v0.13.0/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI= +sigs.k8s.io/controller-runtime v0.13.1 h1:tUsRCSJVM1QQOOeViGeX3GMT3dQF1eePPw6sEE3xSlg= +sigs.k8s.io/controller-runtime v0.13.1/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE= diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index dc6198e3c5..be415b09c4 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -111,9 +111,10 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) ( break } } + name := test.RandomName() created := &v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ - Name: machine.Name, + Name: name, Labels: lo.Assign(labels, machine.Labels), Annotations: machine.Annotations, }, diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 501a24ccf5..65cee3d11b 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -25,7 +25,6 @@ import ( "github.com/aws/karpenter-core/pkg/controllers/counter" "github.com/aws/karpenter-core/pkg/controllers/deprovisioning" "github.com/aws/karpenter-core/pkg/controllers/inflightchecks" - "github.com/aws/karpenter-core/pkg/controllers/machine" "github.com/aws/karpenter-core/pkg/controllers/machine/terminator" metricspod "github.com/aws/karpenter-core/pkg/controllers/metrics/pod" metricsprovisioner "github.com/aws/karpenter-core/pkg/controllers/metrics/provisioner" @@ -65,13 +64,11 @@ func NewControllers( informer.NewNodeController(kubeClient, cluster), informer.NewPodController(kubeClient, cluster), informer.NewProvisionerController(kubeClient, cluster), - informer.NewMachineController(kubeClient, cluster), node.NewController(clock, kubeClient, cloudProvider, cluster), termination.NewController(kubeClient, cloudProvider, terminator, recorder), metricspod.NewController(kubeClient), metricsprovisioner.NewController(kubeClient), counter.NewController(kubeClient, cluster), inflightchecks.NewController(clock, kubeClient, recorder, cloudProvider), - machine.NewController(clock, kubeClient, cloudProvider, terminator, recorder), } } diff --git a/pkg/controllers/deprovisioning/consolidation.go b/pkg/controllers/deprovisioning/consolidation.go index a58e8512fb..a3a93f3eb0 100644 --- a/pkg/controllers/deprovisioning/consolidation.go +++ b/pkg/controllers/deprovisioning/consolidation.go @@ -80,16 +80,16 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []*Ca // filter out nodes that can't be terminated nodes = lo.Filter(nodes, func(cn *Candidate, _ int) bool { - if !cn.Machine.DeletionTimestamp.IsZero() { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, "in the process of deletion")...) + if !cn.Node.DeletionTimestamp.IsZero() { + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, "in the process of deletion")...) return false } if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) return false } if p, ok := hasDoNotEvictPod(cn); ok { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) return false } return true @@ -104,15 +104,15 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []*Ca // ShouldDeprovision is a predicate used to filter deprovisionable nodes func (c *consolidation) ShouldDeprovision(_ context.Context, cn *Candidate) bool { if val, ok := cn.Annotations()[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...) return val != "true" } if cn.provisioner == nil { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, "provisioner is unknown")...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, "provisioner is unknown")...) return false } if cn.provisioner.Spec.Consolidation == nil || !ptr.BoolValue(cn.provisioner.Spec.Consolidation.Enabled) { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("provisioner %s has consolidation disabled", cn.provisioner.Name))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("provisioner %s has consolidation disabled", cn.provisioner.Name))...) return false } return true @@ -201,7 +201,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... if !allPodsScheduled { // This method is used by multi-node consolidation as well, so we'll only report in the single node case if len(candidates) == 1 { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "not all pods would schedule")...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "not all pods would schedule")...) } return Command{action: actionDoNothing}, nil } @@ -217,7 +217,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... // we're not going to turn a single node into multiple candidates if len(newMachines) != 1 { if len(candidates) == 1 { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...) } return Command{action: actionDoNothing}, nil } @@ -231,7 +231,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... newMachines[0].InstanceTypeOptions = filterByPrice(newMachines[0].InstanceTypeOptions, newMachines[0].Requirements, nodesPrice) if len(newMachines[0].InstanceTypeOptions) == 0 { if len(candidates) == 1 { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace with a cheaper node")...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace with a cheaper node")...) } // no instance types remain after filtering by price return Command{action: actionDoNothing}, nil @@ -250,7 +250,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... if allExistingAreSpot && newMachines[0].Requirements.Get(v1alpha5.LabelCapacityType).Has(v1alpha5.CapacityTypeSpot) { if len(candidates) == 1 { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace a spot node with a spot node")...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace a spot node with a spot node")...) } return Command{action: actionDoNothing}, nil } diff --git a/pkg/controllers/deprovisioning/controller.go b/pkg/controllers/deprovisioning/controller.go index 12dad0d3b9..74f0cb0bf1 100644 --- a/pkg/controllers/deprovisioning/controller.go +++ b/pkg/controllers/deprovisioning/controller.go @@ -114,7 +114,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc for _, d := range c.deprovisioners { candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, d.ShouldDeprovision) if err != nil { - return reconcile.Result{}, fmt.Errorf("determining candidate nodes, %w", err) + return reconcile.Result{}, fmt.Errorf("determining candidates, %w", err) } // If there are no candidate nodes, move to the next deprovisioner if len(candidates) == 0 { @@ -135,7 +135,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc // Attempt to deprovision if err := c.executeCommand(ctx, d, cmd); err != nil { - return reconcile.Result{}, fmt.Errorf("deprovisioning nodes, %w", err) + return reconcile.Result{}, fmt.Errorf("deprovisioning candidates, %w", err) } return reconcile.Result{Requeue: true}, nil } @@ -162,15 +162,15 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman } for _, candidate := range command.candidates { - c.recorder.Publish(deprovisioningevents.Terminating(candidate.Node, candidate.Machine, command.String())...) + c.recorder.Publish(deprovisioningevents.Terminating(candidate.Node, command.String())...) - if err := c.kubeClient.Delete(ctx, candidate.Machine); err != nil { + if err := c.kubeClient.Delete(ctx, candidate.Node); err != nil { if errors.IsNotFound(err) { continue } logging.FromContext(ctx).Errorf("terminating machine, %s", err) } else { - metrics.MachinesTerminatedCounter.With(prometheus.Labels{ + metrics.NodesTerminatedCounter.With(prometheus.Labels{ metrics.ReasonLabel: reason, metrics.ProvisionerLabel: candidate.provisioner.Name, }).Inc() @@ -180,7 +180,7 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman // We wait for nodes to delete to ensure we don't start another round of deprovisioning until this node is fully // deleted. for _, oldCandidate := range command.candidates { - c.waitForDeletion(ctx, oldCandidate.Machine) + c.waitForDeletion(ctx, oldCandidate.Node) } return nil } @@ -230,8 +230,8 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name var once sync.Once pollStart := time.Now() return retry.Do(func() error { - machine := &v1alpha5.Machine{} - if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: name}, machine); err != nil { + node := &v1.Node{} + if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: name}, node); err != nil { // If the machine was deleted after a few seconds (to give the cache time to update), then we assume // that the machine was deleted due to an Insufficient Capacity error if errors.IsNotFound(err) && c.clock.Since(pollStart) > time.Second*5 { @@ -240,12 +240,12 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name return fmt.Errorf("getting machine, %w", err) } once.Do(func() { - c.recorder.Publish(deprovisioningevents.Launching(machine, action.String())) + c.recorder.Publish(deprovisioningevents.Launching(node, action.String())) }) - if !machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() { + if _, ok := node.Labels[v1alpha5.LabelNodeInitialized]; !ok { // make the user aware of why deprovisioning is paused - c.recorder.Publish(deprovisioningevents.WaitingOnReadiness(machine)) - return fmt.Errorf("machine is not initialized") + c.recorder.Publish(deprovisioningevents.WaitingOnReadiness(node)) + return fmt.Errorf("node is not initialized") } return nil }, waitRetryOptions...) @@ -254,21 +254,21 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name // waitForDeletion waits for the specified machine to be removed from the API server. This deletion can take some period // of time if there are PDBs that govern pods on the machine as we need to wait until the node drains before // it's actually deleted. -func (c *Controller) waitForDeletion(ctx context.Context, machine *v1alpha5.Machine) { +func (c *Controller) waitForDeletion(ctx context.Context, node *v1.Node) { if err := retry.Do(func() error { - m := &v1alpha5.Machine{} - nerr := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(machine), m) + m := &v1.Node{} + nerr := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(node), m) // We expect the not machine found error, at which point we know the machine is deleted. if errors.IsNotFound(nerr) { return nil } // make the user aware of why deprovisioning is paused - c.recorder.Publish(deprovisioningevents.WaitingOnDeletion(machine)) + c.recorder.Publish(deprovisioningevents.WaitingOnDeletion(node)) if nerr != nil { - return fmt.Errorf("expected machine to be not found, %w", nerr) + return fmt.Errorf("expected node to be not found, %w", nerr) } // the machine still exists - return fmt.Errorf("expected machine to be not found") + return fmt.Errorf("expected node to be not found") }, waitRetryOptions..., ); err != nil { logging.FromContext(ctx).Errorf("Waiting on machine deletion, %s", err) diff --git a/pkg/controllers/deprovisioning/drift.go b/pkg/controllers/deprovisioning/drift.go index 851d73b54c..8754a9df55 100644 --- a/pkg/controllers/deprovisioning/drift.go +++ b/pkg/controllers/deprovisioning/drift.go @@ -66,15 +66,15 @@ func (d *Drift) ComputeCommand(ctx context.Context, candidates ...*Candidate) (C } // filter out machines that can't be terminated candidates = lo.Filter(candidates, func(cn *Candidate, _ int) bool { - if !cn.Machine.DeletionTimestamp.IsZero() { + if !cn.Node.DeletionTimestamp.IsZero() { return false } if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok { - d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) + d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) return false } if p, ok := hasDoNotEvictPod(cn); ok { - d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, + d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) return false } @@ -93,7 +93,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, candidates ...*Candidate) (C } // Log when all pods can't schedule, as the command will get executed immediately. if !allPodsScheduled { - logging.FromContext(ctx).With("machine", candidate.Machine.Name, "node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods") + logging.FromContext(ctx).With("node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods") } if len(newMachines) == 0 { return Command{ diff --git a/pkg/controllers/deprovisioning/drift_test.go b/pkg/controllers/deprovisioning/drift_test.go index 9d8ae9874d..fa201c4349 100644 --- a/pkg/controllers/deprovisioning/drift_test.go +++ b/pkg/controllers/deprovisioning/drift_test.go @@ -76,8 +76,8 @@ var _ = Describe("Drift", func() { wg.Wait() // Expect to not create or delete more machines - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, machine) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, node) }) It("should ignore nodes with the drift label, but not the drifted value", func() { node.Annotations = lo.Assign(node.Annotations, map[string]string{ @@ -96,8 +96,8 @@ var _ = Describe("Drift", func() { wg.Wait() // Expect to not create or delete more machines - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, machine) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, node) }) It("should ignore nodes without the drift label", func() { ExpectApplied(ctx, env.Client, machine, node, prov) @@ -110,8 +110,8 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) // Expect to not create or delete more machines - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, machine) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, node) }) It("can delete drifted nodes", func() { node.Annotations = lo.Assign(node.Annotations, map[string]string{ @@ -129,13 +129,9 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine) - // We should delete the machine that has drifted - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, machine, node) + ExpectNotFound(ctx, env.Client, node) }) It("can replace drifted nodes", func() { labels := map[string]string{ @@ -175,21 +171,15 @@ var _ = Describe("Drift", func() { // deprovisioning won't delete the old machine until the new machine is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine) - - ExpectNotFound(ctx, env.Client, machine, node) + ExpectNotFound(ctx, env.Client, node) // Expect that the new machine was created and its different than the original - machines := ExpectMachines(ctx, env.Client) nodes := ExpectNodes(ctx, env.Client) - Expect(machines).To(HaveLen(1)) Expect(nodes).To(HaveLen(1)) - Expect(machines[0].Name).ToNot(Equal(machine.Name)) Expect(nodes[0].Name).ToNot(Equal(node.Name)) }) It("can replace drifted nodes with multiple nodes", func() { @@ -279,16 +269,12 @@ var _ = Describe("Drift", func() { // deprovisioning won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 3) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 3) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine) - // expect that drift provisioned three nodes, one for each pod - ExpectNotFound(ctx, env.Client, machine, node) - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3)) + ExpectNotFound(ctx, env.Client, node) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) }) It("should delete one drifted node at a time", func() { @@ -338,11 +324,7 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2) - // Expect one of the nodes to be deleted - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) }) }) diff --git a/pkg/controllers/deprovisioning/emptiness.go b/pkg/controllers/deprovisioning/emptiness.go index adf8351c2c..1b019706c6 100644 --- a/pkg/controllers/deprovisioning/emptiness.go +++ b/pkg/controllers/deprovisioning/emptiness.go @@ -64,7 +64,7 @@ func (e *Emptiness) ShouldDeprovision(ctx context.Context, c *Candidate) bool { // ComputeCommand generates a deprovisioning command given deprovisionable machines func (e *Emptiness) ComputeCommand(_ context.Context, candidates ...*Candidate) (Command, error) { emptyCandidates := lo.Filter(candidates, func(cn *Candidate, _ int) bool { - return cn.Machine.DeletionTimestamp.IsZero() && len(cn.pods) == 0 + return cn.Node.DeletionTimestamp.IsZero() && len(cn.pods) == 0 }) if len(emptyCandidates) == 0 { diff --git a/pkg/controllers/deprovisioning/events/events.go b/pkg/controllers/deprovisioning/events/events.go index 53cd07a6c4..337cd5695c 100644 --- a/pkg/controllers/deprovisioning/events/events.go +++ b/pkg/controllers/deprovisioning/events/events.go @@ -20,11 +20,10 @@ import ( v1 "k8s.io/api/core/v1" - "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/events" ) -func Blocked(node *v1.Node, machine *v1alpha5.Machine, reason string) []events.Event { +func Blocked(node *v1.Node, reason string) []events.Event { return []events.Event{ { InvolvedObject: node, @@ -33,17 +32,10 @@ func Blocked(node *v1.Node, machine *v1alpha5.Machine, reason string) []events.E Message: fmt.Sprintf("Cannot deprovision node due to %s", reason), DedupeValues: []string{node.Name, reason}, }, - { - InvolvedObject: machine, - Type: v1.EventTypeNormal, - Reason: "DeprovisioningBlocked", - Message: fmt.Sprintf("Cannot deprovision machine due to %s", reason), - DedupeValues: []string{machine.Name, reason}, - }, } } -func Terminating(node *v1.Node, machine *v1alpha5.Machine, reason string) []events.Event { +func Terminating(node *v1.Node, reason string) []events.Event { return []events.Event{ { InvolvedObject: node, @@ -52,47 +44,40 @@ func Terminating(node *v1.Node, machine *v1alpha5.Machine, reason string) []even Message: fmt.Sprintf("Deprovisioning node via %s", reason), DedupeValues: []string{node.Name, reason}, }, - { - InvolvedObject: machine, - Type: v1.EventTypeNormal, - Reason: "DeprovisioningTerminating", - Message: fmt.Sprintf("Deprovisioning machine via %s", reason), - DedupeValues: []string{machine.Name, reason}, - }, } } -func Launching(machine *v1alpha5.Machine, reason string) events.Event { +func Launching(node *v1.Node, reason string) events.Event { return events.Event{ - InvolvedObject: machine, + InvolvedObject: node, Type: v1.EventTypeNormal, Reason: "DeprovisioningLaunching", - Message: fmt.Sprintf("Launching machine for %s", reason), - DedupeValues: []string{machine.Name, reason}, + Message: fmt.Sprintf("Launching node for %s", reason), + DedupeValues: []string{node.Name, reason}, } } -func WaitingOnReadiness(machine *v1alpha5.Machine) events.Event { +func WaitingOnReadiness(node *v1.Node) events.Event { return events.Event{ - InvolvedObject: machine, + InvolvedObject: node, Type: v1.EventTypeNormal, Reason: "DeprovisioningWaitingReadiness", Message: "Waiting on readiness to continue deprovisioning", - DedupeValues: []string{machine.Name}, + DedupeValues: []string{node.Name}, } } -func WaitingOnDeletion(machine *v1alpha5.Machine) events.Event { +func WaitingOnDeletion(node *v1.Node) events.Event { return events.Event{ - InvolvedObject: machine, + InvolvedObject: node, Type: v1.EventTypeNormal, Reason: "DeprovisioningWaitingDeletion", Message: "Waiting on deletion to continue deprovisioning", - DedupeValues: []string{machine.Name}, + DedupeValues: []string{node.Name}, } } -func Unconsolidatable(node *v1.Node, machine *v1alpha5.Machine, reason string) []events.Event { +func Unconsolidatable(node *v1.Node, reason string) []events.Event { return []events.Event{ { InvolvedObject: node, @@ -102,13 +87,5 @@ func Unconsolidatable(node *v1.Node, machine *v1alpha5.Machine, reason string) [ DedupeValues: []string{node.Name}, DedupeTimeout: time.Minute * 15, }, - { - InvolvedObject: machine, - Type: v1.EventTypeNormal, - Reason: "Unconsolidatable", - Message: reason, - DedupeValues: []string{machine.Name}, - DedupeTimeout: time.Minute * 15, - }, } } diff --git a/pkg/controllers/deprovisioning/expiration.go b/pkg/controllers/deprovisioning/expiration.go index 79e275098d..9a445a6432 100644 --- a/pkg/controllers/deprovisioning/expiration.go +++ b/pkg/controllers/deprovisioning/expiration.go @@ -79,15 +79,15 @@ func (e *Expiration) ComputeCommand(ctx context.Context, candidates ...*Candidat } // filter out nodes that can't be terminated candidates = lo.Filter(candidates, func(cn *Candidate, _ int) bool { - if !cn.Machine.DeletionTimestamp.IsZero() { + if !cn.Node.DeletionTimestamp.IsZero() { return false } if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok { - e.recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) + e.recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) return false } if p, ok := hasDoNotEvictPod(cn); ok { - e.recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, + e.recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) return false } diff --git a/pkg/controllers/deprovisioning/expiration_test.go b/pkg/controllers/deprovisioning/expiration_test.go index ca10ac7dfe..87c2a500c9 100644 --- a/pkg/controllers/deprovisioning/expiration_test.go +++ b/pkg/controllers/deprovisioning/expiration_test.go @@ -70,9 +70,8 @@ var _ = Describe("Expiration", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) // Expect to not create or delete more machines - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, machine) + ExpectExists(ctx, env.Client, node) }) It("can delete expired nodes", func() { prov.Spec.TTLSecondsUntilExpired = ptr.Int64(60) @@ -89,13 +88,9 @@ var _ = Describe("Expiration", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine) - // Expect that the expired machine is gone - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, machine, node) + ExpectNotFound(ctx, env.Client, node) }) It("should expire one node at a time, starting with most expired", func() { expireProv := test.Provisioner(test.ProvisionerOptions{ @@ -144,13 +139,9 @@ var _ = Describe("Expiration", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machineToExpire) - // Expect that one of the expired machines is gone - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, machineToExpire, nodeToExpire) + ExpectNotFound(ctx, env.Client, nodeToExpire) }) It("can replace node for expiration", func() { labels := map[string]string{ @@ -188,23 +179,18 @@ var _ = Describe("Expiration", func() { // deprovisioning won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine) - // Expect that the new machine was created, and it's different than the original - ExpectNotFound(ctx, env.Client, machine, node) - machines := ExpectMachines(ctx, env.Client) + ExpectNotFound(ctx, env.Client, node) nodes := ExpectNodes(ctx, env.Client) - Expect(machines).To(HaveLen(1)) Expect(nodes).To(HaveLen(1)) - Expect(machines[0].Name).ToNot(Equal(machine.Name)) Expect(nodes[0].Name).ToNot(Equal(node.Name)) }) - It("should uncordon nodes when expiration replacement partially fails", func() { + It("should uncordon nodes when expiration replacement fails", func() { + cloudProvider.AllowedCreateCalls = 0 // fail the replacement and expect it to uncordon prov.Spec.TTLSecondsUntilExpired = ptr.Int64(30) labels := map[string]string{ @@ -240,7 +226,6 @@ var _ = Describe("Expiration", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectNewMachinesDeleted(ctx, env.Client, &wg, 1) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).To(HaveOccurred()) wg.Wait() @@ -333,15 +318,11 @@ var _ = Describe("Expiration", func() { // deprovisioning won't delete the old machine until the new machine is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 3) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 3) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine) - - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) - ExpectNotFound(ctx, env.Client, machine, node) + ExpectNotFound(ctx, env.Client, node) }) }) diff --git a/pkg/controllers/deprovisioning/metrics.go b/pkg/controllers/deprovisioning/metrics.go index 99e00ba0c1..554b3c26bf 100644 --- a/pkg/controllers/deprovisioning/metrics.go +++ b/pkg/controllers/deprovisioning/metrics.go @@ -44,8 +44,8 @@ var deprovisioningReplacementNodeInitializedHistogram = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: metrics.Namespace, Subsystem: deprovisioningSubsystem, - Name: "replacement_machine_initialized_seconds", - Help: "Amount of time required for a replacement machine to become initialized.", + Name: "replacement_node_initialized_seconds", + Help: "Amount of time required for a replacement node to become initialized.", Buckets: metrics.DurationBuckets(), }) diff --git a/pkg/controllers/deprovisioning/suite_test.go b/pkg/controllers/deprovisioning/suite_test.go index 6fd8574b97..2ab65cd23d 100644 --- a/pkg/controllers/deprovisioning/suite_test.go +++ b/pkg/controllers/deprovisioning/suite_test.go @@ -260,26 +260,21 @@ var _ = Describe("Replace Nodes", func() { // consolidation won't delete the old machine until the new machine is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine) - // should create a new machine as there is a cheaper one that can hold the pod - machines := ExpectMachines(ctx, env.Client) nodes := ExpectNodes(ctx, env.Client) - Expect(machines).To(HaveLen(1)) Expect(nodes).To(HaveLen(1)) // Expect that the new machine does not request the most expensive instance type - Expect(machines[0].Name).ToNot(Equal(machine.Name)) - Expect(scheduling.NewNodeSelectorRequirements(machines[0].Spec.Requirements...).Has(v1.LabelInstanceTypeStable)).To(BeTrue()) - Expect(scheduling.NewNodeSelectorRequirements(machines[0].Spec.Requirements...).Get(v1.LabelInstanceTypeStable).Has(mostExpensiveInstance.Name)).To(BeFalse()) + Expect(nodes[0].Name).ToNot(Equal(node.Name)) + Expect(scheduling.NewLabelRequirements(nodes[0].Labels).Has(v1.LabelInstanceTypeStable)).To(BeTrue()) + Expect(scheduling.NewLabelRequirements(nodes[0].Labels).Get(v1.LabelInstanceTypeStable).Has(mostExpensiveInstance.Name)).To(BeFalse()) // and delete the old one - ExpectNotFound(ctx, env.Client, machine, node) + ExpectNotFound(ctx, env.Client, node) }) It("can replace nodes, considers PDB", func() { labels := map[string]string{ @@ -355,7 +350,7 @@ var _ = Describe("Replace Nodes", func() { // we didn't create a new machine or delete the old one Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, machine) + ExpectExists(ctx, env.Client, node) }) It("can replace nodes, PDB namespace must match", func() { labels := map[string]string{ @@ -424,17 +419,13 @@ var _ = Describe("Replace Nodes", func() { // consolidation won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine) - // should create a new machine as there is a cheaper one that can hold the pod - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, machine, node) + ExpectNotFound(ctx, env.Client, node) }) It("can replace nodes, considers do-not-consolidate annotation", func() { labels := map[string]string{ @@ -518,13 +509,9 @@ var _ = Describe("Replace Nodes", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, regularMachine) - // we should delete the non-annotated node - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, regularMachine, regularNode) + ExpectNotFound(ctx, env.Client, regularNode) }) It("won't replace node if any spot replacement is more expensive", func() { currentInstance := fake.NewInstanceType(fake.InstanceTypeOptions{ @@ -622,9 +609,8 @@ var _ = Describe("Replace Nodes", func() { Expect(cluster.Consolidated()).To(BeTrue()) // Expect to not create or delete more machines - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, machine) + ExpectExists(ctx, env.Client, node) }) It("won't replace on-demand node if on-demand replacement is more expensive", func() { currentInstance := fake.NewInstanceType(fake.InstanceTypeOptions{ @@ -737,9 +723,8 @@ var _ = Describe("Replace Nodes", func() { Expect(cluster.Consolidated()).To(BeTrue()) // Expect to not create or delete more machines - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, machine) + ExpectExists(ctx, env.Client, node) }) It("waits for node deletion to finish", func() { labels := map[string]string{ @@ -768,7 +753,6 @@ var _ = Describe("Replace Nodes", func() { }) machine, node := test.MachineAndNode(v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ - Finalizers: []string{"unit-test.com/block-deletion"}, Labels: map[string]string{ v1alpha5.ProvisionerNameLabelKey: prov.Name, v1.LabelInstanceTypeStable: mostExpensiveInstance.Name, @@ -781,6 +765,7 @@ var _ = Describe("Replace Nodes", func() { Allocatable: map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("32")}, }, }) + node.Finalizers = []string{"unit-test.com/block-deletion"} ExpectApplied(ctx, env.Client, rs, pod, machine, node, prov) @@ -795,7 +780,7 @@ var _ = Describe("Replace Nodes", func() { // consolidation won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) var consolidationFinished atomic.Bool go func() { @@ -806,31 +791,25 @@ var _ = Describe("Replace Nodes", func() { }() wg.Wait() - // machine should still exist - ExpectExists(ctx, env.Client, machine) + // node should still exist + ExpectExists(ctx, env.Client, node) // and consolidation should still be running waiting on the machine's deletion Expect(consolidationFinished.Load()).To(BeFalse()) - // fetch the latest machine object and remove the finalizer - machine = ExpectExists(ctx, env.Client, machine) - ExpectFinalizersRemoved(ctx, env.Client, machine) + // fetch the latest node object and remove the finalizer + node = ExpectExists(ctx, env.Client, node) + ExpectFinalizersRemoved(ctx, env.Client, node) // consolidation should complete now that the finalizer on the machine is gone and it can // was actually deleted Eventually(consolidationFinished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine) - - ExpectNotFound(ctx, env.Client, machine, node) + ExpectNotFound(ctx, env.Client, node) // Expect that the new machine was created and its different than the original - machines := ExpectMachines(ctx, env.Client) nodes := ExpectNodes(ctx, env.Client) - Expect(machines).To(HaveLen(1)) Expect(nodes).To(HaveLen(1)) - Expect(machines[0].Name).ToNot(Equal(machine.Name)) Expect(nodes[0].Name).ToNot(Equal(node.Name)) }) }) @@ -915,14 +894,10 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine2) - // we don't need a new node, but we should evict everything off one of node2 which only has a single pod - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // and delete the old one - ExpectNotFound(ctx, env.Client, machine2, node2) + ExpectNotFound(ctx, env.Client, node2) }) It("can delete nodes, considers PDB", func() { var nl v1.NodeList @@ -980,15 +955,11 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) - // we don't need a new node - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // but we expect to delete the machine with more pods (node1) as the pod on machine2 has a PDB preventing // eviction - ExpectNotFound(ctx, env.Client, machine1, node1) + ExpectNotFound(ctx, env.Client, node1) }) It("can delete nodes, considers do-not-evict", func() { // create our RS, so we can link a pod to it @@ -1031,14 +1002,10 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) - // we don't need a new node - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // but we expect to delete the machine with more pods (machine1) as the pod on machine2 has a do-not-evict annotation - ExpectNotFound(ctx, env.Client, machine1, node1) + ExpectNotFound(ctx, env.Client, node1) }) It("can delete nodes, evicts pods without an ownerRef", func() { // create our RS so we can link a pod to it @@ -1079,15 +1046,11 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine2) - // we don't need a new node - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // but we expect to delete the machine with the fewest pods (machine 2) even though the pod has no ownerRefs // and will not be recreated - ExpectNotFound(ctx, env.Client, machine2, node2) + ExpectNotFound(ctx, env.Client, node2) }) }) @@ -1180,14 +1143,10 @@ var _ = Describe("Node Lifetime Consideration", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) - // the second node has more pods, so it would normally not be picked for consolidation, except it very little // lifetime remaining, so it should be deleted - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, machine1, node1) + ExpectNotFound(ctx, env.Client, node1) }) }) @@ -1195,7 +1154,7 @@ var _ = Describe("Topology Consideration", func() { var prov *v1alpha5.Provisioner var zone1Machine, zone2Machine, zone3Machine *v1alpha5.Machine var zone1Node, zone2Node, zone3Node *v1.Node - var oldMachineNames sets.String + var oldNodeNames sets.String BeforeEach(func() { testZone1Instance := leastExpensiveInstanceWithZone("test-zone-1") @@ -1247,7 +1206,7 @@ var _ = Describe("Topology Consideration", func() { Allocatable: map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("1")}, }, }) - oldMachineNames = sets.NewString(zone1Machine.Name, zone2Machine.Name, zone3Machine.Name) + oldNodeNames = sets.NewString(zone1Node.Name, zone2Node.Name, zone3Node.Name) }) It("can replace node maintaining zonal topology spread", func() { labels := map[string]string{ @@ -1296,25 +1255,17 @@ var _ = Describe("Topology Consideration", func() { // consolidation won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, zone2Machine) - // should create a new node as there is a cheaper one that can hold the pod - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) - ExpectNotFound(ctx, env.Client, zone2Machine, zone2Node) + ExpectNotFound(ctx, env.Client, zone2Node) - // Find the new node associated with the machine - newMachine, ok := lo.Find(ExpectMachines(ctx, env.Client), func(m *v1alpha5.Machine) bool { - return !oldMachineNames.Has(m.Name) - }) - Expect(ok).To(BeTrue()) + // Find the new node newNode, ok := lo.Find(ExpectNodes(ctx, env.Client), func(n *v1.Node) bool { - return newMachine.Status.ProviderID == n.Spec.ProviderID + return !oldNodeNames.Has(n.Name) }) Expect(ok).To(BeTrue()) @@ -1387,11 +1338,10 @@ var _ = Describe("Topology Consideration", func() { // our nodes are already the cheapest available, so we can't replace them. If we delete, it would // violate the anti-affinity rule, so we can't do anything. - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) - ExpectExists(ctx, env.Client, zone1Machine) - ExpectExists(ctx, env.Client, zone2Machine) - ExpectExists(ctx, env.Client, zone3Machine) + ExpectExists(ctx, env.Client, zone1Node) + ExpectExists(ctx, env.Client, zone2Node) + ExpectExists(ctx, env.Client, zone3Node) }) }) @@ -1450,13 +1400,9 @@ var _ = Describe("Empty Nodes", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) - // we should delete the empty node - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, machine1, node1) + ExpectNotFound(ctx, env.Client, node1) }) It("can delete multiple empty nodes with consolidation", func() { ExpectApplied(ctx, env.Client, machine1, node1, machine2, node2, prov) @@ -1469,14 +1415,10 @@ var _ = Describe("Empty Nodes", func() { ExpectTriggerVerifyAction(&wg) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2) - // we should delete the empty nodes - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, machine1) - ExpectNotFound(ctx, env.Client, machine2) + ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, node2) }) It("can delete empty nodes with TTLSecondsAfterEmpty with the emptiness timestamp", func() { prov = test.Provisioner(test.ProvisionerOptions{TTLSecondsAfterEmpty: ptr.Int64(10)}) @@ -1502,13 +1444,9 @@ var _ = Describe("Empty Nodes", func() { ExpectTriggerVerifyAction(&wg) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) - // we should delete the empty node - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, machine1, node1) + ExpectNotFound(ctx, env.Client, node1) }) It("considers pending pods when consolidating", func() { machine1, node1 = test.MachineAndNode(v1alpha5.Machine{ @@ -1561,9 +1499,8 @@ var _ = Describe("Empty Nodes", func() { // we don't need any new nodes and consolidation should notice the huge pending pod that needs the large // node to schedule, which prevents the large expensive node from being replaced - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, machine1) + ExpectExists(ctx, env.Client, node1) }) }) @@ -1630,7 +1567,7 @@ var _ = Describe("Consolidation TTL", func() { // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) // and the node should not be deleted yet - ExpectExists(ctx, env.Client, machine1) + ExpectExists(ctx, env.Client, node1) // advance the clock so that the timeout expires fakeClock.Step(31 * time.Second) @@ -1638,13 +1575,9 @@ var _ = Describe("Consolidation TTL", func() { Eventually(finished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) - // machine should be deleted after the TTL due to emptiness - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, machine1, node1) + ExpectNotFound(ctx, env.Client, node1) }) It("should wait for the node TTL for non-empty nodes before consolidating", func() { labels := map[string]string{ @@ -1714,8 +1647,8 @@ var _ = Describe("Consolidation TTL", func() { // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) // and the node should not be deleted yet - ExpectExists(ctx, env.Client, machine1) - ExpectExists(ctx, env.Client, machine2) + ExpectExists(ctx, env.Client, node1) + ExpectExists(ctx, env.Client, node2) // advance the clock so that the timeout expires fakeClock.Step(31 * time.Second) @@ -1723,13 +1656,9 @@ var _ = Describe("Consolidation TTL", func() { Eventually(finished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine2) - // machine should be deleted after the TTL due to emptiness - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, machine2, node2) + ExpectNotFound(ctx, env.Client, node2) }) It("should not consolidate if the action becomes invalid during the node TTL wait", func() { pod := test.Pod() @@ -1753,7 +1682,7 @@ var _ = Describe("Consolidation TTL", func() { // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) // and the node should not be deleted yet - ExpectExists(ctx, env.Client, machine1) + ExpectExists(ctx, env.Client, node1) // make the node non-empty by binding it ExpectManualBinding(ctx, env.Client, pod, node1) @@ -1766,9 +1695,8 @@ var _ = Describe("Consolidation TTL", func() { wg.Wait() // nothing should be removed since the node is no longer empty - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, machine1) + ExpectExists(ctx, env.Client, node1) }) }) @@ -1836,7 +1764,7 @@ var _ = Describe("Parallelization", func() { // Run the processing loop in parallel in the background with environment context var wg sync.WaitGroup - ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) ExpectTriggerVerifyAction(&wg) go func() { defer GinkgoRecover() @@ -1844,12 +1772,11 @@ var _ = Describe("Parallelization", func() { }() wg.Wait() - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(2)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2)) // Add a new pending pod that should schedule while node is not yet deleted pod = test.UnschedulablePod() ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, provisioner, pod) - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(2)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2)) ExpectScheduled(ctx, env.Client, pod) }) @@ -1893,8 +1820,6 @@ var _ = Describe("Parallelization", func() { ExpectApplied(ctx, env.Client, rs, prov) ExpectProvisionedNoBinding(ctx, env.Client, cluster, cloudProvider, provisioner, lo.Map(pods, func(p *v1.Pod, _ int) *v1.Pod { return p.DeepCopy() })...) - machines := ExpectMachines(ctx, env.Client) - Expect(machines).To(HaveLen(1)) nodes := ExpectNodes(ctx, env.Client) Expect(nodes).To(HaveLen(1)) @@ -2028,17 +1953,13 @@ var _ = Describe("Multi-Node Consolidation", func() { var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2, machine3) - // three machines should be replaced with a single machine - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, machine1, node1, machine2, node2, machine3, node3) + ExpectNotFound(ctx, env.Client, node1, node2, node3) }) It("won't merge 2 nodes into 1 of the same type", func() { labels := map[string]string{ @@ -2099,20 +2020,15 @@ var _ = Describe("Multi-Node Consolidation", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) - // We have [cheap-node, cheap-node] which multi-node consolidation could consolidate via // [delete cheap-node, delete cheap-node, launch cheap-node]. This isn't the best method though // as we should instead just delete one of the nodes instead of deleting both and launching a single // identical replacement. This test verifies the filterOutSameType function from multi-node consolidation // works to ensure we perform the least-disruptive action. - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // should have just deleted the node with the fewest pods - ExpectNotFound(ctx, env.Client, machine1, node1) + ExpectNotFound(ctx, env.Client, node1) // and left the other node alone - ExpectExists(ctx, env.Client, machine2) ExpectExists(ctx, env.Client, node2) }) It("should wait for the node TTL for non-empty nodes before consolidating (multi-node)", func() { @@ -2146,7 +2062,7 @@ var _ = Describe("Multi-Node Consolidation", func() { ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1, node2}, []*v1alpha5.Machine{machine1, machine2}) var wg sync.WaitGroup - ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) wg.Add(1) finished := atomic.Bool{} @@ -2162,8 +2078,8 @@ var _ = Describe("Multi-Node Consolidation", func() { // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) // and the node should not be deleted yet - ExpectExists(ctx, env.Client, machine1) - ExpectExists(ctx, env.Client, machine2) + ExpectExists(ctx, env.Client, node1) + ExpectExists(ctx, env.Client, node2) // advance the clock so that the timeout expires fakeClock.Step(31 * time.Second) @@ -2171,14 +2087,10 @@ var _ = Describe("Multi-Node Consolidation", func() { Eventually(finished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2) - // should launch a single smaller replacement node - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // and delete the two large ones - ExpectNotFound(ctx, env.Client, machine1, node1, machine2, node2) + ExpectNotFound(ctx, env.Client, node1, node2) }) }) @@ -2271,6 +2183,47 @@ func ExpectNewMachinesDeleted(ctx context.Context, c client.Client, wg *sync.Wai }() } +func ExpectMakeNewNodesReady(ctx context.Context, c client.Client, wg *sync.WaitGroup, numNewNodes int) { + existingNodes := ExpectNodes(ctx, c) + existingNodeNames := sets.NewString(lo.Map(existingNodes, func(n *v1.Node, _ int) string { + return n.Name + })...) + + wg.Add(1) + go func() { + nodesMadeReady := 0 + ctx, cancel := context.WithTimeout(ctx, time.Second*10) // give up after 10s + defer GinkgoRecover() + defer wg.Done() + defer cancel() + for { + select { + case <-time.After(50 * time.Millisecond): + nodeList := &v1.NodeList{} + if err := c.List(ctx, nodeList); err != nil { + continue + } + for i := range nodeList.Items { + n := &nodeList.Items[i] + if existingNodeNames.Has(n.Name) { + continue + } + ExpectMakeNodesReady(ctx, c, n) + + nodesMadeReady++ + existingNodeNames.Insert(n.Name) + // did we make all the nodes ready that we expected? + if nodesMadeReady == numNewNodes { + return + } + } + case <-ctx.Done(): + Fail(fmt.Sprintf("waiting for nodes to be ready, %s", ctx.Err())) + } + } + }() +} + func ExpectMakeNewMachinesReady(ctx context.Context, c client.Client, wg *sync.WaitGroup, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, numNewMachines int) { diff --git a/pkg/controllers/deprovisioning/types.go b/pkg/controllers/deprovisioning/types.go index f980a2438d..57ac93f175 100644 --- a/pkg/controllers/deprovisioning/types.go +++ b/pkg/controllers/deprovisioning/types.go @@ -90,9 +90,6 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, clk clock.Clock if node.Nominated() { return nil, fmt.Errorf("state node is nominated") } - if node.Node == nil || node.Machine == nil { - return nil, fmt.Errorf("state node doesn't contain both a node and a machine") - } pods, err := node.Pods(ctx, kubeClient) if err != nil { diff --git a/pkg/controllers/inflightchecks/controller.go b/pkg/controllers/inflightchecks/controller.go index d9de8bb1d5..5b4a4ff5d0 100644 --- a/pkg/controllers/inflightchecks/controller.go +++ b/pkg/controllers/inflightchecks/controller.go @@ -27,17 +27,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" inflightchecksevents "github.com/aws/karpenter-core/pkg/controllers/inflightchecks/events" "github.com/aws/karpenter-core/pkg/events" corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" - machineutil "github.com/aws/karpenter-core/pkg/utils/machine" ) -var _ corecontroller.TypedController[*v1alpha5.Machine] = (*Controller)(nil) +var _ corecontroller.TypedController[*v1.Node] = (*Controller)(nil) type Controller struct { clock clock.Clock @@ -52,7 +50,7 @@ type Issue string type Check interface { // Check performs the inflight check, this should return a list of slice discovered, or an empty // slice if no issues were found - Check(context.Context, *v1.Node, *v1alpha5.Machine) ([]Issue, error) + Check(context.Context, *v1.Node) ([]Issue, error) } // scanPeriod is how often we inspect and report issues that are found. @@ -61,15 +59,15 @@ const scanPeriod = 10 * time.Minute func NewController(clk clock.Clock, kubeClient client.Client, recorder events.Recorder, provider cloudprovider.CloudProvider) corecontroller.Controller { - return corecontroller.Typed[*v1alpha5.Machine](kubeClient, &Controller{ + return corecontroller.Typed[*v1.Node](kubeClient, &Controller{ clock: clk, kubeClient: kubeClient, recorder: recorder, lastScanned: cache.New(scanPeriod, 1*time.Minute), checks: []Check{ - NewFailedInit(clk, provider), + NewFailedInit(clk, kubeClient, provider), NewTermination(kubeClient), - NewNodeShape(provider), + NewNodeShape(kubeClient, provider), }}, ) } @@ -78,12 +76,12 @@ func (c *Controller) Name() string { return "inflightchecks" } -func (c *Controller) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) { - if machine.Status.ProviderID == "" { +func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Result, error) { + if node.Labels[v1alpha5.ProvisionerNameLabelKey] == "" { return reconcile.Result{}, nil } // If we get an event before we should check for inflight checks, we ignore and wait - if lastTime, ok := c.lastScanned.Get(client.ObjectKeyFromObject(machine).String()); ok { + if lastTime, ok := c.lastScanned.Get(client.ObjectKeyFromObject(node).String()); ok { if lastTime, ok := lastTime.(time.Time); ok { remaining := scanPeriod - c.clock.Since(lastTime) return reconcile.Result{RequeueAfter: remaining}, nil @@ -91,15 +89,11 @@ func (c *Controller) Reconcile(ctx context.Context, machine *v1alpha5.Machine) ( // the above should always succeed return reconcile.Result{RequeueAfter: scanPeriod}, nil } - c.lastScanned.SetDefault(client.ObjectKeyFromObject(machine).String(), c.clock.Now()) + c.lastScanned.SetDefault(client.ObjectKeyFromObject(node).String(), c.clock.Now()) - node, err := machineutil.NodeForMachine(ctx, c.kubeClient, machine) - if err != nil { - return reconcile.Result{}, machineutil.IgnoreDuplicateNodeError(machineutil.IgnoreNodeNotFoundError(err)) - } var allIssues []Issue for _, check := range c.checks { - issues, err := check.Check(ctx, node, machine) + issues, err := check.Check(ctx, node) if err != nil { logging.FromContext(ctx).Errorf("checking node with %T, %s", check, err) } @@ -107,19 +101,15 @@ func (c *Controller) Reconcile(ctx context.Context, machine *v1alpha5.Machine) ( } for _, issue := range allIssues { logging.FromContext(ctx).Infof("inflight check failed, %s", issue) - c.recorder.Publish(inflightchecksevents.InflightCheck(node, machine, string(issue))...) + c.recorder.Publish(inflightchecksevents.InflightCheck(node, string(issue))...) } return reconcile.Result{RequeueAfter: scanPeriod}, nil } -func (c *Controller) Builder(ctx context.Context, m manager.Manager) corecontroller.Builder { +func (c *Controller) Builder(_ context.Context, m manager.Manager) corecontroller.Builder { return corecontroller.Adapt(controllerruntime. NewControllerManagedBy(m). - For(&v1alpha5.Machine{}). - Watches( - &source.Kind{Type: &v1.Node{}}, - machineutil.NodeEventHandler(ctx, c.kubeClient), - ). + For(&v1.Node{}). WithOptions(controller.Options{MaxConcurrentReconciles: 10}), ) } diff --git a/pkg/controllers/inflightchecks/events/events.go b/pkg/controllers/inflightchecks/events/events.go index 69e8d81d92..e2d3a06b8e 100644 --- a/pkg/controllers/inflightchecks/events/events.go +++ b/pkg/controllers/inflightchecks/events/events.go @@ -17,11 +17,10 @@ package events import ( v1 "k8s.io/api/core/v1" - "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/events" ) -func InflightCheck(node *v1.Node, machine *v1alpha5.Machine, message string) []events.Event { +func InflightCheck(node *v1.Node, message string) []events.Event { return []events.Event{ { InvolvedObject: node, @@ -30,12 +29,5 @@ func InflightCheck(node *v1.Node, machine *v1alpha5.Machine, message string) []e Message: message, DedupeValues: []string{node.Name, message}, }, - { - InvolvedObject: machine, - Type: v1.EventTypeWarning, - Reason: "FailedInflightCheck", - Message: message, - DedupeValues: []string{machine.Name, message}, - }, } } diff --git a/pkg/controllers/inflightchecks/failedinit.go b/pkg/controllers/inflightchecks/failedinit.go index 08b4727bc1..f30df1cc88 100644 --- a/pkg/controllers/inflightchecks/failedinit.go +++ b/pkg/controllers/inflightchecks/failedinit.go @@ -19,12 +19,15 @@ import ( "fmt" "time" + "github.com/samber/lo" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/utils/clock" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" - "github.com/aws/karpenter-core/pkg/controllers/machine" + "github.com/aws/karpenter-core/pkg/controllers/node" ) // initFailureTime is the time after which we start reporting a node as having failed to initialize. This is set @@ -34,34 +37,51 @@ const initFailureTime = time.Hour // FailedInit detects nodes that fail to initialize within an hour and reports the reason for the initialization // failure type FailedInit struct { - clock clock.Clock - provider cloudprovider.CloudProvider + clock clock.Clock + kubeClient client.Client + provider cloudprovider.CloudProvider } -func NewFailedInit(clk clock.Clock, provider cloudprovider.CloudProvider) Check { - return &FailedInit{clock: clk, provider: provider} +func NewFailedInit(clk clock.Clock, kubeClient client.Client, provider cloudprovider.CloudProvider) Check { + return &FailedInit{clock: clk, kubeClient: kubeClient, provider: provider} } -func (f FailedInit) Check(_ context.Context, node *v1.Node, m *v1alpha5.Machine) ([]Issue, error) { - // ignore machines that are deleting - if !m.DeletionTimestamp.IsZero() { +func (f FailedInit) Check(ctx context.Context, n *v1.Node) ([]Issue, error) { + // ignore nodes that are deleting + if !n.DeletionTimestamp.IsZero() { return nil, nil } - // machine is already initialized or isn't old enough - if m.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() || - f.clock.Now().Before(m.CreationTimestamp.Time.Add(initFailureTime)) { + + nodeAge := f.clock.Since(n.CreationTimestamp.Time) + // n is already initialized or not old enough + if n.Labels[v1alpha5.LabelNodeInitialized] == "true" || nodeAge < initFailureTime { return nil, nil } + provisioner := &v1alpha5.Provisioner{} + if err := f.kubeClient.Get(ctx, types.NamespacedName{Name: n.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil { + // provisioner is missing, node should be removed soon + return nil, client.IgnoreNotFound(err) + } + instanceTypes, err := f.provider.GetInstanceTypes(ctx, provisioner) + if err != nil { + return nil, err + } + instanceType, ok := lo.Find(instanceTypes, func(it *cloudprovider.InstanceType) bool { return it.Name == n.Labels[v1.LabelInstanceTypeStable] }) + if !ok { + return []Issue{Issue(fmt.Sprintf("Instance Type %q not found", n.Labels[v1.LabelInstanceTypeStable]))}, nil + } // detect startup taints which should be removed var result []Issue - if taint, ok := machine.IsStartupTaintRemoved(node, m); !ok { + if taint, ok := node.IsStartupTaintRemoved(n, provisioner); !ok { result = append(result, Issue(fmt.Sprintf("Startup taint %q is still on the node", formatTaint(taint)))) } + // and extended resources which never registered - if resource, ok := machine.RequestedResourcesRegistered(node, m); !ok { + if resource, ok := node.IsExtendedResourceRegistered(n, instanceType); !ok { result = append(result, Issue(fmt.Sprintf("Expected resource %q didn't register on the node", resource))) } + return result, nil } diff --git a/pkg/controllers/inflightchecks/nodeshape.go b/pkg/controllers/inflightchecks/nodeshape.go index 3895c4679e..c3648c4bbf 100644 --- a/pkg/controllers/inflightchecks/nodeshape.go +++ b/pkg/controllers/inflightchecks/nodeshape.go @@ -18,7 +18,10 @@ import ( "context" "fmt" + "github.com/samber/lo" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" @@ -26,29 +29,44 @@ import ( // NodeShape detects nodes that have launched with 10% or less of any resource than was expected. type NodeShape struct { - provider cloudprovider.CloudProvider + kubeClient client.Client + provider cloudprovider.CloudProvider } -func NewNodeShape(provider cloudprovider.CloudProvider) Check { +func NewNodeShape(kubeClient client.Client, provider cloudprovider.CloudProvider) Check { return &NodeShape{ - provider: provider, + kubeClient: kubeClient, + provider: provider, } } -func (n *NodeShape) Check(ctx context.Context, node *v1.Node, machine *v1alpha5.Machine) ([]Issue, error) { - // ignore machines that are deleting - if !machine.DeletionTimestamp.IsZero() { +func (n *NodeShape) Check(ctx context.Context, node *v1.Node) ([]Issue, error) { + // ignore nodes that are deleting + if !node.DeletionTimestamp.IsZero() { return nil, nil } - // and machines that haven't initialized yet - if machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() { + // and nodes that haven't initialized yet + if node.Labels[v1alpha5.LabelNodeInitialized] != "true" { return nil, nil } + provisioner := &v1alpha5.Provisioner{} + if err := n.kubeClient.Get(ctx, types.NamespacedName{Name: node.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil { + // provisioner is missing, node should be removed soon + return nil, client.IgnoreNotFound(err) + } + instanceTypes, err := n.provider.GetInstanceTypes(ctx, provisioner) + if err != nil { + return nil, err + } + instanceType, ok := lo.Find(instanceTypes, func(it *cloudprovider.InstanceType) bool { return it.Name == node.Labels[v1.LabelInstanceTypeStable] }) + if !ok { + return []Issue{Issue(fmt.Sprintf("Instance Type %q not found", node.Labels[v1.LabelInstanceTypeStable]))}, nil + } var issues []Issue - for resourceName, expectedQuantity := range machine.Status.Capacity { + for resourceName, expectedQuantity := range instanceType.Capacity { nodeQuantity, ok := node.Status.Capacity[resourceName] if !ok && !expectedQuantity.IsZero() { - issues = append(issues, Issue(fmt.Sprintf("Expected resource \"%s\" not found", resourceName))) + issues = append(issues, Issue(fmt.Sprintf("Expected resource %s not found", resourceName))) continue } @@ -57,7 +75,6 @@ func (n *NodeShape) Check(ctx context.Context, node *v1.Node, machine *v1alpha5. issues = append(issues, Issue(fmt.Sprintf("Expected %s of resource %s, but found %s (%0.1f%% of expected)", expectedQuantity.String(), resourceName, nodeQuantity.String(), pct*100))) } - } return issues, nil } diff --git a/pkg/controllers/inflightchecks/suite_test.go b/pkg/controllers/inflightchecks/suite_test.go index ee3a5d968c..57c67caefa 100644 --- a/pkg/controllers/inflightchecks/suite_test.go +++ b/pkg/controllers/inflightchecks/suite_test.go @@ -124,10 +124,16 @@ var _ = Describe("Controller", func() { } ExpectApplied(ctx, env.Client, provisioner, machine, node) fakeClock.Step(2 * time.Hour) - ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(machine)) + ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(node)) Expect(recorder.DetectedEvent("Expected resource \"fake.com/vendor-a\" didn't register on the node")).To(BeTrue()) }) It("should detect issues with nodes that have a startup taint which isn't removed", func() { + provisioner.Spec.StartupTaints = []v1.Taint{ + { + Key: "my.startup.taint", + Effect: v1.TaintEffectNoSchedule, + }, + } machine, node := test.MachineAndNode(v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -154,7 +160,7 @@ var _ = Describe("Controller", func() { }) ExpectApplied(ctx, env.Client, provisioner, machine, node) fakeClock.Step(2 * time.Hour) - ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(machine)) + ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(node)) Expect(recorder.DetectedEvent("Startup taint \"my.startup.taint:NoSchedule\" is still on the node")).To(BeTrue()) }) }) @@ -182,12 +188,12 @@ var _ = Describe("Controller", func() { Labels: podsLabels, MaxUnavailable: &intstr.IntOrString{IntVal: 0, Type: intstr.Int}, }) - machine.Finalizers = []string{"prevent.deletion/now"} + node.Finalizers = []string{"prevent.deletion/now"} p := test.Pod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: podsLabels}}) ExpectApplied(ctx, env.Client, provisioner, machine, node, p, pdb) ExpectManualBinding(ctx, env.Client, p, node) - _ = env.Client.Delete(ctx, machine) - ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(machine)) + _ = env.Client.Delete(ctx, node) + ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(node)) Expect(recorder.DetectedEvent(fmt.Sprintf("Can't drain node, PDB %s/%s is blocking evictions", pdb.Namespace, pdb.Name))).To(BeTrue()) }) }) @@ -217,7 +223,7 @@ var _ = Describe("Controller", func() { v1.ResourcePods: resource.MustParse("10"), } ExpectApplied(ctx, env.Client, provisioner, machine, node) - ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(machine)) + ExpectReconcileSucceeded(ctx, inflightController, client.ObjectKeyFromObject(node)) Expect(recorder.DetectedEvent("Expected 128Gi of resource memory, but found 64Gi (50.0% of expected)")).To(BeTrue()) }) }) diff --git a/pkg/controllers/inflightchecks/termination.go b/pkg/controllers/inflightchecks/termination.go index 836de78496..b4116c1917 100644 --- a/pkg/controllers/inflightchecks/termination.go +++ b/pkg/controllers/inflightchecks/termination.go @@ -21,7 +21,6 @@ import ( v1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/controllers/deprovisioning" nodeutils "github.com/aws/karpenter-core/pkg/utils/node" ) @@ -37,9 +36,9 @@ func NewTermination(kubeClient client.Client) Check { } } -func (t *Termination) Check(ctx context.Context, node *v1.Node, machine *v1alpha5.Machine) ([]Issue, error) { +func (t *Termination) Check(ctx context.Context, node *v1.Node) ([]Issue, error) { // we are only looking at nodes that are hung deleting - if machine.DeletionTimestamp.IsZero() { + if node.DeletionTimestamp.IsZero() { return nil, nil } pdbs, err := deprovisioning.NewPDBLimits(ctx, t.kubeClient) diff --git a/pkg/controllers/node/controller.go b/pkg/controllers/node/controller.go index 3304ed9ec5..3a9e9ae57b 100644 --- a/pkg/controllers/node/controller.go +++ b/pkg/controllers/node/controller.go @@ -49,21 +49,22 @@ var _ corecontroller.TypedController[*v1.Node] = (*Controller)(nil) // Controller manages a set of properties on karpenter provisioned nodes, such as // taints, labels, finalizers. type Controller struct { - kubeClient client.Client - cluster *state.Cluster - - emptiness *Emptiness - drift *Drift + kubeClient client.Client + cluster *state.Cluster + initialization *Initialization + emptiness *Emptiness + finalizer *Finalizer + drift *Drift } // NewController constructs a nodeController instance func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster) corecontroller.Controller { return corecontroller.Typed[*v1.Node](kubeClient, &Controller{ - kubeClient: kubeClient, - cluster: cluster, - - emptiness: &Emptiness{kubeClient: kubeClient, clock: clk, cluster: cluster}, - drift: &Drift{kubeClient: kubeClient, cloudProvider: cloudProvider}, + kubeClient: kubeClient, + cluster: cluster, + initialization: &Initialization{kubeClient: kubeClient, cloudProvider: cloudProvider}, + emptiness: &Emptiness{kubeClient: kubeClient, clock: clk, cluster: cluster}, + drift: &Drift{kubeClient: kubeClient, cloudProvider: cloudProvider}, }) } @@ -80,22 +81,24 @@ func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Re if !node.DeletionTimestamp.IsZero() { return reconcile.Result{}, nil } + provisioner := &v1alpha5.Provisioner{} if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: node.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil { return reconcile.Result{}, client.IgnoreNotFound(err) } - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provisioner", provisioner.Name)) + + // Execute Reconcilers + var results []reconcile.Result + var errs error reconcilers := []nodeReconciler{ + c.initialization, c.emptiness, + c.finalizer, } if settings.FromContext(ctx).DriftEnabled { reconcilers = append(reconcilers, c.drift) } - - // Execute Reconcilers - var results []reconcile.Result - var errs error for _, reconciler := range reconcilers { res, err := reconciler.Reconcile(ctx, provisioner, node) errs = multierr.Append(errs, err) diff --git a/pkg/controllers/node/drift.go b/pkg/controllers/node/drift.go index dba1d62b4f..ce19347635 100644 --- a/pkg/controllers/node/drift.go +++ b/pkg/controllers/node/drift.go @@ -19,15 +19,15 @@ import ( "fmt" "time" - "github.com/samber/lo" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/samber/lo" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" + "github.com/aws/karpenter-core/pkg/utils/machine" ) type Drift struct { @@ -39,30 +39,17 @@ func (d *Drift) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisioner if _, ok := node.Annotations[v1alpha5.VoluntaryDisruptionAnnotationKey]; ok { return reconcile.Result{}, nil } - machineName, ok := node.Labels[v1alpha5.MachineNameLabelKey] - if !ok { - return reconcile.Result{}, nil - } - machine := &v1alpha5.Machine{} - if err := d.kubeClient.Get(ctx, types.NamespacedName{Name: machineName}, machine); err != nil { - return reconcile.Result{}, client.IgnoreNotFound(err) - } - if !machine.StatusConditions().GetCondition(v1alpha5.MachineCreated).IsTrue() { - return reconcile.Result{Requeue: true}, nil - } + // TODO: Add Provisioner Drift - drifted, err := d.cloudProvider.IsMachineDrifted(ctx, machine) + drifted, err := d.cloudProvider.IsMachineDrifted(ctx, machine.NewFromNode(node)) if err != nil { return reconcile.Result{}, fmt.Errorf("getting drift for node, %w", err) } - if !drifted { - // Requeue after 5 minutes for the cache TTL - return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil + if drifted { + node.Annotations = lo.Assign(node.Annotations, map[string]string{ + v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionDriftedAnnotationValue, + }) } - node.Annotations = lo.Assign(node.Annotations, map[string]string{ - v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionDriftedAnnotationValue, - }) - logging.FromContext(ctx).Debugf("annotated node as drifted") // Requeue after 5 minutes for the cache TTL return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil } diff --git a/pkg/controllers/node/emptiness.go b/pkg/controllers/node/emptiness.go index eeb3656dae..fbbc363e65 100644 --- a/pkg/controllers/node/emptiness.go +++ b/pkg/controllers/node/emptiness.go @@ -46,18 +46,22 @@ func (r *Emptiness) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisi if provisioner.Spec.TTLSecondsAfterEmpty == nil { return reconcile.Result{}, nil } + // node is not ready yet, so we don't consider it to possibly be empty if n.Labels[v1alpha5.LabelNodeInitialized] != "true" { return reconcile.Result{}, nil } + empty, err := r.isEmpty(ctx, n) if err != nil { return reconcile.Result{}, err } + // node is empty, but it is in-use per the last scheduling round so we don't consider it empty if r.cluster.IsNodeNominated(n.Name) { return reconcile.Result{}, nil } + _, hasEmptinessTimestamp := n.Annotations[v1alpha5.EmptinessTimestampAnnotationKey] if !empty && hasEmptinessTimestamp { delete(n.Annotations, v1alpha5.EmptinessTimestampAnnotationKey) @@ -68,6 +72,7 @@ func (r *Emptiness) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisi }) logging.FromContext(ctx).Infof("added TTL to empty node") } + // Short requeue result so that we requeue to check for emptiness when the node nomination time ends return reconcile.Result{RequeueAfter: time.Minute}, nil } diff --git a/pkg/controllers/node/finalizer.go b/pkg/controllers/node/finalizer.go new file mode 100644 index 0000000000..2a160553b1 --- /dev/null +++ b/pkg/controllers/node/finalizer.go @@ -0,0 +1,49 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "context" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/ptr" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" +) + +// Finalizer is a subreconciler that ensures nodes have the termination +// finalizer. This protects against instances that launch when Karpenter fails +// to create the node object. In this case, the node will come online without +// the termination finalizer. This controller will update the node accordingly. +type Finalizer struct{} + +// Reconcile reconciles the node +func (r *Finalizer) Reconcile(_ context.Context, provisioner *v1alpha5.Provisioner, node *v1.Node) (reconcile.Result, error) { + if !node.DeletionTimestamp.IsZero() { + return reconcile.Result{}, nil + } + node.OwnerReferences = []metav1.OwnerReference{{ + APIVersion: v1alpha5.SchemeGroupVersion.String(), + Kind: "Provisioner", + Name: provisioner.Name, + UID: provisioner.UID, + BlockOwnerDeletion: ptr.Bool(true), + }} + controllerutil.AddFinalizer(node, v1alpha5.TerminationFinalizer) + return reconcile.Result{}, nil +} diff --git a/pkg/controllers/node/initialization.go b/pkg/controllers/node/initialization.go new file mode 100644 index 0000000000..37da64b16a --- /dev/null +++ b/pkg/controllers/node/initialization.go @@ -0,0 +1,125 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "context" + "fmt" + + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" + + "github.com/aws/karpenter-core/pkg/cloudprovider" + "github.com/aws/karpenter-core/pkg/utils/node" + "github.com/aws/karpenter-core/pkg/utils/resources" +) + +type Initialization struct { + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider +} + +// Reconcile reconciles the node +func (r *Initialization) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisioner, n *v1.Node) (reconcile.Result, error) { + // node has been previously determined to be ready, so there's nothing to do + if n.Labels[v1alpha5.LabelNodeInitialized] == "true" { + return reconcile.Result{}, nil + } + + // node is not ready per the label, we need to check if kubelet indicates that the node is ready as well as if + // startup taints are removed and extended resources have been initialized + instanceType, err := r.getInstanceType(ctx, provisioner, n.Labels[v1.LabelInstanceTypeStable]) + if err != nil { + return reconcile.Result{}, fmt.Errorf("determining instance type, %w", err) + } + if !r.isInitialized(n, provisioner, instanceType) { + return reconcile.Result{}, nil + } + + n.Labels[v1alpha5.LabelNodeInitialized] = "true" + return reconcile.Result{}, nil +} + +func (r *Initialization) getInstanceType(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypeName string) (*cloudprovider.InstanceType, error) { + instanceTypes, err := r.cloudProvider.GetInstanceTypes(ctx, provisioner) + if err != nil { + return nil, err + } + // The instance type may not be found which can occur if the instance type label was removed/edited. This shouldn't occur, + // but if it does we only lose the ability to check for extended resources. + return lo.FindOrElse(instanceTypes, nil, func(it *cloudprovider.InstanceType) bool { return it.Name == instanceTypeName }), nil +} + +// isInitialized returns true if the node has: +// a) its current status is set to Ready +// b) all the startup taints have been removed from the node +// c) all extended resources have been registered +// This method handles both nil provisioners and nodes without extended resources gracefully. +func (r *Initialization) isInitialized(n *v1.Node, provisioner *v1alpha5.Provisioner, instanceType *cloudprovider.InstanceType) bool { + // fast checks first + if node.GetCondition(n, v1.NodeReady).Status != v1.ConditionTrue { + return false + } + if _, ok := IsStartupTaintRemoved(n, provisioner); !ok { + return false + } + + if _, ok := IsExtendedResourceRegistered(n, instanceType); !ok { + return false + } + return true +} + +// IsStartupTaintRemoved returns true if there are no startup taints registered for the provisioner, or if all startup +// taints have been removed from the node +func IsStartupTaintRemoved(node *v1.Node, provisioner *v1alpha5.Provisioner) (*v1.Taint, bool) { + if provisioner != nil { + for _, startupTaint := range provisioner.Spec.StartupTaints { + for i := 0; i < len(node.Spec.Taints); i++ { + // if the node still has a startup taint applied, it's not ready + if startupTaint.MatchTaint(&node.Spec.Taints[i]) { + return &node.Spec.Taints[i], false + } + } + } + } + return nil, true +} + +// IsExtendedResourceRegistered returns true if there are no extended resources on the node, or they have all been +// registered by device plugins +func IsExtendedResourceRegistered(node *v1.Node, instanceType *cloudprovider.InstanceType) (v1.ResourceName, bool) { + if instanceType == nil { + // no way to know, so assume they're registered + return "", true + } + for resourceName, quantity := range instanceType.Capacity { + if quantity.IsZero() { + continue + } + // kubelet will zero out both the capacity and allocatable for an extended resource on startup, so if our + // annotation says the resource should be there, but it's zero'd in both then the device plugin hasn't + // registered it yet. + // We wait on allocatable since this is the value that is used in scheduling + if resources.IsZero(node.Status.Allocatable[resourceName]) { + return resourceName, false + } + } + return "", true +} diff --git a/pkg/controllers/node/suite_test.go b/pkg/controllers/node/suite_test.go index 8190de34ff..7ed54dbbdb 100644 --- a/pkg/controllers/node/suite_test.go +++ b/pkg/controllers/node/suite_test.go @@ -20,6 +20,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clock "k8s.io/utils/clock/testing" @@ -119,26 +120,220 @@ var _ = Describe("Controller", func() { }) It("should annotate the node when it has drifted in the cloud provider", func() { cp.Drifted = true - machine, node := test.MachineAndNode(v1alpha5.Machine{ + node := test.Node(test.NodeOptions{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1alpha5.ProvisionerNameLabelKey: provisioner.Name, v1.LabelInstanceTypeStable: test.RandomName(), }, }, - Status: v1alpha5.MachineStatus{ - ProviderID: test.RandomProviderID(), - }, }) - ExpectApplied(ctx, env.Client, provisioner, machine, node) - ExpectMakeMachinesReady(ctx, env.Client, machine) + ExpectApplied(ctx, env.Client, provisioner, node) ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) node = ExpectNodeExists(ctx, env.Client, node.Name) Expect(node.Annotations).To(HaveKeyWithValue(v1alpha5.VoluntaryDisruptionAnnotationKey, v1alpha5.VoluntaryDisruptionDriftedAnnotationValue)) }) }) + + Context("Initialization", func() { + It("should initialize the node when ready", func() { + node := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + }, + }, + ReadyStatus: v1.ConditionTrue, + }) + ExpectApplied(ctx, env.Client, provisioner, node) + ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) + + node = ExpectNodeExists(ctx, env.Client, node.Name) + Expect(node.Labels).To(HaveKey(v1alpha5.LabelNodeInitialized)) + }) + It("should not initialize the node when not ready", func() { + node := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + }, + }, + ReadyStatus: v1.ConditionFalse, + }) + ExpectApplied(ctx, env.Client, provisioner, node) + ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) + + node = ExpectNodeExists(ctx, env.Client, node.Name) + Expect(node.Labels).ToNot(HaveKey(v1alpha5.LabelNodeInitialized)) + }) + It("should initialize the node when extended resources are registered", func() { + node := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: "gpu-vendor-instance-type", + }, + }, + ReadyStatus: v1.ConditionTrue, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("4Gi"), + v1.ResourcePods: resource.MustParse("5"), + fake.ResourceGPUVendorA: resource.MustParse("2"), + }, + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("4Gi"), + v1.ResourcePods: resource.MustParse("5"), + fake.ResourceGPUVendorA: resource.MustParse("2"), + }, + }) + ExpectApplied(ctx, env.Client, provisioner, node) + ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) + + node = ExpectNodeExists(ctx, env.Client, node.Name) + Expect(node.Labels).To(HaveKey(v1alpha5.LabelNodeInitialized)) + }) + It("should not initialize the node when extended resource isn't registered", func() { + node := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: "gpu-vendor-instance-type", + }, + }, + ReadyStatus: v1.ConditionTrue, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("4Gi"), + v1.ResourcePods: resource.MustParse("5"), + }, + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("4Gi"), + v1.ResourcePods: resource.MustParse("5"), + }, + }) + ExpectApplied(ctx, env.Client, provisioner, node) + ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) + + node = ExpectNodeExists(ctx, env.Client, node.Name) + Expect(node.Labels).ToNot(HaveKey(v1alpha5.LabelNodeInitialized)) + }) + It("should not initialize the node when capacity is filled but allocatable isn't set", func() { + node := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: "gpu-vendor-instance-type", + }, + }, + ReadyStatus: v1.ConditionTrue, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("4Gi"), + v1.ResourcePods: resource.MustParse("5"), + }, + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("4Gi"), + v1.ResourcePods: resource.MustParse("5"), + fake.ResourceGPUVendorA: resource.MustParse("2"), + }, + }) + ExpectApplied(ctx, env.Client, provisioner, node) + ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) + + node = ExpectNodeExists(ctx, env.Client, node.Name) + Expect(node.Labels).ToNot(HaveKey(v1alpha5.LabelNodeInitialized)) + }) + It("should initialize the node when startup taints are removed", func() { + provisioner.Spec.StartupTaints = []v1.Taint{ + { + Key: "example.com/startup-taint1", + Value: "true", + Effect: v1.TaintEffectNoExecute, + }, + { + Key: "example.com/startup-taint1", + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, + { + Key: "example.com/startup-taint2", + Value: "true", + Effect: v1.TaintEffectNoExecute, + }, + } + node := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + }, + }, + ReadyStatus: v1.ConditionTrue, + }) + ExpectApplied(ctx, env.Client, provisioner, node) + ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) + + node = ExpectNodeExists(ctx, env.Client, node.Name) + Expect(node.Labels).To(HaveKey(v1alpha5.LabelNodeInitialized)) + }) + It("should not initialize the node when startup taints aren't removed", func() { + provisioner.Spec.StartupTaints = []v1.Taint{ + { + Key: "example.com/startup-taint1", + Value: "true", + Effect: v1.TaintEffectNoExecute, + }, + { + Key: "example.com/startup-taint1", + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, + { + Key: "example.com/startup-taint2", + Value: "true", + Effect: v1.TaintEffectNoExecute, + }, + } + node := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + }, + }, + Taints: []v1.Taint{ + { + Key: "example.com/startup-taint1", + Value: "true", + Effect: v1.TaintEffectNoExecute, + }, + }, + ReadyStatus: v1.ConditionTrue, + }) + ExpectApplied(ctx, env.Client, provisioner, node) + ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) + + node = ExpectNodeExists(ctx, env.Client, node.Name) + Expect(node.Labels).ToNot(HaveKey(v1alpha5.LabelNodeInitialized)) + }) + }) Context("Emptiness", func() { - It("should not TTL nodes that are not initialized", func() { + It("should not TTL nodes that have ready status unknown", func() { + provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30) + node := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ReadyStatus: v1.ConditionUnknown, + }) + + ExpectApplied(ctx, env.Client, provisioner, node) + ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) + + node = ExpectNodeExists(ctx, env.Client, node.Name) + Expect(node.Annotations).ToNot(HaveKey(v1alpha5.EmptinessTimestampAnnotationKey)) + }) + It("should not TTL nodes that have ready status false", func() { provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30) node := test.Node(test.NodeOptions{ ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, @@ -154,10 +349,7 @@ var _ = Describe("Controller", func() { It("should label nodes as underutilized and add TTL", func() { provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30) node := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - v1alpha5.LabelNodeInitialized: "true", - }, + Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, }}) ExpectApplied(ctx, env.Client, provisioner, node) @@ -175,10 +367,7 @@ var _ = Describe("Controller", func() { It("should remove labels from non-empty nodes", func() { provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30) node := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - v1alpha5.LabelNodeInitialized: "true", - }, + Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, Annotations: map[string]string{ v1alpha5.EmptinessTimestampAnnotationKey: fakeClock.Now().Add(100 * time.Second).Format(time.RFC3339), }}, @@ -195,6 +384,57 @@ var _ = Describe("Controller", func() { Expect(node.Annotations).ToNot(HaveKey(v1alpha5.EmptinessTimestampAnnotationKey)) }) }) + Context("Finalizer", func() { + It("should add the termination finalizer if missing", func() { + n := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, + Finalizers: []string{"fake.com/finalizer"}, + }}) + ExpectApplied(ctx, env.Client, provisioner, n) + ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(n)) + + n = ExpectNodeExists(ctx, env.Client, n.Name) + Expect(n.Finalizers).To(ConsistOf(n.Finalizers[0], v1alpha5.TerminationFinalizer)) + }) + It("should do nothing if terminating", func() { + n := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, + Finalizers: []string{"fake.com/finalizer"}, + }}) + ExpectApplied(ctx, env.Client, provisioner, n) + Expect(env.Client.Delete(ctx, n)).To(Succeed()) + ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(n)) + + n = ExpectNodeExists(ctx, env.Client, n.Name) + Expect(n.Finalizers).To(Equal(n.Finalizers)) + }) + It("should do nothing if the termination finalizer already exists", func() { + n := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, + Finalizers: []string{v1alpha5.TerminationFinalizer, "fake.com/finalizer"}, + }}) + ExpectApplied(ctx, env.Client, provisioner, n) + ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(n)) + + n = ExpectNodeExists(ctx, env.Client, n.Name) + Expect(n.Finalizers).To(Equal(n.Finalizers)) + }) + It("should add an owner reference to the node", func() { + n := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, + }}) + ExpectApplied(ctx, env.Client, provisioner, n) + ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(n)) + n = ExpectNodeExists(ctx, env.Client, n.Name) + Expect(n.OwnerReferences).To(Equal([]metav1.OwnerReference{{ + APIVersion: v1alpha5.SchemeGroupVersion.String(), + Kind: "Provisioner", + Name: provisioner.Name, + UID: provisioner.UID, + BlockOwnerDeletion: ptr.Bool(true), + }})) + }) + }) Context("Filters", func() { BeforeEach(func() { innerCtx, cancel := context.WithCancel(ctx) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 34daea1ace..e8071ba91e 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -18,11 +18,14 @@ import ( "context" "fmt" + "github.com/imdario/mergo" "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" "go.uber.org/multierr" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -310,7 +313,6 @@ func (p *Provisioner) Schedule(ctx context.Context) ([]*scheduler.Machine, []*sc func (p *Provisioner) Launch(ctx context.Context, m *scheduler.Machine, opts ...functional.Option[LaunchOptions]) (string, error) { ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provisioner", m.Labels[v1alpha5.ProvisionerNameLabelKey])) - // Check limits latest := &v1alpha5.Provisioner{} if err := p.kubeClient.Get(ctx, types.NamespacedName{Name: m.ProvisionerName}, latest); err != nil { @@ -322,21 +324,56 @@ func (p *Provisioner) Launch(ctx context.Context, m *scheduler.Machine, opts ... options := functional.ResolveOptions(opts...) logging.FromContext(ctx).Infof("launching %s", m) - machine := m.ToMachine(latest) - if err := p.kubeClient.Create(ctx, machine); err != nil { - return "", err + created, err := p.cloudProvider.Create( + logging.WithLogger(ctx, logging.FromContext(ctx).Named("cloudprovider")), + m.ToMachine(latest), + ) + if err != nil { + return "", fmt.Errorf("creating cloud provider instance, %w", err) + } + k8sNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: created.Name, + Labels: created.Labels, + }, + Spec: v1.NodeSpec{ + ProviderID: created.Status.ProviderID, + }, + } + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("node", k8sNode.Name)) + + if err := mergo.Merge(k8sNode, m.ToNode()); err != nil { + return "", fmt.Errorf("merging cloud provider node, %w", err) + } + // ensure we clear out the status + k8sNode.Status = v1.NodeStatus{} + + // Idempotently create a node. In rare cases, nodes can come online and + // self register before the controller is able to register a node object + // with the API server. In the common case, we create the node object + // ourselves to enforce the binding decision and enable images to be pulled + // before the node is fully Ready. + if _, err := p.coreV1Client.Nodes().Create(ctx, k8sNode, metav1.CreateOptions{}); err != nil { + if errors.IsAlreadyExists(err) { + logging.FromContext(ctx).Debugf("node already registered") + } else { + return "", fmt.Errorf("creating node %s, %w", k8sNode.Name, err) + } + } + if err := p.cluster.UpdateNode(ctx, k8sNode); err != nil { + return "", fmt.Errorf("updating cluster state, %w", err) } - p.cluster.NominateNodeForPod(ctx, machine.Name) - metrics.MachinesCreatedCounter.With(prometheus.Labels{ + metrics.NodesCreatedCounter.With(prometheus.Labels{ metrics.ReasonLabel: options.Reason, - metrics.ProvisionerLabel: machine.Labels[v1alpha5.ProvisionerNameLabelKey], + metrics.ProvisionerLabel: k8sNode.Labels[v1alpha5.ProvisionerNameLabelKey], }).Inc() - if functional.ResolveOptions(opts...).RecordPodNomination { + p.cluster.NominateNodeForPod(ctx, k8sNode.Name) + if options.RecordPodNomination { for _, pod := range m.Pods { - p.recorder.Publish(schedulingevents.NominatePod(pod, nil, machine)...) + p.recorder.Publish(schedulingevents.NominatePod(pod, k8sNode)...) } } - return machine.Name, nil + return k8sNode.Name, nil } func (p *Provisioner) getDaemonSetPods(ctx context.Context) ([]*v1.Pod, error) { diff --git a/pkg/controllers/provisioning/scheduling/events/events.go b/pkg/controllers/provisioning/scheduling/events/events.go index 90e90011a7..35d3120019 100644 --- a/pkg/controllers/provisioning/scheduling/events/events.go +++ b/pkg/controllers/provisioning/scheduling/events/events.go @@ -20,39 +20,23 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/util/flowcontrol" - "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/events" ) // PodNominationRateLimiter is a pointer so it rate-limits across events var PodNominationRateLimiter = flowcontrol.NewTokenBucketRateLimiter(5, 10) -// PodNominationRateLimiterForMachine is a pointer so it rate-limits across events -var PodNominationRateLimiterForMachine = flowcontrol.NewTokenBucketRateLimiter(5, 10) - -func NominatePod(pod *v1.Pod, node *v1.Node, machine *v1alpha5.Machine) []events.Event { - var evts []events.Event - if node != nil { - evts = append(evts, events.Event{ +func NominatePod(pod *v1.Pod, node *v1.Node) []events.Event { + return []events.Event{ + { InvolvedObject: pod, Type: v1.EventTypeNormal, Reason: "Nominated", Message: fmt.Sprintf("Pod should schedule on node: %s", node.Name), DedupeValues: []string{string(pod.UID)}, RateLimiter: PodNominationRateLimiter, - }) - } - if machine != nil { - evts = append(evts, events.Event{ - InvolvedObject: pod, - Type: v1.EventTypeNormal, - Reason: "NominatedMachine", - Message: fmt.Sprintf("Pod should schedule on node associated with machine: %s", machine.Name), - DedupeValues: []string{string(pod.UID)}, - RateLimiter: PodNominationRateLimiterForMachine, - }) + }, } - return evts } func PodFailedToSchedule(pod *v1.Pod, err error) events.Event { diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index d051b39151..539d5e6ede 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -145,7 +145,7 @@ func (s *Scheduler) recordSchedulingResults(ctx context.Context, pods []*v1.Pod, s.cluster.NominateNodeForPod(ctx, existing.Name()) } for _, pod := range existing.Pods { - s.recorder.Publish(schedulingevents.NominatePod(pod, existing.Node, existing.Machine)...) + s.recorder.Publish(schedulingevents.NominatePod(pod, existing.Node)...) } } diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 4f5256a5cd..84a9b767be 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -1800,15 +1800,12 @@ var _ = Describe("In-Flight Nodes", func() { bindings := ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, initialPod) ExpectScheduled(ctx, env.Client, initialPod) - // delete the node/machine - machine1 := bindings.Get(initialPod).Machine + // delete the node node1 := bindings.Get(initialPod).Node - machine1.Finalizers = nil node1.Finalizers = nil - ExpectApplied(ctx, env.Client, machine1, node1) - ExpectDeleted(ctx, env.Client, machine1, node1) + ExpectApplied(ctx, env.Client, node1) + ExpectDeleted(ctx, env.Client, node1) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) - ExpectReconcileSucceeded(ctx, machineStateController, client.ObjectKeyFromObject(machine1)) secondPod := test.UnschedulablePod(opts) ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, secondPod) @@ -1922,9 +1919,7 @@ var _ = Describe("In-Flight Nodes", func() { bindings := ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, initialPod) ExpectScheduled(ctx, env.Client, initialPod) - machine1 := bindings.Get(initialPod).Machine node1 := bindings.Get(initialPod).Node - machine1.StatusConditions().MarkTrue(v1alpha5.MachineInitialized) // delete the pod so that the node is empty ExpectDeleted(ctx, env.Client, initialPod) @@ -1934,7 +1929,7 @@ var _ = Describe("In-Flight Nodes", func() { Value: "tainted", Effect: v1.TaintEffectNoSchedule, }) - ExpectApplied(ctx, env.Client, machine1, node1) + ExpectApplied(ctx, env.Client, node1) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) secondPod := test.UnschedulablePod() @@ -1988,13 +1983,11 @@ var _ = Describe("In-Flight Nodes", func() { // Mark it initialized which only occurs once the startup taint was removed and re-apply only the startup taint. // We also need to add resource capacity as after initialization we assume that kubelet has recorded them. - - machine1 := bindings.Get(initialPod).Machine node1 := bindings.Get(initialPod).Node - machine1.StatusConditions().MarkTrue(v1alpha5.MachineInitialized) + node1.Labels[v1alpha5.LabelNodeInitialized] = "true" node1.Spec.Taints = []v1.Taint{startupTaint} node1.Status.Capacity = v1.ResourceList{v1.ResourcePods: resource.MustParse("10")} - ExpectApplied(ctx, env.Client, machine1, node1) + ExpectApplied(ctx, env.Client, node1) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index 0e27aac08e..faa309f926 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -80,11 +80,6 @@ func NewCluster(clk clock.Clock, client client.Client, cp cloudprovider.CloudPro // of the cluster is as close to correct as it can be when we begin to perform operations // utilizing the cluster state as our source of truth func (c *Cluster) Synced(ctx context.Context) bool { - machineList := &v1alpha5.MachineList{} - if err := c.kubeClient.List(ctx, machineList); err != nil { - logging.FromContext(ctx).Errorf("checking cluster state sync, %v", err) - return false - } nodeList := &v1.NodeList{} if err := c.kubeClient.List(ctx, nodeList); err != nil { logging.FromContext(ctx).Errorf("checking cluster state sync, %v", err) @@ -95,13 +90,6 @@ func (c *Cluster) Synced(ctx context.Context) bool { c.mu.RUnlock() providerIDs := sets.New[string]() - for _, machine := range machineList.Items { - // If the machine hasn't resolved its provider id, then it hasn't resolved its status - if machine.Status.ProviderID == "" { - return false - } - providerIDs.Insert(machine.Status.ProviderID) - } for _, node := range nodeList.Items { if node.Spec.ProviderID == "" { node.Spec.ProviderID = node.Name @@ -110,7 +98,7 @@ func (c *Cluster) Synced(ctx context.Context) bool { } // The provider ids tracked in-memory should at least have all the data that is in the api-server // This doesn't ensure that the two states are exactly aligned (we could still not be tracking a node - // that exists on the apiserver but not in the cluster state) but it ensures that we have a state + // that exists in the cluster state but not in the apiserver) but it ensures that we have a state // representation for every node/machine that exists on the apiserver return stateProviderIDs.IsSuperset(providerIDs) } diff --git a/pkg/controllers/state/suite_test.go b/pkg/controllers/state/suite_test.go index 6ae08afe8c..2aa44fb395 100644 --- a/pkg/controllers/state/suite_test.go +++ b/pkg/controllers/state/suite_test.go @@ -1331,6 +1331,7 @@ var _ = Describe("Cluster State Sync", func() { Expect(cluster.Synced(ctx)).To(BeTrue()) }) It("shouldn't consider the cluster state synced if a machine hasn't resolved its provider id", func() { + Skip("enable this test when cluster state sync relies on machines") // Deploy 1000 machines and sync them all with the cluster for i := 0; i < 1000; i++ { machine := test.Machine(v1alpha5.Machine{ @@ -1348,6 +1349,7 @@ var _ = Describe("Cluster State Sync", func() { Expect(cluster.Synced(ctx)).To(BeFalse()) }) It("shouldn't consider the cluster state synced if a machine isn't tracked", func() { + Skip("enable this test when cluster state sync relies on machines") // Deploy 1000 machines and sync them all with the cluster for i := 0; i < 1000; i++ { machine := test.Machine(v1alpha5.Machine{ diff --git a/pkg/controllers/termination/controller.go b/pkg/controllers/termination/controller.go index c39021b1e4..213064c6b1 100644 --- a/pkg/controllers/termination/controller.go +++ b/pkg/controllers/termination/controller.go @@ -19,7 +19,6 @@ import ( "fmt" "time" - "go.uber.org/multierr" "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -31,14 +30,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" "github.com/aws/karpenter-core/pkg/controllers/machine/terminator" terminatorevents "github.com/aws/karpenter-core/pkg/controllers/machine/terminator/events" "github.com/aws/karpenter-core/pkg/events" - "github.com/aws/karpenter-core/pkg/metrics" corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" machineutil "github.com/aws/karpenter-core/pkg/utils/machine" ) @@ -76,17 +73,6 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res if !controllerutil.ContainsFinalizer(node, v1alpha5.TerminationFinalizer) { return reconcile.Result{}, nil } - allRemoved, err := c.ensureMachinesRemoved(ctx, node) - if err != nil { - return reconcile.Result{}, fmt.Errorf("removing machines, %w", err) - } - if !allRemoved { - return reconcile.Result{}, nil - } - // TODO @joinnis: Remove this section after v1beta1 migration is completed - // We need to keep the full termination flow in here during the migration timeframe - // This is because there is a short time where a node with the karpenter.sh/termination finalizer - // may not have a machine owner and we should still terminate gracefully if err := c.terminator.Cordon(ctx, node); err != nil { return reconcile.Result{}, fmt.Errorf("cordoning node, %w", err) } @@ -106,34 +92,15 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res if err := c.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { return reconcile.Result{}, client.IgnoreNotFound(err) } - metrics.NodesTerminatedCounter.Inc() logging.FromContext(ctx).Infof("deleted node") } return reconcile.Result{}, nil } -func (c *Controller) ensureMachinesRemoved(ctx context.Context, n *v1.Node) (allRemoved bool, err error) { - machineList := &v1alpha5.MachineList{} - if err = c.kubeClient.List(ctx, machineList, client.MatchingFields{"status.providerID": n.Spec.ProviderID}); err != nil { - return false, err - } - if len(machineList.Items) == 0 { - return true, nil - } - for i := range machineList.Items { - err = multierr.Append(err, client.IgnoreNotFound(c.kubeClient.Delete(ctx, &machineList.Items[i]))) - } - return false, err -} - func (c *Controller) Builder(ctx context.Context, m manager.Manager) corecontroller.Builder { return corecontroller.Adapt(controllerruntime. NewControllerManagedBy(m). For(&v1.Node{}). - Watches( - &source.Kind{Type: &v1alpha5.Machine{}}, - machineutil.EventHandler(ctx, c.kubeClient), - ). WithOptions( controller.Options{ RateLimiter: workqueue.NewMaxOfRateLimiter( diff --git a/pkg/controllers/termination/suite_test.go b/pkg/controllers/termination/suite_test.go index 658b9285a2..c272d1689a 100644 --- a/pkg/controllers/termination/suite_test.go +++ b/pkg/controllers/termination/suite_test.go @@ -16,19 +16,15 @@ package termination_test import ( "context" + "fmt" "sync" "testing" "time" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/samber/lo" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" - . "knative.dev/pkg/logging/testing" - "sigs.k8s.io/controller-runtime/pkg/cache" + + "github.com/samber/lo" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/karpenter-core/pkg/apis" @@ -40,12 +36,22 @@ import ( "github.com/aws/karpenter-core/pkg/operator/controller" "github.com/aws/karpenter-core/pkg/operator/scheme" "github.com/aws/karpenter-core/pkg/test" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + . "knative.dev/pkg/logging/testing" + . "github.com/aws/karpenter-core/pkg/test/expectations" ) var ctx context.Context var terminationController controller.Controller +var evictionQueue *terminator.EvictionQueue var env *test.Environment +var defaultOwnerRefs = []metav1.OwnerReference{{Kind: "ReplicaSet", APIVersion: "appsv1", Name: "rs", UID: "1234567890"}} var fakeClock *clock.FakeClock func TestAPIs(t *testing.T) { @@ -56,14 +62,11 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { fakeClock = clock.NewFakeClock(time.Now()) - env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...), test.WithFieldIndexers(func(c cache.Cache) error { - return c.IndexField(ctx, &v1alpha5.Machine{}, "status.providerID", func(obj client.Object) []string { - return []string{obj.(*v1alpha5.Machine).Status.ProviderID} - }) - })) - evictionQueue := terminator.NewEvictionQueue(ctx, env.KubernetesInterface.CoreV1(), events.NewRecorder(&record.FakeRecorder{})) - terminator := terminator.NewTerminator(fakeClock, env.Client, evictionQueue) - terminationController = termination.NewController(env.Client, fake.NewCloudProvider(), terminator, events.NewRecorder(&record.FakeRecorder{})) + env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...)) + + cloudProvider := fake.NewCloudProvider() + evictionQueue = terminator.NewEvictionQueue(ctx, env.KubernetesInterface.CoreV1(), events.NewRecorder(&record.FakeRecorder{})) + terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, evictionQueue), events.NewRecorder(&record.FakeRecorder{})) }) var _ = AfterSuite(func() { @@ -71,33 +74,26 @@ var _ = AfterSuite(func() { }) var _ = Describe("Termination", func() { - var machine *v1alpha5.Machine var node *v1.Node BeforeEach(func() { - machine = test.Machine(v1alpha5.Machine{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1alpha5.TerminationFinalizer}}, Status: v1alpha5.MachineStatus{ProviderID: test.RandomProviderID()}}) - node = test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1alpha5.TerminationFinalizer}}, ProviderID: machine.Status.ProviderID}) + node = test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1alpha5.TerminationFinalizer}}}) }) + AfterEach(func() { ExpectCleanedUp(ctx, env.Client) fakeClock.SetTime(time.Now()) }) + Context("Reconciliation", func() { - It("should not delete node if machine still exists", func() { - ExpectApplied(ctx, env.Client, machine, node) - Expect(env.Client.Delete(ctx, node)).To(Succeed()) - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectExists(ctx, env.Client, node) - }) - It("should delete nodes if machine doesn't exist", func() { + It("should delete nodes", func() { ExpectApplied(ctx, env.Client, node) Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) ExpectNotFound(ctx, env.Client, node) }) - It("should not race if deleting nodes in parallel if machines don't exist", func() { + It("should not race if deleting nodes in parallel", func() { var nodes []*v1.Node for i := 0; i < 10; i++ { node = test.Node(test.NodeOptions{ @@ -124,5 +120,289 @@ var _ = Describe("Termination", func() { wg.Wait() ExpectNotFound(ctx, env.Client, lo.Map(nodes, func(n *v1.Node, _ int) client.Object { return n })...) }) + It("should exclude nodes from load balancers when terminating", func() { + // This is a kludge to prevent the node from being deleted before we can + // inspect its labels + podNoEvict := test.Pod(test.PodOptions{ + NodeName: node.Name, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{v1alpha5.DoNotEvictPodAnnotationKey: "true"}, + OwnerReferences: defaultOwnerRefs, + }, + }) + + ExpectApplied(ctx, env.Client, node, podNoEvict) + + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + node = ExpectNodeExists(ctx, env.Client, node.Name) + Expect(node.Labels[v1.LabelNodeExcludeBalancers]).Should(Equal("karpenter")) + }) + It("should not evict pods that tolerate unschedulable taint", func() { + podEvict := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) + podSkip := test.Pod(test.PodOptions{ + NodeName: node.Name, + Tolerations: []v1.Toleration{{Key: v1.TaintNodeUnschedulable, Operator: v1.TolerationOpExists, Effect: v1.TaintEffectNoSchedule}}, + ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}, + }) + ExpectApplied(ctx, env.Client, node, podEvict, podSkip) + + // Trigger Termination Controller + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + + // Expect node to exist and be draining + ExpectNodeDraining(env.Client, node.Name) + + // Expect podEvict to be evicting, and delete it + ExpectEvicted(env.Client, podEvict) + ExpectDeleted(ctx, env.Client, podEvict) + + // Reconcile to delete node + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectNotFound(ctx, env.Client, node) + }) + It("should delete nodes that have pods without an ownerRef", func() { + pod := test.Pod(test.PodOptions{ + NodeName: node.Name, + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: nil, + }, + }) + + ExpectApplied(ctx, env.Client, node, pod) + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + + // Expect pod with no owner ref to be enqueued for eviction + ExpectEvicted(env.Client, pod) + + // Expect node to exist and be draining + ExpectNodeDraining(env.Client, node.Name) + + // Delete no owner refs pod to simulate successful eviction + ExpectDeleted(ctx, env.Client, pod) + + // Reconcile node to evict pod + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + + // Reconcile to delete node + ExpectNotFound(ctx, env.Client, node) + }) + It("should delete nodes with terminal pods", func() { + podEvictPhaseSucceeded := test.Pod(test.PodOptions{ + NodeName: node.Name, + Phase: v1.PodSucceeded, + }) + podEvictPhaseFailed := test.Pod(test.PodOptions{ + NodeName: node.Name, + Phase: v1.PodFailed, + }) + + ExpectApplied(ctx, env.Client, node, podEvictPhaseSucceeded, podEvictPhaseFailed) + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + node = ExpectNodeExists(ctx, env.Client, node.Name) + // Trigger Termination Controller, which should ignore these pods and delete the node + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectNotFound(ctx, env.Client, node) + }) + It("should fail to evict pods that violate a PDB", func() { + minAvailable := intstr.FromInt(1) + labelSelector := map[string]string{test.RandomName(): test.RandomName()} + pdb := test.PodDisruptionBudget(test.PDBOptions{ + Labels: labelSelector, + // Don't let any pod evict + MinAvailable: &minAvailable, + }) + podNoEvict := test.Pod(test.PodOptions{ + NodeName: node.Name, + ObjectMeta: metav1.ObjectMeta{ + Labels: labelSelector, + OwnerReferences: defaultOwnerRefs, + }, + Phase: v1.PodRunning, + }) + + ExpectApplied(ctx, env.Client, node, podNoEvict, pdb) + + // Trigger Termination Controller + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + + // Expect node to exist and be draining + ExpectNodeDraining(env.Client, node.Name) + + // Expect podNoEvict to fail eviction due to PDB, and be retried + Eventually(func() int { + return evictionQueue.NumRequeues(client.ObjectKeyFromObject(podNoEvict)) + }).Should(BeNumerically(">=", 1)) + + // Delete pod to simulate successful eviction + ExpectDeleted(ctx, env.Client, podNoEvict) + ExpectNotFound(ctx, env.Client, podNoEvict) + + // Reconcile to delete node + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectNotFound(ctx, env.Client, node) + }) + It("should evict non-critical pods first", func() { + podEvict := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) + podNodeCritical := test.Pod(test.PodOptions{NodeName: node.Name, PriorityClassName: "system-node-critical", ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) + podClusterCritical := test.Pod(test.PodOptions{NodeName: node.Name, PriorityClassName: "system-cluster-critical", ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) + + ExpectApplied(ctx, env.Client, node, podEvict, podNodeCritical, podClusterCritical) + + // Trigger Termination Controller + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + + // Expect node to exist and be draining + ExpectNodeDraining(env.Client, node.Name) + + // Expect podEvict to be evicting, and delete it + ExpectEvicted(env.Client, podEvict) + ExpectDeleted(ctx, env.Client, podEvict) + + // Expect the critical pods to be evicted and deleted + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectEvicted(env.Client, podNodeCritical) + ExpectDeleted(ctx, env.Client, podNodeCritical) + ExpectEvicted(env.Client, podClusterCritical) + ExpectDeleted(ctx, env.Client, podClusterCritical) + + // Reconcile to delete node + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectNotFound(ctx, env.Client, node) + }) + It("should not evict static pods", func() { + ExpectApplied(ctx, env.Client, node) + podEvict := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) + ExpectApplied(ctx, env.Client, node, podEvict) + + podNoEvict := test.Pod(test.PodOptions{ + NodeName: node.Name, + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "v1", + Kind: "Node", + Name: node.Name, + UID: node.UID, + }}, + }, + }) + ExpectApplied(ctx, env.Client, podNoEvict) + + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + + // Expect mirror pod to not be queued for eviction + ExpectNotEnqueuedForEviction(evictionQueue, podNoEvict) + + // Expect podEvict to be enqueued for eviction then be successful + ExpectEvicted(env.Client, podEvict) + + // Expect node to exist and be draining + ExpectNodeDraining(env.Client, node.Name) + + // Reconcile node to evict pod + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + + // Delete pod to simulate successful eviction + ExpectDeleted(ctx, env.Client, podEvict) + + // Reconcile to delete node + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectNotFound(ctx, env.Client, node) + + }) + It("should not delete nodes until all pods are deleted", func() { + pods := []*v1.Pod{ + test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}), + test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}), + } + ExpectApplied(ctx, env.Client, node, pods[0], pods[1]) + + // Trigger Termination Controller + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + + // Expect the pods to be evicted + ExpectEvicted(env.Client, pods[0], pods[1]) + + // Expect node to exist and be draining, but not deleted + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectNodeDraining(env.Client, node.Name) + + ExpectDeleted(ctx, env.Client, pods[1]) + + // Expect node to exist and be draining, but not deleted + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectNodeDraining(env.Client, node.Name) + + ExpectDeleted(ctx, env.Client, pods[0]) + + // Reconcile to delete node + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectNotFound(ctx, env.Client, node) + }) + It("should wait for pods to terminate", func() { + pod := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) + fakeClock.SetTime(time.Now()) // make our fake clock match the pod creation time + ExpectApplied(ctx, env.Client, node, pod) + + // Before grace period, node should not delete + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectNodeExists(ctx, env.Client, node.Name) + ExpectEvicted(env.Client, pod) + + // After grace period, node should delete. The deletion timestamps are from etcd which we can't control, so + // to eliminate test-flakiness we reset the time to current time + 90 seconds instead of just advancing + // the clock by 90 seconds. + fakeClock.SetTime(time.Now().Add(90 * time.Second)) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectNotFound(ctx, env.Client, node) + }) }) }) + +func ExpectNotEnqueuedForEviction(e *terminator.EvictionQueue, pods ...*v1.Pod) { + for _, pod := range pods { + ExpectWithOffset(1, e.Contains(client.ObjectKeyFromObject(pod))).To(BeFalse()) + } +} + +func ExpectEvicted(c client.Client, pods ...*v1.Pod) { + for _, pod := range pods { + EventuallyWithOffset(1, func() bool { + return ExpectPodExists(ctx, c, pod.Name, pod.Namespace).GetDeletionTimestamp().IsZero() + }, ReconcilerPropagationTime, RequestInterval).Should(BeFalse(), func() string { + return fmt.Sprintf("expected %s/%s to be evicting, but it isn't", pod.Namespace, pod.Name) + }) + } +} + +func ExpectNodeDraining(c client.Client, nodeName string) *v1.Node { + node := ExpectNodeExistsWithOffset(1, ctx, c, nodeName) + ExpectWithOffset(1, node.Spec.Unschedulable).To(BeTrue()) + ExpectWithOffset(1, lo.Contains(node.Finalizers, v1alpha5.TerminationFinalizer)).To(BeTrue()) + ExpectWithOffset(1, node.DeletionTimestamp.IsZero()).To(BeFalse()) + return node +} diff --git a/pkg/events/suite_test.go b/pkg/events/suite_test.go index 95c8eb6d4d..f7d234bd17 100644 --- a/pkg/events/suite_test.go +++ b/pkg/events/suite_test.go @@ -77,14 +77,12 @@ var _ = BeforeEach(func() { internalRecorder = NewInternalRecorder() eventRecorder = events.NewRecorder(internalRecorder) schedulingevents.PodNominationRateLimiter = flowcontrol.NewTokenBucketRateLimiter(5, 10) - schedulingevents.PodNominationRateLimiterForMachine = flowcontrol.NewTokenBucketRateLimiter(5, 10) }) var _ = Describe("Event Creation", func() { It("should create a NominatePod event", func() { - eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())...) - Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[0].Reason)).To(Equal(1)) - Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[1].Reason)).To(Equal(1)) + eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())...) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())[0].Reason)).To(Equal(1)) }) It("should create a EvictPod event", func() { eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID())) @@ -135,20 +133,18 @@ var _ = Describe("Dedupe", func() { var _ = Describe("Rate Limiting", func() { It("should only create max-burst when many events are created quickly", func() { for i := 0; i < 100; i++ { - eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())...) + eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())...) } - Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[0].Reason)).To(Equal(10)) - Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[1].Reason)).To(Equal(10)) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())[0].Reason)).To(Equal(10)) }) It("should allow many events over time due to smoothed rate limiting", func() { for i := 0; i < 3; i++ { for j := 0; j < 5; j++ { - eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())...) + eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())...) } time.Sleep(time.Second) } - Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[0].Reason)).To(Equal(15)) - Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[1].Reason)).To(Equal(15)) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())[0].Reason)).To(Equal(15)) }) }) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 32e56ada26..b5e1d32570 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -25,12 +25,36 @@ const ( ) var ( + NodesCreatedCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: nodeSubsystem, + Name: "created", + Help: "Number of nodes created in total by Karpenter. Labeled by reason the node was created and the owning provisioner.", + }, + []string{ + ReasonLabel, + ProvisionerLabel, + }, + ) + NodesTerminatedCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: nodeSubsystem, + Name: "terminated", + Help: "Number of nodes terminated in total by Karpenter. Labeled by reason the node was terminated and the owning provisioner.", + }, + []string{ + ReasonLabel, + ProvisionerLabel, + }, + ) MachinesCreatedCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, Subsystem: machineSubsystem, Name: "created", - Help: "Number of machines created in total by Karpenter. Labeled by reason the machine was created.", + Help: "Number of machines created in total by Karpenter. Labeled by reason the machine was terminated and the owning provisioner.", }, []string{ ReasonLabel, @@ -42,21 +66,15 @@ var ( Namespace: Namespace, Subsystem: machineSubsystem, Name: "terminated", - Help: "Number of machines terminated in total by Karpenter. Labeled by reason the machine was terminated.", + Help: "Number of machines terminated in total by Karpenter. Labeled by reason the machine was terminated and the owning provisioner.", }, []string{ ReasonLabel, ProvisionerLabel, }, ) - NodesTerminatedCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: Namespace, - Subsystem: nodeSubsystem, - Name: "terminated", - Help: "Number of nodes terminated in total by Karpenter.", - }) ) func MustRegister() { - crmetrics.Registry.MustRegister(MachinesCreatedCounter, MachinesTerminatedCounter, NodesTerminatedCounter) + crmetrics.Registry.MustRegister(NodesCreatedCounter, NodesTerminatedCounter) } diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 40de0b3204..596669524c 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -41,7 +41,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/aws/karpenter-core/pkg/apis" - "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/events" corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" "github.com/aws/karpenter-core/pkg/operator/injection" @@ -128,12 +127,6 @@ func NewOperator() (context.Context, *Operator) { lo.Must0(manager.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "spec.nodeName", func(o client.Object) []string { return []string{o.(*v1.Pod).Spec.NodeName} }), "failed to setup pod indexer") - lo.Must0(manager.GetFieldIndexer().IndexField(ctx, &v1.Node{}, "spec.providerID", func(o client.Object) []string { - return []string{o.(*v1.Node).Spec.ProviderID} - }), "failed to setup node provider id indexer") - lo.Must0(manager.GetFieldIndexer().IndexField(ctx, &v1alpha5.Machine{}, "status.providerID", func(o client.Object) []string { - return []string{o.(*v1alpha5.Machine).Status.ProviderID} - }), "failed to setup machine provider id indexer") return ctx, &Operator{ Manager: manager, diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index f7d0cb83f6..5f4351cccb 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -240,7 +240,6 @@ func ExpectFinalizersRemoved(ctx context.Context, c client.Client, objs ...clien ExpectWithOffset(1, client.IgnoreNotFound(c.Patch(ctx, obj, client.MergeFrom(stored)))).To(Succeed()) } } - func ExpectProvisioned(ctx context.Context, c client.Client, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, provisioner *provisioning.Provisioner, pods ...*v1.Pod) Bindings { bindings := ExpectProvisionedNoBindingWithOffset(1, ctx, c, cluster, cloudProvider, provisioner, pods...) podKeys := sets.NewString(lo.Map(pods, func(p *v1.Pod, _ int) string { return client.ObjectKeyFromObject(p).String() })...) @@ -258,7 +257,7 @@ func ExpectProvisionedNoBinding(ctx context.Context, c client.Client, cluster *s return ExpectProvisionedNoBindingWithOffset(1, ctx, c, cluster, cloudProvider, provisioner, pods...) } -func ExpectProvisionedNoBindingWithOffset(offset int, ctx context.Context, c client.Client, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, provisioner *provisioning.Provisioner, pods ...*v1.Pod) Bindings { +func ExpectProvisionedNoBindingWithOffset(offset int, ctx context.Context, c client.Client, _ *state.Cluster, _ cloudprovider.CloudProvider, provisioner *provisioning.Provisioner, pods ...*v1.Pod) Bindings { // Persist objects for _, pod := range pods { ExpectAppliedWithOffset(offset+1, ctx, c, pod) @@ -272,24 +271,13 @@ func ExpectProvisionedNoBindingWithOffset(offset int, ctx context.Context, c cli if err != nil { return bindings } - machine := &v1alpha5.Machine{} - ExpectWithOffset(offset+1, c.Get(ctx, types.NamespacedName{Name: name}, machine)).To(Succeed()) - machine, node := ExpectMachineDeployedWithOffset(offset+1, ctx, c, cluster, cloudProvider, machine) - if machine != nil && node != nil { - for _, pod := range m.Pods { - bindings[pod] = &Binding{ - Machine: machine, - Node: node, - } - } + for _, pod := range m.Pods { + bindings[pod] = &Binding{Node: ExpectNodeExistsWithOffset(offset+1, ctx, c, name)} } } for _, node := range nodes { for _, pod := range node.Pods { - bindings[pod] = &Binding{ - Node: node.Node, - Machine: node.Machine, - } + bindings[pod] = &Binding{Node: node.Node} } } return bindings diff --git a/pkg/test/nodes.go b/pkg/test/nodes.go index dce019738e..eba25e448f 100644 --- a/pkg/test/nodes.go +++ b/pkg/test/nodes.go @@ -71,6 +71,7 @@ func MachineLinkedNode(machine *v1alpha5.Machine) *v1.Node { ObjectMeta: metav1.ObjectMeta{ Labels: machine.Labels, Annotations: machine.Annotations, + Finalizers: machine.Finalizers, }, Taints: append(machine.Spec.Taints, machine.Spec.StartupTaints...), Capacity: machine.Status.Capacity,