From 7c0c79673e16df3f60e5b1649e1e2cdd0e4228c9 Mon Sep 17 00:00:00 2001 From: xuzhonghu Date: Wed, 10 Jul 2019 18:16:41 +0800 Subject: [PATCH] set drf dominant resource name --- pkg/scheduler/plugins/drf/drf.go | 39 ++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/pkg/scheduler/plugins/drf/drf.go b/pkg/scheduler/plugins/drf/drf.go index 7d81ca43f3..94b7ab5e7d 100644 --- a/pkg/scheduler/plugins/drf/drf.go +++ b/pkg/scheduler/plugins/drf/drf.go @@ -26,6 +26,9 @@ import ( "volcano.sh/volcano/pkg/scheduler/framework" ) +// PluginName indicates name of volcano scheduler plugin. +const PluginName = "drf" + var shareDelta = 0.000001 type drfAttr struct { @@ -38,7 +41,7 @@ type drfPlugin struct { totalResource *api.Resource // Key is Job ID - jobOpts map[api.JobID]*drfAttr + jobAttrs map[api.JobID]*drfAttr // Arguments given for the plugin pluginArguments framework.Arguments @@ -48,13 +51,13 @@ type drfPlugin struct { func New(arguments framework.Arguments) framework.Plugin { return &drfPlugin{ totalResource: api.EmptyResource(), - jobOpts: map[api.JobID]*drfAttr{}, + jobAttrs: map[api.JobID]*drfAttr{}, pluginArguments: arguments, } } func (drf *drfPlugin) Name() string { - return "drf" + return PluginName } func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { @@ -79,25 +82,25 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { // Calculate the init share of Job drf.updateShare(attr) - drf.jobOpts[job.UID] = attr + drf.jobAttrs[job.UID] = attr } preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) []*api.TaskInfo { var victims []*api.TaskInfo - latt := drf.jobOpts[preemptor.Job] + latt := drf.jobAttrs[preemptor.Job] lalloc := latt.allocated.Clone().Add(preemptor.Resreq) - ls := drf.calculateShare(lalloc, drf.totalResource) + _, ls := drf.calculateShare(lalloc, drf.totalResource) allocations := map[api.JobID]*api.Resource{} for _, preemptee := range preemptees { if _, found := allocations[preemptee.Job]; !found { - ratt := drf.jobOpts[preemptee.Job] + ratt := drf.jobAttrs[preemptee.Job] allocations[preemptee.Job] = ratt.allocated.Clone() } ralloc := allocations[preemptee.Job].Sub(preemptee.Resreq) - rs := drf.calculateShare(ralloc, drf.totalResource) + _, rs := drf.calculateShare(ralloc, drf.totalResource) if ls < rs || math.Abs(ls-rs) <= shareDelta { victims = append(victims, preemptee) @@ -116,13 +119,13 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { rv := r.(*api.JobInfo) glog.V(4).Infof("DRF JobOrderFn: <%v/%v> share state: %v, <%v/%v> share state: %v", - lv.Namespace, lv.Name, drf.jobOpts[lv.UID].share, rv.Namespace, rv.Name, drf.jobOpts[rv.UID].share) + lv.Namespace, lv.Name, drf.jobAttrs[lv.UID].share, rv.Namespace, rv.Name, drf.jobAttrs[rv.UID].share) - if drf.jobOpts[lv.UID].share == drf.jobOpts[rv.UID].share { + if drf.jobAttrs[lv.UID].share == drf.jobAttrs[rv.UID].share { return 0 } - if drf.jobOpts[lv.UID].share < drf.jobOpts[rv.UID].share { + if drf.jobAttrs[lv.UID].share < drf.jobAttrs[rv.UID].share { return -1 } @@ -134,7 +137,7 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { // Register event handlers. ssn.AddEventHandler(&framework.EventHandler{ AllocateFunc: func(event *framework.Event) { - attr := drf.jobOpts[event.Task.Job] + attr := drf.jobAttrs[event.Task.Job] attr.allocated.Add(event.Task.Resreq) drf.updateShare(attr) @@ -143,7 +146,7 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share) }, DeallocateFunc: func(event *framework.Event) { - attr := drf.jobOpts[event.Task.Job] + attr := drf.jobAttrs[event.Task.Job] attr.allocated.Sub(event.Task.Resreq) drf.updateShare(attr) @@ -155,23 +158,25 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { } func (drf *drfPlugin) updateShare(attr *drfAttr) { - attr.share = drf.calculateShare(attr.allocated, drf.totalResource) + attr.dominantResource, attr.share = drf.calculateShare(attr.allocated, drf.totalResource) } -func (drf *drfPlugin) calculateShare(allocated, totalResource *api.Resource) float64 { +func (drf *drfPlugin) calculateShare(allocated, totalResource *api.Resource) (string, float64) { res := float64(0) + dominantResource := "" for _, rn := range totalResource.ResourceNames() { share := helpers.Share(allocated.Get(rn), totalResource.Get(rn)) if share > res { res = share + dominantResource = string(rn) } } - return res + return dominantResource, res } func (drf *drfPlugin) OnSessionClose(session *framework.Session) { // Clean schedule data. drf.totalResource = api.EmptyResource() - drf.jobOpts = map[api.JobID]*drfAttr{} + drf.jobAttrs = map[api.JobID]*drfAttr{} }