Skip to content

Commit

Permalink
Add prometheus metrics for antrea controller (antrea-io#325)
Browse files Browse the repository at this point in the history
Add metrics for antrea controller events processing, 
includes syncing time, processed number, and queue length.

Signed-off-by: Weiqiang TANG <weiqiangt@vmware.com>
  • Loading branch information
weiqiangt authored and McCodeman committed Jun 2, 2020
1 parent e0df6b7 commit 0dba3d3
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 5 deletions.
68 changes: 67 additions & 1 deletion pkg/controller/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,45 @@ import (
"github.com/vmware-tanzu/antrea/pkg/util/env"
)

var (
OpsAppliedToGroupProcessed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "antrea_controller_applied_to_group_processed",
Help: "The total number of applied-to-group processed",
})
OpsAddressGroupProcessed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "antrea_controller_address_group_processed",
Help: "The total number of address-group processed ",
})
OpsInternalNetworkPolicyProcessed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "antrea_controller_network_policy_processed",
Help: "The total number of internal-networkpolicy processed",
})
DurationAppliedToGroupSyncing = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "antrea_controller_applied_to_group_sync_duration_milliseconds",
Help: "The duration of syncing applied-to-group",
})
DurationAddressGroupSyncing = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "antrea_controller_address_group_sync_duration_milliseconds",
Help: "The duration of syncing address-group",
})
DurationInternalNetworkPolicySyncing = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "antrea_controller_network_policy_sync_duration_milliseconds",
Help: "The duration of syncing internal-networkpolicy",
})
LengthAppliedToGroupQueue = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "antrea_controller_length_applied_to_group_queue",
Help: "The length of AppliedToGroupQueue",
})
LengthAddressGroupQueue = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "antrea_controller_length_address_group_queue",
Help: "The length of AddressGroupQueue",
})
LengthInternalNetworkPolicyQueue = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "antrea_controller_length_network_policy_queue",
Help: "The length of InternalNetworkPolicyQueue",
})
)

