Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Automated cherry pick of #592: Update prometheus vendor libs #629

Merged
merged 2 commits into from
Mar 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@
[prune]
go-tests = true
unused-packages = true

[[constraint]]
name = "github.com/prometheus/client_golang"
version = "0.9.2"
2 changes: 2 additions & 0 deletions cmd/kube-batch/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ServerOption struct {
LockObjectNamespace string
DefaultQueue string
PrintVersion bool
ListenAddress string
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -56,6 +57,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
"executing the main loop. Enable this when running replicated kube-batch for high availability")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object")
fs.StringVar(&s.ListenAddress, "listen-address", ":8080", "The address to listen on for HTTP requests.")
}

func (s *ServerOption) CheckOptionOrDie() error {
Expand Down
7 changes: 7 additions & 0 deletions cmd/kube-batch/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package app
import (
"context"
"fmt"
"net/http"
"os"
"time"

"github.com/golang/glog"
"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler"
"github.com/kubernetes-sigs/kube-batch/pkg/version"
"github.com/prometheus/client_golang/prometheus/promhttp"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
Expand Down Expand Up @@ -75,6 +77,11 @@ func Run(opt *options.ServerOption) error {
panic(err)
}

go func() {
http.Handle("/metrics", promhttp.Handler())
glog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
}()

run := func(ctx context.Context) {
sched.Run(ctx.Done())
<-ctx.Done()
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
for !tasks.Empty() {
predicateNodes := []*api.NodeInfo{}
nodeScores := map[int][]*api.NodeInfo{}

task := tasks.Pop().(*api.TaskInfo)
assigned := false

Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util"
)

Expand Down Expand Up @@ -212,6 +213,7 @@ func preempt(
}
}
victims := ssn.Preemptable(preemptor, preemptees)
metrics.UpdatePreemptionVictimsCount(len(victims))

if err := validateVictims(victims, resreq); err != nil {
glog.V(3).Infof("No validated victims on Node <%s>: %v", node.Name, err)
Expand All @@ -235,6 +237,7 @@ func preempt(
resreq.Sub(preemptee.Resreq)
}

metrics.RegisterPreemptionAttempts()
glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.",
preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq)

Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package framework

import (
"time"

"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session {
Expand All @@ -39,15 +42,19 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session {
}

for _, plugin := range ssn.plugins {
onSessionOpenStart := time.Now()
plugin.OnSessionOpen(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
}

return ssn
}

func CloseSession(ssn *Session) {
for _, plugin := range ssn.plugins {
onSessionCloseStart := time.Now()
plugin.OnSessionClose(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionClose, metrics.Duration(onSessionCloseStart))
}

closeSession(ssn)
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/golang/glog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
Expand All @@ -30,6 +30,7 @@ import (
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

type Session struct {
Expand Down Expand Up @@ -295,6 +296,7 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error {
task.Job, ssn.UID)
}

metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time))
return nil
}

Expand Down
191 changes: 191 additions & 0 deletions pkg/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" // auto-registry collectors in default registry
)

const (
// KubeBatchNamespace - namespace in prometheus used by kube-batch
KubeBatchNamespace = "kube_batch"

// OnSessionOpen label
OnSessionOpen = "OnSessionOpen"

// OnSessionClose label
OnSessionClose = "OnSessionClose"
)

var (
e2eSchedulingLatency = promauto.NewHistogram(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "e2e_scheduling_latency_milliseconds",
Help: "E2e scheduling latency in milliseconds (scheduling algorithm + binding)",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
},
)

pluginSchedulingLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "plugin_scheduling_latency_microseconds",
Help: "Plugin scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
}, []string{"plugin", "OnSession"},
)

actionSchedulingLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "action_scheduling_latency_microseconds",
Help: "Action scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
}, []string{"action"},
)

taskSchedulingLatency = promauto.NewHistogram(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "task_scheduling_latency_microseconds",
Help: "Task scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
},
)

