Skip to content

Commit

Permalink
Revert "Revert machine migration changes (kubernetes-sigs#176) (kuber…
Browse files Browse the repository at this point in the history
…netes-sigs#241)"

This reverts commit 9973eac.
  • Loading branch information
jonathan-innis committed Mar 16, 2023
1 parent 0b437c2 commit 9d26cd8
Show file tree
Hide file tree
Showing 41 changed files with 561 additions and 1,128 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
k8s.io/client-go v0.25.4
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2
knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a
sigs.k8s.io/controller-runtime v0.13.1
sigs.k8s.io/controller-runtime v0.13.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,8 @@ knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a/go.mod h1:fckNBPf9bu5/p1RbnOh
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/controller-runtime v0.13.1 h1:tUsRCSJVM1QQOOeViGeX3GMT3dQF1eePPw6sEE3xSlg=
sigs.k8s.io/controller-runtime v0.13.1/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI=
sigs.k8s.io/controller-runtime v0.13.0 h1:iqa5RNciy7ADWnIc8QxCbOX5FEKVR3uxVxKHRMc2WIQ=
sigs.k8s.io/controller-runtime v0.13.0/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE=
Expand Down
3 changes: 1 addition & 2 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,9 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (
break
}
}
name := test.RandomName()
created := &v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: machine.Name,
Labels: lo.Assign(labels, machine.Labels),
Annotations: machine.Annotations,
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/counter"
"github.com/aws/karpenter-core/pkg/controllers/deprovisioning"
"github.com/aws/karpenter-core/pkg/controllers/inflightchecks"
"github.com/aws/karpenter-core/pkg/controllers/machine"
"github.com/aws/karpenter-core/pkg/controllers/machine/terminator"
metricspod "github.com/aws/karpenter-core/pkg/controllers/metrics/pod"
metricsprovisioner "github.com/aws/karpenter-core/pkg/controllers/metrics/provisioner"
Expand Down Expand Up @@ -64,11 +65,13 @@ func NewControllers(
informer.NewNodeController(kubeClient, cluster),
informer.NewPodController(kubeClient, cluster),
informer.NewProvisionerController(kubeClient, cluster),
informer.NewMachineController(kubeClient, cluster),
node.NewController(clock, kubeClient, cloudProvider, cluster),
termination.NewController(kubeClient, cloudProvider, terminator, recorder),
metricspod.NewController(kubeClient),
metricsprovisioner.NewController(kubeClient),
counter.NewController(kubeClient, cluster),
inflightchecks.NewController(clock, kubeClient, recorder, cloudProvider),
machine.NewController(clock, kubeClient, cloudProvider, terminator, recorder),
}
}
22 changes: 11 additions & 11 deletions pkg/controllers/deprovisioning/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []*Ca

// filter out nodes that can't be terminated
nodes = lo.Filter(nodes, func(cn *Candidate, _ int) bool {
if !cn.Node.DeletionTimestamp.IsZero() {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, "in the process of deletion")...)
if !cn.Machine.DeletionTimestamp.IsZero() {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, "in the process of deletion")...)
return false
}
if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
return false
}
if p, ok := hasDoNotEvictPod(cn); ok {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...)
return false
}
return true
Expand All @@ -104,15 +104,15 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []*Ca
// ShouldDeprovision is a predicate used to filter deprovisionable nodes
func (c *consolidation) ShouldDeprovision(_ context.Context, cn *Candidate) bool {
if val, ok := cn.Annotations()[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...)
return val != "true"
}
if cn.provisioner == nil {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, "provisioner is unknown")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, "provisioner is unknown")...)
return false
}
if cn.provisioner.Spec.Consolidation == nil || !ptr.BoolValue(cn.provisioner.Spec.Consolidation.Enabled) {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("provisioner %s has consolidation disabled", cn.provisioner.Name))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("provisioner %s has consolidation disabled", cn.provisioner.Name))...)
return false
}
return true
Expand Down Expand Up @@ -201,7 +201,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if !allPodsScheduled {
// This method is used by multi-node consolidation as well, so we'll only report in the single node case
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "not all pods would schedule")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "not all pods would schedule")...)
}
return Command{action: actionDoNothing}, nil
}
Expand All @@ -217,7 +217,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
// we're not going to turn a single node into multiple candidates
if len(newMachines) != 1 {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...)
}
return Command{action: actionDoNothing}, nil
}
Expand All @@ -231,7 +231,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
newMachines[0].InstanceTypeOptions = filterByPrice(newMachines[0].InstanceTypeOptions, newMachines[0].Requirements, nodesPrice)
if len(newMachines[0].InstanceTypeOptions) == 0 {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace with a cheaper node")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace with a cheaper node")...)
}
// no instance types remain after filtering by price
return Command{action: actionDoNothing}, nil
Expand All @@ -250,7 +250,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if allExistingAreSpot &&
newMachines[0].Requirements.Get(v1alpha5.LabelCapacityType).Has(v1alpha5.CapacityTypeSpot) {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace a spot node with a spot node")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace a spot node with a spot node")...)
}
return Command{action: actionDoNothing}, nil
}
Expand Down
36 changes: 18 additions & 18 deletions pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
for _, d := range c.deprovisioners {
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, d.ShouldDeprovision)
if err != nil {
return reconcile.Result{}, fmt.Errorf("determining candidates, %w", err)
return reconcile.Result{}, fmt.Errorf("determining candidate nodes, %w", err)
}
// If there are no candidate nodes, move to the next deprovisioner
if len(candidates) == 0 {
Expand All @@ -138,7 +138,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc

// Attempt to deprovision
if err := c.executeCommand(ctx, d, cmd); err != nil {
return reconcile.Result{}, fmt.Errorf("deprovisioning candidates, %w", err)
return reconcile.Result{}, fmt.Errorf("deprovisioning nodes, %w", err)
}
return reconcile.Result{Requeue: true}, nil
}
Expand All @@ -165,15 +165,15 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman
}

