diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 59092452369f..ae1bc09b673e 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -18,6 +18,8 @@ package config import ( "time" + + scheduler_config "k8s.io/kubernetes/pkg/scheduler/apis/config" ) // GpuLimits define lower and upper bound on GPU instances of given type in cluster @@ -166,6 +168,9 @@ type AutoscalingOptions struct { // ScaleDownSimulationTimeout defines the maximum time that can be // spent on scale down simulation. ScaleDownSimulationTimeout time.Duration + // SchedulerConfig allows changing configuration of in-tree + // scheduler plugins acting on PreFilter and Filter extension points + SchedulerConfig *scheduler_config.KubeSchedulerConfiguration // NodeDeletionDelayTimeout is maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node. NodeDeletionDelayTimeout time.Duration // WriteStatusConfigMap tells if the status information should be written to a ConfigMap diff --git a/cluster-autoscaler/config/const.go b/cluster-autoscaler/config/const.go index b87c7638ebcb..1025ac9059d7 100644 --- a/cluster-autoscaler/config/const.go +++ b/cluster-autoscaler/config/const.go @@ -19,6 +19,10 @@ package config import "time" const ( + // SchedulerConfigFileFlag is the name of the flag + // for passing in custom scheduler config for in-tree scheduelr plugins + SchedulerConfigFileFlag = "scheduler-config-file" + // DefaultMaxClusterCores is the default maximum number of cores in the cluster. DefaultMaxClusterCores = 5000 * 64 // DefaultMaxClusterMemory is the default maximum number of gigabytes of memory in cluster. diff --git a/cluster-autoscaler/config/test/config.go b/cluster-autoscaler/config/test/config.go new file mode 100644 index 000000000000..c5708ac54efd --- /dev/null +++ b/cluster-autoscaler/config/test/config.go @@ -0,0 +1,68 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +const ( + // Custom scheduler configs for testing + + // SchedulerConfigNodeResourcesFitDisabled is scheduler config + // with `NodeResourcesFit` plugin disabled + SchedulerConfigNodeResourcesFitDisabled = ` +apiVersion: kubescheduler.config.k8s.io/v1 +kind: KubeSchedulerConfiguration +profiles: +- pluginConfig: + plugins: + multiPoint: + disabled: + - name: NodeResourcesFit + weight: 1 + schedulerName: custom-scheduler` + + // SchedulerConfigTaintTolerationDisabled is scheduler config + // with `TaintToleration` plugin disabled + SchedulerConfigTaintTolerationDisabled = ` +apiVersion: kubescheduler.config.k8s.io/v1 +kind: KubeSchedulerConfiguration +profiles: +- pluginConfig: + plugins: + multiPoint: + disabled: + - name: TaintToleration + weight: 1 + schedulerName: custom-scheduler` + + // SchedulerConfigMinimalCorrect is the minimal + // correct scheduler config + SchedulerConfigMinimalCorrect = ` +apiVersion: kubescheduler.config.k8s.io/v1 +kind: KubeSchedulerConfiguration` + + // SchedulerConfigDecodeErr is the scheduler config + // which throws decoding error when we try to load it + SchedulerConfigDecodeErr = ` +kind: KubeSchedulerConfiguration` + + // SchedulerConfigInvalid is invalid scheduler config + // because we specify percentageOfNodesToScore > 100 + SchedulerConfigInvalid = ` +apiVersion: kubescheduler.config.k8s.io/v1 +kind: KubeSchedulerConfiguration +# percentageOfNodesToScore has to be between 0 and 100 +percentageOfNodesToScore: 130` +) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 274908d4f6b0..225b94743c0e 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -56,6 +56,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" "k8s.io/autoscaler/cluster-autoscaler/utils/units" "k8s.io/autoscaler/cluster-autoscaler/version" kube_client "k8s.io/client-go/kubernetes" @@ -68,6 +69,7 @@ import ( "k8s.io/component-base/config/options" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" + scheduler_config "k8s.io/kubernetes/pkg/scheduler/apis/config" ) // MultiStringFlag is a flag for passing multiple parameters using same flag @@ -133,6 +135,7 @@ var ( "for scale down when some candidates from previous iteration are no longer valid."+ "When calculating the pool size for additional candidates we take"+ "max(#nodes * scale-down-candidates-pool-ratio, scale-down-candidates-pool-min-count).") + schedulerConfigFile = flag.String(config.SchedulerConfigFileFlag, "", "scheduler-config allows changing configuration of in-tree scheduler plugins acting on PreFilter and Filter extension points") nodeDeletionDelayTimeout = flag.Duration("node-deletion-delay-timeout", 2*time.Minute, "Maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.") nodeDeletionBatcherInterval = flag.Duration("node-deletion-batcher-interval", 0*time.Second, "How long CA ScaleDown gather nodes to delete them in batch.") scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") @@ -274,6 +277,15 @@ func createAutoscalingOptions() config.AutoscalingOptions { *maxEmptyBulkDeleteFlag = *maxScaleDownParallelismFlag } + var parsedSchedConfig *scheduler_config.KubeSchedulerConfiguration + // if scheduler config flag was set by the user + if pflag.CommandLine.Changed(config.SchedulerConfigFileFlag) { + parsedSchedConfig, err = scheduler_util.ConfigFromPath(*schedulerConfigFile) + } + if err != nil { + klog.Fatalf("Failed to get scheduler config: %v", err) + } + return config.AutoscalingOptions{ NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ ScaleDownUtilizationThreshold: *scaleDownUtilizationThreshold, @@ -316,6 +328,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { ScaleDownNonEmptyCandidatesCount: *scaleDownNonEmptyCandidatesCount, ScaleDownCandidatesPoolRatio: *scaleDownCandidatesPoolRatio, ScaleDownCandidatesPoolMinCount: *scaleDownCandidatesPoolMinCount, + SchedulerConfig: parsedSchedConfig, WriteStatusConfigMap: *writeStatusConfigMapFlag, StatusConfigMapName: *statusConfigMapName, BalanceSimilarNodeGroups: *balanceSimilarNodeGroupsFlag, @@ -422,7 +435,8 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter eventsKubeClient := createKubeClient(getKubeConfig()) - predicateChecker, err := predicatechecker.NewSchedulerBasedPredicateChecker(kubeClient, make(chan struct{})) + predicateChecker, err := predicatechecker.NewSchedulerBasedPredicateChecker(kubeClient, + autoscalingOptions.SchedulerConfig, make(chan struct{})) if err != nil { return nil, err } diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go b/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go index fde1d2758a78..9b7075323c14 100644 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go +++ b/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go @@ -27,6 +27,7 @@ import ( kube_client "k8s.io/client-go/kubernetes" v1listers "k8s.io/client-go/listers/core/v1" klog "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/apis/config" scheduler_config "k8s.io/kubernetes/pkg/scheduler/apis/config/latest" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" scheduler_plugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" @@ -44,21 +45,26 @@ type SchedulerBasedPredicateChecker struct { } // NewSchedulerBasedPredicateChecker builds scheduler based PredicateChecker. -func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, stop <-chan struct{}) (*SchedulerBasedPredicateChecker, error) { +func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, schedConfig *config.KubeSchedulerConfiguration, stop <-chan struct{}) (*SchedulerBasedPredicateChecker, error) { informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) - config, err := scheduler_config.Default() - if err != nil { - return nil, fmt.Errorf("couldn't create scheduler config: %v", err) + + if schedConfig == nil { + var err error + schedConfig, err = scheduler_config.Default() + if err != nil { + return nil, fmt.Errorf("couldn't create scheduler config: %v", err) + } } - if len(config.Profiles) != 1 || config.Profiles[0].SchedulerName != apiv1.DefaultSchedulerName { - return nil, fmt.Errorf("unexpected scheduler config: expected default scheduler profile only (found %d profiles)", len(config.Profiles)) + + if len(schedConfig.Profiles) != 1 { + return nil, fmt.Errorf("unexpected scheduler config: expected one scheduler profile only (found %d profiles)", len(schedConfig.Profiles)) } sharedLister := NewDelegatingSchedulerSharedLister() framework, err := schedulerframeworkruntime.NewFramework( context.TODO(), scheduler_plugins.NewInTreeRegistry(), - &config.Profiles[0], + &schedConfig.Profiles[0], schedulerframeworkruntime.WithInformerFactory(informerFactory), schedulerframeworkruntime.WithSnapshotSharedLister(sharedLister), ) @@ -181,6 +187,7 @@ func (p *SchedulerBasedPredicateChecker) CheckPredicates(clusterSnapshot cluster filterReasons, p.buildDebugInfo(filterName, nodeInfo)) } + return nil } diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go b/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go index 4405490880b8..b9d6f8be7cbe 100644 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go +++ b/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go @@ -17,10 +17,14 @@ limitations under the License. package predicatechecker import ( + "os" + "path/filepath" "testing" "time" + testconfig "k8s.io/autoscaler/cluster-autoscaler/config/test" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + scheduler "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "github.com/stretchr/testify/assert" @@ -36,52 +40,114 @@ func TestCheckPredicate(t *testing.T) { n1000 := BuildTestNode("n1000", 1000, 2000000) SetNodeReadyState(n1000, true, time.Time{}) + n1000Unschedulable := BuildTestNode("n1000", 1000, 2000000) + SetNodeReadyState(n1000Unschedulable, true, time.Time{}) + + defaultPredicateChecker, err := NewTestPredicateChecker() + assert.NoError(t, err) + + // temp dir + tmpDir, err := os.MkdirTemp("", "scheduler-configs") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + customConfigFile := filepath.Join(tmpDir, "custom_config.yaml") + if err := os.WriteFile(customConfigFile, + []byte(testconfig.SchedulerConfigNodeResourcesFitDisabled), + os.FileMode(0600)); err != nil { + t.Fatal(err) + } + + customConfig, err := scheduler.ConfigFromPath(customConfigFile) + assert.NoError(t, err) + customPredicateChecker, err := NewTestPredicateCheckerWithCustomConfig(customConfig) + assert.NoError(t, err) tests := []struct { - name string - node *apiv1.Node - scheduledPods []*apiv1.Pod - testPod *apiv1.Pod - expectError bool + name string + node *apiv1.Node + scheduledPods []*apiv1.Pod + testPod *apiv1.Pod + predicateChecker PredicateChecker + expectError bool }{ + // default predicate checker test cases + { + name: "default - other pod - insuficient cpu", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p600, + expectError: true, + predicateChecker: defaultPredicateChecker, + }, + { + name: "default - other pod - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p500, + expectError: false, + predicateChecker: defaultPredicateChecker, + }, + { + name: "default - empty - insuficient cpu", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p8000, + expectError: true, + predicateChecker: defaultPredicateChecker, + }, { - name: "other pod - insuficient cpu", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p600, - expectError: true, + name: "default - empty - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p600, + expectError: false, + predicateChecker: defaultPredicateChecker, }, + // custom predicate checker test cases { - name: "other pod - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p500, - expectError: false, + name: "custom - other pod - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p600, + expectError: false, + predicateChecker: customPredicateChecker, }, { - name: "empty - insuficient cpu", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p8000, - expectError: true, + name: "custom -other pod - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p500, + expectError: false, + predicateChecker: customPredicateChecker, }, { - name: "empty - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p600, - expectError: false, + name: "custom -empty - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p8000, + expectError: false, + predicateChecker: customPredicateChecker, + }, + { + name: "custom -empty - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p600, + expectError: false, + predicateChecker: customPredicateChecker, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var err error - predicateChecker, err := NewTestPredicateChecker() clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() err = clusterSnapshot.AddNodeWithPods(tt.node, tt.scheduledPods) assert.NoError(t, err) - predicateError := predicateChecker.CheckPredicates(clusterSnapshot, tt.testPod, tt.node.Name) + predicateError := tt.predicateChecker.CheckPredicates(clusterSnapshot, tt.testPod, tt.node.Name) if tt.expectError { assert.NotNil(t, predicateError) assert.Equal(t, NotSchedulablePredicateError, predicateError.ErrorType()) @@ -102,27 +168,99 @@ func TestFitsAnyNode(t *testing.T) { n1000 := BuildTestNode("n1000", 1000, 2000000) n2000 := BuildTestNode("n2000", 2000, 2000000) - var err error + defaultPredicateChecker, err := NewTestPredicateChecker() + assert.NoError(t, err) - clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - err = clusterSnapshot.AddNode(n1000) + // temp dir + tmpDir, err := os.MkdirTemp("", "scheduler-configs") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + customConfigFile := filepath.Join(tmpDir, "custom_config.yaml") + if err := os.WriteFile(customConfigFile, + []byte(testconfig.SchedulerConfigNodeResourcesFitDisabled), + os.FileMode(0600)); err != nil { + t.Fatal(err) + } + + customConfig, err := scheduler.ConfigFromPath(customConfigFile) assert.NoError(t, err) - err = clusterSnapshot.AddNode(n2000) + customPredicateChecker, err := NewTestPredicateCheckerWithCustomConfig(customConfig) assert.NoError(t, err) - predicateChecker, err := NewTestPredicateChecker() - assert.NoError(t, err) + testCases := []struct { + name string + predicateChecker PredicateChecker + pod *apiv1.Pod + expectedNodes []string + expectError bool + }{ + // default predicate checker test cases + { + name: "default - small pod - no error", + predicateChecker: defaultPredicateChecker, + pod: p900, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, + }, + { + name: "default - medium pod - no error", + predicateChecker: defaultPredicateChecker, + pod: p1900, + expectedNodes: []string{"n2000"}, + expectError: false, + }, + { + name: "default - large pod - insufficient cpu", + predicateChecker: defaultPredicateChecker, + pod: p2100, + expectError: true, + }, - nodeName, err := predicateChecker.FitsAnyNode(clusterSnapshot, p900) - assert.NoError(t, err) - assert.True(t, nodeName == "n1000" || nodeName == "n2000") + // custom predicate checker test cases + { + name: "custom - small pod - no error", + predicateChecker: customPredicateChecker, + pod: p900, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, + }, + { + name: "custom - medium pod - no error", + predicateChecker: customPredicateChecker, + pod: p1900, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, + }, + { + name: "custom - large pod - insufficient cpu", + predicateChecker: customPredicateChecker, + pod: p2100, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, + }, + } - nodeName, err = predicateChecker.FitsAnyNode(clusterSnapshot, p1900) + clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() + err = clusterSnapshot.AddNode(n1000) assert.NoError(t, err) - assert.Equal(t, "n2000", nodeName) + err = clusterSnapshot.AddNode(n2000) + assert.NoError(t, err) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + nodeName, err := tc.predicateChecker.FitsAnyNode(clusterSnapshot, tc.pod) + if tc.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Contains(t, tc.expectedNodes, nodeName) + } + }) + } - nodeName, err = predicateChecker.FitsAnyNode(clusterSnapshot, p2100) - assert.Error(t, err) } func TestDebugInfo(t *testing.T) { @@ -142,16 +280,39 @@ func TestDebugInfo(t *testing.T) { } SetNodeReadyState(node1, true, time.Time{}) - predicateChecker, err := NewTestPredicateChecker() - assert.NoError(t, err) - clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - err = clusterSnapshot.AddNode(node1) + err := clusterSnapshot.AddNode(node1) assert.NoError(t, err) - predicateErr := predicateChecker.CheckPredicates(clusterSnapshot, p1, "n1") + // with default predicate checker + defaultPredicateChecker, err := NewTestPredicateChecker() + assert.NoError(t, err) + predicateErr := defaultPredicateChecker.CheckPredicates(clusterSnapshot, p1, "n1") assert.NotNil(t, predicateErr) assert.Equal(t, "node(s) had untolerated taint {SomeTaint: WhyNot?}", predicateErr.Message()) assert.Contains(t, predicateErr.VerboseMessage(), "RandomTaint") + + // with custom predicate checker + + // temp dir + tmpDir, err := os.MkdirTemp("", "scheduler-configs") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + customConfigFile := filepath.Join(tmpDir, "custom_config.yaml") + if err := os.WriteFile(customConfigFile, + []byte(testconfig.SchedulerConfigTaintTolerationDisabled), + os.FileMode(0600)); err != nil { + t.Fatal(err) + } + + customConfig, err := scheduler.ConfigFromPath(customConfigFile) + assert.NoError(t, err) + customPredicateChecker, err := NewTestPredicateCheckerWithCustomConfig(customConfig) + assert.NoError(t, err) + predicateErr = customPredicateChecker.CheckPredicates(clusterSnapshot, p1, "n1") + assert.Nil(t, predicateErr) } diff --git a/cluster-autoscaler/simulator/predicatechecker/testchecker.go b/cluster-autoscaler/simulator/predicatechecker/testchecker.go index ba7730dabd2e..81c3459c88ba 100644 --- a/cluster-autoscaler/simulator/predicatechecker/testchecker.go +++ b/cluster-autoscaler/simulator/predicatechecker/testchecker.go @@ -18,10 +18,27 @@ package predicatechecker import ( clientsetfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/scheduler/apis/config" + scheduler_config_latest "k8s.io/kubernetes/pkg/scheduler/apis/config/latest" ) // NewTestPredicateChecker builds test version of PredicateChecker. func NewTestPredicateChecker() (PredicateChecker, error) { + schedConfig, err := scheduler_config_latest.Default() + if err != nil { + return nil, err + } + // just call out to NewSchedulerBasedPredicateChecker but use fake kubeClient - return NewSchedulerBasedPredicateChecker(clientsetfake.NewSimpleClientset(), make(chan struct{})) + return NewSchedulerBasedPredicateChecker(clientsetfake.NewSimpleClientset(), schedConfig, make(chan struct{})) +} + +// NewTestPredicateCheckerWithCustomConfig builds test version of PredicateChecker with custom scheduler config. +func NewTestPredicateCheckerWithCustomConfig(schedConfig *config.KubeSchedulerConfiguration) (PredicateChecker, error) { + if schedConfig != nil { + // just call out to NewSchedulerBasedPredicateChecker but use fake kubeClient + return NewSchedulerBasedPredicateChecker(clientsetfake.NewSimpleClientset(), schedConfig, make(chan struct{})) + } + + return NewTestPredicateChecker() } diff --git a/cluster-autoscaler/utils/scheduler/scheduler.go b/cluster-autoscaler/utils/scheduler/scheduler.go index 1b9e61259e2d..59d008db64d8 100644 --- a/cluster-autoscaler/utils/scheduler/scheduler.go +++ b/cluster-autoscaler/utils/scheduler/scheduler.go @@ -18,14 +18,25 @@ package scheduler import ( "fmt" + "os" "strings" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/uuid" + scheduler_config "k8s.io/kubernetes/pkg/scheduler/apis/config" + scheduler_scheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" + scheduler_validation "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) +const ( + schedulerConfigDecodeErr = "couldn't decode scheduler config" + schedulerConfigLoadErr = "couldn't load scheduler config" + schedulerConfigTypeCastErr = "couldn't assert type as KubeSchedulerConfiguration" + schedulerConfigInvalidErr = "invalid KubeSchedulerConfiguration" +) + // CreateNodeNameToInfoMap obtains a list of pods and pivots that list into a map where the keys are node names // and the values are the aggregated information for that node. Pods waiting lower priority pods preemption // (pod.Status.NominatedNodeName is set) are also added to list of pods for a node. @@ -106,3 +117,33 @@ func ResourceToResourceList(r *schedulerframework.Resource) apiv1.ResourceList { } return result } + +// ConfigFromPath loads scheduler config from a path. +// TODO(vadasambar): replace code to parse scheduler config with upstream function +// once https://github.com/kubernetes/kubernetes/pull/119057 is merged +func ConfigFromPath(path string) (*scheduler_config.KubeSchedulerConfiguration, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("%s: %v", schedulerConfigLoadErr, err) + } + + obj, gvk, err := scheduler_scheme.Codecs.UniversalDecoder().Decode(data, nil, nil) + if err != nil { + return nil, fmt.Errorf("%s: %v", schedulerConfigDecodeErr, err) + } + + cfgObj, ok := obj.(*scheduler_config.KubeSchedulerConfiguration) + if !ok { + return nil, fmt.Errorf("%s, gvk: %s", schedulerConfigTypeCastErr, gvk) + } + + // this needs to be set explicitly because config's api version is empty after decoding + // check kubernetes/cmd/kube-scheduler/app/options/configfile.go for more info + cfgObj.TypeMeta.APIVersion = gvk.GroupVersion().String() + + if err := scheduler_validation.ValidateKubeSchedulerConfiguration(cfgObj); err != nil { + return nil, fmt.Errorf("%s: %v", schedulerConfigInvalidErr, err) + } + + return cfgObj, nil +} diff --git a/cluster-autoscaler/utils/scheduler/scheduler_test.go b/cluster-autoscaler/utils/scheduler/scheduler_test.go index 54989cb6fe16..59f1aa52d92a 100644 --- a/cluster-autoscaler/utils/scheduler/scheduler_test.go +++ b/cluster-autoscaler/utils/scheduler/scheduler_test.go @@ -18,11 +18,15 @@ package scheduler import ( "fmt" + "os" + "path/filepath" "reflect" "testing" "k8s.io/apimachinery/pkg/api/resource" + testconfig "k8s.io/autoscaler/cluster-autoscaler/config/test" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" apiv1 "k8s.io/api/core/v1" @@ -102,3 +106,86 @@ func TestResourceList(t *testing.T) { }) } } + +func TestConfigFromPath(t *testing.T) { + // temp dir + tmpDir, err := os.MkdirTemp("", "scheduler-configs") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + // Note that even if we are passing minimal config like below + // `ConfigFromPath` will set the rest of the default fields + // on its own (including default profile and default plugins) + correctConfigFile := filepath.Join(tmpDir, "correct_config.yaml") + if err := os.WriteFile(correctConfigFile, + []byte(testconfig.SchedulerConfigMinimalCorrect), + os.FileMode(0600)); err != nil { + t.Fatal(err) + } + + decodeErrConfigFile := filepath.Join(tmpDir, "decode_err_no_version_config.yaml") + if err := os.WriteFile(decodeErrConfigFile, + []byte(testconfig.SchedulerConfigDecodeErr), + os.FileMode(0600)); err != nil { + t.Fatal(err) + } + + validationErrConfigFile := filepath.Join(tmpDir, "invalid_percent_node_score_config.yaml") + if err := os.WriteFile(validationErrConfigFile, + []byte(testconfig.SchedulerConfigInvalid), + os.FileMode(0600)); err != nil { + t.Fatal(err) + } + + tests := []struct { + name string + path string + expectedErr error + expectedConfig *config.KubeSchedulerConfiguration + }{ + { + name: "Empty scheduler config file path", + path: "", + expectedErr: fmt.Errorf(schedulerConfigLoadErr), + expectedConfig: nil, + }, + { + name: "Correct scheduler config", + path: correctConfigFile, + expectedErr: nil, + expectedConfig: &config.KubeSchedulerConfiguration{}, + }, + { + name: "Scheduler config with decode error", + path: decodeErrConfigFile, + expectedErr: fmt.Errorf(schedulerConfigDecodeErr), + expectedConfig: nil, + }, + { + name: "Invalid scheduler config", + path: validationErrConfigFile, + expectedErr: fmt.Errorf(schedulerConfigInvalidErr), + expectedConfig: nil, + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("case_%d: %s", i, test.name), func(t *testing.T) { + cfg, err := ConfigFromPath(test.path) + if test.expectedConfig == nil { + assert.Nil(t, cfg) + } else { + assert.NotNil(t, cfg) + } + + if test.expectedErr == nil { + assert.NoError(t, err) + } else { + assert.ErrorContains(t, err, test.expectedErr.Error()) + } + }) + + } +}