Skip to content

Commit

Permalink
chore: Machine Disruption Controller (kubernetes-sigs#319)
Browse files Browse the repository at this point in the history
* Add controller to do disruption on machines

* Remove sub-reconcilers for node disruption

* Break-up sub-reconciler testing into separate files

* Update tests to reconcile against machine

* PR comment for condition checking clarity

* Re-reconcile disruption controller on node changes

* Create separate status conditions for different deprovisioning conditions

* Fix comments with 'node' naming

* PR comments and updates
  • Loading branch information
jonathan-innis authored Jul 9, 2023
1 parent 9d5d943 commit c8eec72
Show file tree
Hide file tree
Showing 29 changed files with 1,160 additions and 905 deletions.
3 changes: 1 addition & 2 deletions pkg/apis/crds/karpenter.sh_machines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.11.3
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.12.0
name: machines.karpenter.sh
spec:
group: karpenter.sh
Expand Down
3 changes: 1 addition & 2 deletions pkg/apis/crds/karpenter.sh_provisioners.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.11.3
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.12.0
name: provisioners.karpenter.sh
spec:
group: karpenter.sh
Expand Down
5 changes: 0 additions & 5 deletions pkg/apis/v1alpha5/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,10 @@ const (
DoNotEvictPodAnnotationKey = Group + "/do-not-evict"
DoNotConsolidateNodeAnnotationKey = Group + "/do-not-consolidate"
EmptinessTimestampAnnotationKey = Group + "/emptiness-timestamp"
VoluntaryDisruptionAnnotationKey = Group + "/voluntary-disruption"
MachineLinkedAnnotationKey = Group + "/linked"
MachineManagedByAnnotationKey = Group + "/managed-by"

ProviderCompatabilityAnnotationKey = CompatabilityGroup + "/provider"

// Karpenter specific annotation values
VoluntaryDisruptionDriftedAnnotationValue = "drifted"
VoluntaryDisruptionExpiredAnnotationValue = "expired"
)

// Karpenter specific finalizers
Expand Down
15 changes: 10 additions & 5 deletions pkg/apis/v1alpha5/machine_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,22 @@ type MachineStatus struct {
}

func (in *Machine) StatusConditions() apis.ConditionManager {
return apis.NewLivingConditionSet(
MachineLaunched,
MachineRegistered,
MachineInitialized,
).Manage(in)
return apis.NewLivingConditionSet(LivingConditions...).Manage(in)
}

var LivingConditions = []apis.ConditionType{
MachineLaunched,
MachineRegistered,
MachineInitialized,
}

var (
MachineLaunched apis.ConditionType = "MachineLaunched"
MachineRegistered apis.ConditionType = "MachineRegistered"
MachineInitialized apis.ConditionType = "MachineInitialized"
MachineDrifted apis.ConditionType = "MachineDrifted"
MachineEmpty apis.ConditionType = "MachineEmpty"
MachineExpired apis.ConditionType = "MachineExpired"
)

func (in *Machine) GetConditions() apis.Conditions {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/v1alpha5/provisioner_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *ProvisionerSpec) validateTTLSecondsAfterEmpty() (errs *apis.FieldError)
}

