Skip to content

Commit

Permalink
feature: mesos方案优化scheduler metrics数据 TencentBlueKing#532
Browse files Browse the repository at this point in the history
  • Loading branch information
zmberg committed Jul 21, 2020
1 parent 2eb7db4 commit f962666
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 34 deletions.
16 changes: 14 additions & 2 deletions bcs-mesos/bcs-scheduler/src/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/store/etcd"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/store/zk"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/pluginManager"
"strconv"
"strings"
//"sync"
Expand Down Expand Up @@ -46,8 +47,19 @@ func New(config util.SchedConfig) (*Manager, error) {

var s store.Store
var err error
var pm *pluginManager.PluginManager
if config.Scheduler.Plugins != "" {
blog.Infof("start init plugin manager")
plugins := strings.Split(config.Scheduler.Plugins, ",")

pm, err = pluginManager.NewPluginManager(plugins, config.Scheduler.PluginDir)
if err != nil {
blog.Errorf("NewPluginManager error %s", err.Error())
}
}

if config.Scheduler.StoreDriver == "etcd" {
s, err = etcd.NewEtcdStore(config.Scheduler.Kubeconfig)
s, err = etcd.NewEtcdStore(config.Scheduler.Kubeconfig, pm, config.Scheduler.Cluster)
if err != nil {
blog.Errorf("new etcd store failed: %s", err.Error())
return nil, err
Expand All @@ -60,7 +72,7 @@ func New(config util.SchedConfig) (*Manager, error) {
blog.Errorf("connect zookeeper %s failed: %s", config.ZkHost, err.Error())
return nil, err
}
s = zk.NewManagerStore(dbzk)
s = zk.NewManagerStore(dbzk, pm, config.Scheduler.Cluster)
config.Scheduler.UseCache = false
}

Expand Down
50 changes: 46 additions & 4 deletions bcs-mesos/bcs-scheduler/src/manager/store/etcd/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/Tencent/bk-bcs/bcs-common/common/blog"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/store"
"github.com/Tencent/bk-bcs/bcs-mesos/pkg/client/internalclientset"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/pluginManager"
typesplugin "github.com/Tencent/bk-bcs/bcs-common/common/plugin"
bkbcsv2 "github.com/Tencent/bk-bcs/bcs-mesos/pkg/client/internalclientset/typed/bkbcs/v2"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -84,6 +86,10 @@ type managerStore struct {
//wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc

//plugin manager, ip-resources
pm *pluginManager.PluginManager
clusterId string
}

//init bcs mesos custom resources
Expand Down Expand Up @@ -174,9 +180,12 @@ func (s *managerStore) StartStoreObjectMetrics() {
store.AgentCpuResourceTotal.Reset()
store.AgentMemoryResourceRemain.Reset()
store.AgentMemoryResourceTotal.Reset()
store.AgentIpResourceRemain.Reset()
store.StorageOperatorFailedTotal.Reset()
store.StorageOperatorLatencyMs.Reset()
store.StorageOperatorTotal.Reset()
store.ClusterMemoryResouceRemain.Reset()
store.ClusterCpuResouceRemain.Reset()

// handle service metrics
services, err := s.ListAllServices()
Expand Down Expand Up @@ -237,22 +246,53 @@ func (s *managerStore) StartStoreObjectMetrics() {
if err != nil {
blog.Errorf("list all agent error %s", err.Error())
}

var clusterCpu float64
var clusterMem float64
for _, agent := range agents {
info := agent.GetAgentInfo()
if info.IP == "" {
blog.Errorf("agent %s don't have InnerIP attribute", agent.Key)
continue
}

store.ReportAgentInfoMetrics(info.IP, info.CpuTotal, info.CpuTotal-info.CpuUsed,
info.MemTotal, info.MemTotal-info.MemUsed)
var ipValue float64
if s.pm!=nil {
//request netservice to node container ip
para := &typesplugin.HostPluginParameter{
Ips: []string{info.IP},
ClusterId: s.clusterId,
}

outerAttri, err := s.pm.GetHostAttributes(para)
if err != nil {
blog.Errorf("Get host(%s) ip-resources failed: %s", info.IP, err.Error())
continue
}
attr, ok := outerAttri[info.IP]
if !ok {
blog.Errorf("host(%s) don't have ip-resources attributes", info.IP)
continue
}
ipAttr := attr.Attributes[0]
blog.Infof("Host(%s) %s Scalar(%f)", info.IP, ipAttr.Name, ipAttr.Scalar.Value)
ipValue = ipAttr.Scalar.Value
}

//if ip-resources is zero, then ignore it
if s.pm==nil || ipValue>0{
clusterCpu += info.CpuTotal-info.CpuUsed
clusterMem += info.MemTotal-info.MemUsed
}

store.ReportAgentInfoMetrics(info.IP, s.clusterId, info.CpuTotal, info.CpuTotal-info.CpuUsed,
info.MemTotal, info.MemTotal-info.MemUsed, ipValue)
}
store.ReportClusterInfoMetrics(s.clusterId, clusterCpu, clusterMem)
}
}

//etcd store, based on kube-apiserver
func NewEtcdStore(kubeconfig string) (store.Store, error) {
func NewEtcdStore(kubeconfig string, pm *pluginManager.PluginManager, clusterId string) (store.Store, error) {
//build kube-apiserver config
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
Expand Down Expand Up @@ -286,6 +326,8 @@ func NewEtcdStore(kubeconfig string) (store.Store, error) {
BkbcsClient: clientset.BkbcsV2(),
k8sClient: k8sClientset,
extensionClient: extensionClient,
pm: pm,
clusterId: clusterId,
}

//fetch application
Expand Down
48 changes: 38 additions & 10 deletions bcs-mesos/bcs-scheduler/src/manager/store/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,49 @@ var (
Subsystem: types.MetricsSubsystemScheduler,
Name: "agent_cpu_resource_total",
Help: "Agent cpu resource total",
}, []string{"InnerIP"})
}, []string{"InnerIP", "clusterId"})

AgentMemoryResourceTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: types.MetricsNamespaceScheduler,
Subsystem: types.MetricsSubsystemScheduler,
Name: "agent_memory_resource_total",
Help: "Agent memory resource total",
}, []string{"InnerIP"})
}, []string{"InnerIP", "clusterId"})

