Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize metrics collector #1935

Merged
merged 2 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ var (
WatchNamespace string
LeaderElection LeaderElectionConfiguration
MetricsExportInterval time.Duration
NegMetricsExportInterval time.Duration

// Feature flags should be named Enablexxx.
EnableASMConfigMapBasedConfig bool
Expand Down Expand Up @@ -257,6 +258,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
flag.BoolVar(&F.EnableMultipleIGs, "enable-multiple-igs", false, "Enable using multiple unmanaged instance groups")
flag.IntVar(&F.MaxIGSize, "max-ig-size", 1000, "Max number of instances in Instance Group")
flag.DurationVar(&F.MetricsExportInterval, "metrics-export-interval", 10*time.Minute, `Period for calculating and exporting metrics related to state of managed objects.`)
flag.DurationVar(&F.NegMetricsExportInterval, "neg-metrics-export-interval", 5*time.Second, `Period for calculating and exporting internal neg controller metrics, not usage.`)
}

type RateLimitSpecs struct {
Expand Down
32 changes: 20 additions & 12 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ import (
"k8s.io/ingress-gce/pkg/annotations"
svcnegv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1"
"k8s.io/ingress-gce/pkg/controller/translator"
usage "k8s.io/ingress-gce/pkg/metrics"
"k8s.io/ingress-gce/pkg/flags"
usageMetrics "k8s.io/ingress-gce/pkg/metrics"
"k8s.io/ingress-gce/pkg/neg/metrics"
syncMetrics "k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/readiness"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
Expand Down Expand Up @@ -99,8 +101,11 @@ type Controller struct {
// reflector handles NEG readiness gate and conditions for pods in NEG.
reflector readiness.Reflector

// collector collects NEG usage metrics
collector usage.NegMetricsCollector
// usageCollector collects NEG usage metrics
usageCollector usageMetrics.NegMetricsCollector

// syncerMetrics collects NEG controller metrics
syncerMetrics *syncMetrics.SyncerMetrics

// runL4 indicates whether to run NEG controller that processes L4 ILB services
runL4 bool
Expand All @@ -123,7 +128,7 @@ func NewController(
destinationRuleInformer cache.SharedIndexInformer,
svcNegInformer cache.SharedIndexInformer,
hasSynced func() bool,
controllerMetrics *usage.ControllerMetrics,
controllerMetrics *usageMetrics.ControllerMetrics,
l4Namer namer2.L4ResourcesNamer,
defaultBackendService utils.ServicePort,
cloud negtypes.NetworkEndpointGroupCloud,
Expand Down Expand Up @@ -168,6 +173,7 @@ func NewController(
endpointIndexer = endpointInformer.GetIndexer()
}

syncerMetrics := metrics.NewNegMetricsCollector(flags.F.NegMetricsExportInterval, logger)
manager := newSyncerManager(
namer,
recorder,
Expand All @@ -181,6 +187,7 @@ func NewController(
endpointSliceIndexer,
nodeInformer.GetIndexer(),
svcNegInformer.GetIndexer(),
syncerMetrics,
enableNonGcpMode,
enableEndpointSlices,
logger)
Expand Down Expand Up @@ -217,7 +224,8 @@ func NewController(
nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
collector: controllerMetrics,
usageCollector: controllerMetrics,
syncerMetrics: syncerMetrics,
runL4: runL4Controller,
logger: logger,
}
Expand Down Expand Up @@ -462,15 +470,15 @@ func (c *Controller) processService(key string) error {
return err
}
if !exists {
c.collector.DeleteNegService(key)
c.usageCollector.DeleteNegService(key)
c.manager.StopSyncer(namespace, name)
return nil
}
service := obj.(*apiv1.Service)
if service == nil {
return fmt.Errorf("cannot convert to Service (%T)", obj)
}
negUsage := usage.NegServiceState{}
negUsage := usageMetrics.NegServiceState{}
svcPortInfoMap := make(negtypes.PortInfoMap)
if err := c.mergeDefaultBackendServicePortInfoMap(key, service, svcPortInfoMap); err != nil {
return err
Expand Down Expand Up @@ -510,12 +518,12 @@ func (c *Controller) processService(key string) error {
return fmt.Errorf("failed to merge service ports referenced by Istio:DestinationRule (%v): %w", destinationRulesPortInfoMap, err)
}
negUsage.SuccessfulNeg, negUsage.ErrorNeg, err = c.manager.EnsureSyncers(namespace, name, svcPortInfoMap)
c.collector.SetNegService(key, negUsage)
c.usageCollector.SetNegService(key, negUsage)
return err
}
// do not need Neg
c.logger.V(3).Info("Service does not need any NEG. Skipping", "service", key)
c.collector.DeleteNegService(key)
c.usageCollector.DeleteNegService(key)
// neg annotation is not found or NEG is not enabled
c.manager.StopSyncer(namespace, name)

Expand Down Expand Up @@ -547,7 +555,7 @@ func (c *Controller) mergeIngressPortInfo(service *apiv1.Service, name types.Nam
}

// mergeStandaloneNEGsPortInfo merge Standalone NEG PortInfo into portInfoMap
func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name types.NamespacedName, portInfoMap negtypes.PortInfoMap, negUsage *usage.NegServiceState) error {
func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name types.NamespacedName, portInfoMap negtypes.PortInfoMap, negUsage *usageMetrics.NegServiceState) error {
negAnnotation, foundNEGAnnotation, err := annotations.FromService(service).NEGAnnotation()
if err != nil {
return err
Expand Down Expand Up @@ -587,7 +595,7 @@ func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name ty
}

// mergeVmIpNEGsPortInfo merges the PortInfo for ILB services using GCE_VM_IP NEGs into portInfoMap
func (c *Controller) mergeVmIpNEGsPortInfo(service *apiv1.Service, name types.NamespacedName, portInfoMap negtypes.PortInfoMap, negUsage *usage.NegServiceState) error {
func (c *Controller) mergeVmIpNEGsPortInfo(service *apiv1.Service, name types.NamespacedName, portInfoMap negtypes.PortInfoMap, negUsage *usageMetrics.NegServiceState) error {
if wantsILB, _ := annotations.WantsL4ILB(service); !wantsILB {
return nil
}
Expand All @@ -601,7 +609,7 @@ func (c *Controller) mergeVmIpNEGsPortInfo(service *apiv1.Service, name types.Na

onlyLocal := helpers.RequestsOnlyLocalTraffic(service)
// Update usage metrics.
negUsage.VmIpNeg = usage.NewVmIpNegType(onlyLocal)
negUsage.VmIpNeg = usageMetrics.NewVmIpNegType(onlyLocal)

return portInfoMap.Merge(negtypes.NewPortInfoMapForVMIPNEG(name.Namespace, name.Name, c.l4Namer, onlyLocal))
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type syncerManager struct {
// syncerMap stores the NEG syncer
// key consists of service namespace, name and targetPort. Value is the corresponding syncer.
syncerMap map[negtypes.NegSyncerKey]negtypes.NegSyncer
// syncCollector collect sync related metrics
syncerMetrics *metrics.SyncerMetrics

// reflector handles NEG readiness gate and conditions for pods in NEG.
reflector readiness.Reflector
//svcNegClient handles lifecycle operations for NEG CRs
Expand Down Expand Up @@ -114,6 +117,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
endpointSliceLister cache.Indexer,
nodeLister cache.Indexer,
svcNegLister cache.Indexer,
syncerMetrics *metrics.SyncerMetrics,
enableNonGcpMode bool,
enableEndpointSlices bool,
logger klog.Logger) *syncerManager {
Expand All @@ -135,6 +139,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
svcNegLister: svcNegLister,
svcPortMap: make(map[serviceKey]negtypes.PortInfoMap),
syncerMap: make(map[negtypes.NegSyncerKey]negtypes.NegSyncer),
syncerMetrics: syncerMetrics,
svcNegClient: svcNegClient,
kubeSystemUID: kubeSystemUID,
enableNonGcpMode: enableNonGcpMode,
Expand Down Expand Up @@ -227,6 +232,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
epc,
string(manager.kubeSystemUID),
manager.svcNegClient,
manager.syncerMetrics,
!manager.namer.IsNEG(portInfo.NegName),
manager.enableEndpointSlices,
manager.logger,
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1"
"k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/types"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
Expand Down Expand Up @@ -91,6 +92,7 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) (*syncerManager, *gce
testContext.EndpointSliceInformer.GetIndexer(),
testContext.NodeInformer.GetIndexer(),
testContext.SvcNegInformer.GetIndexer(),
metrics.FakeSyncerMetrics(),
false, //enableNonGcpMode
false, //enableEndpointSlices
klog.TODO(),
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ func RegisterMetrics() {
prometheus.MustRegister(SyncerSyncLatency)
prometheus.MustRegister(LastSyncTimestamp)
prometheus.MustRegister(InitializationLatency)

RegisterSyncerMetrics()
})
}

Expand Down
68 changes: 68 additions & 0 deletions pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"sync"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)

type SyncerMetricsCollector interface {
}

type SyncerMetrics struct {
// mu avoid race conditions and ensure correctness of metrics
mu sync.Mutex
// duration between metrics exports
metricsInterval time.Duration
// logger logs message related to NegMetricsCollector
logger klog.Logger
}

// NewNEGMetricsCollector initializes SyncerMetrics and starts a go routine to compute and export metrics periodically.
func NewNegMetricsCollector(exportInterval time.Duration, logger klog.Logger) *SyncerMetrics {
return &SyncerMetrics{
metricsInterval: exportInterval,
logger: logger.WithName("NegMetricsCollector"),
}
}

// FakeSyncerMetrics creates new NegMetricsCollector with fixed 5 second metricsInterval, to be used in tests
func FakeSyncerMetrics() *SyncerMetrics {
return NewNegMetricsCollector(5*time.Second, klog.TODO())
}

// RegisterSyncerMetrics registers syncer related metrics
func RegisterSyncerMetrics() {
}

func (sm *SyncerMetrics) Run(stopCh <-chan struct{}) {
sm.logger.V(3).Info("Syncer Metrics initialized.", "exportInterval", sm.metricsInterval)
// Compute and export metrics periodically.
go func() {
time.Sleep(sm.metricsInterval)
wait.Until(sm.export, sm.metricsInterval, stopCh)
}()
<-stopCh
}

// export exports syncer metrics.
func (sm *SyncerMetrics) export() {
}
5 changes: 5 additions & 0 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type transactionSyncer struct {
// 4. Endpoint count from EPS or calculated endpoint list is 0
// Need to grab syncLock first for any reads or writes based on this value
inError bool

// syncCollector collect sync related metrics
syncCollector metrics.SyncerMetricsCollector
}

func NewTransactionSyncer(
Expand All @@ -121,6 +124,7 @@ func NewTransactionSyncer(
epc negtypes.NetworkEndpointsCalculator,
kubeSystemUID string,
svcNegClient svcnegclient.Interface,
syncerMetrics *metrics.SyncerMetrics,
customName bool,
enableEndpointSlices bool,
log klog.Logger) negtypes.NegSyncer {
Expand All @@ -145,6 +149,7 @@ func NewTransactionSyncer(
reflector: reflector,
kubeSystemUID: kubeSystemUID,
svcNegClient: svcNegClient,
syncCollector: syncerMetrics,
customName: customName,
enableEndpointSlices: enableEndpointSlices,
inError: false,
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/client-go/tools/record"
negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/readiness"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
Expand Down Expand Up @@ -2396,6 +2397,7 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp
svcPort, mode, klog.TODO()),
string(kubeSystemUID),
testContext.SvcNegClient,
metrics.FakeSyncerMetrics(),
customName,
enableEndpointSlices,
klog.TODO(),
Expand Down