Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Revert "feat: Machine Migration (#176)" #241

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
k8s.io/client-go v0.25.4
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2
knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a
sigs.k8s.io/controller-runtime v0.13.0
sigs.k8s.io/controller-runtime v0.13.1
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,8 @@ knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a/go.mod h1:fckNBPf9bu5/p1RbnOh
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/controller-runtime v0.13.0 h1:iqa5RNciy7ADWnIc8QxCbOX5FEKVR3uxVxKHRMc2WIQ=
sigs.k8s.io/controller-runtime v0.13.0/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI=
sigs.k8s.io/controller-runtime v0.13.1 h1:tUsRCSJVM1QQOOeViGeX3GMT3dQF1eePPw6sEE3xSlg=
sigs.k8s.io/controller-runtime v0.13.1/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE=
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (
break
}
}
name := test.RandomName()
created := &v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: machine.Name,
Name: name,
Labels: lo.Assign(labels, machine.Labels),
Annotations: machine.Annotations,
},
Expand Down
3 changes: 0 additions & 3 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/counter"
"github.com/aws/karpenter-core/pkg/controllers/deprovisioning"
"github.com/aws/karpenter-core/pkg/controllers/inflightchecks"
"github.com/aws/karpenter-core/pkg/controllers/machine"
"github.com/aws/karpenter-core/pkg/controllers/machine/terminator"
metricspod "github.com/aws/karpenter-core/pkg/controllers/metrics/pod"
metricsprovisioner "github.com/aws/karpenter-core/pkg/controllers/metrics/provisioner"
Expand Down Expand Up @@ -65,13 +64,11 @@ func NewControllers(
informer.NewNodeController(kubeClient, cluster),
informer.NewPodController(kubeClient, cluster),
informer.NewProvisionerController(kubeClient, cluster),
informer.NewMachineController(kubeClient, cluster),
node.NewController(clock, kubeClient, cloudProvider, cluster),
termination.NewController(kubeClient, cloudProvider, terminator, recorder),
metricspod.NewController(kubeClient),
metricsprovisioner.NewController(kubeClient),
counter.NewController(kubeClient, cluster),
inflightchecks.NewController(clock, kubeClient, recorder, cloudProvider),
machine.NewController(clock, kubeClient, cloudProvider, terminator, recorder),
}
}
22 changes: 11 additions & 11 deletions pkg/controllers/deprovisioning/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []*Ca

// filter out nodes that can't be terminated
nodes = lo.Filter(nodes, func(cn *Candidate, _ int) bool {
if !cn.Machine.DeletionTimestamp.IsZero() {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, "in the process of deletion")...)
if !cn.Node.DeletionTimestamp.IsZero() {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, "in the process of deletion")...)
return false
}
if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
return false
}
if p, ok := hasDoNotEvictPod(cn); ok {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...)
return false
}
return true
Expand All @@ -104,15 +104,15 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []*Ca
// ShouldDeprovision is a predicate used to filter deprovisionable nodes
func (c *consolidation) ShouldDeprovision(_ context.Context, cn *Candidate) bool {
if val, ok := cn.Annotations()[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...)
return val != "true"
}
if cn.provisioner == nil {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, "provisioner is unknown")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, "provisioner is unknown")...)
return false
}
if cn.provisioner.Spec.Consolidation == nil || !ptr.BoolValue(cn.provisioner.Spec.Consolidation.Enabled) {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("provisioner %s has consolidation disabled", cn.provisioner.Name))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("provisioner %s has consolidation disabled", cn.provisioner.Name))...)
return false
}
return true
Expand Down Expand Up @@ -201,7 +201,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if !allPodsScheduled {
// This method is used by multi-node consolidation as well, so we'll only report in the single node case
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "not all pods would schedule")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "not all pods would schedule")...)
}
return Command{action: actionDoNothing}, nil
}
Expand All @@ -217,7 +217,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
// we're not going to turn a single node into multiple candidates
if len(newMachines) != 1 {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...)
}
return Command{action: actionDoNothing}, nil
}
Expand All @@ -231,7 +231,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
newMachines[0].InstanceTypeOptions = filterByPrice(newMachines[0].InstanceTypeOptions, newMachines[0].Requirements, nodesPrice)
if len(newMachines[0].InstanceTypeOptions) == 0 {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace with a cheaper node")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace with a cheaper node")...)
}
// no instance types remain after filtering by price
return Command{action: actionDoNothing}, nil
Expand All @@ -250,7 +250,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if allExistingAreSpot &&
newMachines[0].Requirements.Get(v1alpha5.LabelCapacityType).Has(v1alpha5.CapacityTypeSpot) {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace a spot node with a spot node")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace a spot node with a spot node")...)
}
return Command{action: actionDoNothing}, nil
}
Expand Down
36 changes: 18 additions & 18 deletions pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
for _, d := range c.deprovisioners {
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, d.ShouldDeprovision)
if err != nil {
return reconcile.Result{}, fmt.Errorf("determining candidate nodes, %w", err)
return reconcile.Result{}, fmt.Errorf("determining candidates, %w", err)
}
// If there are no candidate nodes, move to the next deprovisioner
if len(candidates) == 0 {
Expand All @@ -135,7 +135,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc

// Attempt to deprovision
if err := c.executeCommand(ctx, d, cmd); err != nil {
return reconcile.Result{}, fmt.Errorf("deprovisioning nodes, %w", err)
return reconcile.Result{}, fmt.Errorf("deprovisioning candidates, %w", err)
}
return reconcile.Result{Requeue: true}, nil
}
Expand All @@ -162,15 +162,15 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman
}

