From 27857a6a0c0a6ec159364c50a55cde539cbabd87 Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Thu, 23 Aug 2018 13:54:26 +0800 Subject: [PATCH] Reclaim between Queues. Signed-off-by: Da K. Ma --- pkg/scheduler/actions/allocate/allocate.go | 15 +- .../actions/allocate/allocate_test.go | 9 +- pkg/scheduler/actions/preempt/preempt.go | 11 +- pkg/scheduler/actions/reclaim/reclaim.go | 142 +++++++++++++++++- pkg/scheduler/api/cluster_info.go | 2 + pkg/scheduler/api/helpers/helpers.go | 12 +- pkg/scheduler/api/types.go | 6 + pkg/scheduler/cache/cache.go | 14 ++ pkg/scheduler/factory.go | 2 + pkg/scheduler/framework/session.go | 88 +++++++++++ .../plugins/proportion/proportion.go | 122 ++++++++++++--- pkg/scheduler/util.go | 4 +- test/e2e.go | 33 ++++ test/util.go | 70 +++++++-- 14 files changed, 477 insertions(+), 53 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 70abe20a4b..978583b729 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -43,17 +43,17 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { defer glog.V(3).Infof("Leaving Allocate ...") queues := util.NewPriorityQueue(ssn.QueueOrderFn) - - for _, queue := range ssn.Queues { - queues.Push(queue) - } - jobsMap := map[api.QueueID]*util.PriorityQueue{} for _, job := range ssn.Jobs { if _, found := jobsMap[job.Queue]; !found { jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) } + + if queue, found := ssn.QueueIndex[job.Queue]; found { + queues.Push(queue) + } + glog.V(3).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue) jobsMap[job.Queue].Push(job) } @@ -68,6 +68,11 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { } queue := queues.Pop().(*api.QueueInfo) + if ssn.Overused(queue) { + glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name) + continue + } + jobs, found := jobsMap[queue.UID] glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name) diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index f5a4632c65..374947cf9a 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -19,6 +19,7 @@ package allocate import ( "fmt" "reflect" + "sync" "testing" "time" @@ -101,13 +102,16 @@ func buildOwnerReference(owner string) metav1.OwnerReference { } type fakeBinder struct { + sync.Mutex binds map[string]string c chan string } func (fb *fakeBinder) Bind(p *v1.Pod, hostname string) error { - key := fmt.Sprintf("%v/%v", p.Namespace, p.Name) + fb.Lock() + defer fb.Unlock() + key := fmt.Sprintf("%v/%v", p.Namespace, p.Name) fb.binds[key] = hostname fb.c <- key @@ -150,6 +154,9 @@ func TestAllocate(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "c1", }, + Spec: arbcorev1.QueueSpec{ + Weight: 1, + }, }, }, expected: map[string]string{ diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 5dae2e584e..e67091fef7 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -57,7 +57,16 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { preempteeTasks := map[api.JobID]*util.PriorityQueue{} var underRequest []*api.JobInfo + var queues []*api.QueueInfo for _, job := range ssn.Jobs { + if queue, found := ssn.QueueIndex[job.Queue]; !found { + continue + } else { + glog.V(3).Infof("Added Queue <%s> for Job <%s/%s>", + queue.Name, job.Namespace, job.Name) + queues = append(queues, queue) + } + if len(job.TaskStatusIndex[api.Pending]) != 0 { if _, found := preemptorsMap[job.Queue]; !found { preemptorsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) @@ -89,7 +98,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { } // Preemption between Jobs within Queue. - for _, queue := range ssn.Queues { + for _, queue := range queues { for { preemptors := preemptorsMap[queue.UID] preemptees := preempteesMap[queue.UID] diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 5744167758..19c01dfb59 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -16,7 +16,13 @@ limitations under the License. package reclaim -import "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/framework" +import ( + "github.com/golang/glog" + + "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/api" + "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/framework" + "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/util" +) type reclaimAction struct { ssn *framework.Session @@ -33,6 +39,138 @@ func (alloc *reclaimAction) Name() string { func (alloc *reclaimAction) Initialize() {} func (alloc *reclaimAction) Execute(ssn *framework.Session) { + glog.V(3).Infof("Enter Reclaim ...") + defer glog.V(3).Infof("Leaving Reclaim ...") + + queues := util.NewPriorityQueue(ssn.QueueOrderFn) + + preemptorsMap := map[api.QueueID]*util.PriorityQueue{} + preemptorTasks := map[api.JobID]*util.PriorityQueue{} + + var underRequest []*api.JobInfo + for _, job := range ssn.Jobs { + if queue, found := ssn.QueueIndex[job.Queue]; !found { + continue + } else { + glog.V(3).Infof("Added Queue <%s> for Job <%s/%s>", + queue.Name, job.Namespace, job.Name) + queues.Push(queue) + } + + if len(job.TaskStatusIndex[api.Pending]) != 0 { + if _, found := preemptorsMap[job.Queue]; !found { + preemptorsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) + } + preemptorsMap[job.Queue].Push(job) + underRequest = append(underRequest, job) + preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn) + for _, task := range job.TaskStatusIndex[api.Pending] { + preemptorTasks[job.UID].Push(task) + } + } + } + + for { + // If no queues, break + if queues.Empty() { + break + } + + var job *api.JobInfo + var task *api.TaskInfo + + // TODO (k82cn): we should check whether queue deserved more resources. + queue := queues.Pop().(*api.QueueInfo) + if ssn.Overused(queue) { + glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name) + continue + } + + // Found "high" priority job + if jobs, found := preemptorsMap[queue.UID]; !found || jobs.Empty() { + continue + } else { + job = jobs.Pop().(*api.JobInfo) + } + + // Found "high" priority task to reclaim others + if tasks, found := preemptorTasks[job.UID]; !found || tasks.Empty() { + continue + } else { + task = tasks.Pop().(*api.TaskInfo) + } + + resreq := task.Resreq.Clone() + reclaimed := api.EmptyResource() + + assigned := false + + for _, n := range ssn.Nodes { + // If predicates failed, next node. + if err := ssn.PredicateFn(task, n); err != nil { + continue + } + + glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", + task.Namespace, task.Name, n.Name) + + var reclaimees []*api.TaskInfo + for _, task := range n.Tasks { + if j, found := ssn.JobIndex[task.Job]; !found { + continue + } else if j.Queue != job.Queue { + // Clone task to avoid modify Task's status on node. + reclaimees = append(reclaimees, task.Clone()) + } + } + victims := ssn.Reclaimable(task, reclaimees) + + if len(victims) == 0 { + glog.V(3).Infof("No victims on Node <%s>.", n.Name) + continue + } + + // If not enough resource, continue + allRes := api.EmptyResource() + for _, v := range victims { + allRes.Add(v.Resreq) + } + if allRes.Less(resreq) { + glog.V(3).Infof("Not enough resource from victims on Node <%s>.", n.Name) + continue + } + + // Reclaim victims for tasks. + for _, reclaimee := range victims { + glog.Errorf("Try to reclaim Task <%s/%s> for Tasks <%s/%s>", + reclaimee.Namespace, reclaimee.Name, task.Namespace, task.Name) + if err := ssn.Reclaim(task, reclaimee); err != nil { + glog.Errorf("Failed to reclaim Task <%s/%s> for Tasks <%s/%s>: %v", + reclaimee.Namespace, reclaimee.Name, task.Namespace, task.Name, err) + continue + } + reclaimed.Add(reclaimee.Resreq) + // If reclaimed enough resources, break loop to avoid Sub panic. + if resreq.LessEqual(reclaimee.Resreq) { + break + } + resreq.Sub(reclaimee.Resreq) + } + + glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.", + reclaimed, task.Namespace, task.Name, task.Resreq) + + assigned = true + + break + } + + if assigned { + queues.Push(queue) + } + } + } -func (alloc *reclaimAction) UnInitialize() {} +func (alloc *reclaimAction) UnInitialize() { +} diff --git a/pkg/scheduler/api/cluster_info.go b/pkg/scheduler/api/cluster_info.go index 9f86020dac..8a3355e0c2 100644 --- a/pkg/scheduler/api/cluster_info.go +++ b/pkg/scheduler/api/cluster_info.go @@ -24,6 +24,8 @@ type ClusterInfo struct { Nodes []*NodeInfo Queues []*QueueInfo + + Others []*TaskInfo } func (ci ClusterInfo) String() string { diff --git a/pkg/scheduler/api/helpers/helpers.go b/pkg/scheduler/api/helpers/helpers.go index ece6fd8487..5be80b8f45 100644 --- a/pkg/scheduler/api/helpers/helpers.go +++ b/pkg/scheduler/api/helpers/helpers.go @@ -22,14 +22,14 @@ import ( "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/api" ) -func Min(l, v *api.Resource) *api.Resource { - r := &api.Resource{} +func Min(l, r *api.Resource) *api.Resource { + res := &api.Resource{} - r.MilliCPU = math.Min(l.MilliCPU, r.MilliCPU) - r.MilliGPU = math.Min(l.MilliGPU, r.MilliGPU) - r.Memory = math.Min(l.Memory, r.Memory) + res.MilliCPU = math.Min(l.MilliCPU, r.MilliCPU) + res.MilliGPU = math.Min(l.MilliGPU, r.MilliGPU) + res.Memory = math.Min(l.Memory, r.Memory) - return r + return res } func Share(l, r float64) float64 { diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 3fc6279c0c..cc22f8ec5e 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -100,3 +100,9 @@ type Reason struct { Event arbcorev1.Event Message string } + +// ReclaimableFn is the func declaration used to reclaim tasks. +type ReclaimableFn func(*TaskInfo, []*TaskInfo) []*TaskInfo + +// PreemptableFn is the func declaration used to reclaim tasks. +type PreemptableFn func(*TaskInfo, []*TaskInfo) []*TaskInfo diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index d4100abc50..1964c34174 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -416,20 +416,34 @@ func (sc *SchedulerCache) Snapshot() *arbapi.ClusterInfo { Nodes: make([]*arbapi.NodeInfo, 0, len(sc.Nodes)), Jobs: make([]*arbapi.JobInfo, 0, len(sc.Jobs)), Queues: make([]*arbapi.QueueInfo, 0, len(sc.Queues)), + Others: make([]*arbapi.TaskInfo, 0, 10), } for _, value := range sc.Nodes { snapshot.Nodes = append(snapshot.Nodes, value.Clone()) } + queues := map[arbapi.QueueID]struct{}{} for _, value := range sc.Queues { snapshot.Queues = append(snapshot.Queues, value.Clone()) + queues[value.UID] = struct{}{} } for _, value := range sc.Jobs { // If no scheduling spec, does not handle it. if value.PodGroup == nil && value.PDB == nil { glog.V(3).Infof("The scheduling spec of Job <%v> is nil, ignore it.", value.UID) + + // Also tracing the running task assigned by other scheduler. + for _, task := range value.TaskStatusIndex[arbapi.Running] { + snapshot.Others = append(snapshot.Others, task.Clone()) + } + + continue + } + + if _, found := queues[value.Queue]; !found { + glog.V(3).Infof("The Queue of Job <%v> does not exist, ignore it.", value.UID) continue } diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 71a946a002..6f98576a4b 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -19,6 +19,7 @@ package scheduler import ( "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/actions/allocate" "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/actions/preempt" + "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/actions/reclaim" "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/plugins/drf" "github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/plugins/gang" @@ -40,6 +41,7 @@ func init() { framework.RegisterPluginBuilder("proportion", proportion.New) // Actions + framework.RegisterAction(reclaim.New()) framework.RegisterAction(allocate.New()) framework.RegisterAction(preempt.New()) } diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 09f93f7666..93f7690029 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -39,6 +39,7 @@ type Session struct { NodeIndex map[string]*api.NodeInfo Queues []*api.QueueInfo QueueIndex map[api.QueueID]*api.QueueInfo + Others []*api.TaskInfo Backlog []*api.JobInfo plugins []Plugin @@ -48,6 +49,8 @@ type Session struct { taskOrderFns []api.CompareFn predicateFns []api.PredicateFn preemptableFns []api.LessFn + reclaimableFns []api.ReclaimableFn + overusedFns []api.ValidateFn jobReadyFns []api.ValidateFn } @@ -79,6 +82,8 @@ func openSession(cache cache.Cache) *Session { ssn.QueueIndex[queue.UID] = queue } + ssn.Others = snapshot.Others + return ssn } @@ -198,6 +203,71 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error { return nil } +func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo { + var victims []*api.TaskInfo + + for _, rf := range ssn.reclaimableFns { + candidates := rf(reclaimer, reclaimees) + if victims == nil { + victims = candidates + } else { + intersection := []*api.TaskInfo{} + // Get intersection of victims and candidates. + for _, v := range victims { + for _, c := range candidates { + if v.UID == c.UID { + intersection = append(intersection, v) + } + } + } + + // Update victims to intersection + victims = intersection + } + } + + return victims +} + +func (ssn *Session) Reclaim(reclaimer, reclaimee *api.TaskInfo) error { + if err := ssn.cache.Evict(reclaimee); err != nil { + return err + } + + // Update status in session + job, found := ssn.JobIndex[reclaimee.Job] + if found { + if err := job.UpdateTaskStatus(reclaimee, api.Releasing); err != nil { + glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", + reclaimee.Namespace, reclaimee.Name, api.Releasing, ssn.UID, err) + } + } else { + glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.", + reclaimee.Job, ssn.UID) + } + + // Update task in node. + if node, found := ssn.NodeIndex[reclaimee.NodeName]; found { + node.UpdateTask(reclaimee) + } + + for _, eh := range ssn.eventHandlers { + if eh.AllocateFunc != nil { + eh.AllocateFunc(&Event{ + Task: reclaimer, + }) + } + + if eh.EvictFunc != nil { + eh.EvictFunc(&Event{ + Task: reclaimee, + }) + } + } + + return nil +} + func (ssn *Session) Preemptable(preemptor, preemptee *api.TaskInfo) bool { if len(ssn.preemptableFns) == 0 { return false @@ -292,6 +362,10 @@ func (ssn *Session) AddPreemptableFn(cf api.LessFn) { ssn.preemptableFns = append(ssn.preemptableFns, cf) } +func (ssn *Session) AddReclaimableFn(rf api.ReclaimableFn) { + ssn.reclaimableFns = append(ssn.reclaimableFns, rf) +} + func (ssn *Session) AddJobReadyFn(vf api.ValidateFn) { ssn.jobReadyFns = append(ssn.jobReadyFns, vf) } @@ -300,6 +374,20 @@ func (ssn *Session) AddPredicateFn(pf api.PredicateFn) { ssn.predicateFns = append(ssn.predicateFns, pf) } +func (ssn *Session) AddOverusedFn(fn api.ValidateFn) { + ssn.overusedFns = append(ssn.overusedFns, fn) +} + +func (ssn *Session) Overused(queue *api.QueueInfo) bool { + for _, of := range ssn.overusedFns { + if of(queue) { + return true + } + } + + return false +} + func (ssn *Session) JobReady(obj interface{}) bool { for _, jrf := range ssn.jobReadyFns { if !jrf(obj) { diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index e5dc7a3214..dddda820f3 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -31,9 +31,10 @@ type proportionPlugin struct { } type queueAttr struct { - name string - weight int32 - share float64 + queueID api.QueueID + name string + weight int32 + share float64 deserved *api.Resource allocated *api.Resource @@ -54,21 +55,32 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { pp.totalResource.Add(n.Allocatable) } - totalWeight := int32(0) - for _, queue := range ssn.Queues { - attr := &queueAttr{ - name: queue.Name, - weight: queue.Weight, - - deserved: api.EmptyResource(), - allocated: api.EmptyResource(), - request: api.EmptyResource(), - } - totalWeight += queue.Weight - pp.queueOpts[queue.UID] = attr + // Also remove the resource used by other scheduler. + for _, task := range ssn.Others { + pp.totalResource.Sub(task.Resreq) } + glog.V(3).Infof("The total resource is <%v>", pp.totalResource) + + // Build attributes for Queues. for _, job := range ssn.Jobs { + glog.V(3).Infof("Considering Job <%s/%s>.", job.Namespace, job.Name) + + if _, found := pp.queueOpts[job.Queue]; !found { + queue := ssn.QueueIndex[job.Queue] + attr := &queueAttr{ + queueID: queue.UID, + name: queue.Name, + weight: queue.Weight, + + deserved: api.EmptyResource(), + allocated: api.EmptyResource(), + request: api.EmptyResource(), + } + pp.queueOpts[job.Queue] = attr + glog.V(3).Infof("Added Queue <%s> attributes.", job.Queue) + } + for status, tasks := range job.TaskStatusIndex { if api.AllocatedStatus(status) { for _, t := range tasks { @@ -85,12 +97,48 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { } } - for _, attr := range pp.queueOpts { - attr.deserved = pp.totalResource.Clone().Multi(float64(attr.weight) / float64(totalWeight)) - pp.updateShare(attr) + remaining := pp.totalResource.Clone() + meet := map[api.QueueID]struct{}{} + for { + totalWeight := int32(0) + for _, attr := range pp.queueOpts { + if _, found := meet[attr.queueID]; found { + continue + } + totalWeight += attr.weight + } - glog.V(3).Infof("The proportion attributes of Queue <%s>: share: %0.2f, deserved: %v, allocated: %v, request: %v", - attr.name, attr.share, attr.deserved, attr.allocated, attr.request) + // If no queues, break + if totalWeight == 0 { + break + } + + // Calculates the deserved of each Queue. + deserved := api.EmptyResource() + for _, attr := range pp.queueOpts { + glog.V(3).Infof("Considering Queue <%s>: weight <%d>, total weight <%d>.", + attr.name, attr.weight, totalWeight) + if _, found := meet[attr.queueID]; found { + continue + } + + attr.deserved.Add(remaining.Clone().Multi(float64(attr.weight) / float64(totalWeight))) + if !attr.deserved.LessEqual(attr.request) { + attr.deserved = helpers.Min(attr.deserved, attr.request) + meet[attr.queueID] = struct{}{} + } + pp.updateShare(attr) + + glog.V(3).Infof("The attributes of queue <%s> in proportion: deserved <%v>, allocate <%v>, request <%v>, share <%0.2f>", + attr.name, attr.deserved, attr.allocated, attr.request, attr.share) + + deserved.Add(attr.deserved) + } + + remaining.Sub(deserved) + if remaining.IsEmpty() { + break + } } ssn.AddQueueOrderFn(func(l, r interface{}) int { @@ -108,6 +156,36 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { return 1 }) + ssn.AddReclaimableFn(func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo { + var victims []*api.TaskInfo + + for _, reclaimee := range reclaimees { + job := ssn.JobIndex[reclaimee.Job] + attr := pp.queueOpts[job.Queue] + + allocated := attr.allocated.Clone() + if allocated.Less(reclaimee.Resreq) { + glog.Errorf("Failed to calculate the allocation of Task <%s/%s> in Queue <%s>.", + reclaimee.Namespace, reclaimee.Name, job.Queue) + continue + } + + allocated.Sub(reclaimee.Resreq) + if attr.deserved.LessEqual(allocated) { + victims = append(victims, reclaimee) + } + } + + return victims + }) + + ssn.AddOverusedFn(func(obj interface{}) bool { + queue := obj.(*api.QueueInfo) + attr := pp.queueOpts[queue.UID] + + return attr.deserved.LessEqual(attr.allocated) + }) + // Register event handlers. ssn.AddEventHandler(&framework.EventHandler{ AllocateFunc: func(event *framework.Event) { @@ -141,11 +219,9 @@ func (pp *proportionPlugin) OnSessionClose(ssn *framework.Session) { func (pp *proportionPlugin) updateShare(attr *queueAttr) { res := float64(0) - deserved := helpers.Min(attr.deserved, attr.request) - // TODO(k82cn): how to handle fragement issues? for _, rn := range api.ResourceNames() { - share := helpers.Share(attr.allocated.Get(rn), deserved.Get(rn)) + share := helpers.Share(attr.allocated.Get(rn), attr.deserved.Get(rn)) if share > res { res = share } diff --git a/pkg/scheduler/util.go b/pkg/scheduler/util.go index fe4c315cd6..259389a28b 100644 --- a/pkg/scheduler/util.go +++ b/pkg/scheduler/util.go @@ -27,8 +27,8 @@ import ( ) var defaultSchedulerConf = map[string]string{ - "actions": "allocate, preempt", - "plugins": "gang, priority, drf, nodeaffinity", + "actions": "reclaim, allocate, preempt", + "plugins": "gang, priority, drf, nodeaffinity, proportion", "plugin.gang.jobready": "true", "plugin.gang.joborder": "true", "plugin.gang.preemptable": "true", diff --git a/test/e2e.go b/test/e2e.go index c5b0a15369..c24ff3b5d3 100644 --- a/test/e2e.go +++ b/test/e2e.go @@ -17,6 +17,8 @@ limitations under the License. package test import ( + "fmt" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -200,4 +202,35 @@ var _ = Describe("E2E Test", func() { Expect(pod.Spec.NodeName).To(Equal(nodeName)) } }) + + It("Reclaim", func() { + context := initTestContext() + defer cleanupTestContext(context) + + jobName1 := "q1/qj" + jobName2 := "q2/qj" + + slot := oneCPU + rep := clusterSize(context, slot) + + createJob(context, jobName1, 1, rep, "nginx", slot, nil) + err := waitJobReady(context, jobName1) + Expect(err).NotTo(HaveOccurred()) + + expected := int(rep) / 2 + // Reduce one pod to tolerate decimal fraction. + if expected > 1 { + expected-- + } else { + err := fmt.Errorf("expected replica <%d> is too small", expected) + Expect(err).NotTo(HaveOccurred()) + } + + createJob(context, jobName2, 1, rep, "nginx", slot, nil) + err = waitTasksReady(context, jobName2, expected) + Expect(err).NotTo(HaveOccurred()) + + err = waitTasksReady(context, jobName1, expected) + Expect(err).NotTo(HaveOccurred()) + }) }) diff --git a/test/util.go b/test/util.go index 8519811dc7..c8067ca9ed 100644 --- a/test/util.go +++ b/test/util.go @@ -19,6 +19,7 @@ package test import ( "os" "path/filepath" + "strings" "time" . "github.com/onsi/gomega" @@ -62,11 +63,22 @@ type context struct { karclient *versioned.Clientset namespace string + queues []string +} + +func splictJobName(cxt *context, jn string) (string, string) { + nss := strings.Split(jn, "/") + if len(nss) == 1 { + return cxt.namespace, nss[0] + } + + return nss[0], nss[1] } func initTestContext() *context { cxt := &context{ namespace: "test", + queues: []string{"q1", "q2"}, } home := homeDir() @@ -86,6 +98,16 @@ func initTestContext() *context { }) Expect(err).NotTo(HaveOccurred()) + for _, q := range cxt.queues { + _, err = cxt.kubeclient.CoreV1().Namespaces().Create(&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: q, + Namespace: q, + }, + }) + Expect(err).NotTo(HaveOccurred()) + } + _, err = cxt.kubeclient.SchedulingV1beta1().PriorityClasses().Create(&schedv1.PriorityClass{ ObjectMeta: metav1.ObjectMeta{ Name: masterPriority, @@ -110,18 +132,36 @@ func initTestContext() *context { func namespaceNotExist(ctx *context) wait.ConditionFunc { return func() (bool, error) { _, err := ctx.kubeclient.CoreV1().Namespaces().Get(ctx.namespace, metav1.GetOptions{}) - if err != nil && errors.IsNotFound(err) { - return true, nil + if !(err != nil && errors.IsNotFound(err)) { + return false, err } - return false, err + + for _, q := range ctx.queues { + _, err := ctx.kubeclient.CoreV1().Namespaces().Get(q, metav1.GetOptions{}) + if !(err != nil && errors.IsNotFound(err)) { + return false, err + } + } + + return true, nil } } func cleanupTestContext(cxt *context) { - err := cxt.kubeclient.CoreV1().Namespaces().Delete(cxt.namespace, &metav1.DeleteOptions{}) + foreground := metav1.DeletePropagationForeground + + err := cxt.kubeclient.CoreV1().Namespaces().Delete(cxt.namespace, &metav1.DeleteOptions{ + PropagationPolicy: &foreground, + }) Expect(err).NotTo(HaveOccurred()) - foreground := metav1.DeletePropagationForeground + for _, q := range cxt.queues { + err := cxt.kubeclient.CoreV1().Namespaces().Delete(q, &metav1.DeleteOptions{ + PropagationPolicy: &foreground, + }) + Expect(err).NotTo(HaveOccurred()) + } + err = cxt.kubeclient.SchedulingV1beta1().PriorityClasses().Delete(masterPriority, &metav1.DeleteOptions{ PropagationPolicy: &foreground, }) @@ -209,10 +249,12 @@ func createJob( ) *arbextv1.Job { queueJobName := "queuejob.k8s.io" + jns, jn := splictJobName(context, name) + queueJob := &arbextv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: context.namespace, + Name: jn, + Namespace: jns, }, Spec: arbextv1.JobSpec{ MinAvailable: min, @@ -220,13 +262,13 @@ func createJob( { Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - queueJobName: name, + queueJobName: jn, }, }, Replicas: rep, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{queueJobName: name}, + Labels: map[string]string{queueJobName: jn}, }, Spec: v1.PodSpec{ SchedulerName: "kar-scheduler", @@ -235,7 +277,7 @@ func createJob( Containers: []v1.Container{ { Image: img, - Name: name, + Name: jn, ImagePullPolicy: v1.PullIfNotPresent, Resources: v1.ResourceRequirements{ Requests: req, @@ -249,7 +291,7 @@ func createJob( }, } - queueJob, err := context.karclient.Extensions().Jobs(context.namespace).Create(queueJob) + queueJob, err := context.karclient.Extensions().Jobs(jns).Create(queueJob) Expect(err).NotTo(HaveOccurred()) return queueJob @@ -353,11 +395,13 @@ func deleteReplicaSet(ctx *context, name string) error { } func taskReady(ctx *context, jobName string, taskNum int) wait.ConditionFunc { + jns, jn := splictJobName(ctx, jobName) + return func() (bool, error) { - queueJob, err := ctx.karclient.Extensions().Jobs(ctx.namespace).Get(jobName, metav1.GetOptions{}) + queueJob, err := ctx.karclient.Extensions().Jobs(jns).Get(jn, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred()) - pods, err := ctx.kubeclient.CoreV1().Pods(ctx.namespace).List(metav1.ListOptions{}) + pods, err := ctx.kubeclient.CoreV1().Pods(jns).List(metav1.ListOptions{}) Expect(err).NotTo(HaveOccurred()) readyTaskNum := 0