AgentCpuResourceRemain = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: types.MetricsNamespaceScheduler,
Subsystem: types.MetricsSubsystemScheduler,
Name: "agent_cpu_resource_remain",
Help: "Agent cpu resource remain",
}, []string{"InnerIP"})
}, []string{"InnerIP", "clusterId"})

AgentMemoryResourceRemain = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: types.MetricsNamespaceScheduler,
Subsystem: types.MetricsSubsystemScheduler,
Name: "agent_memory_resource_remain",
Help: "Agent memory resource remain",
}, []string{"InnerIP"})
}, []string{"InnerIP", "clusterId"})

AgentIpResourceRemain = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: types.MetricsNamespaceScheduler,
Subsystem: types.MetricsSubsystemScheduler,
Name: "agent_ip_resource_remain",
Help: "Agent ip resource remain",
}, []string{"InnerIP", "clusterId"})

ClusterCpuResouceRemain = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: types.MetricsNamespaceScheduler,
Subsystem: types.MetricsSubsystemScheduler,
Name: "cluster_cpu_resource_remain",
Help: "Cluster cpu resource remain",
}, []string{"clusterId"})

ClusterMemoryResouceRemain = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: types.MetricsNamespaceScheduler,
Subsystem: types.MetricsSubsystemScheduler,
Name: "cluster_memory_resource_remain",
Help: "Cluster memory resource remain",
}, []string{"clusterId"})

StorageOperatorTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespaceScheduler,
Expand All @@ -112,7 +133,8 @@ var (

func init() {
prometheus.MustRegister(ObjectResourceInfo, TaskgroupInfo, AgentCpuResourceTotal, AgentMemoryResourceTotal,
StorageOperatorTotal, StorageOperatorLatencyMs, StorageOperatorFailedTotal, AgentCpuResourceRemain, AgentMemoryResourceRemain)
StorageOperatorTotal, StorageOperatorLatencyMs, StorageOperatorFailedTotal, AgentCpuResourceRemain, AgentMemoryResourceRemain,
AgentIpResourceRemain, ClusterCpuResouceRemain, ClusterMemoryResouceRemain)
}