for _, candidate := range command.candidates {
c.recorder.Publish(deprovisioningevents.Terminating(candidate.Node, command.String())...)
c.recorder.Publish(deprovisioningevents.Terminating(candidate.Node, candidate.Machine, command.String())...)

if err := c.kubeClient.Delete(ctx, candidate.Node); err != nil {
if err := c.kubeClient.Delete(ctx, candidate.Machine); err != nil {
if errors.IsNotFound(err) {
continue
}
logging.FromContext(ctx).Errorf("terminating machine, %s", err)
} else {
metrics.NodesTerminatedCounter.With(prometheus.Labels{
metrics.MachinesTerminatedCounter.With(prometheus.Labels{
metrics.ReasonLabel: reason,
metrics.ProvisionerLabel: candidate.provisioner.Name,
}).Inc()
Expand All @@ -183,7 +183,7 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman
// We wait for nodes to delete to ensure we don't start another round of deprovisioning until this node is fully
// deleted.
for _, oldCandidate := range command.candidates {
c.waitForDeletion(ctx, oldCandidate.Node)
c.waitForDeletion(ctx, oldCandidate.Machine)
}
return nil
}
Expand Down Expand Up @@ -233,8 +233,8 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name
var once sync.Once
pollStart := time.Now()
return retry.Do(func() error {
node := &v1.Node{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: name}, node); err != nil {
machine := &v1alpha5.Machine{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: name}, machine); err != nil {
// If the machine was deleted after a few seconds (to give the cache time to update), then we assume
// that the machine was deleted due to an Insufficient Capacity error
if errors.IsNotFound(err) && c.clock.Since(pollStart) > time.Second*5 {
Expand All @@ -243,12 +243,12 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name
return fmt.Errorf("getting machine, %w", err)
}
once.Do(func() {
c.recorder.Publish(deprovisioningevents.Launching(node, action.String()))
c.recorder.Publish(deprovisioningevents.Launching(machine, action.String()))
})
if _, ok := node.Labels[v1alpha5.LabelNodeInitialized]; !ok {
if !machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() {
// make the user aware of why deprovisioning is paused
c.recorder.Publish(deprovisioningevents.WaitingOnReadiness(node))
return fmt.Errorf("node is not initialized")
c.recorder.Publish(deprovisioningevents.WaitingOnReadiness(machine))
return fmt.Errorf("machine is not initialized")
}
return nil
}, waitRetryOptions...)
Expand All @@ -257,21 +257,21 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name
// waitForDeletion waits for the specified machine to be removed from the API server. This deletion can take some period
// of time if there are PDBs that govern pods on the machine as we need to wait until the node drains before
// it's actually deleted.
func (c *Controller) waitForDeletion(ctx context.Context, node *v1.Node) {
func (c *Controller) waitForDeletion(ctx context.Context, machine *v1alpha5.Machine) {
if err := retry.Do(func() error {
m := &v1.Node{}
nerr := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(node), m)
m := &v1alpha5.Machine{}
nerr := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(machine), m)
// We expect the not machine found error, at which point we know the machine is deleted.
if errors.IsNotFound(nerr) {
return nil
}
// make the user aware of why deprovisioning is paused
c.recorder.Publish(deprovisioningevents.WaitingOnDeletion(node))
c.recorder.Publish(deprovisioningevents.WaitingOnDeletion(machine))
if nerr != nil {
return fmt.Errorf("expected node to be not found, %w", nerr)
return fmt.Errorf("expected machine to be not found, %w", nerr)
}
// the machine still exists
return fmt.Errorf("expected node to be not found")
return fmt.Errorf("expected machine to be not found")
}, waitRetryOptions...,
); err != nil {
logging.FromContext(ctx).Errorf("Waiting on machine deletion, %s", err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/deprovisioning/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ func (d *Drift) ComputeCommand(ctx context.Context, candidates ...*Candidate) (C
}
// filter out machines that can't be terminated
candidates = lo.Filter(candidates, func(cn *Candidate, _ int) bool {
if !cn.Node.DeletionTimestamp.IsZero() {
if !cn.Machine.DeletionTimestamp.IsZero() {
return false
}
if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok {
d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
return false
}
if p, ok := hasDoNotEvictPod(cn); ok {
d.recorder.Publish(deprovisioningevents.Blocked(cn.Node,
d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine,
fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...)
return false
}
Expand All @@ -93,7 +93,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, candidates ...*Candidate) (C
}
// Log when all pods can't schedule, as the command will get executed immediately.
if !allPodsScheduled {
logging.FromContext(ctx).With("node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods")
logging.FromContext(ctx).With("machine", candidate.Machine.Name, "node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods")
}
if len(newMachines) == 0 {
return Command{
Expand Down
40 changes: 29 additions & 11 deletions pkg/controllers/deprovisioning/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ var _ = Describe("Drift", func() {
wg.Wait()

// Expect to not create or delete more machines
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
})
It("should ignore nodes with the drift label, but not the drifted value", func() {
node.Annotations = lo.Assign(node.Annotations, map[string]string{
Expand All @@ -96,8 +96,8 @@ var _ = Describe("Drift", func() {
wg.Wait()

// Expect to not create or delete more machines
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
})
It("should ignore nodes without the drift label", func() {
ExpectApplied(ctx, env.Client, machine, node, prov)
Expand All @@ -110,8 +110,8 @@ var _ = Describe("Drift", func() {
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})

// Expect to not create or delete more machines
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
})
It("can delete drifted nodes", func() {
node.Annotations = lo.Assign(node.Annotations, map[string]string{
Expand All @@ -129,9 +129,13 @@ var _ = Describe("Drift", func() {
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})
wg.Wait()

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine)

// We should delete the machine that has drifted
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0))
ExpectNotFound(ctx, env.Client, node)
ExpectNotFound(ctx, env.Client, machine, node)
})
It("can replace drifted nodes", func() {
labels := map[string]string{
Expand Down Expand Up @@ -171,15 +175,21 @@ var _ = Describe("Drift", func() {
// deprovisioning won't delete the old machine until the new machine is ready
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1)
ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1)
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})
wg.Wait()

ExpectNotFound(ctx, env.Client, node)
// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine)

ExpectNotFound(ctx, env.Client, machine, node)

// Expect that the new machine was created and its different than the original
machines := ExpectMachines(ctx, env.Client)
nodes := ExpectNodes(ctx, env.Client)
Expect(machines).To(HaveLen(1))
Expect(nodes).To(HaveLen(1))
Expect(machines[0].Name).ToNot(Equal(machine.Name))
Expect(nodes[0].Name).ToNot(Equal(node.Name))
})
It("can replace drifted nodes with multiple nodes", func() {
Expand Down Expand Up @@ -269,12 +279,16 @@ var _ = Describe("Drift", func() {
// deprovisioning won't delete the old node until the new node is ready
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectMakeNewNodesReady(ctx, env.Client, &wg, 3)
ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 3)
ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{})
wg.Wait()

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine)

// expect that drift provisioned three nodes, one for each pod
ExpectNotFound(ctx, env.Client, node)
ExpectNotFound(ctx, env.Client, machine, node)
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3))
})
It("should delete one drifted node at a time", func() {
Expand Down Expand Up @@ -324,7 +338,11 @@ var _ = Describe("Drift", func() {
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})
wg.Wait()

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2)

// Expect one of the nodes to be deleted
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
})
})
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (e *Emptiness) ShouldDeprovision(ctx context.Context, c *Candidate) bool {
// ComputeCommand generates a deprovisioning command given deprovisionable machines
func (e *Emptiness) ComputeCommand(_ context.Context, candidates ...*Candidate) (Command, error) {
emptyCandidates := lo.Filter(candidates, func(cn *Candidate, _ int) bool {
return cn.Node.DeletionTimestamp.IsZero() && len(cn.pods) == 0
return cn.Machine.DeletionTimestamp.IsZero() && len(cn.pods) == 0
})

if len(emptyCandidates) == 0 {
Expand Down
Loading

0 comments on commit 9d26cd8

Please sign in to comment.