From cab63769133b408e8d1a412755723891136fe236 Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Wed, 27 Jun 2018 22:28:28 +0800 Subject: [PATCH] Corrected 'node.releasing' counter error. Signed-off-by: Da K. Ma --- pkg/scheduler/actions/allocate/allocate.go | 4 +-- pkg/scheduler/api/helpers.go | 2 +- pkg/scheduler/api/job_info.go | 4 +-- pkg/scheduler/api/node_info.go | 16 ++++++--- pkg/scheduler/framework/event.go | 3 -- pkg/scheduler/framework/session.go | 17 +++------- pkg/scheduler/plugins/drf/drf.go | 38 ++++++++-------------- pkg/scheduler/plugins/gang/gang.go | 2 +- 8 files changed, 36 insertions(+), 50 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 021b47fee2..15a94d4d45 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -104,8 +104,8 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { // Allocate releasing resource to the task if any. if task.Resreq.LessEqual(node.Releasing) { - glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v>", - task.Job, task.UID, node.Name) + glog.V(3).Infof("Pipelining Task <%v:%v/%v> to node <%v> for <%v> on <%v>", + task.UID, task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing) if err := ssn.Pipeline(task, node.Name); err != nil { glog.Errorf("Failed to pipeline Task %v on %v in Session %v", task.UID, node.Name, ssn.ID) diff --git a/pkg/scheduler/api/helpers.go b/pkg/scheduler/api/helpers.go index 0042353d1d..a9415e6cdd 100644 --- a/pkg/scheduler/api/helpers.go +++ b/pkg/scheduler/api/helpers.go @@ -60,7 +60,7 @@ func getTaskStatus(pod *v1.Pod) TaskStatus { return Unknown } -func OccupiedResources(status TaskStatus) bool { +func AllocatedStatus(status TaskStatus) bool { switch status { case Bound, Binding, Running, Allocated: return true diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index 67cc1d3faa..fb866ab9f3 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -186,7 +186,7 @@ func (ps *JobInfo) AddTaskInfo(pi *TaskInfo) { ps.TotalRequest.Add(pi.Resreq) - if OccupiedResources(pi.Status) { + if AllocatedStatus(pi.Status) { ps.Allocated.Add(pi.Resreq) } } @@ -220,7 +220,7 @@ func (ps *JobInfo) DeleteTaskInfo(pi *TaskInfo) { if task, found := ps.Tasks[pi.UID]; found { ps.TotalRequest.Sub(task.Resreq) - if OccupiedResources(task.Status) { + if AllocatedStatus(task.Status) { ps.Allocated.Sub(task.Resreq) } diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index 66faac4dca..b0bf99263f 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -142,12 +142,17 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) { ni.Used.Add(task.Resreq) } + glog.V(3).Infof("After added Task <%v> from Node <%v>: idle <%v>, used <%v>, releasing <%v>", + key, ni.Name, ni.Idle, ni.Used, ni.Releasing) + ni.Tasks[key] = task } -func (ni *NodeInfo) RemoveTask(task *TaskInfo) { - key := PodKey(task.Pod) - if _, found := ni.Tasks[key]; !found { +func (ni *NodeInfo) RemoveTask(ti *TaskInfo) { + key := PodKey(ti.Pod) + + task, found := ni.Tasks[key] + if !found { return } @@ -160,5 +165,8 @@ func (ni *NodeInfo) RemoveTask(task *TaskInfo) { ni.Used.Sub(task.Resreq) } - delete(ni.Tasks, PodKey(task.Pod)) + glog.V(3).Infof("After removed Task <%v> from Node <%v>: idle <%v>, used <%v>, releasing <%v>", + key, ni.Name, ni.Idle, ni.Used, ni.Releasing) + + delete(ni.Tasks, key) } diff --git a/pkg/scheduler/framework/event.go b/pkg/scheduler/framework/event.go index 78848edd4e..e27e7e33f8 100644 --- a/pkg/scheduler/framework/event.go +++ b/pkg/scheduler/framework/event.go @@ -26,8 +26,5 @@ type Event struct { type EventHandler struct { AllocateFunc func(event *Event) - PipelineFunc func(event *Event) - BindFunc func(event *Event) EvictFunc func(event *Event) - PreemptFunc func(event *Event) } diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 3197f39b37..ff399fd9e5 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -99,8 +99,8 @@ func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error { } for _, eh := range ssn.eventHandlers { - if eh.PipelineFunc != nil { - eh.PipelineFunc(&Event{ + if eh.AllocateFunc != nil { + eh.AllocateFunc(&Event{ Task: task, }) } @@ -159,15 +159,6 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error { task.Job, ssn.ID) } - // Callbacks - for _, eh := range ssn.eventHandlers { - if eh.BindFunc != nil { - eh.BindFunc(&Event{ - Task: task, - }) - } - } - return nil } @@ -191,8 +182,8 @@ func (ssn *Session) Preempt(preemptor, preemptee *api.TaskInfo) error { } for _, eh := range ssn.eventHandlers { - if eh.PreemptFunc != nil { - eh.PreemptFunc(&Event{ + if eh.AllocateFunc != nil { + eh.AllocateFunc(&Event{ Task: preemptor, }) } diff --git a/pkg/scheduler/plugins/drf/drf.go b/pkg/scheduler/plugins/drf/drf.go index 2a00eb8a64..03934ec46f 100644 --- a/pkg/scheduler/plugins/drf/drf.go +++ b/pkg/scheduler/plugins/drf/drf.go @@ -17,16 +17,19 @@ limitations under the License. package drf import ( + "math" + "github.com/golang/glog" "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/api" "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/framework" ) +var shareDelta = 0.000001 + type drfAttr struct { share float64 dominantResource string allocated *api.Resource - preempting *api.Resource } type drfPlugin struct { @@ -55,12 +58,11 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { for _, job := range ssn.Jobs { attr := &drfAttr{ - allocated: api.EmptyResource(), - preempting: api.EmptyResource(), + allocated: api.EmptyResource(), } for status, tasks := range job.TaskStatusIndex { - if api.OccupiedResources(status) { + if api.AllocatedStatus(status) { for _, t := range tasks { attr.allocated.Add(t.Resreq) } @@ -81,8 +83,8 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { ratt := drf.jobOpts[rv.Job] // Also includes preempting resources. - lalloc := latt.allocated.Clone().Add(lv.Resreq).Add(latt.preempting) - ralloc := ratt.allocated.Clone().Sub(rv.Resreq).Add(ratt.preempting) + lalloc := latt.allocated.Clone().Add(lv.Resreq) + ralloc := ratt.allocated.Clone().Sub(rv.Resreq) ls := drf.calculateShare(lalloc, drf.totalResource) rs := drf.calculateShare(ralloc, drf.totalResource) @@ -90,7 +92,7 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { glog.V(3).Infof("DRF PreemptableFn: preemptor <%v:%v/%v>, alloc <%v>, share <%v>; preemptee <%v:%v/%v>, alloc <%v>, share <%v>", lv.UID, lv.Namespace, lv.Name, lalloc, ls, rv.UID, rv.Namespace, rv.Name, ralloc, rs) - return ls < rs + return ls < rs || math.Abs(ls-rs) <= shareDelta }) // Add Job Order function. @@ -115,31 +117,19 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { attr := drf.jobOpts[event.Task.Job] attr.allocated.Add(event.Task.Resreq) - if event.Task.Resreq.LessEqual(attr.preempting) { - attr.preempting.Sub(event.Task.Resreq) - } - drf.updateShare(attr) - }, - PipelineFunc: func(event *framework.Event) { - attr := drf.jobOpts[event.Task.Job] - attr.allocated.Add(event.Task.Resreq) - if event.Task.Resreq.LessEqual(attr.preempting) { - attr.preempting.Sub(event.Task.Resreq) - } - - drf.updateShare(attr) + glog.V(3).Infof("DRF AllocateFunc: task <%v:%v/%v>, resreq <%v>, share <%v>", + event.Task.UID, event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share) }, EvictFunc: func(event *framework.Event) { attr := drf.jobOpts[event.Task.Job] attr.allocated.Sub(event.Task.Resreq) drf.updateShare(attr) - }, - PreemptFunc: func(event *framework.Event) { - attr := drf.jobOpts[event.Task.Job] - attr.preempting.Add(event.Task.Resreq) + + glog.V(3).Infof("DRF EvictFunc: task <%v:%v/%v>, resreq <%v>, share <%v>", + event.Task.UID, event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share) }, }) } diff --git a/pkg/scheduler/plugins/gang/gang.go b/pkg/scheduler/plugins/gang/gang.go index 11589b3c66..523aad8f90 100644 --- a/pkg/scheduler/plugins/gang/gang.go +++ b/pkg/scheduler/plugins/gang/gang.go @@ -33,7 +33,7 @@ func New() framework.Plugin { func readyTaskNum(job *api.JobInfo) int { occupid := 0 for status, tasks := range job.TaskStatusIndex { - if api.OccupiedResources(status) || status == api.Succeeded { + if api.AllocatedStatus(status) || status == api.Succeeded { occupid = occupid + len(tasks) } }