Skip to content

Commit

Permalink
fix: update metric when there are zero disruption candidates (#1187)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed May 7, 2024
1 parent 11d9fd2 commit 6197752
Show file tree
Hide file tree
Showing 17 changed files with 256 additions and 89 deletions.
47 changes: 47 additions & 0 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,53 @@ var _ = Describe("Consolidation", func() {
Expect(recorder.Calls("Unconsolidatable")).To(Equal(6))
})
})
Context("Metrics", func() {
var consolidationTypes = []string{"empty", "single", "multi"}
It("should correctly report eligible nodes", func() {
pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1beta1.DoNotDisruptAnnotationKey: "true",
},
},
})
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod)
ExpectManualBinding(ctx, env.Client, pod, node)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node}, []*v1beta1.NodeClaim{nodeClaim})

fakeClock.Step(10 * time.Minute)
wg := sync.WaitGroup{}
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, disruptionController, types.NamespacedName{})
wg.Wait()

for _, ct := range consolidationTypes {
ExpectMetricGaugeValue(disruption.EligibleNodesGauge, 0, map[string]string{
"method": "consolidation",
"consolidation_type": ct,
})
}

// remove the do-not-disrupt annotation to make the node eligible for consolidation and update cluster state
pod.SetAnnotations(map[string]string{})
ExpectApplied(ctx, env.Client, pod)
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node}, []*v1beta1.NodeClaim{nodeClaim})

fakeClock.Step(10 * time.Minute)
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, disruptionController, types.NamespacedName{})
wg.Wait()

for _, ct := range consolidationTypes {
ExpectMetricGaugeValue(disruption.EligibleNodesGauge, 1, map[string]string{
"method": "consolidation",
"consolidation_type": ct,
})
}
})
})
Context("Budgets", func() {
var numNodes = 10
var nodeClaims []*v1beta1.NodeClaim
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro
if err != nil {
return false, fmt.Errorf("determining candidates, %w", err)
}
EligibleNodesGauge.With(map[string]string{
methodLabel: disruption.Type(),
consolidationTypeLabel: disruption.ConsolidationType(),
}).Set(float64(len(candidates)))

// If there are no candidates, move to the next disruption
if len(candidates) == 0 {
return false, nil
Expand Down
4 changes: 0 additions & 4 deletions pkg/controllers/disruption/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
return candidates[i].NodeClaim.StatusConditions().GetCondition(v1beta1.Drifted).LastTransitionTime.Inner.Time.Before(
candidates[j].NodeClaim.StatusConditions().GetCondition(v1beta1.Drifted).LastTransitionTime.Inner.Time)
})
EligibleNodesGauge.With(map[string]string{
methodLabel: d.Type(),
consolidationTypeLabel: d.ConsolidationType(),
}).Set(float64(len(candidates)))

// Do a quick check through the candidates to see if they're empty.
// For each candidate that is empty with a nodePool allowing its disruption
Expand Down
41 changes: 41 additions & 0 deletions pkg/controllers/disruption/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/disruption"
"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
Expand Down Expand Up @@ -78,6 +79,46 @@ var _ = Describe("Drift", func() {
})
nodeClaim.StatusConditions().MarkTrue(v1beta1.Drifted)
})
Context("Metrics", func() {
var eligibleNodesLabels = map[string]string{
"method": "drift",
"consolidation_type": "",
}
It("should correctly report eligible nodes", func() {
pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1beta1.DoNotDisruptAnnotationKey: "true",
},
},
})
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod)
ExpectManualBinding(ctx, env.Client, pod, node)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node}, []*v1beta1.NodeClaim{nodeClaim})

fakeClock.Step(10 * time.Minute)
wg := sync.WaitGroup{}
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, disruptionController, types.NamespacedName{})
wg.Wait()

ExpectMetricGaugeValue(disruption.EligibleNodesGauge, 0, eligibleNodesLabels)