// Validate the constraints
func (s *ProvisionerSpec) Validate(ctx context.Context) (errs *apis.FieldError) {
func (s *ProvisionerSpec) Validate(_ context.Context) (errs *apis.FieldError) {
return errs.Also(
s.validateProvider(),
s.validateLabels(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/consistency/nodeshape.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewNodeShape(provider cloudprovider.CloudProvider) Check {
}
}

func (n *NodeShape) Check(ctx context.Context, node *v1.Node, machine *v1alpha5.Machine) ([]Issue, error) {
func (n *NodeShape) Check(_ context.Context, node *v1.Node, machine *v1alpha5.Machine) ([]Issue, error) {
// ignore machines that are deleting
if !machine.DeletionTimestamp.IsZero() {
return nil, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/consistency"
"github.com/aws/karpenter-core/pkg/controllers/counter"
"github.com/aws/karpenter-core/pkg/controllers/deprovisioning"
machinedisruption "github.com/aws/karpenter-core/pkg/controllers/machine/disruption"
machinegarbagecollection "github.com/aws/karpenter-core/pkg/controllers/machine/garbagecollection"
machinelifecycle "github.com/aws/karpenter-core/pkg/controllers/machine/lifecycle"
machinetermination "github.com/aws/karpenter-core/pkg/controllers/machine/termination"
metricspod "github.com/aws/karpenter-core/pkg/controllers/metrics/pod"
metricsprovisioner "github.com/aws/karpenter-core/pkg/controllers/metrics/provisioner"
metricsstate "github.com/aws/karpenter-core/pkg/controllers/metrics/state"
"github.com/aws/karpenter-core/pkg/controllers/node"
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/controllers/state/informer"
Expand Down Expand Up @@ -64,7 +64,6 @@ func NewControllers(
informer.NewPodController(kubeClient, cluster),
informer.NewProvisionerController(kubeClient, cluster),
informer.NewMachineController(kubeClient, cluster),
node.NewController(clock, kubeClient, cloudProvider, cluster),
termination.NewController(kubeClient, cloudProvider, terminator, recorder),
metricspod.NewController(kubeClient),
metricsprovisioner.NewController(kubeClient),
Expand All @@ -73,5 +72,6 @@ func NewControllers(
machinelifecycle.NewController(clock, kubeClient, cloudProvider),
machinegarbagecollection.NewController(clock, kubeClient, cloudProvider),
machinetermination.NewController(kubeClient, cloudProvider),
machinedisruption.NewController(clock, kubeClient, cluster, cloudProvider),
}
}
8 changes: 3 additions & 5 deletions pkg/controllers/deprovisioning/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@ func NewDrift(kubeClient client.Client, cluster *state.Cluster, provisioner *pro

// ShouldDeprovision is a predicate used to filter deprovisionable machines
func (d *Drift) ShouldDeprovision(ctx context.Context, c *Candidate) bool {
// Look up the feature flag to see if we should deprovision the machine because of drift.
if !settings.FromContext(ctx).DriftEnabled {
return false
}
return c.Annotations()[v1alpha5.VoluntaryDisruptionAnnotationKey] == v1alpha5.VoluntaryDisruptionDriftedAnnotationValue
return settings.FromContext(ctx).DriftEnabled &&
c.Machine.StatusConditions().GetCondition(v1alpha5.MachineDrifted) != nil &&
c.Machine.StatusConditions().GetCondition(v1alpha5.MachineDrifted).IsTrue()
}

// ComputeCommand generates a deprovisioning command given deprovisionable machines
Expand Down
61 changes: 22 additions & 39 deletions pkg/controllers/deprovisioning/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ var _ = Describe("Drift", func() {
prov = test.Provisioner()
machine, node = test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionDriftedAnnotationValue,
},
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: prov.Name,
v1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
Expand All @@ -63,6 +60,7 @@ var _ = Describe("Drift", func() {
},
},
})
machine.StatusConditions().MarkTrue(v1alpha5.MachineDrifted)
})
It("should ignore drifted nodes if the feature flag is disabled", func() {
ctx = settings.ToContext(ctx, test.Settings(settings.Settings{DriftEnabled: false}))
Expand All @@ -82,28 +80,23 @@ var _ = Describe("Drift", func() {
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
})
It("should ignore nodes with the disrupted annotation key, but not the drifted value", func() {
node.Annotations = lo.Assign(node.Annotations, map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: "wrong-value",
})
It("should ignore nodes without the drifted status condition", func() {
_ = machine.StatusConditions().ClearCondition(v1alpha5.MachineDrifted)
ExpectApplied(ctx, env.Client, machine, node, prov)

// inform cluster state about nodes and machines
ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine})

fakeClock.Step(10 * time.Minute)

var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})
wg.Wait()

// Expect to not create or delete more machines
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
})
It("should ignore nodes without the disrupted annotation key", func() {
delete(node.Annotations, v1alpha5.VoluntaryDisruptionAnnotationKey)
It("should ignore nodes with the drifted status condition set to false", func() {
machine.StatusConditions().MarkFalse(v1alpha5.MachineDrifted, "", "")
ExpectApplied(ctx, env.Client, machine, node, prov)

// inform cluster state about nodes and machines
Expand All @@ -118,9 +111,6 @@ var _ = Describe("Drift", func() {
ExpectExists(ctx, env.Client, machine)
})
It("can delete drifted nodes", func() {
node.Annotations = lo.Assign(node.Annotations, map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionDriftedAnnotationValue,
})
ExpectApplied(ctx, env.Client, machine, node, prov)

// inform cluster state about nodes and machines
Expand All @@ -144,9 +134,6 @@ var _ = Describe("Drift", func() {
It("should deprovision all empty drifted nodes in parallel", func() {
machines, nodes := test.MachinesAndNodes(100, v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionDriftedAnnotationValue,
},
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: prov.Name,
v1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
Expand All @@ -162,6 +149,7 @@ var _ = Describe("Drift", func() {
},
})
for _, m := range machines {
m.StatusConditions().MarkTrue(v1alpha5.MachineDrifted)
ExpectApplied(ctx, env.Client, m)
}
for _, n := range nodes {
Expand Down Expand Up @@ -206,9 +194,6 @@ var _ = Describe("Drift", func() {
},
}}})

node.Annotations = lo.Assign(node.Annotations, map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionDriftedAnnotationValue,
})
ExpectApplied(ctx, env.Client, rs, pod, machine, node, prov)

// bind the pods to the node
Expand Down Expand Up @@ -294,23 +279,19 @@ var _ = Describe("Drift", func() {
},
})

machine, node = test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: prov.Name,
v1.LabelInstanceTypeStable: currentInstance.Name,
v1alpha5.LabelCapacityType: currentInstance.Offerings[0].CapacityType,
v1.LabelTopologyZone: currentInstance.Offerings[0].Zone,
},
},
Status: v1alpha5.MachineStatus{
ProviderID: test.RandomProviderID(),
Allocatable: map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("8")},
},
machine.Labels = lo.Assign(machine.Labels, map[string]string{
v1.LabelInstanceTypeStable: currentInstance.Name,
v1alpha5.LabelCapacityType: currentInstance.Offerings[0].CapacityType,
v1.LabelTopologyZone: currentInstance.Offerings[0].Zone,
})
node.Annotations = lo.Assign(node.Annotations, map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionDriftedAnnotationValue,
machine.Status.Allocatable = map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("8")}
node.Labels = lo.Assign(node.Labels, map[string]string{
v1.LabelInstanceTypeStable: currentInstance.Name,
v1alpha5.LabelCapacityType: currentInstance.Offerings[0].CapacityType,
v1.LabelTopologyZone: currentInstance.Offerings[0].Zone,
})
node.Status.Allocatable = map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("8")}

ExpectApplied(ctx, env.Client, rs, machine, node, prov, pods[0], pods[1], pods[2])

// bind the pods to the node
Expand Down Expand Up @@ -360,13 +341,14 @@ var _ = Describe("Drift", func() {
},
},
},
// Make each pod request only fit on a single node
ResourceRequirements: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("30")},
},
})

