From cdde8e3508e198b1aa79e45fafdce97e8f0aad07 Mon Sep 17 00:00:00 2001 From: Jiaxin Shan Date: Tue, 5 Feb 2019 12:18:31 -0800 Subject: [PATCH] Add metrics for kube-batch --- pkg/scheduler/actions/allocate/allocate.go | 1 + pkg/scheduler/actions/preempt/preempt.go | 3 + pkg/scheduler/framework/framework.go | 7 + pkg/scheduler/framework/session.go | 4 +- pkg/scheduler/metrics/metrics.go | 191 +++++++++++++++++++++ pkg/scheduler/plugins/gang/gang.go | 12 +- pkg/scheduler/scheduler.go | 5 + 7 files changed, 221 insertions(+), 2 deletions(-) create mode 100644 pkg/scheduler/metrics/metrics.go diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 11ec5baf1..753ad8bbb 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -105,6 +105,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { for !tasks.Empty() { predicateNodes := []*api.NodeInfo{} nodeScores := map[int][]*api.NodeInfo{} + task := tasks.Pop().(*api.TaskInfo) assigned := false diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index b0e63a974..9b2132549 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -23,6 +23,7 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util" ) @@ -212,6 +213,7 @@ func preempt( } } victims := ssn.Preemptable(preemptor, preemptees) + metrics.UpdatePreemptionVictimsCount(len(victims)) if err := validateVictims(victims, resreq); err != nil { glog.V(3).Infof("No validated victims on Node <%s>: %v", node.Name, err) @@ -235,6 +237,7 @@ func preempt( resreq.Sub(preemptee.Resreq) } + metrics.RegisterPreemptionAttempts() glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.", preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq) diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index 2362ff752..a25f1c6e0 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -17,10 +17,13 @@ limitations under the License. package framework import ( + "time" + "github.com/golang/glog" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics" ) func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session { @@ -39,7 +42,9 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session { } for _, plugin := range ssn.plugins { + onSessionOpenStart := time.Now() plugin.OnSessionOpen(ssn) + metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart)) } return ssn @@ -47,7 +52,9 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session { func CloseSession(ssn *Session) { for _, plugin := range ssn.plugins { + onSessionCloseStart := time.Now() plugin.OnSessionClose(ssn) + metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionClose, metrics.Duration(onSessionCloseStart)) } closeSession(ssn) diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index ca606db4a..c8dbd6e49 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -21,7 +21,7 @@ import ( "github.com/golang/glog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" @@ -30,6 +30,7 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics" ) type Session struct { @@ -295,6 +296,7 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error { task.Job, ssn.UID) } + metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time)) return nil } diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go new file mode 100644 index 000000000..4e634c65f --- /dev/null +++ b/pkg/scheduler/metrics/metrics.go @@ -0,0 +1,191 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" // auto-registry collectors in default registry +) + +const ( + // KubeBatchNamespace - namespace in prometheus used by kube-batch + KubeBatchNamespace = "kube_batch" + + // OnSessionOpen label + OnSessionOpen = "OnSessionOpen" + + // OnSessionClose label + OnSessionClose = "OnSessionClose" +) + +var ( + e2eSchedulingLatency = promauto.NewHistogram( + prometheus.HistogramOpts{ + Subsystem: KubeBatchNamespace, + Name: "e2e_scheduling_latency_milliseconds", + Help: "E2e scheduling latency in milliseconds (scheduling algorithm + binding)", + Buckets: prometheus.ExponentialBuckets(5, 2, 10), + }, + ) + + pluginSchedulingLatency = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: KubeBatchNamespace, + Name: "plugin_scheduling_latency_microseconds", + Help: "Plugin scheduling latency in microseconds", + Buckets: prometheus.ExponentialBuckets(5, 2, 10), + }, []string{"plugin", "OnSession"}, + ) + + actionSchedulingLatency = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: KubeBatchNamespace, + Name: "action_scheduling_latency_microseconds", + Help: "Action scheduling latency in microseconds", + Buckets: prometheus.ExponentialBuckets(5, 2, 10), + }, []string{"action"}, + ) + + taskSchedulingLatency = promauto.NewHistogram( + prometheus.HistogramOpts{ + Subsystem: KubeBatchNamespace, + Name: "task_scheduling_latency_microseconds", + Help: "Task scheduling latency in microseconds", + Buckets: prometheus.ExponentialBuckets(5, 2, 10), + }, + ) + + scheduleAttempts = promauto.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: KubeBatchNamespace, + Name: "schedule_attempts_total", + Help: "Number of attempts to schedule pods, by the result. 'unschedulable' means a pod could not be scheduled, while 'error' means an internal scheduler problem.", + }, []string{"result"}, + ) + + preemptionVictims = promauto.NewGauge( + prometheus.GaugeOpts{ + Subsystem: KubeBatchNamespace, + Name: "pod_preemption_victims", + Help: "Number of selected preemption victims", + }, + ) + + preemptionAttempts = promauto.NewCounter( + prometheus.CounterOpts{ + Subsystem: KubeBatchNamespace, + Name: "total_preemption_attempts", + Help: "Total preemption attempts in the cluster till now", + }, + ) + + unscheduleTaskCount = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: KubeBatchNamespace, + Name: "unschedule_task_count", + Help: "Number of tasks could not be scheduled", + }, []string{"job_id"}, + ) + + unscheduleJobCount = promauto.NewGauge( + prometheus.GaugeOpts{ + Subsystem: KubeBatchNamespace, + Name: "unschedule_job_count", + Help: "Number of jobs could not be scheduled", + }, + ) + + jobRetryCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: KubeBatchNamespace, + Name: "job_retry_counts", + Help: "Number of retry counts for one job", + }, []string{"job_id"}, + ) +) + +// UpdatePluginDuration updates latency for every plugin +func UpdatePluginDuration(pluginName, OnSessionStatus string, duration time.Duration) { + pluginSchedulingLatency.WithLabelValues(pluginName, OnSessionStatus).Observe(DurationInMicroseconds(duration)) +} + +// UpdateActionDuration updates latency for every action +func UpdateActionDuration(actionName string, duration time.Duration) { + actionSchedulingLatency.WithLabelValues(actionName).Observe(DurationInMicroseconds(duration)) +} + +// UpdateE2eDuration updates entire end to end scheduling latency +func UpdateE2eDuration(duration time.Duration) { + e2eSchedulingLatency.Observe(DurationInMilliseconds(duration)) +} + +// UpdateTaskScheduleDuration updates single task scheduling latency +func UpdateTaskScheduleDuration(duration time.Duration) { + taskSchedulingLatency.Observe(DurationInMicroseconds(duration)) +} + +// UpdatePodScheduleStatus update pod schedule decision, could be Success, Failure, Error +func UpdatePodScheduleStatus(label string, count int) { + scheduleAttempts.WithLabelValues(label).Add(float64(count)) +} + +// UpdatePreemptionVictimsCount updates count of preemption victims +func UpdatePreemptionVictimsCount(victimsCount int) { + preemptionVictims.Set(float64(victimsCount)) +} + +// RegisterPreemptionAttempts records number of attempts for preemtion +func RegisterPreemptionAttempts() { + preemptionAttempts.Inc() +} + +// UpdateUnscheduleTaskCount records total number of unscheduleable tasks +func UpdateUnscheduleTaskCount(jobID string, taskCount int) { + unscheduleTaskCount.WithLabelValues(jobID).Set(float64(taskCount)) +} + +// UpdateUnscheduleJobCount records total number of unscheduleable jobs +func UpdateUnscheduleJobCount(jobCount int) { + unscheduleJobCount.Set(float64(jobCount)) +} + +// RegisterJobRetries total number of job retries. +func RegisterJobRetries(jobID string) { + jobRetryCount.WithLabelValues(jobID).Inc() +} + +// DurationInMicroseconds gets the time in microseconds. +func DurationInMicroseconds(duration time.Duration) float64 { + return float64(duration.Nanoseconds()) / float64(time.Microsecond.Nanoseconds()) +} + +// DurationInMilliseconds gets the time in milliseconds. +func DurationInMilliseconds(duration time.Duration) float64 { + return float64(duration.Nanoseconds()) / float64(time.Millisecond.Nanoseconds()) +} + +// DurationInSeconds gets the time in seconds. +func DurationInSeconds(duration time.Duration) float64 { + return duration.Seconds() +} + +// Duration get the time since specified start +func Duration(start time.Time) time.Duration { + return time.Since(start) +} diff --git a/pkg/scheduler/plugins/gang/gang.go b/pkg/scheduler/plugins/gang/gang.go index 098083e01..7c85f9aef 100644 --- a/pkg/scheduler/plugins/gang/gang.go +++ b/pkg/scheduler/plugins/gang/gang.go @@ -21,12 +21,13 @@ import ( "github.com/golang/glog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics" ) type gangPlugin struct { @@ -156,11 +157,18 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { } func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) { + var unreadyTaskCount int32 + var unScheduleJobCount int for _, job := range ssn.Jobs { if !jobReady(job) { + unreadyTaskCount = job.MinAvailable - readyTaskNum(job) msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", job.MinAvailable-readyTaskNum(job), len(job.Tasks), job.FitError()) + unScheduleJobCount += 1 + metrics.UpdateUnscheduleTaskCount(job.Name, int(unreadyTaskCount)) + metrics.RegisterJobRetries(job.Name) + jc := &v1alpha1.PodGroupCondition{ Type: v1alpha1.PodGroupUnschedulableType, Status: v1.ConditionTrue, @@ -176,4 +184,6 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) { } } } + + metrics.UpdateUnscheduleJobCount(unScheduleJobCount) } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index d822df5d3..5542308d9 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -27,6 +27,7 @@ import ( schedcache "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics" ) type Scheduler struct { @@ -82,12 +83,16 @@ func (pc *Scheduler) Run(stopCh <-chan struct{}) { func (pc *Scheduler) runOnce() { glog.V(4).Infof("Start scheduling ...") + scheduleStartTime := time.Now() defer glog.V(4).Infof("End scheduling ...") + defer metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime)) ssn := framework.OpenSession(pc.cache, pc.plugins) defer framework.CloseSession(ssn) for _, action := range pc.actions { + actionStartTime := time.Now() action.Execute(ssn) + metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime)) } }