Skip to content

Commit

Permalink
feat: set IgnoreDaemonSetsUtilization per nodegroup
Browse files Browse the repository at this point in the history
- WIP
- this is a first quick and dirty implementation
Signed-off-by: vadasambar <surajrbanakar@gmail.com>
  • Loading branch information
vadasambar committed Apr 10, 2023
1 parent 88bc588 commit 7fa229d
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 25 deletions.
9 changes: 9 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ func (m *AwsManager) GetAsgOptions(asg asg, defaults config.NodeGroupAutoscaling
}
}

if stringOpt, found := options[config.DefaultIgnoreDaemonSetsUtilizationKey]; found {
if opt, err := strconv.ParseBool(stringOpt); err != nil {
klog.Warningf("failed to convert asg %s %s tag to bool: %v",
asg.Name, config.DefaultScaleDownUnreadyTimeKey, err)
} else {
defaults.IgnoreDaemonSetsUtilization = opt
}
}

return &defaults
}

Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type NodeGroupAutoscalingOptions struct {
ScaleDownUnneededTime time.Duration
// ScaleDownUnreadyTime represents how long an unready node should be unneeded before it is eligible for scale down
ScaleDownUnreadyTime time.Duration
// IgnoreDaemonSetsUtilization sets if daemonsets utilization should be considered during node scale-down
IgnoreDaemonSetsUtilization bool
}

const (
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ const (
DefaultScaleDownUnneededTimeKey = "scaledownunneededtime"
// DefaultScaleDownUnreadyTimeKey identifies ScaleDownUnreadyTime autoscaling option
DefaultScaleDownUnreadyTimeKey = "scaledownunreadytime"
// DefaultIgnoreDaemonSetsUtilizationKey identifies IgnoreDaemonSetsUtilization autoscaling option
DefaultIgnoreDaemonSetsUtilizationKey = "ignoredaemonsetsutilization"
)
14 changes: 12 additions & 2 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
Expand All @@ -51,10 +53,11 @@ type Actuator struct {
nodeDeletionBatcher *NodeDeletionBatcher
evictor Evictor
deleteOptions simulator.NodeDeleteOptions
nodeConfigProcessor nodegroupconfig.NodeGroupConfigProcessor
}

// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions) *Actuator {
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions, processors *processors.AutoscalingProcessors) *Actuator {
nbd := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval)
return &Actuator{
ctx: ctx,
Expand All @@ -63,6 +66,7 @@ func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterState
nodeDeletionBatcher: nbd,
evictor: NewDefaultEvictor(deleteOptions, ndt),
deleteOptions: deleteOptions,
nodeConfigProcessor: processors.NodeGroupConfigProcessor,
}
}

Expand Down Expand Up @@ -306,8 +310,14 @@ func (a *Actuator) scaleDownNodeToReport(node *apiv1.Node, drain bool) (*status.
if err != nil {
return nil, err
}

ignoreDaemonSetsUtilization, err := a.nodeConfigProcessor.GetIgnoreDaemonSetsUtilization(a.ctx, nodeGroup)
if err != nil {
return nil, err
}

gpuConfig := a.ctx.CloudProvider.GetNodeGpuConfig(node)
utilInfo, err := utilization.Calculate(nodeInfo, a.ctx.IgnoreDaemonSetsUtilization, a.ctx.IgnoreMirrorPodsUtilization, gpuConfig, time.Now())
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, a.ctx.IgnoreMirrorPodsUtilization, gpuConfig, time.Now())
if err != nil {
return nil, err
}
Expand Down
43 changes: 26 additions & 17 deletions cluster-autoscaler/core/scaledown/eligibility/eligibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/klogx"
Expand All @@ -41,20 +42,22 @@ const (

// Checker is responsible for deciding which nodes pass the criteria for scale down.
type Checker struct {
thresholdGetter utilizationThresholdGetter
// thresholdGetter utilizationThresholdGetter
nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor
}

type utilizationThresholdGetter interface {
// GetScaleDownUtilizationThreshold returns ScaleDownUtilizationThreshold value that should be used for a given NodeGroup.
GetScaleDownUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error)
// GetScaleDownGpuUtilizationThreshold returns ScaleDownGpuUtilizationThreshold value that should be used for a given NodeGroup.
GetScaleDownGpuUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error)
}
// type utilizationThresholdGetter interface {
// // GetScaleDownUtilizationThreshold returns ScaleDownUtilizationThreshold value that should be used for a given NodeGroup.
// GetScaleDownUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error)
// // GetScaleDownGpuUtilizationThreshold returns ScaleDownGpuUtilizationThreshold value that should be used for a given NodeGroup.
// GetScaleDownGpuUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error)
// }