// remove the do-not-disrupt annotation to make the node eligible for drift and update cluster state
pod.SetAnnotations(map[string]string{})
ExpectApplied(ctx, env.Client, pod)
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node}, []*v1beta1.NodeClaim{nodeClaim})

fakeClock.Step(10 * time.Minute)
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, disruptionController, types.NamespacedName{})
wg.Wait()

ExpectMetricGaugeValue(disruption.EligibleNodesGauge, 1, eligibleNodesLabels)
})
})
Context("Budgets", func() {
var numNodes = 10
var nodeClaims []*v1beta1.NodeClaim
Expand Down
34 changes: 11 additions & 23 deletions pkg/controllers/disruption/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,36 +57,24 @@ func (e *Emptiness) ShouldDisrupt(_ context.Context, c *Candidate) bool {
e.recorder.Publish(disruptionevents.Unconsolidatable(c.Node, c.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", c.nodePool.Name))...)
return false
}
if len(c.reschedulablePods) != 0 {
return false
}
return c.NodeClaim.StatusConditions().GetCondition(v1beta1.Empty).IsTrue() &&
!e.clock.Now().Before(c.NodeClaim.StatusConditions().GetCondition(v1beta1.Empty).LastTransitionTime.Inner.Add(*c.nodePool.Spec.Disruption.ConsolidateAfter.Duration))
}

// ComputeCommand generates a disruption command given candidates
func (e *Emptiness) ComputeCommand(_ context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, scheduling.Results, error) {
// First check how many nodes are empty so that we can emit a metric on how many nodes are eligible
emptyCandidates := lo.Filter(candidates, func(cn *Candidate, _ int) bool {
return cn.NodeClaim.DeletionTimestamp.IsZero() && len(cn.reschedulablePods) == 0
})

EligibleNodesGauge.With(map[string]string{
methodLabel: e.Type(),
consolidationTypeLabel: e.ConsolidationType(),
}).Set(float64(len(candidates)))

empty := make([]*Candidate, 0, len(emptyCandidates))
for _, candidate := range emptyCandidates {
if len(candidate.reschedulablePods) > 0 {
continue
}
// If there's disruptions allowed for the candidate's nodepool,
// add it to the list of candidates, and decrement the budget.
if disruptionBudgetMapping[candidate.nodePool.Name] > 0 {
empty = append(empty, candidate)
disruptionBudgetMapping[candidate.nodePool.Name]--
}
}
return Command{
candidates: empty,
candidates: lo.Filter(candidates, func(c *Candidate, _ int) bool {
// Include candidate iff disruptions are allowed for its nodepool.
if disruptionBudgetMapping[c.nodePool.Name] > 0 {
disruptionBudgetMapping[c.nodePool.Name]--
return true
}
return false
}),
}, scheduling.Results{}, nil
}

Expand Down
34 changes: 34 additions & 0 deletions pkg/controllers/disruption/emptiness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"sigs.k8s.io/karpenter/pkg/apis/v1alpha5"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/controllers/disruption"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
)
Expand Down Expand Up @@ -105,6 +106,39 @@ var _ = Describe("Emptiness", func() {
Expect(recorder.Calls("Unconsolidatable")).To(Equal(2))
})
})
Context("Metrics", func() {
var eligibleNodesEmptinessLabels = map[string]string{
"method": "emptiness",
"consolidation_type": "",
}
It("should correctly report eligible nodes", func() {
pod := test.Pod()
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod)
ExpectManualBinding(ctx, env.Client, pod, node)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node}, []*v1beta1.NodeClaim{nodeClaim})

fakeClock.Step(10 * time.Minute)
wg := sync.WaitGroup{}
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, disruptionController, types.NamespacedName{})
wg.Wait()

ExpectMetricGaugeValue(disruption.EligibleNodesGauge, 0, eligibleNodesEmptinessLabels)

// delete pod and update cluster state, node should now be disruptable
ExpectDeleted(ctx, env.Client, pod)
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node}, []*v1beta1.NodeClaim{nodeClaim})

