Skip to content

Commit

Permalink
feat: scheduler plugins (2/): add topology spread constraints plugin …
Browse files Browse the repository at this point in the history
…logic (#430)
  • Loading branch information
michaelawyu authored Jul 19, 2023
1 parent 36b3987 commit 18e9459
Show file tree
Hide file tree
Showing 10 changed files with 696 additions and 25 deletions.
44 changes: 43 additions & 1 deletion pkg/scheduler/framework/cyclestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"

"k8s.io/apimachinery/pkg/util/sets"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
)

// StateKey is the key for a state value stored in a CycleState.
Expand All @@ -26,6 +28,9 @@ type CycleStatePluginReadWriter interface {
Read(key StateKey) (StateValue, error)
Write(key StateKey, val StateValue)
Delete(key StateKey)

ListClusters() []fleetv1beta1.MemberCluster
IsClusterScheduledOrBound(name string) bool
}

// CycleState is, similar to its namesake in kube-scheduler, provides a way for plugins to
Expand All @@ -38,6 +43,15 @@ type CycleState struct {
// store is a concurrency-safe store (a map).
store sync.Map

// clusters is the list of clusters that the scheduler will inspect and evaluate
// in the current scheduling cycle.
clusters []fleetv1beta1.MemberCluster

// scheduledOrBound is a map between the name of a cluster and its scheduling status,
// i.e., whether there is already a binding of the scheduler or bound state, relevant to
// the current scheduling cycle in presence for the cluster.
scheduledOrBound map[string]bool

// skippedFilterPlugins is a set of Filter plugins that should be skipped in the current scheduling cycle.
//
// TO-DO (chenyu1): the sets package has added support for Go generic types in 1.26, and
Expand All @@ -64,10 +78,38 @@ func (c *CycleState) Delete(key StateKey) {
c.store.Delete(key)
}

// ListClusters returns the list of clusters that the scheduler will inspect and evaluate
// in the current scheduling cycle.
//
// This helps maintain consistency in a scheduling run and improve performance, i.e., the
// scheduler and all plugins can have the same view of clusters being evaluated, and any plugin
// which requires the view no longer needs to list clusters on its own.
//
// Note that this is a relatively expensive op, as it returns the deep copy of the cluster list.
func (c *CycleState) ListClusters() []fleetv1beta1.MemberCluster {
// Do a deep copy to avoid any modification to the list by a single plugin will not
// affect the scheduler itself or other plugins.
clusters := make([]fleetv1beta1.MemberCluster, len(c.clusters))
copy(clusters, c.clusters)
return clusters
}

// IsClusterScheduledOrBound returns whether a cluster already has a scheduled or bound binding
// associated.
//
// This helps maintain consistence in a scheduling run and improve performance, i.e., the
// scheduler and all plugins can have the same view of current spread of bindings. and any plugin
// which requires the view no longer needs to list bindings on its own.
func (c *CycleState) IsClusterScheduledOrBound(name string) bool {
return c.scheduledOrBound[name]
}

// NewCycleState creates a CycleState.
func NewCycleState() *CycleState {
func NewCycleState(clusters []fleetv1beta1.MemberCluster, scheduledOrBoundBindings ...[]*fleetv1beta1.ClusterResourceBinding) *CycleState {
return &CycleState{
store: sync.Map{},
clusters: clusters,
scheduledOrBound: prepareScheduledOrBoundMap(scheduledOrBoundBindings...),
skippedFilterPlugins: sets.NewString(),
}
}
86 changes: 83 additions & 3 deletions pkg/scheduler/framework/cyclestate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,46 @@ Licensed under the MIT license.

package framework

import "testing"
import (
"testing"

// TestCycleStateBasicOps tests the basic ops (Read, Write, and Delete) of a CycleState.
"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
)

// TestCycleStateBasicOps tests the basic ops of a CycleState.
func TestCycleStateBasicOps(t *testing.T) {
cs := NewCycleState()
clusters := []fleetv1beta1.MemberCluster{
{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
},
},
}
scheduledOrBoundBindings := []*fleetv1beta1.ClusterResourceBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: bindingName,
},
Spec: fleetv1beta1.ResourceBindingSpec{
TargetCluster: clusterName,
State: fleetv1beta1.BindingStateBound,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: altBindingName,
},
Spec: fleetv1beta1.ResourceBindingSpec{
TargetCluster: altClusterName,
State: fleetv1beta1.BindingStateScheduled,
},
},
}

cs := NewCycleState(clusters, scheduledOrBoundBindings)