// NewChecker creates a new Checker object.
func NewChecker(thresholdGetter utilizationThresholdGetter) *Checker {
func NewChecker(nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor) *Checker {
return &Checker{
thresholdGetter: thresholdGetter,
// thresholdGetter: thresholdGetter,
nodeGroupConfigProcessor: nodeGroupConfigProcessor,
}
}

Expand Down Expand Up @@ -118,12 +121,6 @@ func (c *Checker) unremovableReasonAndNodeUtilization(context *context.Autoscali
return simulator.ScaleDownDisabledAnnotation, nil
}

gpuConfig := context.CloudProvider.GetNodeGpuConfig(node)
utilInfo, err := utilization.Calculate(nodeInfo, context.IgnoreDaemonSetsUtilization, context.IgnoreMirrorPodsUtilization, gpuConfig, timestamp)
if err != nil {
klog.Warningf("Failed to calculate utilization for %s: %v", node.Name, err)
}

nodeGroup, err := context.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Warning("Node group not found for node %v: %v", node.Name, err)
Expand All @@ -136,6 +133,18 @@ func (c *Checker) unremovableReasonAndNodeUtilization(context *context.Autoscali
return simulator.NotAutoscaled, nil
}

ignoreDaemonSetsUtilization, err := c.nodeGroupConfigProcessor.GetIgnoreDaemonSetsUtilization(context, nodeGroup)
if err != nil {
klog.Warningf("Couldn't retrieve `IgnoreDaemonSetsUtilization` option for node %v: %v", node.Name, err)
return simulator.UnexpectedError, nil
}

gpuConfig := context.CloudProvider.GetNodeGpuConfig(node)
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, context.IgnoreMirrorPodsUtilization, gpuConfig, timestamp)
if err != nil {
klog.Warningf("Failed to calculate utilization for %s: %v", node.Name, err)
}

// If scale down of unready nodes is disabled, skip the node if it is unready
if !context.ScaleDownUnreadyEnabled {
ready, _, _ := kube_util.GetReadinessState(node)
Expand Down Expand Up @@ -166,12 +175,12 @@ func (c *Checker) isNodeBelowUtilizationThreshold(context *context.AutoscalingCo
var err error
gpuConfig := context.CloudProvider.GetNodeGpuConfig(node)
if gpuConfig != nil {
threshold, err = c.thresholdGetter.GetScaleDownGpuUtilizationThreshold(context, nodeGroup)
threshold, err = c.nodeGroupConfigProcessor.GetScaleDownGpuUtilizationThreshold(context, nodeGroup)
if err != nil {
return false, err
}
} else {
threshold, err = c.thresholdGetter.GetScaleDownUtilizationThreshold(context, nodeGroup)
threshold, err = c.nodeGroupConfigProcessor.GetScaleDownUtilizationThreshold(context, nodeGroup)
if err != nil {
return false, err
}
Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/core/scaledown/legacy/legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,8 @@ func newWrapperForTesting(ctx *context.AutoscalingContext, clusterStateRegistry
MinReplicaCount: 0,
SkipNodesWithCustomControllerPods: true,
}
sd := NewScaleDown(ctx, NewTestProcessors(ctx), ndt, deleteOptions)
actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, deleteOptions)
processors := NewTestProcessors(ctx)
sd := NewScaleDown(ctx, processors, ndt, deleteOptions)
actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, deleteOptions, processors)
return NewScaleDownWrapper(sd, actuator)
}
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func NewStaticAutoscaler(
// during the struct creation rather than here.
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
scaleDown := legacy.NewScaleDown(autoscalingContext, processors, ndt, deleteOptions)
actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, deleteOptions)
actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, deleteOptions, processors)
autoscalingContext.ScaleDownActuator = actuator

