diff --git a/pkg/scheduler/actions/enqueue/enqueue.go b/pkg/scheduler/actions/enqueue/enqueue.go index ca6cfd6359..15bb2432a1 100644 --- a/pkg/scheduler/actions/enqueue/enqueue.go +++ b/pkg/scheduler/actions/enqueue/enqueue.go @@ -55,7 +55,6 @@ func (enqueue *Action) Execute(ssn *framework.Session) { queues := util.NewPriorityQueue(ssn.QueueOrderFn) queueMap := map[api.QueueID]*api.QueueInfo{} - jobsMap := map[api.QueueID]*util.PriorityQueue{} for _, job := range ssn.Jobs { @@ -69,6 +68,7 @@ func (enqueue *Action) Execute(ssn *framework.Session) { queueMap[queue.UID] = queue queues.Push(queue) + ssn.InqueueJobResource[queue.UID] = api.EmptyResource() } if job.PodGroup.Status.Phase == scheduling.PodGroupPending { @@ -78,6 +78,11 @@ func (enqueue *Action) Execute(ssn *framework.Session) { klog.V(3).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue) jobsMap[job.Queue].Push(job) } + + if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue { + klog.V(3).Infof("Added Job <%s/%s> into InqueueResource", job.Namespace, job.Name, job.Queue) + ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources)) + } } klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap)) @@ -124,6 +129,7 @@ func (enqueue *Action) Execute(ssn *framework.Session) { if inqueue { job.PodGroup.Status.Phase = scheduling.PodGroupInqueue ssn.Jobs[job.UID] = job + ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources)) } // Added Queue back until no job in Queue. diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index c83e8e4757..efc6bff483 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -44,10 +44,11 @@ type Session struct { // This should not be mutated after initiated podGroupStatus map[api.JobID]scheduling.PodGroupStatus - Jobs map[api.JobID]*api.JobInfo - Nodes map[string]*api.NodeInfo - Queues map[api.QueueID]*api.QueueInfo - NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo + Jobs map[api.JobID]*api.JobInfo + Nodes map[string]*api.NodeInfo + Queues map[api.QueueID]*api.QueueInfo + NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo + InqueueJobResource map[api.QueueID]*api.Resource Tiers []conf.Tier Configurations []conf.Configuration @@ -81,9 +82,10 @@ func openSession(cache cache.Cache) *Session { podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{}, - Jobs: map[api.JobID]*api.JobInfo{}, - Nodes: map[string]*api.NodeInfo{}, - Queues: map[api.QueueID]*api.QueueInfo{}, + Jobs: map[api.JobID]*api.JobInfo{}, + Nodes: map[string]*api.NodeInfo{}, + Queues: map[api.QueueID]*api.QueueInfo{}, + InqueueJobResource: map[api.QueueID]*api.Resource{}, plugins: map[string]Plugin{}, jobOrderFns: map[string]api.CompareFn{}, @@ -155,6 +157,7 @@ func closeSession(ssn *Session) { ssn.jobOrderFns = nil ssn.namespaceOrderFns = nil ssn.queueOrderFns = nil + ssn.InqueueJobResource = nil klog.V(3).Infof("Close Session %v", ssn.UID) } diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 4dd3c72024..f4a5279cde 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -18,7 +18,6 @@ package proportion import ( "k8s.io/klog" - "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/api/helpers" "volcano.sh/volcano/pkg/scheduler/framework" @@ -243,7 +242,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { minReq := api.NewResource(*job.PodGroup.Spec.MinResources) // The queue resource quota limit has not reached - return minReq.Add(attr.allocated).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) + return minReq.Add(attr.allocated).Add(ssn.InqueueJobResource[job.Queue]).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) }) // Register event handlers.