Skip to content

Commit

Permalink
Merge pull request volcano-sh#7 from hzxuzhonghu/master
Browse files Browse the repository at this point in the history
Implement queue Capability
  • Loading branch information
volcano-sh-bot authored May 15, 2019
2 parents 8c5fc7d + ca395c6 commit 3696b73
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 38 deletions.
14 changes: 5 additions & 9 deletions pkg/scheduler/actions/enqueue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,14 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) {
job := jobs.Pop().(*api.JobInfo)

inqueue := false
if len(job.TaskStatusIndex[api.Pending]) != 0 {

if job.PodGroup.Spec.MinResources == nil {
inqueue = true
} else {
if job.PodGroup.Spec.MinResources == nil {
pgResource := api.NewResource(*job.PodGroup.Spec.MinResources)
if ssn.JobEnqueueable(job) && pgResource.LessEqual(nodesIdleRes) {
nodesIdleRes.Sub(pgResource)
inqueue = true
} else {
pgResource := api.NewResource(*job.PodGroup.Spec.MinResources)

if pgResource.LessEqual(nodesIdleRes) {
nodesIdleRes.Sub(pgResource)
inqueue = true
}
}
}

Expand Down
60 changes: 31 additions & 29 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,22 @@ type Session struct {
Backlog []*api.JobInfo
Tiers []conf.Tier

plugins map[string]Plugin
eventHandlers []*EventHandler
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
taskOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
nodeOrderFns map[string]api.NodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
jobReadyFns map[string]api.ValidateFn
jobPipelinedFns map[string]api.ValidateFn
jobValidFns map[string]api.ValidateExFn
plugins map[string]Plugin
eventHandlers []*EventHandler
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
taskOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
nodeOrderFns map[string]api.NodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
jobReadyFns map[string]api.ValidateFn
jobPipelinedFns map[string]api.ValidateFn
jobValidFns map[string]api.ValidateExFn
jobEnqueueableFns map[string]api.ValidateFn
}

func openSession(cache cache.Cache) *Session {
Expand All @@ -71,20 +72,21 @@ func openSession(cache cache.Cache) *Session {
Nodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},

plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.ValidateFn{},
jobValidFns: map[string]api.ValidateExFn{},
plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.ValidateFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.ValidateFn{},
}

snapshot := cache.Snapshot()
Expand Down
23 changes: 23 additions & 0 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (ssn *Session) AddJobValidFn(name string, fn api.ValidateExFn) {
ssn.jobValidFns[name] = fn
}

// AddJobEnqueueableFn add jobenqueueable function
func (ssn *Session) AddJobEnqueueableFn(name string, fn api.ValidateFn) {
ssn.jobEnqueueableFns[name] = fn
}

// Reclaimable invoke reclaimable function of the plugins
func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo
Expand Down Expand Up @@ -249,6 +254,24 @@ func (ssn *Session) JobValid(obj interface{}) *api.ValidateResult {
return nil
}

// JobEnqueueable invoke jobEnqueueableFns function of the plugins
func (ssn *Session) JobEnqueueable(obj interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
fn, found := ssn.jobEnqueueableFns[plugin.Name]
if !found {
continue
}

if res := fn(obj); !res {
return res
}
}
}

return true
}

// JobOrderFn invoke joborder function of the plugins
func (ssn *Session) JobOrderFn(l, r interface{}) bool {
for _, tier := range ssn.Tiers {
Expand Down
13 changes: 13 additions & 0 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,19 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
return overused
})

ssn.AddJobEnqueueableFn(pp.Name(), func(obj interface{}) bool {
job := obj.(*api.JobInfo)
queueID := job.Queue
attr := pp.queueOpts[queueID]
queue := ssn.Queues[queueID]
pgResource := api.NewResource(*job.PodGroup.Spec.MinResources)
// The queue resource quota limit has not reached
if pgResource.Clone().Add(attr.allocated).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) {
return true
}
return false
})

// Register event handlers.
ssn.AddEventHandler(&framework.EventHandler{
AllocateFunc: func(event *framework.Event) {
Expand Down

0 comments on commit 3696b73

Please sign in to comment.