Skip to content

Commit

Permalink
Add prometheus metrics for antrea controller
Browse files Browse the repository at this point in the history
This CL brings metrics of antrea controller events
processing, includes: syncing time, processed number,
and queue length.

Signed-off-by: Weiqiang TANG <weiqiangt@vmware.com>
  • Loading branch information
weiqiangt committed Jan 17, 2020
1 parent 36e5e35 commit a2ca44a
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 5 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/j-keck/arping v1.0.0
github.com/json-iterator/go v1.1.6 // indirect
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
github.com/satori/go.uuid v1.2.0
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.3
Expand Down
73 changes: 73 additions & 0 deletions pkg/controller/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2020 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package metrics provides metrics declaration for the antrea controller.

package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
OpsAppliedToGroupProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "applied_to_group_processed",
Help: "The total number of applied-to-group processed",
})
OpsAddressGroupProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "address_group_processed",
Help: "The total number of address-group processed ",
})
OpsInternalNetworkPolicyProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "network_policy_processed",
Help: "The total number of internal-networkpolicy processed",
})
DurationAppliedToGroupProcessing = promauto.NewSummary(prometheus.SummaryOpts{
Name: "applied_to_group_process_duration_milliseconds",
Help: "The duration of processing applied-to groups",
})
DurationAddressGroupProcessing = promauto.NewSummary(prometheus.SummaryOpts{
Name: "address_group_process_duration_milliseconds",
Help: "The duration of processing address-group",
})
DurationInternalNetworkPolicyProcessing = promauto.NewSummary(prometheus.SummaryOpts{
Name: "network_policy_process_duration_milliseconds",
Help: "The duration of processing internal-networkpolicy",
})
DurationAppliedToGroupSyncing = promauto.NewSummary(prometheus.SummaryOpts{
Name: "applied_to_group_sync_duration_milliseconds",
Help: "The duration of syncing applied-to-group",
})
DurationAddressGroupSyncing = promauto.NewSummary(prometheus.SummaryOpts{
Name: "address_group_sync_duration_milliseconds",
Help: "The duration of syncing address-group",
})
DurationInternalNetworkPolicySyncing = promauto.NewSummary(prometheus.SummaryOpts{
Name: "network_policy_sync_duration_milliseconds",
Help: "The duration of syncing internal-networkpolicy",
})
LengthAppliedToGroupQueue = promauto.NewGauge(prometheus.GaugeOpts{
Name: "length_applied_to_group_queue",
Help: "The length of AppliedToGroupQueue",
})
LengthAddressGroupQueue = promauto.NewGauge(prometheus.GaugeOpts{
Name: "length_address_group_queue",
Help: "The length of AddressGroupQueue",
})
LengthInternalNetworkPolicyQueue = promauto.NewGauge(prometheus.GaugeOpts{
Name: "length_network_policy_queue",
Help: "The length of InternalNetworkPolicyQueue",
})
)
38 changes: 33 additions & 5 deletions pkg/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (

"github.com/vmware-tanzu/antrea/pkg/apis/networking"
"github.com/vmware-tanzu/antrea/pkg/apiserver/storage"
"github.com/vmware-tanzu/antrea/pkg/controller/metrics"
"github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy/store"
antreatypes "github.com/vmware-tanzu/antrea/pkg/controller/types"
)
Expand Down Expand Up @@ -810,6 +811,7 @@ func (n *NetworkPolicyController) deleteNamespace(old interface{}) {
func (n *NetworkPolicyController) enqueueAppliedToGroup(key string) {
klog.V(4).Infof("Adding new key %s to AppliedToGroup queue", key)
n.appliedToGroupQueue.Add(key)
metrics.LengthAppliedToGroupQueue.Set(float64(n.appliedToGroupQueue.Len()))
}

// deleteDereferencedAddressGroups deletes the AddressGroup keys which are no
Expand Down Expand Up @@ -864,11 +866,13 @@ func (n *NetworkPolicyController) deleteDereferencedAppliedToGroup(key string) {
func (n *NetworkPolicyController) enqueueAddressGroup(key string) {
klog.V(4).Infof("Adding new key %s to AddressGroup queue", key)
n.addressGroupQueue.Add(key)
metrics.LengthAddressGroupQueue.Set(float64(n.addressGroupQueue.Len()))
}

func (n *NetworkPolicyController) enqueueInternalNetworkPolicy(key string) {
klog.V(4).Infof("Adding new key %s to internal NetworkPolicy queue", key)
n.internalNetworkPolicyQueue.Add(key)
metrics.LengthInternalNetworkPolicyQueue.Set(float64(n.internalNetworkPolicyQueue.Len()))
}

// Run begins watching and syncing of a NetworkPolicyController.
Expand Down Expand Up @@ -897,16 +901,22 @@ func (n *NetworkPolicyController) Run(stopCh <-chan struct{}) {

func (n *NetworkPolicyController) appliedToGroupWorker() {
for n.processNextAppliedToGroupWorkItem() {
metrics.OpsAppliedToGroupProcessed.Inc()
metrics.LengthAppliedToGroupQueue.Set(float64(n.appliedToGroupQueue.Len()))
}
}

func (n *NetworkPolicyController) addressGroupWorker() {
for n.processNextAddressGroupWorkItem() {
metrics.OpsAddressGroupProcessed.Inc()
metrics.LengthAddressGroupQueue.Set(float64(n.addressGroupQueue.Len()))
}
}

func (n *NetworkPolicyController) internalNetworkPolicyWorker() {
for n.processNextInternalNetworkPolicyWorkItem() {
metrics.OpsInternalNetworkPolicyProcessed.Inc()
metrics.LengthInternalNetworkPolicyQueue.Set(float64(n.internalNetworkPolicyQueue.Len()))
}
}

Expand All @@ -919,6 +929,10 @@ func (n *NetworkPolicyController) internalNetworkPolicyWorker() {
// return false if and only if the work queue was shutdown (no more items will
// be processed).
func (n *NetworkPolicyController) processNextInternalNetworkPolicyWorkItem() bool {
start := time.Now()
defer func() {
metrics.DurationInternalNetworkPolicyProcessing.Observe(float64(time.Since(start).Milliseconds()))
}()
key, quit := n.internalNetworkPolicyQueue.Get()
if quit {
return false
Expand Down Expand Up @@ -950,6 +964,10 @@ func (n *NetworkPolicyController) processNextInternalNetworkPolicyWorkItem() boo
// of a new change. This function return false if and only if the work queue
// was shutdown (no more items will be processed).
func (n *NetworkPolicyController) processNextAddressGroupWorkItem() bool {
start := time.Now()
defer func() {
metrics.DurationAddressGroupProcessing.Observe(float64(time.Since(start).Milliseconds()))
}()
key, quit := n.addressGroupQueue.Get()
if quit {
return false
Expand Down Expand Up @@ -977,6 +995,10 @@ func (n *NetworkPolicyController) processNextAddressGroupWorkItem() bool {
// queue until we get notify of a new change. This function return false if
// and only if the work queue was shutdown (no more items will be processed).
func (n *NetworkPolicyController) processNextAppliedToGroupWorkItem() bool {
start := time.Now()
defer func() {
metrics.DurationAppliedToGroupProcessing.Observe(float64(time.Since(start).Milliseconds()))
}()
key, quit := n.appliedToGroupQueue.Get()
if quit {
return false
Expand All @@ -1002,7 +1024,9 @@ func (n *NetworkPolicyController) processNextAppliedToGroupWorkItem() bool {
func (n *NetworkPolicyController) syncAddressGroup(key string) error {
startTime := time.Now()
defer func() {
klog.V(2).Infof("Finished syncing AddressGroup %s. (%v)", key, time.Since(startTime))
d := time.Since(startTime)
metrics.DurationAddressGroupSyncing.Observe(float64(d.Milliseconds()))
klog.V(2).Infof("Finished syncing AddressGroup %s. (%v)", key, d)
}()
// Get all internal NetworkPolicy objects that refers this AddressGroup.
nps, err := n.internalNetworkPolicyStore.GetByIndex(store.AddressGroupIndex, key)
Expand Down Expand Up @@ -1053,7 +1077,7 @@ func (n *NetworkPolicyController) syncAddressGroup(key string) error {
addresses := sets.String{}
for _, pod := range pods {
if pod.Status.PodIP == "" {
// No need to insert Pod IPAdddress when it is unset.
// No need to insert Pod IPAddress when it is unset.
continue
}
addresses.Insert(pod.Status.PodIP)
Expand All @@ -1078,7 +1102,9 @@ func (n *NetworkPolicyController) syncAddressGroup(key string) error {
func (n *NetworkPolicyController) syncAppliedToGroup(key string) error {
startTime := time.Now()
defer func() {
klog.V(2).Infof("Finished syncing AppliedToGroup %s. (%v)", key, time.Since(startTime))
d := time.Since(startTime)
metrics.DurationAppliedToGroupSyncing.Observe(float64(d.Milliseconds()))
klog.V(2).Infof("Finished syncing AppliedToGroup %s. (%v)", key, d)
}()
podsByNodes := make(map[string]antreatypes.PodSet)
var pods []*v1.Pod
Expand Down Expand Up @@ -1149,7 +1175,9 @@ func (n *NetworkPolicyController) syncAppliedToGroup(key string) error {
func (n *NetworkPolicyController) syncInternalNetworkPolicy(key string) error {
startTime := time.Now()
defer func() {
klog.V(2).Infof("Finished syncing internal NetworkPolicy %s. (%v)", key, time.Since(startTime))
d := time.Since(startTime)
metrics.DurationInternalNetworkPolicySyncing.Observe(float64(d.Milliseconds()))
klog.V(2).Infof("Finished syncing internal NetworkPolicy %s. (%v)", key, d)
}()
klog.V(2).Infof("Syncing internal NetworkPolicy %s", key)
nodeNames := sets.String{}
Expand All @@ -1161,7 +1189,7 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key string) error {
if !found {
// Make sure to unlock the store before returning.
n.internalNetworkPolicyMutex.Unlock()
return fmt.Errorf("Internal NetworkPolicy %s not found: %v", key, err)
return fmt.Errorf("internal NetworkPolicy %s not found: %w", key, err)
}
internalNP := internalNPObj.(*antreatypes.NetworkPolicy)
// Maintain a copy of old SpanMeta Nodenames so we can later enqueue Groups
Expand Down

0 comments on commit a2ca44a

Please sign in to comment.