Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: scheduler (10/): add more scheduler logic #409

Merged
merged 5 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion pkg/scheduler/framework/cyclestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package framework
import (
"fmt"
"sync"

"k8s.io/apimachinery/pkg/util/sets"
)

// StateKey is the key for a state value stored in a CycleState.
Expand All @@ -17,6 +19,9 @@ type StateKey string
type StateValue interface{}

// CycleStatePluginReadWriter is an interface through which plugins can store and retrieve data.
//
// TO-DO (chenyu1): Add methods which allow plugins to query for bindings of different types being
// evaluated in the current scheduling cycle.
type CycleStatePluginReadWriter interface {
Read(key StateKey) (StateValue, error)
Write(key StateKey, val StateValue)
Expand All @@ -32,6 +37,13 @@ type CycleStatePluginReadWriter interface {
type CycleState struct {
// store is a concurrency-safe store (a map).
store sync.Map

// skippedFilterPlugins is a set of Filter plugins that should be skipped in the current scheduling cycle.
//
// TO-DO (chenyu1): the sets package has added support for Go generic types in 1.26, and
// the String set has been deprecated; transition to the generic set when the new version
// becomes available.
skippedFilterPlugins sets.String
}

// Read retrieves a value from CycleState by a key.
Expand All @@ -54,5 +66,8 @@ func (c *CycleState) Delete(key StateKey) {

// NewCycleState creates a CycleState.
func NewCycleState() *CycleState {
return &CycleState{}
return &CycleState{
store: sync.Map{},
skippedFilterPlugins: sets.NewString(),
}
}
4 changes: 4 additions & 0 deletions pkg/scheduler/framework/dummyplugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
)

const (
dummyAllPurposePluginNameFormat = "dummyAllPurposePlugin-%d"
)

// A no-op, dummy plugin which connects to all extension points.
type DummyAllPurposePlugin struct {
name string
Expand Down
178 changes: 174 additions & 4 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package framework
import (
"context"
"fmt"
"sync/atomic"
"time"

"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -296,17 +297,186 @@ func (f *framework) markAsUnscheduledFor(ctx context.Context, bindings []*fleetv
//
// TO-DO (chenyu1): remove the nolint directives once the function is implemented.
func (f *framework) runSchedulingCycleForPickAllPlacementType(
ctx context.Context, //nolint: revive
state *CycleState, //nolint: revive
ctx context.Context,
state *CycleState,
crpName string, //nolint: revive
policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, //nolint: revive
clusters []fleetv1beta1.MemberCluster, //nolint: revive
policy *fleetv1beta1.ClusterSchedulingPolicySnapshot,
clusters []fleetv1beta1.MemberCluster,
bound, scheduled, obsolete []*fleetv1beta1.ClusterResourceBinding, //nolint: revive
) (result ctrl.Result, err error) {
policyRef := klog.KObj(policy)

// The scheduler always needs to take action when processing scheduling policies of the PickAll
// placement type; enter the actual scheduling stages right away.
klog.V(2).InfoS("Scheduling is always needed for CRPs of the PickAll placement type; entering scheduling stages", "schedulingPolicySnapshot", policyRef)

// Run all plugins needed.
//
// TO-DO (chenyu1): assign variables when needed.
_, _, err = f.runAllPluginsForPickAllPlacementType(ctx, state, policy, clusters)
if err != nil {
klog.ErrorS(err, "Failed to run all plugins (pickAll placement type)", "schedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}

// Not yet implemented.
return ctrl.Result{}, nil
}

// runAllPluginsForPickAllPlacementType runs all plugins in each stage of the scheduling cycle for a
// scheduling policy of the PickAll placement type.
//
// Note that for policies of the PickAll placement type, only the following stages are needed:
// * PreFilter
// * Filter
func (f *framework) runAllPluginsForPickAllPlacementType(
ctx context.Context,
state *CycleState,
policy *fleetv1beta1.ClusterSchedulingPolicySnapshot,
clusters []fleetv1beta1.MemberCluster,
) (scored ScoredClusters, filtered []*filteredClusterWithStatus, err error) {
policyRef := klog.KObj(policy)

// Run pre-filter plugins.
//
// Each plugin can:
// * set up some common state for future calls (on different extensions points) in the scheduling cycle; and/or
// * check if it needs to run the the Filter stage.
// Any plugin that would like to be skipped is listed in the cycle state for future reference.
//
// Note that any failure would lead to the cancellation of the scheduling cycle.
if status := f.runPreFilterPlugins(ctx, state, policy); status.IsInteralError() {
klog.ErrorS(status.AsError(), "Failed to run pre filter plugins", "schedulingPolicySnapshot", policyRef)
return nil, nil, controller.NewUnexpectedBehaviorError(status.AsError())
}

// Run filter plugins.
//
// The scheduler checks each cluster candidate by calling the chain of filter plugins; if any plugin suggests
// that the cluster should not be bound, the cluster is ignored for the rest of the cycle. Note that clusters
// are inspected in parallel.
//
// Note that any failure would lead to the cancellation of the scheduling cycle.
passed, filtered, err := f.runFilterPlugins(ctx, state, policy, clusters)
if err != nil {
klog.ErrorS(err, "Failed to run filter plugins", "schedulingPolicySnapshot", policyRef)
return nil, nil, controller.NewUnexpectedBehaviorError(err)
}

// Wrap all clusters that have passed the Filter stage as scored clusters.
scored = make(ScoredClusters, len(passed))
for _, cluster := range passed {
scored = append(scored, &ScoredCluster{
Cluster: cluster,
Score: &ClusterScore{},
})
}
return scored, filtered, nil
}

// runPreFilterPlugins runs all pre filter plugins sequentially.
func (f *framework) runPreFilterPlugins(ctx context.Context, state *CycleState, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot) *Status {
for _, pl := range f.profile.preFilterPlugins {
status := pl.PreFilter(ctx, state, policy)
switch {
case status.IsSuccess(): // Do nothing.
case status.IsInteralError():
return status
case status.IsSkip():
state.skippedFilterPlugins.Insert(pl.Name())
default:
// Any status that is not Success, InternalError, or Skip is considered an error.
return FromError(fmt.Errorf("prefilter plugin returned an unknown status %s", status), pl.Name())
}
}

return nil
}

// runFilterPluginsFor runs filter plugins for a single cluster.
func (f *framework) runFilterPluginsFor(ctx context.Context, state *CycleState, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) *Status {
for _, pl := range f.profile.filterPlugins {
// Skip the plugin if it is not needed.
if state.skippedFilterPlugins.Has(pl.Name()) {
continue
}
status := pl.Filter(ctx, state, policy, cluster)
switch {
case status.IsSuccess(): // Do nothing.
case status.IsInteralError():
return status
case status.IsClusterUnschedulable():
return status
default:
// Any status that is not Success, InternalError, or ClusterUnschedulable is considered an error.
return FromError(fmt.Errorf("filter plugin returned an unknown status %s", status), pl.Name())
}
}

return nil
}

// filteredClusterWithStatus is struct that documents clusters filtered out at the Filter stage,
// along with a plugin status, which documents why a cluster is filtered out.
//
// This struct is used for the purpose of keeping reasons for returning scheduling decision to
// the user.
type filteredClusterWithStatus struct {
cluster *fleetv1beta1.MemberCluster
status *Status
}

// runFilterPlugins runs filter plugins on clusters in parallel.
func (f *framework) runFilterPlugins(ctx context.Context, state *CycleState, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, clusters []fleetv1beta1.MemberCluster) (passed []*fleetv1beta1.MemberCluster, filtered []*filteredClusterWithStatus, err error) {
// Create a child context.
childCtx, cancel := context.WithCancel(ctx)

// Pre-allocate slices to avoid races.
passed = make([]*fleetv1beta1.MemberCluster, len(clusters))
var passedIdx int32 = -1
filtered = make([]*filteredClusterWithStatus, len(clusters))
var filteredIdx int32 = -1

errFlag := parallelizer.NewErrorFlag()

doWork := func(pieces int) {
cluster := clusters[pieces]
status := f.runFilterPluginsFor(childCtx, state, policy, &cluster)
switch {
case status.IsSuccess():
// Use atomic add to avoid races with minimum overhead.
newPassedIdx := atomic.AddInt32(&passedIdx, 1)
passed[newPassedIdx] = &cluster
zhiying-lin marked this conversation as resolved.
Show resolved Hide resolved
case status.IsClusterUnschedulable():
// Use atomic add to avoid races with minimum overhead.
newFilteredIdx := atomic.AddInt32(&filteredIdx, 1)
filtered[newFilteredIdx] = &filteredClusterWithStatus{
cluster: &cluster,
status: status,
}
default: // An error has occurred.
errFlag.Raise(status.AsError())
// Cancel the child context, which will lead the parallelizer to stop running tasks.
cancel()
}
}

// Run inspection in parallel.
//
// Note that the parallel run will be stopped immediately upon encounter of the first error.
f.parallelizer.ParallelizeUntil(childCtx, len(clusters), doWork, "runFilterPlugins")
// Retrieve the first error from the error flag.
if err := errFlag.Lower(); err != nil {
return nil, nil, err
}

// Trim the slices to the actual size.
passed = passed[:passedIdx+1]
filtered = filtered[:filteredIdx+1]

return passed, filtered, nil
}

// runSchedulingCycleForPickNPlacementType runs the scheduling cycle for a scheduling policy of the PickN
// placement type.
//
Expand Down
Loading