Skip to content

Commit

Permalink
Revert "Revert machine migration changes (kubernetes-sigs#176) (kuber…
Browse files Browse the repository at this point in the history
…netes-sigs#241)"

This reverts commit 9973eac.
  • Loading branch information
jonathan-innis committed Apr 5, 2023
1 parent c67f74b commit 394579a
Show file tree
Hide file tree
Showing 40 changed files with 492 additions and 799 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
k8s.io/client-go v0.25.4
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2
knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a
sigs.k8s.io/controller-runtime v0.13.1
sigs.k8s.io/controller-runtime v0.13.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,8 @@ knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a/go.mod h1:fckNBPf9bu5/p1RbnOh
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/controller-runtime v0.13.1 h1:tUsRCSJVM1QQOOeViGeX3GMT3dQF1eePPw6sEE3xSlg=
sigs.k8s.io/controller-runtime v0.13.1/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI=
sigs.k8s.io/controller-runtime v0.13.0 h1:iqa5RNciy7ADWnIc8QxCbOX5FEKVR3uxVxKHRMc2WIQ=
sigs.k8s.io/controller-runtime v0.13.0/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE=
Expand Down
3 changes: 1 addition & 2 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,9 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (
break
}
}
name := test.RandomName()
created := &v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: machine.Name,
Labels: lo.Assign(labels, machine.Labels),
Annotations: machine.Annotations,
},
Expand Down
6 changes: 5 additions & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/counter"
"github.com/aws/karpenter-core/pkg/controllers/deprovisioning"
"github.com/aws/karpenter-core/pkg/controllers/inflightchecks"
"github.com/aws/karpenter-core/pkg/controllers/machine"
metricspod "github.com/aws/karpenter-core/pkg/controllers/metrics/pod"
metricsprovisioner "github.com/aws/karpenter-core/pkg/controllers/metrics/provisioner"
metricsstate "github.com/aws/karpenter-core/pkg/controllers/metrics/state"
Expand Down Expand Up @@ -55,7 +56,7 @@ func NewControllers(

provisioner := provisioning.NewProvisioner(kubeClient, kubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
terminator := terminator.NewTerminator(clock, kubeClient, terminator.NewEvictionQueue(ctx, kubernetesInterface.CoreV1(), recorder))
return []controller.Controller{
controllers := []controller.Controller{
provisioner,
metricsstate.NewController(cluster),
deprovisioning.NewController(clock, kubeClient, provisioner, cloudProvider, recorder, cluster),
Expand All @@ -64,11 +65,14 @@ func NewControllers(
informer.NewNodeController(kubeClient, cluster),
informer.NewPodController(kubeClient, cluster),
informer.NewProvisionerController(kubeClient, cluster),
informer.NewMachineController(kubeClient, cluster),
node.NewController(clock, kubeClient, cloudProvider, cluster),
termination.NewController(kubeClient, cloudProvider, terminator, recorder),
metricspod.NewController(kubeClient),
metricsprovisioner.NewController(kubeClient),
counter.NewController(kubeClient, cluster),
inflightchecks.NewController(clock, kubeClient, recorder, cloudProvider),
}
controllers = append(controllers, machine.NewControllers(clock, kubeClient, cloudProvider)...)
return controllers
}
14 changes: 7 additions & 7 deletions pkg/controllers/deprovisioning/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []*Ca
// ShouldDeprovision is a predicate used to filter deprovisionable nodes
func (c *consolidation) ShouldDeprovision(_ context.Context, cn *Candidate) bool {
if val, ok := cn.Annotations()[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...)
return val != "true"
}
if cn.provisioner == nil {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, "provisioner is unknown")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, "provisioner is unknown")...)
return false
}
if cn.provisioner.Spec.Consolidation == nil || !ptr.BoolValue(cn.provisioner.Spec.Consolidation.Enabled) {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("provisioner %s has consolidation disabled", cn.provisioner.Name))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("provisioner %s has consolidation disabled", cn.provisioner.Name))...)
return false
}
return true
Expand All @@ -119,7 +119,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if !allPodsScheduled {
// This method is used by multi-node consolidation as well, so we'll only report in the single node case
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "not all pods would schedule")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "not all pods would schedule")...)
}
return Command{action: actionDoNothing}, nil
}
Expand All @@ -135,7 +135,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
// we're not going to turn a single node into multiple candidates
if len(newMachines) != 1 {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...)
}
return Command{action: actionDoNothing}, nil
}
Expand All @@ -149,7 +149,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
newMachines[0].InstanceTypeOptions = filterByPrice(newMachines[0].InstanceTypeOptions, newMachines[0].Requirements, nodesPrice)
if len(newMachines[0].InstanceTypeOptions) == 0 {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace with a cheaper node")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace with a cheaper node")...)
}
// no instance types remain after filtering by price
return Command{action: actionDoNothing}, nil
Expand All @@ -168,7 +168,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if allExistingAreSpot &&
newMachines[0].Requirements.Get(v1alpha5.LabelCapacityType).Has(v1alpha5.CapacityTypeSpot) {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace a spot node with a spot node")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace a spot node with a spot node")...)
}
return Command{action: actionDoNothing}, nil
}
Expand Down
36 changes: 18 additions & 18 deletions pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
for _, d := range c.deprovisioners {
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, d.ShouldDeprovision)
if err != nil {
return reconcile.Result{}, fmt.Errorf("determining candidates, %w", err)
return reconcile.Result{}, fmt.Errorf("determining candidate nodes, %w", err)
}
// If there are no candidate nodes, move to the next deprovisioner
if len(candidates) == 0 {
Expand All @@ -138,7 +138,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc

// Attempt to deprovision
if err := c.executeCommand(ctx, d, cmd); err != nil {
return reconcile.Result{}, fmt.Errorf("deprovisioning candidates, %w", err)
return reconcile.Result{}, fmt.Errorf("deprovisioning nodes, %w", err)
}
return reconcile.Result{Requeue: true}, nil
}
Expand All @@ -165,15 +165,15 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman
}

