Skip to content

Commit

Permalink
Reclaim between Queues.
Browse files Browse the repository at this point in the history
Signed-off-by: Da K. Ma <klaus1982.cn@gmail.com>
  • Loading branch information
k82cn committed Aug 24, 2018
1 parent 0efa982 commit 27857a6
Show file tree
Hide file tree
Showing 14 changed files with 477 additions and 53 deletions.
15 changes: 10 additions & 5 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
defer glog.V(3).Infof("Leaving Allocate ...")

queues := util.NewPriorityQueue(ssn.QueueOrderFn)

for _, queue := range ssn.Queues {
queues.Push(queue)
}

jobsMap := map[api.QueueID]*util.PriorityQueue{}

for _, job := range ssn.Jobs {
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}

if queue, found := ssn.QueueIndex[job.Queue]; found {
queues.Push(queue)
}

glog.V(3).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobsMap[job.Queue].Push(job)
}
Expand All @@ -68,6 +68,11 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
}

queue := queues.Pop().(*api.QueueInfo)
if ssn.Overused(queue) {
glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
}

jobs, found := jobsMap[queue.UID]

glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name)
Expand Down
9 changes: 8 additions & 1 deletion pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package allocate
import (
"fmt"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -101,13 +102,16 @@ func buildOwnerReference(owner string) metav1.OwnerReference {
}

type fakeBinder struct {
sync.Mutex
binds map[string]string
c chan string
}

func (fb *fakeBinder) Bind(p *v1.Pod, hostname string) error {
key := fmt.Sprintf("%v/%v", p.Namespace, p.Name)
fb.Lock()
defer fb.Unlock()

key := fmt.Sprintf("%v/%v", p.Namespace, p.Name)
fb.binds[key] = hostname

fb.c <- key
Expand Down Expand Up @@ -150,6 +154,9 @@ func TestAllocate(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "c1",
},
Spec: arbcorev1.QueueSpec{
Weight: 1,
},
},
},
expected: map[string]string{
Expand Down
11 changes: 10 additions & 1 deletion pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,16 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
preempteeTasks := map[api.JobID]*util.PriorityQueue{}

var underRequest []*api.JobInfo
var queues []*api.QueueInfo
for _, job := range ssn.Jobs {
if queue, found := ssn.QueueIndex[job.Queue]; !found {
continue
} else {
glog.V(3).Infof("Added Queue <%s> for Job <%s/%s>",
queue.Name, job.Namespace, job.Name)
queues = append(queues, queue)
}

if len(job.TaskStatusIndex[api.Pending]) != 0 {
if _, found := preemptorsMap[job.Queue]; !found {
preemptorsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
Expand Down Expand Up @@ -89,7 +98,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
}

// Preemption between Jobs within Queue.
for _, queue := range ssn.Queues {
for _, queue := range queues {
for {
preemptors := preemptorsMap[queue.UID]
preemptees := preempteesMap[queue.UID]
Expand Down
142 changes: 140 additions & 2 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ limitations under the License.

package reclaim

import "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/framework"
import (
"github.com/golang/glog"

"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/api"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/framework"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/util"
)

type reclaimAction struct {
ssn *framework.Session
Expand All @@ -33,6 +39,138 @@ func (alloc *reclaimAction) Name() string {
func (alloc *reclaimAction) Initialize() {}

func (alloc *reclaimAction) Execute(ssn *framework.Session) {
glog.V(3).Infof("Enter Reclaim ...")
defer glog.V(3).Infof("Leaving Reclaim ...")

queues := util.NewPriorityQueue(ssn.QueueOrderFn)

preemptorsMap := map[api.QueueID]*util.PriorityQueue{}
preemptorTasks := map[api.JobID]*util.PriorityQueue{}

var underRequest []*api.JobInfo
for _, job := range ssn.Jobs {
if queue, found := ssn.QueueIndex[job.Queue]; !found {
continue
} else {
glog.V(3).Infof("Added Queue <%s> for Job <%s/%s>",
queue.Name, job.Namespace, job.Name)
queues.Push(queue)
}

if len(job.TaskStatusIndex[api.Pending]) != 0 {
if _, found := preemptorsMap[job.Queue]; !found {
preemptorsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
preemptorsMap[job.Queue].Push(job)
underRequest = append(underRequest, job)
preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
preemptorTasks[job.UID].Push(task)
}
}
}

for {
// If no queues, break
if queues.Empty() {
break
}

var job *api.JobInfo
var task *api.TaskInfo

// TODO (k82cn): we should check whether queue deserved more resources.
queue := queues.Pop().(*api.QueueInfo)
if ssn.Overused(queue) {
glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
}

// Found "high" priority job
if jobs, found := preemptorsMap[queue.UID]; !found || jobs.Empty() {
continue
} else {
job = jobs.Pop().(*api.JobInfo)
}

// Found "high" priority task to reclaim others
if tasks, found := preemptorTasks[job.UID]; !found || tasks.Empty() {
continue
} else {
task = tasks.Pop().(*api.TaskInfo)
}

resreq := task.Resreq.Clone()
reclaimed := api.EmptyResource()

assigned := false

for _, n := range ssn.Nodes {
// If predicates failed, next node.
if err := ssn.PredicateFn(task, n); err != nil {
continue
}

glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
task.Namespace, task.Name, n.Name)

var reclaimees []*api.TaskInfo
for _, task := range n.Tasks {
if j, found := ssn.JobIndex[task.Job]; !found {
continue
} else if j.Queue != job.Queue {
// Clone task to avoid modify Task's status on node.
reclaimees = append(reclaimees, task.Clone())
}
}
victims := ssn.Reclaimable(task, reclaimees)

if len(victims) == 0 {
glog.V(3).Infof("No victims on Node <%s>.", n.Name)
continue
}

// If not enough resource, continue
allRes := api.EmptyResource()
for _, v := range victims {
allRes.Add(v.Resreq)
}
if allRes.Less(resreq) {
glog.V(3).Infof("Not enough resource from victims on Node <%s>.", n.Name)
continue
}

// Reclaim victims for tasks.
for _, reclaimee := range victims {
glog.Errorf("Try to reclaim Task <%s/%s> for Tasks <%s/%s>",
reclaimee.Namespace, reclaimee.Name, task.Namespace, task.Name)
if err := ssn.Reclaim(task, reclaimee); err != nil {
glog.Errorf("Failed to reclaim Task <%s/%s> for Tasks <%s/%s>: %v",
reclaimee.Namespace, reclaimee.Name, task.Namespace, task.Name, err)
continue
}
reclaimed.Add(reclaimee.Resreq)
// If reclaimed enough resources, break loop to avoid Sub panic.
if resreq.LessEqual(reclaimee.Resreq) {
break
}
resreq.Sub(reclaimee.Resreq)
}

glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.",
reclaimed, task.Namespace, task.Name, task.Resreq)

assigned = true

break
}

if assigned {
queues.Push(queue)
}
}

}

func (alloc *reclaimAction) UnInitialize() {}
func (alloc *reclaimAction) UnInitialize() {
}
2 changes: 2 additions & 0 deletions pkg/scheduler/api/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type ClusterInfo struct {
Nodes []*NodeInfo

Queues []*QueueInfo

Others []*TaskInfo
}

func (ci ClusterInfo) String() string {
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/api/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/api"
)

func Min(l, v *api.Resource) *api.Resource {
r := &api.Resource{}
func Min(l, r *api.Resource) *api.Resource {
res := &api.Resource{}

r.MilliCPU = math.Min(l.MilliCPU, r.MilliCPU)
r.MilliGPU = math.Min(l.MilliGPU, r.MilliGPU)
r.Memory = math.Min(l.Memory, r.Memory)
res.MilliCPU = math.Min(l.MilliCPU, r.MilliCPU)
res.MilliGPU = math.Min(l.MilliGPU, r.MilliGPU)
res.Memory = math.Min(l.Memory, r.Memory)

return r
return res
}

func Share(l, r float64) float64 {
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,9 @@ type Reason struct {
Event arbcorev1.Event
Message string
}

// ReclaimableFn is the func declaration used to reclaim tasks.
type ReclaimableFn func(*TaskInfo, []*TaskInfo) []*TaskInfo

// PreemptableFn is the func declaration used to reclaim tasks.
type PreemptableFn func(*TaskInfo, []*TaskInfo) []*TaskInfo
14 changes: 14 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,20 +416,34 @@ func (sc *SchedulerCache) Snapshot() *arbapi.ClusterInfo {
Nodes: make([]*arbapi.NodeInfo, 0, len(sc.Nodes)),
Jobs: make([]*arbapi.JobInfo, 0, len(sc.Jobs)),
Queues: make([]*arbapi.QueueInfo, 0, len(sc.Queues)),
Others: make([]*arbapi.TaskInfo, 0, 10),
}

for _, value := range sc.Nodes {
snapshot.Nodes = append(snapshot.Nodes, value.Clone())
}

queues := map[arbapi.QueueID]struct{}{}
for _, value := range sc.Queues {
snapshot.Queues = append(snapshot.Queues, value.Clone())
queues[value.UID] = struct{}{}
}

for _, value := range sc.Jobs {
// If no scheduling spec, does not handle it.
if value.PodGroup == nil && value.PDB == nil {
glog.V(3).Infof("The scheduling spec of Job <%v> is nil, ignore it.", value.UID)

// Also tracing the running task assigned by other scheduler.
for _, task := range value.TaskStatusIndex[arbapi.Running] {
snapshot.Others = append(snapshot.Others, task.Clone())
}

continue
}

if _, found := queues[value.Queue]; !found {
glog.V(3).Infof("The Queue of Job <%v> does not exist, ignore it.", value.UID)
continue
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package scheduler
import (
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/actions/allocate"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/actions/preempt"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/actions/reclaim"

"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/plugins/drf"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/plugins/gang"
Expand All @@ -40,6 +41,7 @@ func init() {
framework.RegisterPluginBuilder("proportion", proportion.New)

// Actions
framework.RegisterAction(reclaim.New())
framework.RegisterAction(allocate.New())
framework.RegisterAction(preempt.New())
}
Loading

0 comments on commit 27857a6

Please sign in to comment.