machine2, node2 := test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionExpiredAnnotationValue,
},
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: prov.Name,
v1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
Expand All @@ -379,6 +361,7 @@ var _ = Describe("Drift", func() {
Allocatable: map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("32")},
},
})
machine2.StatusConditions().MarkTrue(v1alpha5.MachineDrifted)

ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], machine, node, machine2, node2, prov)

Expand Down
29 changes: 5 additions & 24 deletions pkg/controllers/deprovisioning/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@ package deprovisioning

import (
"context"
"time"

"k8s.io/utils/clock"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"

"github.com/samber/lo"
"k8s.io/utils/clock"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/metrics"
machineutil "github.com/aws/karpenter-core/pkg/utils/machine"
)

// Emptiness is a subreconciler that deletes empty machines.
Expand All @@ -41,24 +37,9 @@ func NewEmptiness(clk clock.Clock) *Emptiness {
}

// ShouldDeprovision is a predicate used to filter deprovisionable machines
func (e *Emptiness) ShouldDeprovision(ctx context.Context, c *Candidate) bool {
if c.provisioner == nil || c.provisioner.Spec.TTLSecondsAfterEmpty == nil || len(c.pods) != 0 {
return false
}

emptinessTimestamp, hasEmptinessTimestamp := c.Node.Annotations[v1alpha5.EmptinessTimestampAnnotationKey]
if !hasEmptinessTimestamp {
return false
}
ttl := time.Duration(ptr.Int64Value(c.provisioner.Spec.TTLSecondsAfterEmpty)) * time.Second

emptinessTime, err := time.Parse(time.RFC3339, emptinessTimestamp)
if err != nil {
logging.FromContext(ctx).With("emptiness-timestamp", emptinessTimestamp).Errorf("unable to parse emptiness timestamp")
return true
}
// Don't deprovision if node's emptiness timestamp is before the emptiness TTL
return e.clock.Now().After(emptinessTime.Add(ttl))
func (e *Emptiness) ShouldDeprovision(_ context.Context, c *Candidate) bool {
return c.provisioner.Spec.TTLSecondsAfterEmpty != nil &&
machineutil.IsPastEmptinessTTL(c.Machine, e.clock, c.provisioner)
}

// ComputeCommand generates a deprovisioning command given deprovisionable machines
Expand Down
17 changes: 7 additions & 10 deletions pkg/controllers/deprovisioning/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/metrics"
"github.com/aws/karpenter-core/pkg/utils/node"
machineutil "github.com/aws/karpenter-core/pkg/utils/machine"
)