for _, candidate := range command.candidates {
c.recorder.Publish(deprovisioningevents.Terminating(candidate.Node, command.String())...)
c.recorder.Publish(deprovisioningevents.Terminating(candidate.Node, candidate.Machine, command.String())...)

if err := c.kubeClient.Delete(ctx, candidate.Node); err != nil {
if err := c.kubeClient.Delete(ctx, candidate.Machine); err != nil {
if errors.IsNotFound(err) {
continue
}
logging.FromContext(ctx).Errorf("terminating machine, %s", err)
} else {
metrics.NodesTerminatedCounter.With(prometheus.Labels{
metrics.MachinesTerminatedCounter.With(prometheus.Labels{
metrics.ReasonLabel: reason,
metrics.ProvisionerLabel: candidate.provisioner.Name,
}).Inc()
Expand All @@ -183,7 +183,7 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman
// We wait for nodes to delete to ensure we don't start another round of deprovisioning until this node is fully
// deleted.
for _, oldCandidate := range command.candidates {
c.waitForDeletion(ctx, oldCandidate.Node)
c.waitForDeletion(ctx, oldCandidate.Machine)
}
return nil
}
Expand Down Expand Up @@ -233,8 +233,8 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name
var once sync.Once
pollStart := time.Now()
return retry.Do(func() error {
node := &v1.Node{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: name}, node); err != nil {
machine := &v1alpha5.Machine{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: name}, machine); err != nil {
// If the machine was deleted after a few seconds (to give the cache time to update), then we assume
// that the machine was deleted due to an Insufficient Capacity error
if errors.IsNotFound(err) && c.clock.Since(pollStart) > time.Second*5 {
Expand All @@ -243,12 +243,12 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name
return fmt.Errorf("getting machine, %w", err)
}
once.Do(func() {
c.recorder.Publish(deprovisioningevents.Launching(node, action.String()))
c.recorder.Publish(deprovisioningevents.Launching(machine, action.String()))
})
if _, ok := node.Labels[v1alpha5.LabelNodeInitialized]; !ok {
if !machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() {
// make the user aware of why deprovisioning is paused
c.recorder.Publish(deprovisioningevents.WaitingOnReadiness(node))
return fmt.Errorf("node is not initialized")
c.recorder.Publish(deprovisioningevents.WaitingOnReadiness(machine))
return fmt.Errorf("machine is not initialized")
}
return nil
}, waitRetryOptions...)
Expand All @@ -257,21 +257,21 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name
// waitForDeletion waits for the specified machine to be removed from the API server. This deletion can take some period
// of time if there are PDBs that govern pods on the machine as we need to wait until the node drains before
// it's actually deleted.
func (c *Controller) waitForDeletion(ctx context.Context, node *v1.Node) {
func (c *Controller) waitForDeletion(ctx context.Context, machine *v1alpha5.Machine) {
if err := retry.Do(func() error {
m := &v1.Node{}
nerr := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(node), m)
m := &v1alpha5.Machine{}
nerr := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(machine), m)
// We expect the not machine found error, at which point we know the machine is deleted.
if errors.IsNotFound(nerr) {
return nil
}
// make the user aware of why deprovisioning is paused
c.recorder.Publish(deprovisioningevents.WaitingOnDeletion(node))
c.recorder.Publish(deprovisioningevents.WaitingOnDeletion(machine))
if nerr != nil {
return fmt.Errorf("expected node to be not found, %w", nerr)
return fmt.Errorf("expected machine to be not found, %w", nerr)
}
// the machine still exists
return fmt.Errorf("expected node to be not found")
return fmt.Errorf("expected machine to be not found")
}, waitRetryOptions...,
); err != nil {
logging.FromContext(ctx).Errorf("Waiting on machine deletion, %s", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, nodes ...*Candidate) (Comman
}
// Log when all pods can't schedule, as the command will get executed immediately.
if !allPodsScheduled {
logging.FromContext(ctx).With("node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods")
logging.FromContext(ctx).With("machine", candidate.Machine.Name, "node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods")
}
if len(newMachines) == 0 {
return Command{
Expand Down
40 changes: 29 additions & 11 deletions pkg/controllers/deprovisioning/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ var _ = Describe("Drift", func() {
wg.Wait()

// Expect to not create or delete more machines
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
})
It("should ignore nodes with the disrupted annotation key, but not the drifted value", func() {
node.Annotations = lo.Assign(node.Annotations, map[string]string{
Expand All @@ -99,8 +99,8 @@ var _ = Describe("Drift", func() {
wg.Wait()

// Expect to not create or delete more machines
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
})
It("should ignore nodes without the disrupted annotation key", func() {
delete(node.Annotations, v1alpha5.VoluntaryDisruptionAnnotationKey)
Expand All @@ -114,8 +114,8 @@ var _ = Describe("Drift", func() {
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})

// Expect to not create or delete more machines
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
})
It("can delete drifted nodes", func() {
node.Annotations = lo.Assign(node.Annotations, map[string]string{
Expand All @@ -133,9 +133,13 @@ var _ = Describe("Drift", func() {
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})
wg.Wait()

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine)

// We should delete the machine that has drifted
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0))
ExpectNotFound(ctx, env.Client, node)
ExpectNotFound(ctx, env.Client, machine, node)
})
It("can replace drifted nodes", func() {
labels := map[string]string{
Expand Down Expand Up @@ -175,15 +179,21 @@ var _ = Describe("Drift", func() {
// deprovisioning won't delete the old machine until the new machine is ready
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1)
ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1)
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})
wg.Wait()

ExpectNotFound(ctx, env.Client, node)
// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine)

ExpectNotFound(ctx, env.Client, machine, node)

// Expect that the new machine was created and its different than the original
machines := ExpectMachines(ctx, env.Client)
nodes := ExpectNodes(ctx, env.Client)
Expect(machines).To(HaveLen(1))
Expect(nodes).To(HaveLen(1))
Expect(machines[0].Name).ToNot(Equal(machine.Name))
Expect(nodes[0].Name).ToNot(Equal(node.Name))
})
It("can replace drifted nodes with multiple nodes", func() {
Expand Down Expand Up @@ -273,12 +283,16 @@ var _ = Describe("Drift", func() {
// deprovisioning won't delete the old node until the new node is ready
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectMakeNewNodesReady(ctx, env.Client, &wg, 3)
ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 3)
ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{})
wg.Wait()

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine)

// expect that drift provisioned three nodes, one for each pod
ExpectNotFound(ctx, env.Client, node)
ExpectNotFound(ctx, env.Client, machine, node)
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3))
})
It("should delete one drifted node at a time", func() {
Expand Down Expand Up @@ -328,7 +342,11 @@ var _ = Describe("Drift", func() {
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})
wg.Wait()

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2)

// Expect one of the nodes to be deleted
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
})
})
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (e *Emptiness) ShouldDeprovision(ctx context.Context, c *Candidate) bool {
// ComputeCommand generates a deprovisioning command given deprovisionable machines
func (e *Emptiness) ComputeCommand(_ context.Context, candidates ...*Candidate) (Command, error) {
emptyCandidates := lo.Filter(candidates, func(cn *Candidate, _ int) bool {
return cn.Node.DeletionTimestamp.IsZero() && len(cn.pods) == 0
return cn.Machine.DeletionTimestamp.IsZero() && len(cn.pods) == 0
})

if len(emptyCandidates) == 0 {
Expand Down
Loading

0 comments on commit 394579a

Please sign in to comment.