for _, candidate := range command.candidates {
c.recorder.Publish(deprovisioningevents.Terminating(candidate.Node, candidate.Machine, command.String())...)
c.recorder.Publish(deprovisioningevents.Terminating(candidate.Node, command.String())...)

if err := c.kubeClient.Delete(ctx, candidate.Machine); err != nil {
if err := c.kubeClient.Delete(ctx, candidate.Node); err != nil {
if errors.IsNotFound(err) {
continue
}
logging.FromContext(ctx).Errorf("terminating machine, %s", err)
} else {
metrics.MachinesTerminatedCounter.With(prometheus.Labels{
metrics.NodesTerminatedCounter.With(prometheus.Labels{
metrics.ReasonLabel: reason,
metrics.ProvisionerLabel: candidate.provisioner.Name,
}).Inc()
Expand All @@ -180,7 +180,7 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman
// We wait for nodes to delete to ensure we don't start another round of deprovisioning until this node is fully
// deleted.
for _, oldCandidate := range command.candidates {
c.waitForDeletion(ctx, oldCandidate.Machine)
c.waitForDeletion(ctx, oldCandidate.Node)
}
return nil
}
Expand Down Expand Up @@ -230,8 +230,8 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name
var once sync.Once
pollStart := time.Now()
return retry.Do(func() error {
machine := &v1alpha5.Machine{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: name}, machine); err != nil {
node := &v1.Node{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: name}, node); err != nil {
// If the machine was deleted after a few seconds (to give the cache time to update), then we assume
// that the machine was deleted due to an Insufficient Capacity error
if errors.IsNotFound(err) && c.clock.Since(pollStart) > time.Second*5 {
Expand All @@ -240,12 +240,12 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name
return fmt.Errorf("getting machine, %w", err)
}
once.Do(func() {
c.recorder.Publish(deprovisioningevents.Launching(machine, action.String()))
c.recorder.Publish(deprovisioningevents.Launching(node, action.String()))
})
if !machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() {
if _, ok := node.Labels[v1alpha5.LabelNodeInitialized]; !ok {
// make the user aware of why deprovisioning is paused
c.recorder.Publish(deprovisioningevents.WaitingOnReadiness(machine))
return fmt.Errorf("machine is not initialized")
c.recorder.Publish(deprovisioningevents.WaitingOnReadiness(node))
return fmt.Errorf("node is not initialized")
}
return nil
}, waitRetryOptions...)
Expand All @@ -254,21 +254,21 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name
// waitForDeletion waits for the specified machine to be removed from the API server. This deletion can take some period
// of time if there are PDBs that govern pods on the machine as we need to wait until the node drains before
// it's actually deleted.
func (c *Controller) waitForDeletion(ctx context.Context, machine *v1alpha5.Machine) {
func (c *Controller) waitForDeletion(ctx context.Context, node *v1.Node) {
if err := retry.Do(func() error {
m := &v1alpha5.Machine{}
nerr := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(machine), m)
m := &v1.Node{}
nerr := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(node), m)
// We expect the not machine found error, at which point we know the machine is deleted.
if errors.IsNotFound(nerr) {
return nil
}
// make the user aware of why deprovisioning is paused
c.recorder.Publish(deprovisioningevents.WaitingOnDeletion(machine))
c.recorder.Publish(deprovisioningevents.WaitingOnDeletion(node))
if nerr != nil {
return fmt.Errorf("expected machine to be not found, %w", nerr)
return fmt.Errorf("expected node to be not found, %w", nerr)
}
// the machine still exists
return fmt.Errorf("expected machine to be not found")
return fmt.Errorf("expected node to be not found")
}, waitRetryOptions...,
); err != nil {
logging.FromContext(ctx).Errorf("Waiting on machine deletion, %s", err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/deprovisioning/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ func (d *Drift) ComputeCommand(ctx context.Context, candidates ...*Candidate) (C
}
// filter out machines that can't be terminated
candidates = lo.Filter(candidates, func(cn *Candidate, _ int) bool {
if !cn.Machine.DeletionTimestamp.IsZero() {
if !cn.Node.DeletionTimestamp.IsZero() {
return false
}
if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok {
d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...)
return false
}
if p, ok := hasDoNotEvictPod(cn); ok {
d.recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine,
d.recorder.Publish(deprovisioningevents.Blocked(cn.Node,
fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...)
return false
}
Expand All @@ -93,7 +93,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, candidates ...*Candidate) (C
}
// Log when all pods can't schedule, as the command will get executed immediately.
if !allPodsScheduled {
logging.FromContext(ctx).With("machine", candidate.Machine.Name, "node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods")
logging.FromContext(ctx).With("node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods")
}
if len(newMachines) == 0 {
return Command{
Expand Down
40 changes: 11 additions & 29 deletions pkg/controllers/deprovisioning/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ var _ = Describe("Drift", func() {
wg.Wait()

// Expect to not create or delete more machines
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
})
It("should ignore nodes with the drift label, but not the drifted value", func() {
node.Annotations = lo.Assign(node.Annotations, map[string]string{
Expand All @@ -96,8 +96,8 @@ var _ = Describe("Drift", func() {
wg.Wait()

// Expect to not create or delete more machines
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
})
It("should ignore nodes without the drift label", func() {
ExpectApplied(ctx, env.Client, machine, node, prov)
Expand All @@ -110,8 +110,8 @@ var _ = Describe("Drift", func() {
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})

// Expect to not create or delete more machines
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
})
It("can delete drifted nodes", func() {
node.Annotations = lo.Assign(node.Annotations, map[string]string{
Expand All @@ -129,13 +129,9 @@ var _ = Describe("Drift", func() {
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})
wg.Wait()

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine)

// We should delete the machine that has drifted
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0))
ExpectNotFound(ctx, env.Client, machine, node)
ExpectNotFound(ctx, env.Client, node)
})
It("can replace drifted nodes", func() {
labels := map[string]string{
Expand Down Expand Up @@ -175,21 +171,15 @@ var _ = Describe("Drift", func() {
// deprovisioning won't delete the old machine until the new machine is ready
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1)
ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1)
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})
wg.Wait()

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine)

ExpectNotFound(ctx, env.Client, machine, node)
ExpectNotFound(ctx, env.Client, node)

// Expect that the new machine was created and its different than the original
machines := ExpectMachines(ctx, env.Client)
nodes := ExpectNodes(ctx, env.Client)
Expect(machines).To(HaveLen(1))
Expect(nodes).To(HaveLen(1))
Expect(machines[0].Name).ToNot(Equal(machine.Name))
Expect(nodes[0].Name).ToNot(Equal(node.Name))
})
It("can replace drifted nodes with multiple nodes", func() {
Expand Down Expand Up @@ -279,16 +269,12 @@ var _ = Describe("Drift", func() {
// deprovisioning won't delete the old node until the new node is ready
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 3)
ExpectMakeNewNodesReady(ctx, env.Client, &wg, 3)
ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{})
wg.Wait()

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine)

// expect that drift provisioned three nodes, one for each pod
ExpectNotFound(ctx, env.Client, machine, node)
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3))
ExpectNotFound(ctx, env.Client, node)
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3))
})
It("should delete one drifted node at a time", func() {
Expand Down Expand Up @@ -338,11 +324,7 @@ var _ = Describe("Drift", func() {
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})
wg.Wait()

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2)

// Expect one of the nodes to be deleted
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
})
})
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (e *Emptiness) ShouldDeprovision(ctx context.Context, c *Candidate) bool {
// ComputeCommand generates a deprovisioning command given deprovisionable machines
func (e *Emptiness) ComputeCommand(_ context.Context, candidates ...*Candidate) (Command, error) {
emptyCandidates := lo.Filter(candidates, func(cn *Candidate, _ int) bool {
return cn.Machine.DeletionTimestamp.IsZero() && len(cn.pods) == 0
return cn.Node.DeletionTimestamp.IsZero() && len(cn.pods) == 0
})

if len(emptyCandidates) == 0 {
Expand Down
Loading