k, v := "key", "value"
cs.Write(StateKey(k), StateValue(v))
Expand All @@ -20,4 +55,49 @@ func TestCycleStateBasicOps(t *testing.T) {
if out, err := cs.Read("key"); out != nil || err == nil {
t.Fatalf("Read(%v) = %v, %v, want nil, not found error", k, out, err)
}

clustersInState := cs.ListClusters()
if diff := cmp.Diff(clustersInState, clusters); diff != "" {
t.Fatalf("ListClusters() diff (-got, +want): %s", diff)
}

for _, binding := range scheduledOrBoundBindings {
if !cs.IsClusterScheduledOrBound(binding.Spec.TargetCluster) {
t.Fatalf("IsClusterScheduledOrBound(%v) = false, want true", binding.Spec.TargetCluster)
}
}
}

// TestPrepareScheduledOrBoundMap tests the prepareScheduledOrBoundMap function.
func TestPrepareScheduledOrBoundMap(t *testing.T) {
scheduled := []*fleetv1beta1.ClusterResourceBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: bindingName,
},
Spec: fleetv1beta1.ResourceBindingSpec{
TargetCluster: clusterName,
},
},
}
bound := []*fleetv1beta1.ClusterResourceBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: altBindingName,
},
Spec: fleetv1beta1.ResourceBindingSpec{
TargetCluster: altClusterName,
},
},
}

want := map[string]bool{
clusterName: true,
altClusterName: true,
}

scheduleOrBoundMap := prepareScheduledOrBoundMap(scheduled, bound)
if diff := cmp.Diff(scheduleOrBoundMap, want); diff != "" {
t.Errorf("preparedScheduledOrBoundMap() scheduledOrBoundMap diff (-got, +want): %s", diff)
}
}
25 changes: 25 additions & 0 deletions pkg/scheduler/framework/cyclestateutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/

package framework

import (
fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
)

// prepareScheduledOrBoundMap returns a map that allows quick lookup of whether a cluster
// already has a binding of the scheduled or bound state relevant to the current scheduling
// cycle.
func prepareScheduledOrBoundMap(scheduledOrBoundBindings ...[]*fleetv1beta1.ClusterResourceBinding) map[string]bool {
scheduledOrBound := make(map[string]bool)

for _, bindingSet := range scheduledOrBoundBindings {
for _, binding := range bindingSet {
scheduledOrBound[binding.Spec.TargetCluster] = true
}
}

return scheduledOrBound
}
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p
// Note that this state is shared between all plugins and the scheduler framework itself (though some fields are reserved by
// the framework). These resevered fields are never accessed concurrently, as each scheduling run has its own cycle and a run
// is always executed in one single goroutine; plugin access to the state is guarded by sync.Map.
state := NewCycleState()
state := NewCycleState(clusters, bound, scheduled)