func ReportObjectResourceInfoMetrics(resource, ns, name, status string) {
Expand Down Expand Up @@ -153,11 +175,17 @@ func ReportTaskgroupInfoMetrics(ns, name, taskgroupId, status string) {
TaskgroupInfo.WithLabelValues(ns, name, taskgroupId).Set(val)
}

func ReportAgentInfoMetrics(ip string, totalCpu, remainCpu, totalMem, remainMem float64) {
AgentCpuResourceTotal.WithLabelValues(ip).Set(totalCpu)
AgentCpuResourceRemain.WithLabelValues(ip).Set(remainCpu)
AgentMemoryResourceTotal.WithLabelValues(ip).Set(totalMem)
AgentMemoryResourceRemain.WithLabelValues(ip).Set(remainMem)
func ReportAgentInfoMetrics(ip, clusterId string, totalCpu, remainCpu, totalMem, remainMem, remainIp float64) {
AgentCpuResourceTotal.WithLabelValues(ip, clusterId).Set(totalCpu)
AgentCpuResourceRemain.WithLabelValues(ip, clusterId).Set(remainCpu)
AgentMemoryResourceTotal.WithLabelValues(ip, clusterId).Set(totalMem)
AgentMemoryResourceRemain.WithLabelValues(ip, clusterId).Set(remainMem)
AgentIpResourceRemain.WithLabelValues(ip, clusterId).Set(remainIp)
}

func ReportClusterInfoMetrics(clusterId string, remainCpu, remainMem float64) {
ClusterCpuResouceRemain.WithLabelValues(clusterId).Set(remainCpu)
ClusterMemoryResouceRemain.WithLabelValues(clusterId).Set(remainMem)
}

func ReportStorageOperatorMetrics(operator string, started time.Time, failed bool) {
Expand Down
77 changes: 59 additions & 18 deletions bcs-mesos/bcs-scheduler/src/manager/store/zk/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,32 @@ package zk

import (
"context"
"sync"
"time"

"github.com/Tencent/bk-bcs/bcs-common/common/blog"
typesplugin "github.com/Tencent/bk-bcs/bcs-common/common/plugin"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/store"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/pluginManager"
)

// Store Manager
type managerStore struct {
Db store.Dbdrvier

wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc

//plugin manager, ip-resources
pm *pluginManager.PluginManager
clusterId string
}

// Create a store manager by a db driver
func NewManagerStore(dbDriver store.Dbdrvier) store.Store {
func NewManagerStore(dbDriver store.Dbdrvier, pm *pluginManager.PluginManager, clusterId string) store.Store {
s := &managerStore{
Db: dbDriver,
pm: pm,
clusterId: clusterId,
}

return s
Expand All @@ -45,26 +51,31 @@ func (s *managerStore) StopStoreMetrics() {
return
}
s.cancel()

time.Sleep(time.Second)
s.wg.Wait()
//s.wg.Wait()
}

func (s *managerStore) StartStoreObjectMetrics() {
s.ctx, s.cancel = context.WithCancel(context.Background())

for {
time.Sleep(time.Minute)

select {
case <-s.ctx.Done():
blog.Infof("stop scheduler store metrics")
return

default:
s.wg.Add(1)
store.ObjectResourceInfo.Reset()
if cacheMgr==nil {
continue
}
blog.Infof("start produce metrics")
store.ObjectResourceInfo.Reset()
store.TaskgroupInfo.Reset()
store.AgentCpuResourceRemain.Reset()
store.AgentCpuResourceTotal.Reset()
store.AgentMemoryResourceRemain.Reset()
store.AgentMemoryResourceTotal.Reset()
store.AgentIpResourceRemain.Reset()
store.StorageOperatorFailedTotal.Reset()
store.StorageOperatorLatencyMs.Reset()
store.StorageOperatorTotal.Reset()
store.ClusterMemoryResouceRemain.Reset()
store.ClusterCpuResouceRemain.Reset()

// handle service metrics
services, err := s.ListAllServices()
Expand Down Expand Up @@ -120,6 +131,8 @@ func (s *managerStore) StartStoreObjectMetrics() {
store.ReportObjectResourceInfoMetrics(store.ObjectResourceSecret, secret.NameSpace, secret.Name, "")
}

var clusterCpu float64
var clusterMem float64
// handle agents metrics
agents, err := s.ListAllAgents()
if err != nil {
Expand All @@ -132,11 +145,39 @@ func (s *managerStore) StartStoreObjectMetrics() {
continue
}

store.ReportAgentInfoMetrics(info.IP, info.CpuTotal, info.CpuTotal-info.CpuUsed,
info.MemTotal, info.MemTotal-info.MemUsed)
}
var ipValue float64
if s.pm!=nil {
//request netservice to node container ip
para := &typesplugin.HostPluginParameter{
Ips: []string{info.IP},
ClusterId: s.clusterId,
}

outerAttri, err := s.pm.GetHostAttributes(para)
if err != nil {
blog.Errorf("Get host(%s) ip-resources failed: %s", info.IP, err.Error())
continue
}
attr, ok := outerAttri[info.IP]
if !ok {
blog.Errorf("host(%s) don't have ip-resources attributes", info.IP)
continue
}
ipAttr := attr.Attributes[0]
blog.Infof("Host(%s) %s Scalar(%f)", info.IP, ipAttr.Name, ipAttr.Scalar.Value)
ipValue = ipAttr.Scalar.Value
}

//if ip-resources is zero, then ignore it
if s.pm==nil || ipValue>0{
clusterCpu += info.CpuTotal-info.CpuUsed
clusterMem += info.MemTotal-info.MemUsed
}

s.wg.Done()
store.ReportAgentInfoMetrics(info.IP, s.clusterId, info.CpuTotal, info.CpuTotal-info.CpuUsed,
info.MemTotal, info.MemTotal-info.MemUsed, ipValue)
}
store.ReportClusterInfoMetrics(s.clusterId, clusterCpu, clusterMem)
}
}

Expand Down

0 comments on commit f962666

Please sign in to comment.