// Initialize Prometheus metrics collection.
func InitializePrometheusMetrics() {
nodeName, err := env.GetNodeName()
Expand All @@ -36,6 +75,33 @@ func InitializePrometheusMetrics() {
})
gaugeHost.Set(1)
if err = prometheus.Register(gaugeHost); err != nil {
klog.Error("Failed to register antrea_controller_runtime_info with Prometheus")
klog.Errorf("Failed to register antrea_controller_runtime_info with Prometheus: %s", err.Error())
}
if err := prometheus.Register(OpsAppliedToGroupProcessed); err != nil {
klog.Errorf("Failed to register antrea_controller_applied_to_group_processed with Prometheus: %s", err.Error())
}
if err := prometheus.Register(OpsAddressGroupProcessed); err != nil {
klog.Errorf("Failed to register antrea_controller_address_group_processed with Prometheus: %s", err.Error())
}
if err := prometheus.Register(OpsInternalNetworkPolicyProcessed); err != nil {
klog.Errorf("Failed to register antrea_controller_network_policy_processed with Prometheus: %s", err.Error())
}
if err := prometheus.Register(DurationAppliedToGroupSyncing); err != nil {
klog.Errorf("Failed to register antrea_controller_applied_to_group_sync_duration_milliseconds with Prometheus: %s", err.Error())
}
if err := prometheus.Register(DurationAddressGroupSyncing); err != nil {
klog.Errorf("Failed to register antrea_controller_address_group_sync_duration_milliseconds with Prometheus: %s", err.Error())
}
if err := prometheus.Register(DurationInternalNetworkPolicySyncing); err != nil {
klog.Errorf("Failed to register antrea_controller_network_policy_sync_duration_milliseconds with Prometheus: %s", err.Error())
}
if err := prometheus.Register(LengthAppliedToGroupQueue); err != nil {
klog.Errorf("Failed to register antrea_controller_length_applied_to_group_queue with Prometheus: %s", err.Error())
}
if err := prometheus.Register(LengthAddressGroupQueue); err != nil {
klog.Errorf("Failed to register antrea_controller_length_address_group_queue with Prometheus: %s", err.Error())
}
if err := prometheus.Register(LengthInternalNetworkPolicyQueue); err != nil {
klog.Errorf("Failed to register antrea_controller_length_network_policy_queue with Prometheus: %s", err.Error())
}
}
24 changes: 20 additions & 4 deletions pkg/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (

"github.com/vmware-tanzu/antrea/pkg/apis/networking"
"github.com/vmware-tanzu/antrea/pkg/apiserver/storage"
"github.com/vmware-tanzu/antrea/pkg/controller/metrics"
"github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy/store"
antreatypes "github.com/vmware-tanzu/antrea/pkg/controller/types"
)
Expand Down Expand Up @@ -838,6 +839,7 @@ func (n *NetworkPolicyController) deleteNamespace(old interface{}) {
func (n *NetworkPolicyController) enqueueAppliedToGroup(key string) {
klog.V(4).Infof("Adding new key %s to AppliedToGroup queue", key)
n.appliedToGroupQueue.Add(key)
metrics.LengthAppliedToGroupQueue.Set(float64(n.appliedToGroupQueue.Len()))
}

// deleteDereferencedAddressGroups deletes the AddressGroup keys which are no
Expand Down Expand Up @@ -892,11 +894,13 @@ func (n *NetworkPolicyController) deleteDereferencedAppliedToGroup(key string) {
func (n *NetworkPolicyController) enqueueAddressGroup(key string) {
klog.V(4).Infof("Adding new key %s to AddressGroup queue", key)
n.addressGroupQueue.Add(key)
metrics.LengthAddressGroupQueue.Set(float64(n.addressGroupQueue.Len()))
}

func (n *NetworkPolicyController) enqueueInternalNetworkPolicy(key string) {
klog.V(4).Infof("Adding new key %s to internal NetworkPolicy queue", key)
n.internalNetworkPolicyQueue.Add(key)
metrics.LengthInternalNetworkPolicyQueue.Set(float64(n.internalNetworkPolicyQueue.Len()))
}

// Run begins watching and syncing of a NetworkPolicyController.
Expand Down Expand Up @@ -925,16 +929,22 @@ func (n *NetworkPolicyController) Run(stopCh <-chan struct{}) {

func (n *NetworkPolicyController) appliedToGroupWorker() {
for n.processNextAppliedToGroupWorkItem() {
metrics.OpsAppliedToGroupProcessed.Inc()
metrics.LengthAppliedToGroupQueue.Set(float64(n.appliedToGroupQueue.Len()))
}
}

func (n *NetworkPolicyController) addressGroupWorker() {
for n.processNextAddressGroupWorkItem() {
metrics.OpsAddressGroupProcessed.Inc()
metrics.LengthAddressGroupQueue.Set(float64(n.addressGroupQueue.Len()))
}
}

func (n *NetworkPolicyController) internalNetworkPolicyWorker() {
for n.processNextInternalNetworkPolicyWorkItem() {
metrics.OpsInternalNetworkPolicyProcessed.Inc()
metrics.LengthInternalNetworkPolicyQueue.Set(float64(n.internalNetworkPolicyQueue.Len()))
}
}

Expand Down Expand Up @@ -1033,7 +1043,9 @@ func (n *NetworkPolicyController) processNextAppliedToGroupWorkItem() bool {
func (n *NetworkPolicyController) syncAddressGroup(key string) error {
startTime := time.Now()
defer func() {
klog.V(2).Infof("Finished syncing AddressGroup %s. (%v)", key, time.Since(startTime))
d := time.Since(startTime)
metrics.DurationAddressGroupSyncing.Observe(float64(d.Milliseconds()))
klog.V(2).Infof("Finished syncing AddressGroup %s. (%v)", key, d)
}()
// Get all internal NetworkPolicy objects that refers this AddressGroup.
nps, err := n.internalNetworkPolicyStore.GetByIndex(store.AddressGroupIndex, key)
Expand Down Expand Up @@ -1079,7 +1091,7 @@ func (n *NetworkPolicyController) syncAddressGroup(key string) error {
podSet := networking.GroupMemberPodSet{}
for _, pod := range pods {
if pod.Status.PodIP == "" {
// No need to insert Pod IPAdddress when it is unset.
// No need to insert Pod IPAddress when it is unset.
continue
}
podSet.Insert(podToMemberPod(pod, true, false))
Expand Down Expand Up @@ -1136,7 +1148,9 @@ func podToMemberPod(pod *v1.Pod, includeIP, includePodRef bool) *networking.Grou
func (n *NetworkPolicyController) syncAppliedToGroup(key string) error {
startTime := time.Now()
defer func() {
klog.V(2).Infof("Finished syncing AppliedToGroup %s. (%v)", key, time.Since(startTime))
d := time.Since(startTime)
metrics.DurationAppliedToGroupSyncing.Observe(float64(d.Milliseconds()))
klog.V(2).Infof("Finished syncing AppliedToGroup %s. (%v)", key, d)
}()
podSetByNode := make(map[string]networking.GroupMemberPodSet)
var pods []*v1.Pod
Expand Down Expand Up @@ -1201,7 +1215,9 @@ func (n *NetworkPolicyController) syncAppliedToGroup(key string) error {
func (n *NetworkPolicyController) syncInternalNetworkPolicy(key string) error {
startTime := time.Now()
defer func() {
klog.V(2).Infof("Finished syncing internal NetworkPolicy %s. (%v)", key, time.Since(startTime))
d := time.Since(startTime)
metrics.DurationInternalNetworkPolicySyncing.Observe(float64(d.Milliseconds()))
klog.V(2).Infof("Finished syncing internal NetworkPolicy %s. (%v)", key, d)
}()
klog.V(2).Infof("Syncing internal NetworkPolicy %s", key)
nodeNames := sets.String{}
Expand Down

0 comments on commit 0dba3d3

Please sign in to comment.