scheduleAttempts = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeBatchNamespace,
Name: "schedule_attempts_total",
Help: "Number of attempts to schedule pods, by the result. 'unschedulable' means a pod could not be scheduled, while 'error' means an internal scheduler problem.",
}, []string{"result"},
)

preemptionVictims = promauto.NewGauge(
prometheus.GaugeOpts{
Subsystem: KubeBatchNamespace,
Name: "pod_preemption_victims",
Help: "Number of selected preemption victims",
},
)

preemptionAttempts = promauto.NewCounter(
prometheus.CounterOpts{
Subsystem: KubeBatchNamespace,
Name: "total_preemption_attempts",
Help: "Total preemption attempts in the cluster till now",
},
)

unscheduleTaskCount = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: KubeBatchNamespace,
Name: "unschedule_task_count",
Help: "Number of tasks could not be scheduled",
}, []string{"job_id"},
)

unscheduleJobCount = promauto.NewGauge(
prometheus.GaugeOpts{
Subsystem: KubeBatchNamespace,
Name: "unschedule_job_count",
Help: "Number of jobs could not be scheduled",
},
)

jobRetryCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeBatchNamespace,
Name: "job_retry_counts",
Help: "Number of retry counts for one job",
}, []string{"job_id"},
)
)

// UpdatePluginDuration updates latency for every plugin
func UpdatePluginDuration(pluginName, OnSessionStatus string, duration time.Duration) {
pluginSchedulingLatency.WithLabelValues(pluginName, OnSessionStatus).Observe(DurationInMicroseconds(duration))
}

// UpdateActionDuration updates latency for every action
func UpdateActionDuration(actionName string, duration time.Duration) {
actionSchedulingLatency.WithLabelValues(actionName).Observe(DurationInMicroseconds(duration))
}

// UpdateE2eDuration updates entire end to end scheduling latency
func UpdateE2eDuration(duration time.Duration) {
e2eSchedulingLatency.Observe(DurationInMilliseconds(duration))
}

// UpdateTaskScheduleDuration updates single task scheduling latency
func UpdateTaskScheduleDuration(duration time.Duration) {
taskSchedulingLatency.Observe(DurationInMicroseconds(duration))
}

// UpdatePodScheduleStatus update pod schedule decision, could be Success, Failure, Error
func UpdatePodScheduleStatus(label string, count int) {
scheduleAttempts.WithLabelValues(label).Add(float64(count))
}

// UpdatePreemptionVictimsCount updates count of preemption victims
func UpdatePreemptionVictimsCount(victimsCount int) {
preemptionVictims.Set(float64(victimsCount))
}

// RegisterPreemptionAttempts records number of attempts for preemtion
func RegisterPreemptionAttempts() {
preemptionAttempts.Inc()
}

// UpdateUnscheduleTaskCount records total number of unscheduleable tasks
func UpdateUnscheduleTaskCount(jobID string, taskCount int) {
unscheduleTaskCount.WithLabelValues(jobID).Set(float64(taskCount))
}

// UpdateUnscheduleJobCount records total number of unscheduleable jobs
func UpdateUnscheduleJobCount(jobCount int) {
unscheduleJobCount.Set(float64(jobCount))
}

// RegisterJobRetries total number of job retries.
func RegisterJobRetries(jobID string) {
jobRetryCount.WithLabelValues(jobID).Inc()
}

// DurationInMicroseconds gets the time in microseconds.
func DurationInMicroseconds(duration time.Duration) float64 {
return float64(duration.Nanoseconds()) / float64(time.Microsecond.Nanoseconds())
}

// DurationInMilliseconds gets the time in milliseconds.
func DurationInMilliseconds(duration time.Duration) float64 {
return float64(duration.Nanoseconds()) / float64(time.Millisecond.Nanoseconds())
}

// DurationInSeconds gets the time in seconds.
func DurationInSeconds(duration time.Duration) float64 {
return duration.Seconds()
}

// Duration get the time since specified start
func Duration(start time.Time) time.Duration {
return time.Since(start)
}
Loading