fakeClock.Step(10 * time.Minute)
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, disruptionController, types.NamespacedName{})
wg.Wait()

ExpectMetricGaugeValue(disruption.EligibleNodesGauge, 1, eligibleNodesEmptinessLabels)
})
})
Context("Budgets", func() {
var numNodes = 10
var nodeClaims []*v1beta1.NodeClaim
Expand Down
4 changes: 0 additions & 4 deletions pkg/controllers/disruption/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
return Command{}, scheduling.Results{}, nil
}
candidates = c.sortCandidates(candidates)
EligibleNodesGauge.With(map[string]string{
methodLabel: c.Type(),
consolidationTypeLabel: c.ConsolidationType(),
}).Set(float64(len(candidates)))

empty := make([]*Candidate, 0, len(candidates))
constrainedByBudgets := false
Expand Down
4 changes: 0 additions & 4 deletions pkg/controllers/disruption/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ func (e *Expiration) ComputeCommand(ctx context.Context, disruptionBudgetMapping
return candidates[i].NodeClaim.StatusConditions().GetCondition(v1beta1.Expired).LastTransitionTime.Inner.Time.Before(
candidates[j].NodeClaim.StatusConditions().GetCondition(v1beta1.Expired).LastTransitionTime.Inner.Time)
})
EligibleNodesGauge.With(map[string]string{
methodLabel: e.Type(),
consolidationTypeLabel: e.ConsolidationType(),
}).Set(float64(len(candidates)))

// Do a quick check through the candidates to see if they're empty.
// For each candidate that is empty with a nodePool allowing its disruption
Expand Down
41 changes: 41 additions & 0 deletions pkg/controllers/disruption/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/disruption"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
)
Expand Down Expand Up @@ -76,6 +77,46 @@ var _ = Describe("Expiration", func() {
})
nodeClaim.StatusConditions().MarkTrue(v1beta1.Expired)
})
Context("Metrics", func() {
var eligibleNodesLabels = map[string]string{
"method": "expiration",
"consolidation_type": "",
}
It("should correctly report eligible nodes", func() {
pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1beta1.DoNotDisruptAnnotationKey: "true",
},
},
})
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod)
ExpectManualBinding(ctx, env.Client, pod, node)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node}, []*v1beta1.NodeClaim{nodeClaim})

fakeClock.Step(10 * time.Minute)
wg := sync.WaitGroup{}
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, disruptionController, types.NamespacedName{})
wg.Wait()

ExpectMetricGaugeValue(disruption.EligibleNodesGauge, 0, eligibleNodesLabels)

// remove the do-not-disrupt annotation to make the node eligible for drift and update cluster state
pod.SetAnnotations(map[string]string{})
ExpectApplied(ctx, env.Client, pod)
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node}, []*v1beta1.NodeClaim{nodeClaim})

fakeClock.Step(10 * time.Minute)
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, disruptionController, types.NamespacedName{})
wg.Wait()

ExpectMetricGaugeValue(disruption.EligibleNodesGauge, 1, eligibleNodesLabels)
})
})
Context("Budgets", func() {
var numNodes = 10
var nodeClaims []*v1beta1.NodeClaim
Expand Down
4 changes: 0 additions & 4 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
return Command{}, scheduling.Results{}, nil
}
candidates = m.sortCandidates(candidates)
EligibleNodesGauge.With(map[string]string{
methodLabel: m.Type(),
consolidationTypeLabel: m.ConsolidationType(),
}).Set(float64(len(candidates)))

// In order, filter out all candidates that would violate the budget.
// Since multi-node consolidation relies on the ordering of
Expand Down
4 changes: 0 additions & 4 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
return Command{}, scheduling.Results{}, nil
}
candidates = s.sortCandidates(candidates)
EligibleNodesGauge.With(map[string]string{
methodLabel: s.Type(),
consolidationTypeLabel: s.ConsolidationType(),
}).Set(float64(len(candidates)))

v := NewValidation(s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue)

Expand Down
Loading

0 comments on commit 6197752

Please sign in to comment.