Skip to content

Commit

Permalink
Merge pull request volcano-sh#255 from k82cn/ka_247_3
Browse files Browse the repository at this point in the history
Corrected 'node.releasing' counter error.
  • Loading branch information
k82cn authored Jun 27, 2018
2 parents 160dec1 + cab6376 commit 2c1f3fd
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 50 deletions.
4 changes: 2 additions & 2 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {

// Allocate releasing resource to the task if any.
if task.Resreq.LessEqual(node.Releasing) {
glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v>",
task.Job, task.UID, node.Name)
glog.V(3).Infof("Pipelining Task <%v:%v/%v> to node <%v> for <%v> on <%v>",
task.UID, task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing)
if err := ssn.Pipeline(task, node.Name); err != nil {
glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
task.UID, node.Name, ssn.ID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/api/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func getTaskStatus(pod *v1.Pod) TaskStatus {
return Unknown
}

func OccupiedResources(status TaskStatus) bool {
func AllocatedStatus(status TaskStatus) bool {
switch status {
case Bound, Binding, Running, Allocated:
return true
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (ps *JobInfo) AddTaskInfo(pi *TaskInfo) {

ps.TotalRequest.Add(pi.Resreq)

if OccupiedResources(pi.Status) {
if AllocatedStatus(pi.Status) {
ps.Allocated.Add(pi.Resreq)
}
}
Expand Down Expand Up @@ -220,7 +220,7 @@ func (ps *JobInfo) DeleteTaskInfo(pi *TaskInfo) {
if task, found := ps.Tasks[pi.UID]; found {
ps.TotalRequest.Sub(task.Resreq)

if OccupiedResources(task.Status) {
if AllocatedStatus(task.Status) {
ps.Allocated.Sub(task.Resreq)
}

Expand Down
16 changes: 12 additions & 4 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,17 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) {
ni.Used.Add(task.Resreq)
}

glog.V(3).Infof("After added Task <%v> from Node <%v>: idle <%v>, used <%v>, releasing <%v>",
key, ni.Name, ni.Idle, ni.Used, ni.Releasing)

ni.Tasks[key] = task
}

func (ni *NodeInfo) RemoveTask(task *TaskInfo) {
key := PodKey(task.Pod)
if _, found := ni.Tasks[key]; !found {
func (ni *NodeInfo) RemoveTask(ti *TaskInfo) {
key := PodKey(ti.Pod)

task, found := ni.Tasks[key]
if !found {
return
}

Expand All @@ -160,5 +165,8 @@ func (ni *NodeInfo) RemoveTask(task *TaskInfo) {
ni.Used.Sub(task.Resreq)
}

delete(ni.Tasks, PodKey(task.Pod))
glog.V(3).Infof("After removed Task <%v> from Node <%v>: idle <%v>, used <%v>, releasing <%v>",
key, ni.Name, ni.Idle, ni.Used, ni.Releasing)

delete(ni.Tasks, key)
}
3 changes: 0 additions & 3 deletions pkg/scheduler/framework/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,5 @@ type Event struct {

type EventHandler struct {
AllocateFunc func(event *Event)
PipelineFunc func(event *Event)
BindFunc func(event *Event)
EvictFunc func(event *Event)
PreemptFunc func(event *Event)
}
17 changes: 4 additions & 13 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error {
}

for _, eh := range ssn.eventHandlers {
if eh.PipelineFunc != nil {
eh.PipelineFunc(&Event{
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
Task: task,
})
}
Expand Down Expand Up @@ -159,15 +159,6 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error {
task.Job, ssn.ID)
}

// Callbacks
for _, eh := range ssn.eventHandlers {
if eh.BindFunc != nil {
eh.BindFunc(&Event{
Task: task,
})
}
}

return nil
}

Expand All @@ -191,8 +182,8 @@ func (ssn *Session) Preempt(preemptor, preemptee *api.TaskInfo) error {
}

for _, eh := range ssn.eventHandlers {
if eh.PreemptFunc != nil {
eh.PreemptFunc(&Event{
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
Task: preemptor,
})
}
Expand Down
38 changes: 14 additions & 24 deletions pkg/scheduler/plugins/drf/drf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ limitations under the License.
package drf

import (
"math"

"github.com/golang/glog"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/api"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/framework"
)

var shareDelta = 0.000001

type drfAttr struct {
share float64
dominantResource string
allocated *api.Resource
preempting *api.Resource
}

type drfPlugin struct {
Expand Down Expand Up @@ -55,12 +58,11 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {

for _, job := range ssn.Jobs {
attr := &drfAttr{
allocated: api.EmptyResource(),
preempting: api.EmptyResource(),
allocated: api.EmptyResource(),
}

for status, tasks := range job.TaskStatusIndex {
if api.OccupiedResources(status) {
if api.AllocatedStatus(status) {
for _, t := range tasks {
attr.allocated.Add(t.Resreq)
}
Expand All @@ -81,16 +83,16 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
ratt := drf.jobOpts[rv.Job]

// Also includes preempting resources.
lalloc := latt.allocated.Clone().Add(lv.Resreq).Add(latt.preempting)
ralloc := ratt.allocated.Clone().Sub(rv.Resreq).Add(ratt.preempting)
lalloc := latt.allocated.Clone().Add(lv.Resreq)
ralloc := ratt.allocated.Clone().Sub(rv.Resreq)

ls := drf.calculateShare(lalloc, drf.totalResource)
rs := drf.calculateShare(ralloc, drf.totalResource)

glog.V(3).Infof("DRF PreemptableFn: preemptor <%v:%v/%v>, alloc <%v>, share <%v>; preemptee <%v:%v/%v>, alloc <%v>, share <%v>",
lv.UID, lv.Namespace, lv.Name, lalloc, ls, rv.UID, rv.Namespace, rv.Name, ralloc, rs)

return ls < rs
return ls < rs || math.Abs(ls-rs) <= shareDelta
})

// Add Job Order function.
Expand All @@ -115,31 +117,19 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
attr := drf.jobOpts[event.Task.Job]
attr.allocated.Add(event.Task.Resreq)

if event.Task.Resreq.LessEqual(attr.preempting) {
attr.preempting.Sub(event.Task.Resreq)
}

drf.updateShare(attr)
},
PipelineFunc: func(event *framework.Event) {
attr := drf.jobOpts[event.Task.Job]
attr.allocated.Add(event.Task.Resreq)

if event.Task.Resreq.LessEqual(attr.preempting) {
attr.preempting.Sub(event.Task.Resreq)
}

drf.updateShare(attr)
glog.V(3).Infof("DRF AllocateFunc: task <%v:%v/%v>, resreq <%v>, share <%v>",
event.Task.UID, event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share)
},
EvictFunc: func(event *framework.Event) {
attr := drf.jobOpts[event.Task.Job]
attr.allocated.Sub(event.Task.Resreq)

drf.updateShare(attr)
},
PreemptFunc: func(event *framework.Event) {
attr := drf.jobOpts[event.Task.Job]
attr.preempting.Add(event.Task.Resreq)

glog.V(3).Infof("DRF EvictFunc: task <%v:%v/%v>, resreq <%v>, share <%v>",
event.Task.UID, event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share)
},
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func New() framework.Plugin {
func readyTaskNum(job *api.JobInfo) int {
occupid := 0
for status, tasks := range job.TaskStatusIndex {
if api.OccupiedResources(status) || status == api.Succeeded {
if api.AllocatedStatus(status) || status == api.Succeeded {
occupid = occupid + len(tasks)
}
}
Expand Down

0 comments on commit 2c1f3fd

Please sign in to comment.