Skip to content

Commit

Permalink
Prioritize scheduling to machines that are satisfied with current res…
Browse files Browse the repository at this point in the history
…ources, and then consider machines that are satisfied with future resources

Signed-off-by: wangyang <wangyang8126@gmail.com>
  • Loading branch information
wangyang0616 committed Aug 3, 2023
1 parent 9eaa272 commit 5e48157
Showing 1 changed file with 50 additions and 24 deletions.
74 changes: 50 additions & 24 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,50 +195,76 @@ func (alloc *Action) Execute(ssn *framework.Session) {
break
}

var candidateNodes []*api.NodeInfo
// Candidate nodes are divided into two gradients:
// - the first gradient node: a list of free nodes that satisfy the task resource request;
// - The second gradient node: the node list whose sum of node idle resources and future idle meets the task resource request;
// Score the first gradient node first. If the first gradient node meets the requirements, ignore the second gradient node list,
// otherwise, score the second gradient node and select the appropriate node.
var candidateNodes [][]*api.NodeInfo
var idleCandidateNodes []*api.NodeInfo
var futureIdleCandidateNodes []*api.NodeInfo
for _, n := range predicateNodes {
if task.InitResreq.LessEqual(n.Idle, api.Zero) || task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) {
candidateNodes = append(candidateNodes, n)
if task.InitResreq.LessEqual(n.Idle, api.Zero) {
idleCandidateNodes = append(idleCandidateNodes, n)
} else if task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) {
futureIdleCandidateNodes = append(futureIdleCandidateNodes, n)
} else {
klog.V(5).Infof("Predicate filtered node %v, idle: %v and future idle: %v do not meet the requirements of task: %v",
n.Name, n.Idle, n.FutureIdle(), task.Name)
}
}
candidateNodes = append(candidateNodes, idleCandidateNodes)
candidateNodes = append(candidateNodes, futureIdleCandidateNodes)

var bestNode *api.NodeInfo
for index, nodes := range candidateNodes {
if klog.V(5).Enabled() {
for _, node := range nodes {
klog.V(5).Infof("node %v, idle: %v, future idle: %v", node.Name, node.Idle, node.FutureIdle())
}
}
switch {
case len(nodes) == 0:
klog.V(5).Infof("Task: %v, no matching node is found in the candidateNodes(index: %d) list.", task.Name, index)
case len(nodes) == 1: // If only one node after predicate, just use it.
bestNode = nodes[0]
case len(nodes) > 1: // If more than one node after predicate, using "the best" one
nodeScores := util.PrioritizeNodes(task, nodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

bestNode = ssn.BestNodeFn(task, nodeScores)
if bestNode == nil {
bestNode = util.SelectBestNode(nodeScores)
}
}

var node *api.NodeInfo
switch {
case len(candidateNodes) == 0: // If not candidate nodes for this task, skip it.
continue
case len(candidateNodes) == 1: // If only one node after predicate, just use it.
node = candidateNodes[0]
case len(candidateNodes) > 1: // If more than one node after predicate, using "the best" one
nodeScores := util.PrioritizeNodes(task, candidateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
// If a proper node is found in idleCandidateNodes, skip futureIdleCandidateNodes and directly return the node information.
if bestNode != nil {
break
}
}

// Allocate idle resource to the task.
if task.InitResreq.LessEqual(node.Idle, api.Zero) {
if task.InitResreq.LessEqual(bestNode.Idle, api.Zero) {
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
task.Namespace, task.Name, node.Name)
if err := stmt.Allocate(task, node); err != nil {
task.Namespace, task.Name, bestNode.Name)
if err := stmt.Allocate(task, bestNode); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, ssn.UID, err)
task.UID, bestNode.Name, ssn.UID, err)
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
}
} else {
klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)
task.Namespace, task.Name, bestNode.Name)

// Allocate releasing resource to the task if any.
if task.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
if task.InitResreq.LessEqual(bestNode.FutureIdle(), api.Zero) {
klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
if err := stmt.Pipeline(task, node.Name); err != nil {
task.Namespace, task.Name, bestNode.Name, task.InitResreq, bestNode.Releasing)
if err := stmt.Pipeline(task, bestNode.Name); err != nil {
klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.",
task.UID, node.Name, ssn.UID, err)
task.UID, bestNode.Name, ssn.UID, err)
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
Expand Down

0 comments on commit 5e48157

Please sign in to comment.