Skip to content

Commit

Permalink
Merge pull request volcano-sh#31 from lminzhw/private_scheduler
Browse files Browse the repository at this point in the history
Performance and some other improvement on scheduler
  • Loading branch information
volcano-sh-bot authored Jun 28, 2019
2 parents 31b04ec + 4de758b commit 0302d83
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 18 deletions.
3 changes: 3 additions & 0 deletions cmd/kube-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import (
// Import default actions/plugins.
_ "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions"
_ "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins"

// init assert
_ "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/assert"
)

var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes")
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type NodeInfo struct {
Tasks map[TaskID]*TaskInfo

// Used to store custom information
Other interface{}
Others map[string]interface{}
}

// NodeState defines the current state of node.
Expand Down Expand Up @@ -98,7 +98,7 @@ func (ni *NodeInfo) Clone() *NodeInfo {
for _, p := range ni.Tasks {
res.AddTask(p)
}
res.Other = ni.Other
res.Others = ni.Others
return res
}

Expand Down
28 changes: 13 additions & 15 deletions pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

v1 "k8s.io/api/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/assert"
)

// Resource struct defines all the resource type
Expand Down Expand Up @@ -117,9 +119,8 @@ func (r *Resource) IsZero(rn v1.ResourceName) bool {
return true
}

if _, ok := r.ScalarResources[rn]; !ok {
panic("unknown resource")
}
_, found := r.ScalarResources[rn]
assert.Assertf(found, "unknown resource %s", rn)

return r.ScalarResources[rn] < minMilliScalarResources
}
Expand All @@ -142,22 +143,19 @@ func (r *Resource) Add(rr *Resource) *Resource {

//Sub subtracts two Resource objects.
func (r *Resource) Sub(rr *Resource) *Resource {
if rr.LessEqual(r) {
r.MilliCPU -= rr.MilliCPU
r.Memory -= rr.Memory
assert.Assertf(rr.LessEqual(r), "resource is not sufficient to do operation: <%v> sub <%v>", r, rr)

for rrName, rrQuant := range rr.ScalarResources {
if r.ScalarResources == nil {
return r
}
r.ScalarResources[rrName] -= rrQuant
}
r.MilliCPU -= rr.MilliCPU
r.Memory -= rr.Memory

return r
for rrName, rrQuant := range rr.ScalarResources {
if r.ScalarResources == nil {
return r
}
r.ScalarResources[rrName] -= rrQuant
}

panic(fmt.Errorf("resource is not sufficient to do operation: <%v> sub <%v>",
r, rr))
return r
}

// SetMaxResource compares with ResourceList and takes max value for each Resource.
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type defaultBinder struct {
//Bind will send bind request to api server
func (db *defaultBinder) Bind(p *v1.Pod, hostname string) error {
if err := db.kubeclient.CoreV1().Pods(p.Namespace).Bind(&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: p.Annotations},
Target: v1.ObjectReference{
Kind: "Node",
Name: hostname,
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
}

pgResource := api.NewResource(*job.PodGroup.Spec.MinResources)
if len(queue.Queue.Spec.Capability) == 0 {
glog.V(4).Infof("Capability of queue <%s> was not set, allow job <%s/%s> to Inqueue.",
queue.Name, job.Namespace, job.Name)
return true
}
// The queue resource quota limit has not reached
if pgResource.Clone().Add(attr.allocated).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) {
return true
Expand Down
44 changes: 44 additions & 0 deletions pkg/scheduler/util/assert/assert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package assert

import (
"fmt"
"os"
"runtime/debug"

"github.com/golang/glog"
)

const (
// EnvPanicOnError is the env name to determine panic on assertion failed or not
EnvPanicOnError = "PANIC_ON_ERROR"
)

var (
panicOnError = true
)

func init() {
env := os.Getenv(EnvPanicOnError)
if env == "false" {
panicOnError = false
}
}

// Assert check condition, if condition is false, print message by log or panic
func Assert(condition bool, message string) {
if condition {
return
}
if panicOnError {
panic(message)
}
glog.Errorf("%s, %s", message, debug.Stack())
}

// Assertf check condition, if condition is false, print message using Assert
func Assertf(condition bool, format string, args ...interface{}) {
if condition {
return
}
Assert(condition, fmt.Sprintf(format, args...))
}

0 comments on commit 0302d83

Please sign in to comment.