Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: optimize performance on transformer extension and Skip status #2211

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 53 additions & 24 deletions pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package frameworkext
import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
k8sfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
schedconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
koordinatorclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned"
Expand All @@ -51,9 +54,12 @@ type frameworkExtenderImpl struct {
koordinatorClientSet koordinatorclientset.Interface
koordinatorSharedInformerFactory koordinatorinformers.SharedInformerFactory

preFilterTransformers map[string]PreFilterTransformer
filterTransformers map[string]FilterTransformer
scoreTransformers map[string]ScoreTransformer
preFilterTransformers map[string]PreFilterTransformer
filterTransformers map[string]FilterTransformer
scoreTransformers map[string]ScoreTransformer
preFilterTransformersEnabled []PreFilterTransformer
filterTransformersEnabled []FilterTransformer
scoreTransformersEnabled []ScoreTransformer

reservationNominator ReservationNominator
reservationFilterPlugins []ReservationFilterPlugin
Expand All @@ -66,6 +72,8 @@ type frameworkExtenderImpl struct {

numaTopologyHintProviders []topologymanager.NUMATopologyHintProvider
topologyManager topologymanager.Interface

metricsRecorder *metrics.MetricAsyncRecorder
}

func NewFrameworkExtender(f *FrameworkExtenderFactory, fw framework.Framework) FrameworkExtender {
Expand All @@ -85,6 +93,7 @@ func NewFrameworkExtender(f *FrameworkExtenderFactory, fw framework.Framework) F
filterTransformers: map[string]FilterTransformer{},
scoreTransformers: map[string]ScoreTransformer{},
preBindExtensionsPlugins: map[string]PreBindExtensions{},
metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, wait.NeverStop),
}
frameworkExtender.topologyManager = topologymanager.New(frameworkExtender)
return frameworkExtender
Expand Down Expand Up @@ -143,6 +152,29 @@ func (ext *frameworkExtenderImpl) updatePlugins(pl framework.Plugin) {

func (ext *frameworkExtenderImpl) SetConfiguredPlugins(plugins *schedconfig.Plugins) {
ext.configuredPlugins = plugins

for _, pl := range ext.configuredPlugins.PreFilter.Enabled {
transformer := ext.preFilterTransformers[pl.Name]
if transformer != nil {
ext.preFilterTransformersEnabled = append(ext.preFilterTransformersEnabled, transformer)
}
}
for _, pl := range ext.configuredPlugins.Filter.Enabled {
transformer := ext.filterTransformers[pl.Name]
if transformer != nil {
ext.filterTransformersEnabled = append(ext.filterTransformersEnabled, transformer)
}
}
for _, pl := range ext.configuredPlugins.Score.Enabled {
transformer := ext.scoreTransformers[pl.Name]
if transformer != nil {
ext.scoreTransformersEnabled = append(ext.scoreTransformersEnabled, transformer)
}
}
klog.V(5).InfoS("Set configured transformer plugins",
"PreFilterTransformer", len(ext.preFilterTransformersEnabled),
"FilterTransformer", len(ext.filterTransformersEnabled),
"ScoreTransformer", len(ext.scoreTransformersEnabled))
}

func (ext *frameworkExtenderImpl) KoordinatorClientSet() koordinatorclientset.Interface {
Expand All @@ -166,12 +198,10 @@ func (ext *frameworkExtenderImpl) GetReservationNominator() ReservationNominator

// RunPreFilterPlugins transforms the PreFilter phase of framework with pre-filter transformers.
func (ext *frameworkExtenderImpl) RunPreFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod) (*framework.PreFilterResult, *framework.Status) {
for _, pl := range ext.configuredPlugins.PreFilter.Enabled {
transformer := ext.preFilterTransformers[pl.Name]
if transformer == nil {
continue
}
for _, transformer := range ext.preFilterTransformersEnabled {
startTime := time.Now()
newPod, transformed, status := transformer.BeforePreFilter(ctx, cycleState, pod)
ext.metricsRecorder.ObservePluginDurationAsync("BeforePreFilter", transformer.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
if !status.IsSuccess() {
klog.ErrorS(status.AsError(), "Failed to run BeforePreFilter", "pod", klog.KObj(pod), "plugin", transformer.Name())
return nil, status
Expand All @@ -187,12 +217,11 @@ func (ext *frameworkExtenderImpl) RunPreFilterPlugins(ctx context.Context, cycle
return result, status
}

for _, pl := range ext.configuredPlugins.PreFilter.Enabled {
transformer := ext.preFilterTransformers[pl.Name]
if transformer == nil {
continue
}
if status := transformer.AfterPreFilter(ctx, cycleState, pod); !status.IsSuccess() {
for _, transformer := range ext.preFilterTransformersEnabled {
startTime := time.Now()
status = transformer.AfterPreFilter(ctx, cycleState, pod)
ext.metricsRecorder.ObservePluginDurationAsync("AfterPreFilter", transformer.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
if !status.IsSuccess() {
klog.ErrorS(status.AsError(), "Failed to run AfterPreFilter", "pod", klog.KObj(pod), "plugin", transformer.Name())
return nil, status
}
Expand All @@ -203,12 +232,10 @@ func (ext *frameworkExtenderImpl) RunPreFilterPlugins(ctx context.Context, cycle
// RunFilterPluginsWithNominatedPods transforms the Filter phase of framework with filter transformers.
// We don't transform RunFilterPlugins since framework's RunFilterPluginsWithNominatedPods just calls its RunFilterPlugins.
func (ext *frameworkExtenderImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
for _, pl := range ext.configuredPlugins.Filter.Enabled {
transformer := ext.filterTransformers[pl.Name]
if transformer == nil {
continue
}
for _, transformer := range ext.filterTransformersEnabled {
startTime := time.Now()
newPod, newNodeInfo, transformed, status := transformer.BeforeFilter(ctx, cycleState, pod, nodeInfo)
ext.metricsRecorder.ObservePluginDurationAsync("BeforeFilter", transformer.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
if !status.IsSuccess() {
klog.ErrorS(status.AsError(), "Failed to run BeforeFilter", "pod", klog.KObj(pod), "plugin", transformer.Name())
return status
Expand All @@ -227,12 +254,10 @@ func (ext *frameworkExtenderImpl) RunFilterPluginsWithNominatedPods(ctx context.
}

func (ext *frameworkExtenderImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodes []*corev1.Node) ([]framework.NodePluginScores, *framework.Status) {
for _, pl := range ext.configuredPlugins.Score.Enabled {
transformer := ext.scoreTransformers[pl.Name]
if transformer == nil {
continue
}
for _, transformer := range ext.scoreTransformersEnabled {
startTime := time.Now()
newPod, newNodes, transformed, status := transformer.BeforeScore(ctx, state, pod, nodes)
ext.metricsRecorder.ObservePluginDurationAsync("BeforeScore", transformer.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
if !status.IsSuccess() {
klog.ErrorS(status.AsError(), "Failed to run BeforeScore", "pod", klog.KObj(pod), "plugin", transformer.Name())
return nil, status
Expand Down Expand Up @@ -278,7 +303,9 @@ func (ext *frameworkExtenderImpl) RunPreBindPlugins(ctx context.Context, state *
reservation = reservation.DeepCopy()
reservation.Status.NodeName = nodeName
for _, pl := range ext.reservationPreBindPlugins {
startTime := time.Now()
status := pl.PreBindReservation(ctx, state, reservation, nodeName)
ext.metricsRecorder.ObservePluginDurationAsync("PreBindReservation", pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
if !status.IsSuccess() {
err := status.AsError()
klog.ErrorS(err, "Failed running ReservationPreBindPlugin plugin", "plugin", pl.Name(), "reservation", klog.KObj(reservation))
Expand Down Expand Up @@ -465,7 +492,9 @@ func (ext *frameworkExtenderImpl) RunReservePluginsReserve(ctx context.Context,

func (ext *frameworkExtenderImpl) RunResizePod(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
for _, pl := range ext.resizePodPlugins {
startTime := time.Now()
status := pl.ResizePod(ctx, cycleState, pod, nodeName)
ext.metricsRecorder.ObservePluginDurationAsync("ResizePod", pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
if !status.IsSuccess() {
return status
}
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type ReservationScoreExtensions interface {
// ResizePodPlugin is an interface that resize the pod resource spec after reserve.
// If you want to use the feature, must enable the feature gate ResizePod=true
type ResizePodPlugin interface {
framework.Plugin
ResizePod(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/elasticquota/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (g *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState
quotaName, treeID := g.getPodAssociateQuotaNameAndTreeID(pod)
if quotaName == "" {
g.skipPostFilterState(cycleState)
return nil, framework.NewStatus(framework.Success, "")
return nil, framework.NewStatus(framework.Skip)
}

mgr := g.GetGroupQuotaManagerForTree(treeID)
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,10 @@ func (pl *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleStat
for nodeName := range state.nodeReservationStates {
preResult.NodeNames.Insert(nodeName)
}
} else if len(state.nodeReservationStates) <= 0 { // nor available reservation neither a reserve pod
return nil, framework.NewStatus(framework.Skip)
}

return preResult, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestPreFilter(t *testing.T) {
Name: "not-reserve",
},
},
wantStatus: nil,
wantStatus: framework.NewStatus(framework.Skip),
wantPreRes: nil,
},
{
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/plugins/reservation/scoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ const (

func (pl *Plugin) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodes []*corev1.Node) *framework.Status {
if reservationutil.IsReservePod(pod) {
return nil
return framework.NewStatus(framework.Skip)
}

// if the pod is reservation-ignored, it does not want a nominated reservation
if apiext.IsReservationIgnored(pod) {
return nil
return framework.NewStatus(framework.Skip)
}

state := getStateData(cycleState)
if len(state.nodeReservationStates) == 0 {
return nil
return framework.NewStatus(framework.Skip)
}

ctx, cancel := context.WithCancel(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/plugins/reservation/scoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestScore(t *testing.T) {
suit.start()

status := pl.PreScore(context.TODO(), cycleState, tt.pod, []*corev1.Node{node})
assert.True(t, status.IsSuccess())
assert.True(t, status.IsSuccess() || status.IsSkip())

score, status := pl.Score(context.TODO(), cycleState, tt.pod, node.Name)
assert.True(t, status.IsSuccess())
Expand Down Expand Up @@ -843,7 +843,7 @@ func TestPreScoreWithNominateReservation(t *testing.T) {
suit.start()

status := pl.PreScore(context.TODO(), cycleState, tt.pod, nodes)
assert.Equal(t, tt.wantStatus, status.IsSuccess())
assert.Equal(t, tt.wantStatus, status.IsSuccess() || status.IsSkip())

for nodeName, wantReservationInfo := range tt.wantReservation {
sort.Slice(wantReservationInfo.ResourceNames, func(i, j int) bool {
Expand Down