switch policy.Spec.Policy.PlacementType {
case fleetv1beta1.PickAllPlacementType:
Expand Down
8 changes: 4 additions & 4 deletions pkg/scheduler/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func TestRunPreFilterPlugins(t *testing.T) {
}

ctx := context.Background()
state := NewCycleState()
state := NewCycleState([]fleetv1beta1.MemberCluster{}, []*fleetv1beta1.ClusterResourceBinding{})
policy := &fleetv1beta1.ClusterSchedulingPolicySnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: policyName,
Expand Down Expand Up @@ -576,7 +576,7 @@ func TestRunFilterPluginsFor(t *testing.T) {
}

ctx := context.Background()
state := NewCycleState()
state := NewCycleState([]fleetv1beta1.MemberCluster{}, []*fleetv1beta1.ClusterResourceBinding{})
for _, name := range tc.skippedPluginNames {
state.skippedFilterPlugins.Insert(name)
}
Expand Down Expand Up @@ -747,7 +747,7 @@ func TestRunFilterPlugins(t *testing.T) {
}

ctx := context.Background()
state := NewCycleState()
state := NewCycleState([]fleetv1beta1.MemberCluster{}, []*fleetv1beta1.ClusterResourceBinding{})
policy := &fleetv1beta1.ClusterSchedulingPolicySnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: policyName,
Expand Down Expand Up @@ -1050,7 +1050,7 @@ func TestRunAllPluginsForPickAllPlacementType(t *testing.T) {
}

ctx := context.Background()
state := NewCycleState()
state := NewCycleState([]fleetv1beta1.MemberCluster{}, []*fleetv1beta1.ClusterResourceBinding{})
scored, filtered, err := f.runAllPluginsForPickAllPlacementType(ctx, state, policy, clusters)
if tc.expectedToFail {
if err == nil {
Expand Down
41 changes: 30 additions & 11 deletions pkg/scheduler/framework/plugins/topologyspreadconstraints/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,25 @@ import (
fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
)

const (
// skewChangeScoreFactor is the factor applied to topology spread score for every unit
// of skew change.
//
// Note that it should be a negative value, so that any provisional placement that reduces
// skewing will be assigned a positive topology spread score.
skewChangeScoreFactor = -1
// maxSkewViolationPenality is the penalty applied to topology spread score when a
// provisional placement violates a topology spread constraint.
//
// Note that it should be a positive value, as the plugin will subtract this value from
// the total score.
maxSkewViolationPenality = 1000
)

var (
doNotScheduleConstraintViolationReasonTemplate = "violated doNotSchedule topology spread constraint %q (max skew %d)"
)

// Plugin is the scheduler plugin that enforces the
// topology spread constraints (if any) defined on a CRP.
type Plugin struct {
Expand Down Expand Up @@ -97,11 +116,11 @@ func (p *Plugin) readPluginState(state framework.CycleStatePluginReadWriter) (*p
}

// Cast the value to the right type.
pluginState, ok := val.(*pluginState)
ps, ok := val.(*pluginState)
if !ok {
return nil, fmt.Errorf("failed to cast value %v to the right type", val)
}
return pluginState, nil
return ps, nil
}

// PostBatch allows the plugin to connect to the PostBatch extension point in the scheduling
Expand Down Expand Up @@ -163,15 +182,15 @@ func (p *Plugin) PreFilter(
//
// Note that this will happen as long as there is one or more topology spread constraints
// in presence in the scheduling policy, regardless of its settings.
pluginState, err := prepareTopologySpreadConstraintsPluginState(state, policy)
ps, err := prepareTopologySpreadConstraintsPluginState(state, policy)
if err != nil {
return framework.FromError(err, p.Name(), "failed to prepare plugin state")
}

// Save the plugin state.
state.Write(framework.StateKey(p.Name()), pluginState)
state.Write(framework.StateKey(p.Name()), ps)

if len(pluginState.doNotScheduleConstraints) == 0 {
if len(ps.doNotScheduleConstraints) == 0 {
// There are no DoNotSchedule topology spread constraints to enforce; skip.
//
// Note that this will lead the scheduler to skip this plugin in the next stage
Expand All @@ -191,15 +210,15 @@ func (p *Plugin) Filter(
cluster *fleetv1beta1.MemberCluster,
) (status *framework.Status) {
// Read the plugin state.
pluginState, err := p.readPluginState(state)
ps, err := p.readPluginState(state)
if err != nil {
// This branch should never be reached, as for any policy with present topology spread
// constraints, a common plugin state has been set at the PreFilter extension point.
return framework.FromError(err, p.Name(), "failed to read plugin state")
}

// The state is safe for concurrent reads.
reasons, ok := pluginState.violations[clusterName(cluster.Name)]
reasons, ok := ps.violations[clusterName(cluster.Name)]
if ok {
// Violation is found; filter this cluster out.
return framework.NewNonErrorStatus(framework.ClusterUnschedulable, p.Name(), reasons...)
Expand Down Expand Up @@ -232,14 +251,14 @@ func (p *Plugin) PreScore(
}

// Read the plugin state.
pluginState, err := p.readPluginState(state)
ps, err := p.readPluginState(state)
if err != nil {
// This branch should never be reached, as for any policy with present topology spread
// constraints, a common plugin state has been set at the PreFilter extension point.
return framework.FromError(err, p.Name(), "failed to read plugin state")
}

if len(pluginState.scheduleAnywayConstraints) == 0 && len(pluginState.doNotScheduleConstraints) == 0 {
if len(ps.scheduleAnywayConstraints) == 0 && len(ps.doNotScheduleConstraints) == 0 {
// There are no topology spread constraints to enforce; skip.
//
// Note that this will lead the scheduler to skip this plugin in the next stage
Expand All @@ -262,15 +281,15 @@ func (p *Plugin) Score(
cluster *fleetv1beta1.MemberCluster,
) (score *framework.ClusterScore, status *framework.Status) {
// Read the plugin state.
pluginState, err := p.readPluginState(state)
ps, err := p.readPluginState(state)
if err != nil {
// This branch should never be reached, as for any policy with present topology spread
// constraints, a common plugin state has been set at the PreFilter extension point.
return nil, framework.FromError(err, p.Name(), "failed to read plugin state")
}

// The state is safe for concurrent reads.
topologySpreadScore, ok := pluginState.scores[clusterName(cluster.Name)]
topologySpreadScore, ok := ps.scores[clusterName(cluster.Name)]
if !ok {
// No score is found; normally this should never happen, as the state is consistent,
// and each cluster that does not violate DoNotSchedule topology spread constraints
Expand Down
Loading

0 comments on commit 18e9459

Please sign in to comment.