// Expiration is a subreconciler that deletes empty nodes.
Expand All @@ -58,13 +58,10 @@ func NewExpiration(clk clock.Clock, kubeClient client.Client, cluster *state.Clu
}

// ShouldDeprovision is a predicate used to filter deprovisionable nodes
func (e *Expiration) ShouldDeprovision(ctx context.Context, c *Candidate) bool {
// Filter out nodes without the TTL defined or expired.
if c.provisioner == nil || c.provisioner.Spec.TTLSecondsUntilExpired == nil {
return false
}

return c.Node.Annotations[v1alpha5.VoluntaryDisruptionAnnotationKey] == v1alpha5.VoluntaryDisruptionExpiredAnnotationValue
func (e *Expiration) ShouldDeprovision(_ context.Context, c *Candidate) bool {
return c.provisioner.Spec.TTLSecondsUntilExpired != nil &&
c.Machine.StatusConditions().GetCondition(v1alpha5.MachineExpired) != nil &&
c.Machine.StatusConditions().GetCondition(v1alpha5.MachineExpired).IsTrue()
}

// SortCandidates orders expired nodes by when they've expired
Expand All @@ -74,7 +71,7 @@ func (e *Expiration) filterAndSortCandidates(ctx context.Context, nodes []*Candi
return nil, fmt.Errorf("filtering candidates, %w", err)
}
sort.Slice(candidates, func(i int, j int) bool {
return node.GetExpirationTime(candidates[i].Node, candidates[i].provisioner).Before(node.GetExpirationTime(candidates[j].Node, candidates[j].provisioner))
return machineutil.GetExpirationTime(candidates[i].Machine, candidates[i].provisioner).Before(machineutil.GetExpirationTime(candidates[j].Machine, candidates[j].provisioner))
})
return candidates, nil
}
Expand Down Expand Up @@ -112,7 +109,7 @@ func (e *Expiration) ComputeCommand(ctx context.Context, nodes ...*Candidate) (C
}

logging.FromContext(ctx).With("ttl", time.Duration(ptr.Int64Value(candidates[0].provisioner.Spec.TTLSecondsUntilExpired))*time.Second).
With("delay", time.Since(node.GetExpirationTime(candidates[0].Node, candidates[0].provisioner))).Infof("triggering termination for expired node after TTL")
With("delay", time.Since(machineutil.GetExpirationTime(candidates[0].Machine, candidates[0].provisioner))).Infof("triggering termination for expired node after TTL")
return Command{
candidates: []*Candidate{candidate},
replacements: results.NewMachines,
Expand Down
Loading

0 comments on commit c8eec72

Please sign in to comment.