From 81d8d063a96facfd71a7c882d6b70d91de319c5c Mon Sep 17 00:00:00 2001 From: asifdxtreme Date: Mon, 1 Jul 2019 12:36:27 +0530 Subject: [PATCH 1/3] Bump Volcano-sh/Scheduler --- Gopkg.lock | 6 +- .../cmd/kube-batch/app/options/options.go | 9 +- .../kube-batch/cmd/kube-batch/app/server.go | 21 ++- .../scheduler/actions/allocate/allocate.go | 14 +- .../scheduler/actions/backfill/backfill.go | 15 ++ .../pkg/scheduler/actions/preempt/preempt.go | 8 +- .../pkg/scheduler/actions/reclaim/reclaim.go | 4 + .../kube-batch/pkg/scheduler/api/job_info.go | 34 ++-- .../kube-batch/pkg/scheduler/api/node_info.go | 4 +- .../pkg/scheduler/api/resource_info.go | 28 ++- .../kube-batch/pkg/scheduler/api/types.go | 7 + .../pkg/scheduler/api/unschedule_info.go | 112 ++++++++++++ .../kube-batch/pkg/scheduler/cache/cache.go | 133 ++++++++++---- .../pkg/scheduler/cache/interface.go | 2 +- .../pkg/scheduler/framework/job_updater.go | 122 +++++++++++++ .../pkg/scheduler/framework/session.go | 27 ++- .../scheduler/framework/session_plugins.go | 29 +++ .../pkg/scheduler/plugins/gang/gang.go | 13 ++ .../scheduler/plugins/nodeorder/nodeorder.go | 99 ++++++---- .../plugins/predicates/predicates.go | 99 ++++++---- .../plugins/proportion/proportion.go | 5 + .../pkg/scheduler/plugins/util/util.go | 170 +++++++++++++++--- .../pkg/scheduler/util/assert/assert.go | 44 +++++ .../pkg/scheduler/util/scheduler_helper.go | 28 ++- 24 files changed, 827 insertions(+), 206 deletions(-) create mode 100644 vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/unschedule_info.go create mode 100644 vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/job_updater.go create mode 100644 vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/assert/assert.go diff --git a/Gopkg.lock b/Gopkg.lock index 9560e925f9..21f8ecb3a7 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -204,7 +204,7 @@ [[projects]] branch = "master" - digest = "1:ea789d69109979142a6fbd569095f7678ea62b633b9b5a284f2fd1ff673da48f" + digest = "1:63c04b7effb2d164595f6d4ba30160fafd8cede82c743a2077a65921ae62e06b" name = "github.com/kubernetes-sigs/kube-batch" packages = [ "cmd/kube-batch/app", @@ -244,10 +244,11 @@ "pkg/scheduler/plugins/proportion", "pkg/scheduler/plugins/util", "pkg/scheduler/util", + "pkg/scheduler/util/assert", "pkg/version", ] pruneopts = "UT" - revision = "4b391ab34b53779e47243217006d8772cb86d8d8" + revision = "0302d83f20a6bb9cff8a27f9d95fd69d0d84ab28" source = "https://github.com/volcano-sh/scheduler" [[projects]] @@ -1094,6 +1095,7 @@ "github.com/spf13/cobra", "github.com/spf13/pflag", "golang.org/x/crypto/ssh", + "golang.org/x/time/rate", "k8s.io/api/admission/v1beta1", "k8s.io/api/admissionregistration/v1beta1", "k8s.io/api/apps/v1", diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options/options.go b/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options/options.go index d88e890a8c..85f9e92ffc 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options/options.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options/options.go @@ -24,10 +24,13 @@ import ( ) const ( - defaultSchedulerName = "kube-batch" + defaultSchedulerName = "volcano" defaultSchedulerPeriod = time.Second defaultQueue = "default" defaultListenAddress = ":8080" + + defaultQPS = 50.0 + defaultBurst = 100 ) // ServerOption is the main context object for the controller manager. @@ -43,6 +46,8 @@ type ServerOption struct { PrintVersion bool ListenAddress string EnablePriorityClass bool + KubeAPIBurst int + KubeAPIQPS float32 } // ServerOpts server options @@ -71,6 +76,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.") fs.BoolVar(&s.EnablePriorityClass, "priority-class", true, "Enable PriorityClass to provide the capacity of preemption at pod group level; to disable it, set it false") + fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver") + fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver") } // CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/server.go b/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/server.go index 1fbe6f415e..a2d7d493f3 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/server.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/server.go @@ -52,11 +52,24 @@ const ( apiVersion = "v1alpha1" ) -func buildConfig(master, kubeconfig string) (*rest.Config, error) { +func buildConfig(opt *options.ServerOption) (*rest.Config, error) { + var cfg *rest.Config + var err error + + master := opt.Master + kubeconfig := opt.Kubeconfig if master != "" || kubeconfig != "" { - return clientcmd.BuildConfigFromFlags(master, kubeconfig) + cfg, err = clientcmd.BuildConfigFromFlags(master, kubeconfig) + } else { + cfg, err = rest.InClusterConfig() + } + if err != nil { + return nil, err } - return rest.InClusterConfig() + cfg.QPS = opt.KubeAPIQPS + cfg.Burst = opt.KubeAPIBurst + + return cfg, nil } // Run the kubeBatch scheduler @@ -65,7 +78,7 @@ func Run(opt *options.ServerOption) error { version.PrintVersionAndExit(apiVersion) } - config, err := buildConfig(opt.Master, opt.Kubeconfig) + config, err := buildConfig(opt) if err != nil { return err } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/allocate/allocate.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/allocate/allocate.go index 18458d7cad..ba3bafa92c 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/allocate/allocate.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/allocate/allocate.go @@ -17,8 +17,6 @@ limitations under the License. package allocate import ( - "fmt" - "github.com/golang/glog" "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" @@ -52,6 +50,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { continue } + if vr := ssn.JobValid(job); vr != nil && !vr.Pass { + glog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message) + continue + } if queue, found := ssn.Queues[job.Queue]; found { queues.Push(queue) @@ -84,8 +86,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { // ... // } if !task.InitResreq.LessEqual(node.Idle) && !task.InitResreq.LessEqual(node.Releasing) { - return fmt.Errorf("task <%s/%s> ResourceFit failed on node <%s>", - task.Namespace, task.Name, node.Name) + return api.NewFitError(task, node, api.NodeResourceFitFailed) } return ssn.PredicateFn(task, node) @@ -145,12 +146,13 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { job.NodesFitDelta = make(api.NodeResourceMap) } - predicateNodes := util.PredicateNodes(task, allNodes, predicateFn) + predicateNodes, fitErrors := util.PredicateNodes(task, allNodes, predicateFn) if len(predicateNodes) == 0 { + job.NodesFitErrors[task.UID] = fitErrors break } - nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) + nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) node := util.SelectBestNode(nodeScores) // Allocate idle resource to the task. diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/backfill/backfill.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/backfill/backfill.go index 618c8cedaf..6bd970e788 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/backfill/backfill.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/backfill/backfill.go @@ -47,9 +47,16 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) { if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { continue } + if vr := ssn.JobValid(job); vr != nil && !vr.Pass { + glog.V(4).Infof("Job <%s/%s> Queue <%s> skip backfill, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message) + continue + } for _, task := range job.TaskStatusIndex[api.Pending] { if task.InitResreq.IsEmpty() { + allocated := false + fe := api.NewFitErrors() + // As task did not request resources, so it only need to meet predicates. // TODO (k82cn): need to prioritize nodes to avoid pod hole. for _, node := range ssn.Nodes { @@ -58,16 +65,24 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) { if err := ssn.PredicateFn(task, node); err != nil { glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) + fe.SetNodeError(node.Name, err) continue } glog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) if err := ssn.Allocate(task, node.Name); err != nil { glog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) + fe.SetNodeError(node.Name, err) continue } + + allocated = true break } + + if !allocated { + job.NodesFitErrors[task.UID] = fe + } } else { // TODO (k82cn): backfill for other case. } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/preempt/preempt.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/preempt/preempt.go index 4ca1b5f508..5081c94c09 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/preempt/preempt.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/preempt/preempt.go @@ -56,6 +56,10 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { continue } + if vr := ssn.JobValid(job); vr != nil && !vr.Pass { + glog.V(4).Infof("Job <%s/%s> Queue <%s> skip preemption, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message) + continue + } if queue, found := ssn.Queues[job.Queue]; !found { continue @@ -184,9 +188,9 @@ func preempt( allNodes := util.GetNodeList(nodes) - predicateNodes := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn) + predicateNodes, _ := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn) - nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) + nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) selectedNodes := util.SortNodes(nodeScores) for _, node := range selectedNodes { diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/reclaim/reclaim.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/reclaim/reclaim.go index 520e8264a8..5545df516f 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/reclaim/reclaim.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/reclaim/reclaim.go @@ -57,6 +57,10 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { continue } + if vr := ssn.JobValid(job); vr != nil && !vr.Pass { + glog.V(4).Infof("Job <%s/%s> Queue <%s> skip reclaim, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message) + continue + } if queue, found := ssn.Queues[job.Queue]; !found { glog.Errorf("Failed to find Queue <%s> for Job <%s/%s>", diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/job_info.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/job_info.go index 2a9a6bfcf5..3f4ee7e325 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/job_info.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/job_info.go @@ -139,6 +139,9 @@ type JobInfo struct { NodesFitDelta NodeResourceMap + JobFitErrors string + NodesFitErrors map[TaskID]*FitErrors + // All tasks of the Job. TaskStatusIndex map[TaskStatus]tasksMap Tasks tasksMap @@ -164,6 +167,8 @@ func NewJobInfo(uid JobID, tasks ...*TaskInfo) *JobInfo { Allocated: EmptyResource(), TotalRequest: EmptyResource(), + NodesFitErrors: make(map[TaskID]*FitErrors), + TaskStatusIndex: map[TaskStatus]tasksMap{}, Tasks: tasksMap{}, } @@ -301,8 +306,10 @@ func (ji *JobInfo) Clone() *JobInfo { TotalRequest: EmptyResource(), NodesFitDelta: make(NodeResourceMap), + NodesFitErrors: make(map[TaskID]*FitErrors), + PDB: ji.PDB, - PodGroup: ji.PodGroup, + PodGroup: ji.PodGroup.DeepCopy(), TaskStatusIndex: map[TaskStatus]tasksMap{}, Tasks: tasksMap{}, @@ -338,36 +345,21 @@ func (ji JobInfo) String() string { // FitError returns detailed information on why a job's task failed to fit on // each available node func (ji *JobInfo) FitError() string { - if len(ji.NodesFitDelta) == 0 { - reasonMsg := fmt.Sprintf("0 nodes are available") - return reasonMsg - } - reasons := make(map[string]int) - for _, v := range ji.NodesFitDelta { - if v.Get(v1.ResourceCPU) < 0 { - reasons["cpu"]++ - } - if v.Get(v1.ResourceMemory) < 0 { - reasons["memory"]++ - } - - for rName, rQuant := range v.ScalarResources { - if rQuant < 0 { - reasons[string(rName)]++ - } - } + for status, taskMap := range ji.TaskStatusIndex { + reasons[fmt.Sprintf("%s", status)] += len(taskMap) } + reasons["minAvailable"] = int(ji.MinAvailable) sortReasonsHistogram := func() []string { reasonStrings := []string{} for k, v := range reasons { - reasonStrings = append(reasonStrings, fmt.Sprintf("%v insufficient %v", v, k)) + reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k)) } sort.Strings(reasonStrings) return reasonStrings } - reasonMsg := fmt.Sprintf("0/%v nodes are available, %v.", len(ji.NodesFitDelta), strings.Join(sortReasonsHistogram(), ", ")) + reasonMsg := fmt.Sprintf("job is not ready, %v.", strings.Join(sortReasonsHistogram(), ", ")) return reasonMsg } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/node_info.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/node_info.go index 679818db51..77625a41ef 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/node_info.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/node_info.go @@ -46,7 +46,7 @@ type NodeInfo struct { Tasks map[TaskID]*TaskInfo // Used to store custom information - Other interface{} + Others map[string]interface{} } // NodeState defines the current state of node. @@ -98,7 +98,7 @@ func (ni *NodeInfo) Clone() *NodeInfo { for _, p := range ni.Tasks { res.AddTask(p) } - res.Other = ni.Other + res.Others = ni.Others return res } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/resource_info.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/resource_info.go index b575e49174..27ec0b94ce 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/resource_info.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/resource_info.go @@ -22,6 +22,8 @@ import ( v1 "k8s.io/api/core/v1" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/assert" ) // Resource struct defines all the resource type @@ -117,9 +119,8 @@ func (r *Resource) IsZero(rn v1.ResourceName) bool { return true } - if _, ok := r.ScalarResources[rn]; !ok { - panic("unknown resource") - } + _, found := r.ScalarResources[rn] + assert.Assertf(found, "unknown resource %s", rn) return r.ScalarResources[rn] < minMilliScalarResources } @@ -142,22 +143,19 @@ func (r *Resource) Add(rr *Resource) *Resource { //Sub subtracts two Resource objects. func (r *Resource) Sub(rr *Resource) *Resource { - if rr.LessEqual(r) { - r.MilliCPU -= rr.MilliCPU - r.Memory -= rr.Memory + assert.Assertf(rr.LessEqual(r), "resource is not sufficient to do operation: <%v> sub <%v>", r, rr) - for rrName, rrQuant := range rr.ScalarResources { - if r.ScalarResources == nil { - return r - } - r.ScalarResources[rrName] -= rrQuant - } + r.MilliCPU -= rr.MilliCPU + r.Memory -= rr.Memory - return r + for rrName, rrQuant := range rr.ScalarResources { + if r.ScalarResources == nil { + return r + } + r.ScalarResources[rrName] -= rrQuant } - panic(fmt.Errorf("resource is not sufficient to do operation: <%v> sub <%v>", - r, rr)) + return r } // SetMaxResource compares with ResourceList and takes max value for each Resource. diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/types.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/types.go index a88439e451..1cff29aeb2 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/types.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/types.go @@ -61,6 +61,10 @@ func (ts TaskStatus) String() string { switch ts { case Pending: return "Pending" + case Allocated: + return "Allocated" + case Pipelined: + return "Pipelined" case Binding: return "Binding" case Bound: @@ -132,6 +136,9 @@ type EvictableFn func(*TaskInfo, []*TaskInfo) []*TaskInfo // NodeOrderFn is the func declaration used to get priority score for a node for a particular task. type NodeOrderFn func(*TaskInfo, *NodeInfo) (float64, error) +// BatchNodeOrderFn is the func declaration used to get priority score for ALL nodes for a particular task. +type BatchNodeOrderFn func(*TaskInfo, []*NodeInfo) (map[string]float64, error) + // NodeMapFn is the func declaration used to get priority score for a node for a particular task. type NodeMapFn func(*TaskInfo, *NodeInfo) (float64, error) diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/unschedule_info.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/unschedule_info.go new file mode 100644 index 0000000000..2275f13ecb --- /dev/null +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/unschedule_info.go @@ -0,0 +1,112 @@ +package api + +import ( + "fmt" + "sort" + "strings" + + "k8s.io/kubernetes/pkg/scheduler/algorithm" +) + +const ( + // NodePodNumberExceeded means pods in node exceed the allocatable pod number + NodePodNumberExceeded = "node(s) pod number exceeded" + // NodeResourceFitFailed means node could not fit the request of pod + NodeResourceFitFailed = "node(s) resource fit failed" + + // AllNodeUnavailableMsg is the default error message + AllNodeUnavailableMsg = "all nodes are unavailable" +) + +// FitErrors is set of FitError on many nodes +type FitErrors struct { + nodes map[string]*FitError + err string +} + +// NewFitErrors returns an FitErrors +func NewFitErrors() *FitErrors { + f := new(FitErrors) + f.nodes = make(map[string]*FitError) + return f +} + +// SetError set the common error message in FitErrors +func (f *FitErrors) SetError(err string) { + f.err = err +} + +// SetNodeError set the node error in FitErrors +func (f *FitErrors) SetNodeError(nodeName string, err error) { + var fe *FitError + switch obj := err.(type) { + case *FitError: + obj.NodeName = nodeName + fe = obj + default: + fe = &FitError{ + NodeName: nodeName, + Reasons: []string{obj.Error()}, + } + } + + f.nodes[nodeName] = fe +} + +// Error returns the final error message +func (f *FitErrors) Error() string { + reasons := make(map[string]int) + + for _, node := range f.nodes { + for _, reason := range node.Reasons { + reasons[reason]++ + } + } + + sortReasonsHistogram := func() []string { + reasonStrings := []string{} + for k, v := range reasons { + reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k)) + } + sort.Strings(reasonStrings) + return reasonStrings + } + if f.err == "" { + f.err = AllNodeUnavailableMsg + } + reasonMsg := fmt.Sprintf(f.err+": %v.", strings.Join(sortReasonsHistogram(), ", ")) + return reasonMsg +} + +// FitError describe the reason why task could not fit that node +type FitError struct { + taskNamespace string + taskName string + NodeName string + Reasons []string +} + +// NewFitError return FitError by message +func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError { + fe := &FitError{ + taskName: task.Name, + taskNamespace: task.Namespace, + NodeName: node.Name, + Reasons: message, + } + return fe +} + +// NewFitErrorByReasons return FitError by reasons +func NewFitErrorByReasons(task *TaskInfo, node *NodeInfo, reasons ...algorithm.PredicateFailureReason) *FitError { + message := make([]string, 0, len(reasons)) + for _, reason := range reasons { + message = append(message, reason.GetReason()) + } + return NewFitError(task, node, message...) +} + +// Error returns the final error message +func (f *FitError) Error() string { + return fmt.Sprintf("task %s/%s on node %s fit failed: %s", f.taskNamespace, f.taskName, f.NodeName, strings.Join(f.Reasons, ", ")) +} diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/cache.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/cache.go index 322d3b4d02..7d41e29ebe 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/cache.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/cache.go @@ -114,7 +114,7 @@ type defaultBinder struct { //Bind will send bind request to api server func (db *defaultBinder) Bind(p *v1.Pod, hostname string) error { if err := db.kubeclient.CoreV1().Pods(p.Namespace).Bind(&v1.Binding{ - ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID}, + ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: p.Annotations}, Target: v1.ObjectReference{ Kind: "Node", Name: hostname, @@ -147,6 +147,31 @@ type defaultStatusUpdater struct { kbclient *kbver.Clientset } +// following the same logic as podutil.UpdatePodCondition +func podConditionHaveUpdate(status *v1.PodStatus, condition *v1.PodCondition) bool { + lastTransitionTime := metav1.Now() + // Try to find this pod condition. + _, oldCondition := podutil.GetPodCondition(status, condition.Type) + + if oldCondition == nil { + // We are adding new pod condition. + return true + } + // We are updating an existing condition, so we need to check if it has changed. + if condition.Status == oldCondition.Status { + lastTransitionTime = oldCondition.LastTransitionTime + } + + isEqual := condition.Status == oldCondition.Status && + condition.Reason == oldCondition.Reason && + condition.Message == oldCondition.Message && + condition.LastProbeTime.Equal(&oldCondition.LastProbeTime) && + lastTransitionTime.Equal(&oldCondition.LastTransitionTime) + + // Return true if one of the fields have changed. + return !isEqual +} + // UpdatePodCondition will Update pod with podCondition func (su *defaultStatusUpdater) UpdatePodCondition(pod *v1.Pod, condition *v1.PodCondition) (*v1.Pod, error) { glog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status) @@ -184,6 +209,19 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error { } func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue string) *SchedulerCache { + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + panic(fmt.Sprintf("failed init kubeClient, with err: %v", err)) + } + kbClient, err := kbver.NewForConfig(config) + if err != nil { + panic(fmt.Sprintf("failed init kbClient, with err: %v", err)) + } + eventClient, err := kubernetes.NewForConfig(config) + if err != nil { + panic(fmt.Sprintf("failed init eventClient, with err: %v", err)) + } + sc := &SchedulerCache{ Jobs: make(map[kbapi.JobID]*kbapi.JobInfo), Nodes: make(map[string]*kbapi.NodeInfo), @@ -191,15 +229,15 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s PriorityClasses: make(map[string]*v1beta1.PriorityClass), errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - kubeclient: kubernetes.NewForConfigOrDie(config), - kbclient: kbver.NewForConfigOrDie(config), + kubeclient: kubeClient, + kbclient: kbClient, defaultQueue: defaultQueue, schedulerName: schedulerName, } // Prepare event clients. broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: sc.kubeclient.CoreV1().Events("")}) + broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: eventClient.CoreV1().Events("")}) sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName}) sc.Binder = &defaultBinder{ @@ -460,22 +498,27 @@ func (sc *SchedulerCache) BindVolumes(task *api.TaskInfo) error { // taskUnschedulable updates pod status of pending task func (sc *SchedulerCache) taskUnschedulable(task *api.TaskInfo, message string) error { - sc.Mutex.Lock() - defer sc.Mutex.Unlock() + pod := task.Pod - pod := task.Pod.DeepCopy() - - // The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in - // k8s core, so using the same string here. - // The reason field in PodCondition should be "Unschedulable" - sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message) - if _, err := sc.StatusUpdater.UpdatePodCondition(pod, &v1.PodCondition{ + condition := &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: v1.PodReasonUnschedulable, Message: message, - }); err != nil { - return err + } + + if podConditionHaveUpdate(&pod.Status, condition) { + pod = pod.DeepCopy() + + // The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in + // k8s core, so using the same string here. + // The reason field in PodCondition should be "Unschedulable" + sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message) + if _, err := sc.StatusUpdater.UpdatePodCondition(pod, condition); err != nil { + return err + } + } else { + glog.V(4).Infof("task unscheduleable %s/%s, message: %s, skip by no condition update", pod.Namespace, pod.Name, message) } return nil @@ -560,6 +603,30 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { snapshot.Queues[value.UID] = value.Clone() } + var cloneJobLock sync.Mutex + var wg sync.WaitGroup + + cloneJob := func(value *api.JobInfo) { + if value.PodGroup != nil { + value.Priority = sc.defaultPriority + + priName := value.PodGroup.Spec.PriorityClassName + if priorityClass, found := sc.PriorityClasses[priName]; found { + value.Priority = priorityClass.Value + } + + glog.V(4).Infof("The priority of job <%s/%s> is <%s/%d>", + value.Namespace, value.Name, priName, value.Priority) + } + + clonedJob := value.Clone() + + cloneJobLock.Lock() + snapshot.Jobs[value.UID] = clonedJob + cloneJobLock.Unlock() + wg.Done() + } + for _, value := range sc.Jobs { // If no scheduling spec, does not handle it. if value.PodGroup == nil && value.PDB == nil { @@ -575,20 +642,10 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { continue } - if value.PodGroup != nil { - value.Priority = sc.defaultPriority - - priName := value.PodGroup.Spec.PriorityClassName - if priorityClass, found := sc.PriorityClasses[priName]; found { - value.Priority = priorityClass.Value - } - - glog.V(4).Infof("The priority of job <%s/%s> is <%s/%d>", - value.Namespace, value.Name, priName, value.Priority) - } - - snapshot.Jobs[value.UID] = value.Clone() + wg.Add(1) + go cloneJob(value) } + wg.Wait() glog.V(3).Infof("There are <%d> Jobs, <%d> Queues and <%d> Nodes in total for scheduling.", len(snapshot.Jobs), len(snapshot.Queues), len(snapshot.Nodes)) @@ -629,7 +686,10 @@ func (sc *SchedulerCache) String() string { // RecordJobStatusEvent records related events according to job status. func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { - jobErrMsg := job.FitError() + baseErrorMessage := job.JobFitErrors + if baseErrorMessage == "" { + baseErrorMessage = kbapi.AllNodeUnavailableMsg + } if !shadowPodGroup(job.PodGroup) { pgUnschedulable := job.PodGroup != nil && @@ -639,17 +699,20 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { // If pending or unschedulable, record unschedulable event. if pgUnschedulable || pdbUnschedulabe { - msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", - len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError()) sc.Recorder.Eventf(job.PodGroup, v1.EventTypeWarning, - string(v1alpha1.PodGroupUnschedulableType), msg) + string(v1alpha1.PodGroupUnschedulableType), baseErrorMessage) } } // Update podCondition for tasks Allocated and Pending before job discarded for _, status := range []api.TaskStatus{api.Allocated, api.Pending} { for _, taskInfo := range job.TaskStatusIndex[status] { - if err := sc.taskUnschedulable(taskInfo, jobErrMsg); err != nil { + msg := baseErrorMessage + fitError := job.NodesFitErrors[taskInfo.UID] + if fitError != nil { + msg = fitError.Error() + } + if err := sc.taskUnschedulable(taskInfo, msg); err != nil { glog.Errorf("Failed to update unschedulable task status <%s/%s>: %v", taskInfo.Namespace, taskInfo.Name, err) } @@ -658,8 +721,8 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { } // UpdateJobStatus update the status of job and its tasks. -func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo) (*kbapi.JobInfo, error) { - if !shadowPodGroup(job.PodGroup) { +func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*kbapi.JobInfo, error) { + if updatePG && !shadowPodGroup(job.PodGroup) { pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup) if err != nil { return nil, err diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/interface.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/interface.go index b664ea20a6..9a2807d4e2 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/interface.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/interface.go @@ -46,7 +46,7 @@ type Cache interface { RecordJobStatusEvent(job *api.JobInfo) // UpdateJobStatus puts job in backlog for a while. - UpdateJobStatus(job *api.JobInfo) (*api.JobInfo, error) + UpdateJobStatus(job *api.JobInfo, updatePG bool) (*api.JobInfo, error) // AllocateVolumes allocates volume on the host to the task AllocateVolumes(task *api.TaskInfo, hostname string) error diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/job_updater.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/job_updater.go new file mode 100644 index 0000000000..ab03cc992e --- /dev/null +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/job_updater.go @@ -0,0 +1,122 @@ +package framework + +import ( + "context" + "math/rand" + "reflect" + "time" + + "github.com/golang/glog" + + "k8s.io/client-go/util/workqueue" + + "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" +) + +const ( + jobUpdaterWorker = 16 + + jobConditionUpdateTime = time.Minute + jobConditionUpdateTimeJitter = 30 * time.Second +) + +// TimeJitterAfter means: new after old + duration + jitter +func TimeJitterAfter(new, old time.Time, duration, maxJitter time.Duration) bool { + var jitter int64 + if maxJitter > 0 { + jitter = rand.Int63n(int64(maxJitter)) + } + return new.After(old.Add(duration + time.Duration(jitter))) +} + +type jobUpdater struct { + ssn *Session + jobQueue []*api.JobInfo +} + +func newJobUpdater(ssn *Session) *jobUpdater { + queue := make([]*api.JobInfo, 0, len(ssn.Jobs)) + for _, job := range ssn.Jobs { + queue = append(queue, job) + } + + ju := &jobUpdater{ + ssn: ssn, + jobQueue: queue, + } + return ju +} + +func (ju *jobUpdater) UpdateAll() { + workqueue.ParallelizeUntil(context.TODO(), jobUpdaterWorker, len(ju.jobQueue), ju.updateJob) +} + +func isPodGroupConditionsUpdated(newCondition, oldCondition []v1alpha1.PodGroupCondition) bool { + if len(newCondition) != len(oldCondition) { + return true + } + + for index, newCond := range newCondition { + oldCond := oldCondition[index] + + newTime := newCond.LastTransitionTime + oldTime := oldCond.LastTransitionTime + if TimeJitterAfter(newTime.Time, oldTime.Time, jobConditionUpdateTime, jobConditionUpdateTimeJitter) { + return true + } + + // if newCond is not new enough, we treat it the same as the old one + newCond.LastTransitionTime = oldTime + + // comparing should ignore the TransitionID + newTransitionID := newCond.TransitionID + newCond.TransitionID = oldCond.TransitionID + + shouldUpdate := !reflect.DeepEqual(&newCond, &oldCond) + + newCond.LastTransitionTime = newTime + newCond.TransitionID = newTransitionID + if shouldUpdate { + return true + } + } + + return false +} + +func isPodGroupStatusUpdated(newStatus, oldStatus *v1alpha1.PodGroupStatus) bool { + newCondition := newStatus.Conditions + newStatus.Conditions = nil + oldCondition := oldStatus.Conditions + oldStatus.Conditions = nil + + shouldUpdate := !reflect.DeepEqual(newStatus, oldStatus) || isPodGroupConditionsUpdated(newCondition, oldCondition) + + newStatus.Conditions = newCondition + oldStatus.Conditions = oldCondition + + return shouldUpdate +} + +// updateJob update specified job +func (ju *jobUpdater) updateJob(index int) { + job := ju.jobQueue[index] + ssn := ju.ssn + + // If job is using PDB, ignore it. + // TODO(k82cn): remove it when removing PDB support + if job.PodGroup == nil { + ssn.cache.RecordJobStatusEvent(job) + return + } + + job.PodGroup.Status = jobStatus(ssn, job) + oldStatus, found := ssn.podGroupStatus[job.UID] + updatePG := !found || isPodGroupStatusUpdated(&job.PodGroup.Status, oldStatus) + + if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil { + glog.Errorf("Failed to update job <%s/%s>: %v", + job.Namespace, job.Name, err) + } +} diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session.go index 27d4ca2a61..311a096d28 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session.go @@ -39,6 +39,8 @@ type Session struct { cache cache.Cache + podGroupStatus map[api.JobID]*v1alpha1.PodGroupStatus + Jobs map[api.JobID]*api.JobInfo Nodes map[string]*api.NodeInfo Queues map[api.QueueID]*api.QueueInfo @@ -52,6 +54,7 @@ type Session struct { taskOrderFns map[string]api.CompareFn predicateFns map[string]api.PredicateFn nodeOrderFns map[string]api.NodeOrderFn + batchNodeOrderFns map[string]api.BatchNodeOrderFn nodeMapFns map[string]api.NodeMapFn nodeReduceFns map[string]api.NodeReduceFn preemptableFns map[string]api.EvictableFn @@ -68,6 +71,8 @@ func openSession(cache cache.Cache) *Session { UID: uuid.NewUUID(), cache: cache, + podGroupStatus: map[api.JobID]*v1alpha1.PodGroupStatus{}, + Jobs: map[api.JobID]*api.JobInfo{}, Nodes: map[string]*api.NodeInfo{}, Queues: map[api.QueueID]*api.QueueInfo{}, @@ -78,6 +83,7 @@ func openSession(cache cache.Cache) *Session { taskOrderFns: map[string]api.CompareFn{}, predicateFns: map[string]api.PredicateFn{}, nodeOrderFns: map[string]api.NodeOrderFn{}, + batchNodeOrderFns: map[string]api.BatchNodeOrderFn{}, nodeMapFns: map[string]api.NodeMapFn{}, nodeReduceFns: map[string]api.NodeReduceFn{}, preemptableFns: map[string]api.EvictableFn{}, @@ -93,6 +99,11 @@ func openSession(cache cache.Cache) *Session { ssn.Jobs = snapshot.Jobs for _, job := range ssn.Jobs { + // only conditions will be updated periodically + if job.PodGroup != nil && job.PodGroup.Status.Conditions != nil { + ssn.podGroupStatus[job.UID] = job.PodGroup.Status.DeepCopy() + } + if vjr := ssn.JobValid(job); vjr != nil { if !vjr.Pass { jc := &v1alpha1.PodGroupCondition{ @@ -123,20 +134,8 @@ func openSession(cache cache.Cache) *Session { } func closeSession(ssn *Session) { - for _, job := range ssn.Jobs { - // If job is using PDB, ignore it. - // TODO(k82cn): remove it when removing PDB support - if job.PodGroup == nil { - ssn.cache.RecordJobStatusEvent(job) - continue - } - - job.PodGroup.Status = jobStatus(ssn, job) - if _, err := ssn.cache.UpdateJobStatus(job); err != nil { - glog.Errorf("Failed to update job <%s/%s>: %v", - job.Namespace, job.Name, err) - } - } + ju := newJobUpdater(ssn) + ju.UpdateAll() ssn.Jobs = nil ssn.Nodes = nil diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session_plugins.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session_plugins.go index b7421b3d0c..8754c71a51 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session_plugins.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session_plugins.go @@ -66,6 +66,11 @@ func (ssn *Session) AddNodeOrderFn(name string, pf api.NodeOrderFn) { ssn.nodeOrderFns[name] = pf } +// AddBatchNodeOrderFn add Batch Node order function +func (ssn *Session) AddBatchNodeOrderFn(name string, pf api.BatchNodeOrderFn) { + ssn.batchNodeOrderFns[name] = pf +} + // AddNodeMapFn add Node map function func (ssn *Session) AddNodeMapFn(name string, pf api.NodeMapFn) { ssn.nodeMapFns[name] = pf @@ -406,6 +411,30 @@ func (ssn *Session) NodeOrderFn(task *api.TaskInfo, node *api.NodeInfo) (float64 return priorityScore, nil } +// BatchNodeOrderFn invoke node order function of the plugins +func (ssn *Session) BatchNodeOrderFn(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) { + priorityScore := make(map[string]float64, len(nodes)) + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + if !isEnabled(plugin.EnabledNodeOrder) { + continue + } + pfn, found := ssn.batchNodeOrderFns[plugin.Name] + if !found { + continue + } + score, err := pfn(task, nodes) + if err != nil { + return nil, err + } + for nodeName, score := range score { + priorityScore[nodeName] += score + } + } + } + return priorityScore, nil +} + func isEnabled(enabled *bool) bool { return enabled != nil && *enabled } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/gang/gang.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/gang/gang.go index f1c707b09f..d203517a49 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/gang/gang.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/gang/gang.go @@ -137,6 +137,7 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) { unreadyTaskCount = job.MinAvailable - job.ReadyTaskNum() msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", job.MinAvailable-job.ReadyTaskNum(), len(job.Tasks), job.FitError()) + job.JobFitErrors = msg unScheduleJobCount++ metrics.UpdateUnscheduleTaskCount(job.Name, int(unreadyTaskCount)) @@ -155,6 +156,18 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) { glog.Errorf("Failed to update job <%s/%s> condition: %v", job.Namespace, job.Name, err) } + + // allocated task should follow the job fit error + for _, taskInfo := range job.TaskStatusIndex[api.Allocated] { + fitError := job.NodesFitErrors[taskInfo.UID] + if fitError != nil { + continue + } + + fitError = api.NewFitErrors() + job.NodesFitErrors[taskInfo.UID] = fitError + fitError.SetError(msg) + } } } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/nodeorder/nodeorder.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/nodeorder/nodeorder.go index 4441d33119..30a9833529 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/nodeorder/nodeorder.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/nodeorder/nodeorder.go @@ -56,19 +56,6 @@ func getInterPodAffinityScore(name string, interPodAffinityScore schedulerapi.Ho return 0 } -func generateNodeMapAndSlice(nodes map[string]*api.NodeInfo) (map[string]*cache.NodeInfo, []*v1.Node) { - var nodeMap map[string]*cache.NodeInfo - var nodeSlice []*v1.Node - nodeMap = make(map[string]*cache.NodeInfo) - for _, node := range nodes { - nodeInfo := cache.NewNodeInfo(node.Pods()...) - nodeInfo.SetNode(node.Node) - nodeMap[node.Name] = nodeInfo - nodeSlice = append(nodeSlice, node.Node) - } - return nodeMap, nodeSlice -} - type cachedNodeInfo struct { session *framework.Session } @@ -153,30 +140,58 @@ func calculateWeight(args framework.Arguments) priorityWeight { } func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { - nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { + var nodeMap map[string]*cache.NodeInfo + var nodeSlice []*v1.Node - weight := calculateWeight(pp.pluginArguments) + weight := calculateWeight(pp.pluginArguments) - pl := &util.PodLister{ - Session: ssn, - } + pl := util.NewPodLister(ssn) - nl := &util.NodeLister{ - Session: ssn, - } + nl := &util.NodeLister{ + Session: ssn, + } - cn := &cachedNodeInfo{ - session: ssn, - } + cn := &cachedNodeInfo{ + session: ssn, + } - var nodeMap map[string]*cache.NodeInfo - var nodeSlice []*v1.Node - var interPodAffinityScore schedulerapi.HostPriorityList + nodeMap, nodeSlice = util.GenerateNodeMapAndSlice(ssn.Nodes) - nodeMap, nodeSlice = generateNodeMapAndSlice(ssn.Nodes) + // Register event handlers to update task info in PodLister & nodeMap + ssn.AddEventHandler(&framework.EventHandler{ + AllocateFunc: func(event *framework.Event) { + pod := pl.UpdateTask(event.Task, event.Task.NodeName) - nodeInfo := cache.NewNodeInfo(node.Pods()...) - nodeInfo.SetNode(node.Node) + nodeName := event.Task.NodeName + node, found := nodeMap[nodeName] + if !found { + glog.Warningf("node order, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + } else { + node.AddPod(pod) + glog.V(4).Infof("node order, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) + } + }, + DeallocateFunc: func(event *framework.Event) { + pod := pl.UpdateTask(event.Task, "") + + nodeName := event.Task.NodeName + node, found := nodeMap[nodeName] + if !found { + glog.Warningf("node order, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + } else { + node.RemovePod(pod) + glog.V(4).Infof("node order, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName) + } + }, + }) + + nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { + nodeInfo, found := nodeMap[node.Name] + if !found { + nodeInfo = cache.NewNodeInfo(node.Pods()...) + nodeInfo.SetNode(node.Node) + glog.Warningf("node order, generate node info for %s at NodeOrderFn is unexpected", node.Name) + } var score = 0.0 //TODO: Add ImageLocalityPriority Function once priorityMetadata is published @@ -206,20 +221,30 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { // If nodeAffinityWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score. score = score + float64(host.Score*weight.nodeAffinityWeight) + glog.V(4).Infof("Total Score for task %s/%s on node %s is: %f", task.Namespace, task.Name, node.Name, score) + return score, nil + } + ssn.AddNodeOrderFn(pp.Name(), nodeOrderFn) + + batchNodeOrderFn := func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) { + var interPodAffinityScore schedulerapi.HostPriorityList + mapFn := priorities.NewInterPodAffinityPriority(cn, nl, pl, v1.DefaultHardPodAffinitySymmetricWeight) - interPodAffinityScore, err = mapFn(task.Pod, nodeMap, nodeSlice) + interPodAffinityScore, err := mapFn(task.Pod, nodeMap, nodeSlice) if err != nil { glog.Warningf("Calculate Inter Pod Affinity Priority Failed because of Error: %v", err) - return 0, err + return nil, err } - hostScore := getInterPodAffinityScore(node.Name, interPodAffinityScore) - // If podAffinityWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score. - score = score + float64(hostScore*weight.podAffinityWeight) - glog.V(4).Infof("Total Score for that node is: %d", score) + score := make(map[string]float64, len(interPodAffinityScore)) + for _, host := range interPodAffinityScore { + score[host.Host] = float64(host.Score) * float64(weight.podAffinityWeight) + } + + glog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, score) return score, nil } - ssn.AddNodeOrderFn(pp.Name(), nodeOrderFn) + ssn.AddBatchNodeOrderFn(pp.Name(), batchNodeOrderFn) } func (pp *nodeOrderPlugin) OnSessionClose(ssn *framework.Session) { diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/predicates/predicates.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/predicates/predicates.go index fa307bc3a1..14dfbdab42 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/predicates/predicates.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/predicates/predicates.go @@ -111,9 +111,39 @@ func enablePredicate(args framework.Arguments) predicateEnable { } func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { - pl := &util.PodLister{ - Session: ssn, - } + var nodeMap map[string]*cache.NodeInfo + + pl := util.NewPodLister(ssn) + + nodeMap, _ = util.GenerateNodeMapAndSlice(ssn.Nodes) + + // Register event handlers to update task info in PodLister & nodeMap + ssn.AddEventHandler(&framework.EventHandler{ + AllocateFunc: func(event *framework.Event) { + pod := pl.UpdateTask(event.Task, event.Task.NodeName) + + nodeName := event.Task.NodeName + node, found := nodeMap[nodeName] + if !found { + glog.Warningf("predicates, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + } else { + node.AddPod(pod) + glog.V(4).Infof("predicates, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) + } + }, + DeallocateFunc: func(event *framework.Event) { + pod := pl.UpdateTask(event.Task, "") + + nodeName := event.Task.NodeName + node, found := nodeMap[nodeName] + if !found { + glog.Warningf("predicates, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + } else { + node.RemovePod(pod) + glog.V(4).Infof("predicates, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName) + } + }, + }) ni := &util.CachedNodeInfo{ Session: ssn, @@ -122,11 +152,17 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { predicate := enablePredicate(pp.pluginArguments) ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { - nodeInfo := cache.NewNodeInfo(node.Pods()...) - nodeInfo.SetNode(node.Node) + nodeInfo, found := nodeMap[node.Name] + if !found { + nodeInfo = cache.NewNodeInfo(node.Pods()...) + nodeInfo.SetNode(node.Node) + glog.Warningf("predicates, generate node info for %s at PredicateFn is unexpected", node.Name) + } if node.Allocatable.MaxTaskNum <= len(nodeInfo.Pods()) { - return fmt.Errorf("node <%s> can not allow more task running on it", node.Name) + glog.V(4).Infof("NodePodNumber predicates Task <%s/%s> on Node <%s> failed", + task.Namespace, task.Name, node.Name) + return api.NewFitError(task, node, api.NodePodNumberExceeded) } // CheckNodeCondition Predicate @@ -139,12 +175,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("node <%s> are not available to schedule task <%s/%s>: %s", - node.Name, task.Namespace, task.Name, formatReason(reasons)) + return api.NewFitErrorByReasons(task, node, reasons...) } // CheckNodeUnschedulable Predicate - fit, _, err = predicates.CheckNodeUnschedulablePredicate(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.CheckNodeUnschedulablePredicate(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -153,12 +188,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("task <%s/%s> node <%s> set to unschedulable", - task.Namespace, task.Name, node.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } // NodeSelector Predicate - fit, _, err = predicates.PodMatchNodeSelector(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.PodMatchNodeSelector(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -167,12 +201,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("node <%s> didn't match task <%s/%s> node selector", - node.Name, task.Namespace, task.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } // HostPorts Predicate - fit, _, err = predicates.PodFitsHostPorts(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.PodFitsHostPorts(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -181,12 +214,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("node <%s> didn't have available host ports for task <%s/%s>", - node.Name, task.Namespace, task.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } // Toleration/Taint Predicate - fit, _, err = predicates.PodToleratesNodeTaints(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.PodToleratesNodeTaints(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -195,13 +227,12 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("task <%s/%s> does not tolerate node <%s> taints", - task.Namespace, task.Name, node.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } if predicate.memoryPressureEnable { // CheckNodeMemoryPressurePredicate - fit, _, err = predicates.CheckNodeMemoryPressurePredicate(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.CheckNodeMemoryPressurePredicate(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -210,14 +241,13 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("node <%s> are not available to schedule task <%s/%s> due to Memory Pressure", - node.Name, task.Namespace, task.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } } if predicate.diskPressureEnable { // CheckNodeDiskPressurePredicate - fit, _, err = predicates.CheckNodeDiskPressurePredicate(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.CheckNodeDiskPressurePredicate(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -226,14 +256,13 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("node <%s> are not available to schedule task <%s/%s> due to Disk Pressure", - node.Name, task.Namespace, task.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } } if predicate.pidPressureEnable { // CheckNodePIDPressurePredicate - fit, _, err = predicates.CheckNodePIDPressurePredicate(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.CheckNodePIDPressurePredicate(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -242,14 +271,19 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("node <%s> are not available to schedule task <%s/%s> due to PID Pressure", - node.Name, task.Namespace, task.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } } + var lister algorithm.PodLister + lister = pl + if !util.HaveAffinity(task.Pod) { + // pod without affinity will be only affected by pod with affinity + lister = pl.AffinityLister() + } // Pod Affinity/Anti-Affinity Predicate - podAffinityPredicate := predicates.NewPodAffinityPredicate(ni, pl) - fit, _, err = podAffinityPredicate(task.Pod, nil, nodeInfo) + podAffinityPredicate := predicates.NewPodAffinityPredicate(ni, lister) + fit, reasons, err = podAffinityPredicate(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -258,8 +292,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("task <%s/%s> affinity/anti-affinity failed on node <%s>", - node.Name, task.Namespace, task.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } return nil diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/proportion/proportion.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/proportion/proportion.go index 06f4c1c4cc..1f570ce01b 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/proportion/proportion.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/proportion/proportion.go @@ -220,6 +220,11 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { } pgResource := api.NewResource(*job.PodGroup.Spec.MinResources) + if len(queue.Queue.Spec.Capability) == 0 { + glog.V(4).Infof("Capability of queue <%s> was not set, allow job <%s/%s> to Inqueue.", + queue.Name, job.Namespace, job.Name) + return true + } // The queue resource quota limit has not reached if pgResource.Clone().Add(attr.allocated).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) { return true diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/util/util.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/util/util.go index a8935afa23..6d5700a609 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/util/util.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/util/util.go @@ -19,9 +19,12 @@ package util import ( "fmt" + "github.com/golang/glog" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/algorithm" + "k8s.io/kubernetes/pkg/scheduler/cache" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" @@ -30,11 +33,36 @@ import ( // PodLister is used in predicate and nodeorder plugin type PodLister struct { Session *framework.Session + + CachedPods map[api.TaskID]*v1.Pod + Tasks map[api.TaskID]*api.TaskInfo + TaskWithAffinity map[api.TaskID]*api.TaskInfo } -// List method is used to list all the pods -func (pl *PodLister) List(selector labels.Selector) ([]*v1.Pod, error) { - var pods []*v1.Pod +// PodAffinityLister is used to list pod with affinity +type PodAffinityLister struct { + pl *PodLister +} + +// HaveAffinity checks pod have affinity or not +func HaveAffinity(pod *v1.Pod) bool { + affinity := pod.Spec.Affinity + return affinity != nil && + (affinity.NodeAffinity != nil || + affinity.PodAffinity != nil || + affinity.PodAntiAffinity != nil) +} + +// NewPodLister returns a PodLister generate from ssn +func NewPodLister(ssn *framework.Session) *PodLister { + pl := &PodLister{ + Session: ssn, + + CachedPods: make(map[api.TaskID]*v1.Pod), + Tasks: make(map[api.TaskID]*api.TaskInfo), + TaskWithAffinity: make(map[api.TaskID]*api.TaskInfo), + } + for _, job := range pl.Session.Jobs { for status, tasks := range job.TaskStatusIndex { if !api.AllocatedStatus(status) { @@ -42,48 +70,132 @@ func (pl *PodLister) List(selector labels.Selector) ([]*v1.Pod, error) { } for _, task := range tasks { - if selector.Matches(labels.Set(task.Pod.Labels)) { - if task.NodeName != task.Pod.Spec.NodeName { - pod := task.Pod.DeepCopy() - pod.Spec.NodeName = task.NodeName - pods = append(pods, pod) - } else { - pods = append(pods, task.Pod) - } + pl.Tasks[task.UID] = task + if HaveAffinity(task.Pod) { + pl.TaskWithAffinity[task.UID] = task } } } } + return pl +} + +func (pl *PodLister) copyTaskPod(task *api.TaskInfo) *v1.Pod { + pod := task.Pod.DeepCopy() + pod.Spec.NodeName = task.NodeName + return pod +} + +// GetPod will get pod with proper nodeName, from cache or DeepCopy +// keeping this function read only to avoid concurrent panic of map +func (pl *PodLister) GetPod(task *api.TaskInfo) *v1.Pod { + if task.NodeName == task.Pod.Spec.NodeName { + return task.Pod + } + + pod, found := pl.CachedPods[task.UID] + if !found { + // we could not write the copied pod back into cache for read only + pod = pl.copyTaskPod(task) + glog.Warningf("DeepCopy for pod %s/%s at PodLister.GetPod is unexpected", pod.Namespace, pod.Name) + } + return pod +} + +// UpdateTask will update the pod nodeName in cache using nodeName +// NOT thread safe, please ensure UpdateTask is the only called function of PodLister at the same time. +func (pl *PodLister) UpdateTask(task *api.TaskInfo, nodeName string) *v1.Pod { + pod, found := pl.CachedPods[task.UID] + if !found { + pod = pl.copyTaskPod(task) + pl.CachedPods[task.UID] = pod + } + pod.Spec.NodeName = nodeName + + if !api.AllocatedStatus(task.Status) { + delete(pl.Tasks, task.UID) + if HaveAffinity(task.Pod) { + delete(pl.TaskWithAffinity, task.UID) + } + } else { + pl.Tasks[task.UID] = task + if HaveAffinity(task.Pod) { + pl.TaskWithAffinity[task.UID] = task + } + } + + return pod +} + +// List method is used to list all the pods +func (pl *PodLister) List(selector labels.Selector) ([]*v1.Pod, error) { + var pods []*v1.Pod + for _, task := range pl.Tasks { + pod := pl.GetPod(task) + if selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, pod) + } + } + return pods, nil } // FilteredList is used to list all the pods under filter condition -func (pl *PodLister) FilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { +func (pl *PodLister) filteredListWithTaskSet(taskSet map[api.TaskID]*api.TaskInfo, podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { var pods []*v1.Pod - for _, job := range pl.Session.Jobs { - for status, tasks := range job.TaskStatusIndex { - if !api.AllocatedStatus(status) { - continue - } - - for _, task := range tasks { - if podFilter(task.Pod) && selector.Matches(labels.Set(task.Pod.Labels)) { - if task.NodeName != task.Pod.Spec.NodeName { - pod := task.Pod.DeepCopy() - pod.Spec.NodeName = task.NodeName - pods = append(pods, pod) - } else { - pods = append(pods, task.Pod) - } - } - } + for _, task := range taskSet { + pod := pl.GetPod(task) + if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, pod) } } return pods, nil } +// FilteredList is used to list all the pods under filter condition +func (pl *PodLister) FilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { + return pl.filteredListWithTaskSet(pl.Tasks, podFilter, selector) +} + +// AffinityFilteredList is used to list all the pods with affinity under filter condition +func (pl *PodLister) AffinityFilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { + return pl.filteredListWithTaskSet(pl.TaskWithAffinity, podFilter, selector) +} + +// AffinityLister generate a PodAffinityLister following current PodLister +func (pl *PodLister) AffinityLister() *PodAffinityLister { + pal := &PodAffinityLister{ + pl: pl, + } + return pal +} + +// List method is used to list all the pods +func (pal *PodAffinityLister) List(selector labels.Selector) ([]*v1.Pod, error) { + return pal.pl.List(selector) +} + +// FilteredList is used to list all the pods with affinity under filter condition +func (pal *PodAffinityLister) FilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { + return pal.pl.AffinityFilteredList(podFilter, selector) +} + +// GenerateNodeMapAndSlice returns the nodeMap and nodeSlice generated from ssn +func GenerateNodeMapAndSlice(nodes map[string]*api.NodeInfo) (map[string]*cache.NodeInfo, []*v1.Node) { + var nodeMap map[string]*cache.NodeInfo + var nodeSlice []*v1.Node + nodeMap = make(map[string]*cache.NodeInfo) + for _, node := range nodes { + nodeInfo := cache.NewNodeInfo(node.Pods()...) + nodeInfo.SetNode(node.Node) + nodeMap[node.Name] = nodeInfo + nodeSlice = append(nodeSlice, node.Node) + } + return nodeMap, nodeSlice +} + // CachedNodeInfo is used in nodeorder and predicate plugin type CachedNodeInfo struct { Session *framework.Session diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/assert/assert.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/assert/assert.go new file mode 100644 index 0000000000..0ff84f0db0 --- /dev/null +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/assert/assert.go @@ -0,0 +1,44 @@ +package assert + +import ( + "fmt" + "os" + "runtime/debug" + + "github.com/golang/glog" +) + +const ( + // EnvPanicOnError is the env name to determine panic on assertion failed or not + EnvPanicOnError = "PANIC_ON_ERROR" +) + +var ( + panicOnError = true +) + +func init() { + env := os.Getenv(EnvPanicOnError) + if env == "false" { + panicOnError = false + } +} + +// Assert check condition, if condition is false, print message by log or panic +func Assert(condition bool, message string) { + if condition { + return + } + if panicOnError { + panic(message) + } + glog.Errorf("%s, %s", message, debug.Stack()) +} + +// Assertf check condition, if condition is false, print message using Assert +func Assertf(condition bool, format string, args ...interface{}) { + if condition { + return + } + Assert(condition, fmt.Sprintf(format, args...)) +} diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/scheduler_helper.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/scheduler_helper.go index 7429dd8a51..3932082b4c 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/scheduler_helper.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/scheduler_helper.go @@ -31,10 +31,14 @@ import ( ) // PredicateNodes returns nodes that fit task -func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) []*api.NodeInfo { +func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) { var predicateNodes []*api.NodeInfo var workerLock sync.Mutex + + var errorLock sync.Mutex + fe := api.NewFitErrors() + checkNode := func(index int) { node := nodes[index] glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>", @@ -42,8 +46,11 @@ func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateF // TODO (k82cn): Enable eCache for performance improvement. if err := fn(task, node); err != nil { - glog.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", + glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) + errorLock.Lock() + fe.SetNodeError(node.Name, err) + errorLock.Unlock() return } @@ -53,11 +60,11 @@ func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateF } workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode) - return predicateNodes + return predicateNodes, fe } // PrioritizeNodes returns a map whose key is node's score and value are corresponding nodes -func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, mapFn api.NodeOrderMapFn, reduceFn api.NodeOrderReduceFn) map[float64][]*api.NodeInfo { +func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, batchFn api.BatchNodeOrderFn, mapFn api.NodeOrderMapFn, reduceFn api.NodeOrderReduceFn) map[float64][]*api.NodeInfo { pluginNodeScoreMap := map[string]schedulerapi.HostPriorityList{} nodeOrderScoreMap := map[string]float64{} nodeScores := map[float64][]*api.NodeInfo{} @@ -90,11 +97,21 @@ func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, mapFn api.NodeOr glog.Errorf("Error in Calculating Priority for the node:%v", err) return nodeScores } + + batchNodeScore, err := batchFn(task, nodes) + if err != nil { + glog.Errorf("Error in Calculating batch Priority for the node, err %v", err) + return nodeScores + } + for _, node := range nodes { if score, found := reduceScores[node.Name]; found { if orderScore, ok := nodeOrderScoreMap[node.Name]; ok { score = score + orderScore } + if batchScore, ok := batchNodeScore[node.Name]; ok { + score = score + batchScore + } nodeScores[score] = append(nodeScores[score], node) } else { // If no plugin is applied to this node, the default is 0.0 @@ -102,6 +119,9 @@ func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, mapFn api.NodeOr if orderScore, ok := nodeOrderScoreMap[node.Name]; ok { score = score + orderScore } + if batchScore, ok := batchNodeScore[node.Name]; ok { + score = score + batchScore + } nodeScores[score] = append(nodeScores[score], node) } } From 4d315f47f598f8b457389c4740e38c080ccf2144 Mon Sep 17 00:00:00 2001 From: asifdxtreme Date: Mon, 1 Jul 2019 14:05:32 +0530 Subject: [PATCH 2/3] Update e2e test to use volcano as scheduler name --- test/e2e/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/util.go b/test/e2e/util.go index f2b4e8a8ce..1eb1c939c0 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -338,7 +338,7 @@ func createJobInner(context *context, jobSpec *jobSpec) (*vkv1.Job, error) { Labels: task.labels, }, Spec: v1.PodSpec{ - SchedulerName: "kube-batch", + SchedulerName: "volcano", RestartPolicy: restartPolicy, Containers: createContainers(task.img, task.command, task.workingDir, task.req, task.limit, task.hostport), Affinity: task.affinity, From 35f0312c7dd2f1b4e224995c3bccc3417d2dc70a Mon Sep 17 00:00:00 2001 From: asifdxtreme Date: Mon, 1 Jul 2019 14:23:35 +0530 Subject: [PATCH 3/3] Update UT to use SchedulerName: volcano --- pkg/controllers/job/job_controller_handler_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/controllers/job/job_controller_handler_test.go b/pkg/controllers/job/job_controller_handler_test.go index b96efcefef..12b5802a7c 100644 --- a/pkg/controllers/job/job_controller_handler_test.go +++ b/pkg/controllers/job/job_controller_handler_test.go @@ -183,7 +183,7 @@ func TestUpdateJobFunc(t *testing.T) { Namespace: namespace, }, Spec: vkbatchv1.JobSpec{ - SchedulerName: "kube-batch", + SchedulerName: "volcano", MinAvailable: 5, }, Status: vkbatchv1.JobStatus{ @@ -198,7 +198,7 @@ func TestUpdateJobFunc(t *testing.T) { Namespace: namespace, }, Spec: vkbatchv1.JobSpec{ - SchedulerName: "kube-batch", + SchedulerName: "volcano", MinAvailable: 5, }, Status: vkbatchv1.JobStatus{ @@ -216,7 +216,7 @@ func TestUpdateJobFunc(t *testing.T) { Namespace: namespace, }, Spec: vkbatchv1.JobSpec{ - SchedulerName: "kube-batch", + SchedulerName: "volcano", MinAvailable: 5, }, Status: vkbatchv1.JobStatus{ @@ -231,7 +231,7 @@ func TestUpdateJobFunc(t *testing.T) { Namespace: namespace, }, Spec: vkbatchv1.JobSpec{ - SchedulerName: "kube-batch", + SchedulerName: "volcano", MinAvailable: 5, }, Status: vkbatchv1.JobStatus{