var scaleDownPlanner scaledown.Planner
Expand Down
6 changes: 3 additions & 3 deletions cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error {

func setUpScaleDownActuator(ctx *context.AutoscalingContext, options config.AutoscalingOptions) {
deleteOptions := simulator.NewNodeDeleteOptions(options)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, NewTestProcessors(ctx))
}

func TestStaticAutoscalerRunOnce(t *testing.T) {
Expand Down Expand Up @@ -1410,7 +1410,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff())

// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), simulator.NodeDeleteOptions{})
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), simulator.NodeDeleteOptions{}, NewTestProcessors(&ctx))
ctx.ScaleDownActuator = actuator

// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.
Expand Down Expand Up @@ -1730,7 +1730,7 @@ func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContex
}
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
sd := legacy.NewScaleDown(ctx, p, ndt, deleteOptions)
actuator := actuation.NewActuator(ctx, cs, ndt, deleteOptions)
actuator := actuation.NewActuator(ctx, cs, ndt, deleteOptions, p)
wrapper := legacy.NewScaleDownWrapper(sd, actuator)
return wrapper, wrapper
}
1 change: 1 addition & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ScaleDownGpuUtilizationThreshold: *scaleDownGpuUtilizationThreshold,
ScaleDownUnneededTime: *scaleDownUnneededTime,
ScaleDownUnreadyTime: *scaleDownUnreadyTime,
IgnoreDaemonSetsUtilization: *ignoreDaemonSetsUtilization,
},
CloudConfig: *cloudConfig,
CloudProviderName: *cloudProviderFlag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type NodeGroupConfigProcessor interface {
GetScaleDownUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error)
// GetScaleDownGpuUtilizationThreshold returns ScaleDownGpuUtilizationThreshold value that should be used for a given NodeGroup.
GetScaleDownGpuUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error)
// GetIgnoreDaemonSetsUtilization returns IgnoreDaemonSetsUtilization value that should be used for a given NodeGroup.
GetIgnoreDaemonSetsUtilization(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (bool, error)
// CleanUp cleans up processor's internal structures.
CleanUp()
}
Expand Down Expand Up @@ -91,6 +93,18 @@ func (p *DelegatingNodeGroupConfigProcessor) GetScaleDownGpuUtilizationThreshold
return ngConfig.ScaleDownGpuUtilizationThreshold, nil
}

// GetIgnoreDaemonSetsUtilization returns IgnoreDaemonSetsUtilization value that should be used for a given NodeGroup.
func (p *DelegatingNodeGroupConfigProcessor) GetIgnoreDaemonSetsUtilization(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (bool, error) {
ngConfig, err := nodeGroup.GetOptions(context.NodeGroupDefaults)
if err != nil && err != cloudprovider.ErrNotImplemented {
return false, err
}
if ngConfig == nil || err == cloudprovider.ErrNotImplemented {
return context.NodeGroupDefaults.IgnoreDaemonSetsUtilization, nil
}
return ngConfig.IgnoreDaemonSetsUtilization, nil
}

// CleanUp cleans up processor's internal structures.
func (p *DelegatingNodeGroupConfigProcessor) CleanUp() {
}
Expand Down

0 comments on commit 7fa229d

Please sign in to comment.