Skip to content

Commit

Permalink
Add prometheus metrics for antrea controller
Browse files Browse the repository at this point in the history
This CL brings metrics of antrea controller events
processing, includes: syncing time, processed number,
and queue length.

Signed-off-by: Weiqiang TANG <weiqiangt@vmware.com>
  • Loading branch information
weiqiangt committed Apr 29, 2020
1 parent ec17ec0 commit 4867626
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 5 deletions.
68 changes: 67 additions & 1 deletion pkg/controller/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,45 @@ import (
"github.com/vmware-tanzu/antrea/pkg/util/env"
)

var (
OpsAppliedToGroupProcessed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "antrea_controller_applied_to_group_processed",
Help: "The total number of applied-to-group processed",
})
OpsAddressGroupProcessed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "antrea_controller_address_group_processed",
Help: "The total number of address-group processed ",
})
OpsInternalNetworkPolicyProcessed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "antrea_controller_network_policy_processed",
Help: "The total number of internal-networkpolicy processed",
})
DurationAppliedToGroupSyncing = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "antrea_controller_applied_to_group_sync_duration_milliseconds",
Help: "The duration of syncing applied-to-group",
})
DurationAddressGroupSyncing = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "antrea_controller_address_group_sync_duration_milliseconds",
Help: "The duration of syncing address-group",
})
DurationInternalNetworkPolicySyncing = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "antrea_controller_network_policy_sync_duration_milliseconds",
Help: "The duration of syncing internal-networkpolicy",
})
LengthAppliedToGroupQueue = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "antrea_controller_length_applied_to_group_queue",
Help: "The length of AppliedToGroupQueue",
})
LengthAddressGroupQueue = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "antrea_controller_length_address_group_queue",
Help: "The length of AddressGroupQueue",
})
LengthInternalNetworkPolicyQueue = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "antrea_controller_length_network_policy_queue",
Help: "The length of InternalNetworkPolicyQueue",
})
)

// Initialize Prometheus metrics collection.
func InitializePrometheusMetrics() {
nodeName, err := env.GetNodeName()
Expand All @@ -36,6 +75,33 @@ func InitializePrometheusMetrics() {
})
gaugeHost.Set(1)
if err = prometheus.Register(gaugeHost); err != nil {
klog.Error("Failed to register antrea_controller_runtime_info with Prometheus")
klog.Errorf("Failed to register antrea_controller_runtime_info with Prometheus: %s", err.Error())
}
if err := prometheus.Register(OpsAppliedToGroupProcessed); err != nil {
klog.Errorf("Failed to register antrea_controller_applied_to_group_processed with Prometheus: %s", err.Error())
}
if err := prometheus.Register(OpsAddressGroupProcessed); err != nil {
klog.Errorf("Failed to register antrea_controller_address_group_processed with Prometheus: %s", err.Error())
}
if err := prometheus.Register(OpsInternalNetworkPolicyProcessed); err != nil {
klog.Errorf("Failed to register antrea_controller_network_policy_processed with Prometheus: %s", err.Error())
}
if err := prometheus.Register(DurationAppliedToGroupSyncing); err != nil {
klog.Errorf("Failed to register antrea_controller_applied_to_group_sync_duration_milliseconds with Prometheus: %s", err.Error())
}
if err := prometheus.Register(DurationAddressGroupSyncing); err != nil {
klog.Errorf("Failed to register antrea_controller_address_group_sync_duration_milliseconds with Prometheus: %s", err.Error())
}
if err := prometheus.Register(DurationInternalNetworkPolicySyncing); err != nil {
klog.Errorf("Failed to register antrea_controller_network_policy_sync_duration_milliseconds with Prometheus: %s", err.Error())
}
if err := prometheus.Register(LengthAppliedToGroupQueue); err != nil {
klog.Errorf("Failed to register antrea_controller_length_applied_to_group_queue with Prometheus: %s", err.Error())
}
if err := prometheus.Register(LengthAddressGroupQueue); err != nil {
klog.Errorf("Failed to register antrea_controller_length_address_group_queue with Prometheus: %s", err.Error())
}
if err := prometheus.Register(LengthInternalNetworkPolicyQueue); err != nil {
klog.Errorf("Failed to register antrea_controller_length_network_policy_queue with Prometheus: %s", err.Error())
}
}
24 changes: 20 additions & 4 deletions pkg/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (

"github.com/vmware-tanzu/antrea/pkg/apis/networking"
"github.com/vmware-tanzu/antrea/pkg/apiserver/storage"
"github.com/vmware-tanzu/antrea/pkg/controller/metrics"
"github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy/store"
antreatypes "github.com/vmware-tanzu/antrea/pkg/controller/types"
)
Expand Down Expand Up @@ -838,6 +839,7 @@ func (n *NetworkPolicyController) deleteNamespace(old interface{}) {
func (n *NetworkPolicyController) enqueueAppliedToGroup(key string) {
klog.V(4).Infof("Adding new key %s to AppliedToGroup queue", key)
n.appliedToGroupQueue.Add(key)
metrics.LengthAppliedToGroupQueue.Set(float64(n.appliedToGroupQueue.Len()))
}

// deleteDereferencedAddressGroups deletes the AddressGroup keys which are no
Expand Down Expand Up @@ -892,11 +894,13 @@ func (n *NetworkPolicyController) deleteDereferencedAppliedToGroup(key string) {
func (n *NetworkPolicyController) enqueueAddressGroup(key string) {
klog.V(4).Infof("Adding new key %s to AddressGroup queue", key)
n.addressGroupQueue.Add(key)
metrics.LengthAddressGroupQueue.Set(float64(n.addressGroupQueue.Len()))
}

func (n *NetworkPolicyController) enqueueInternalNetworkPolicy(key string) {
klog.V(4).Infof("Adding new key %s to internal NetworkPolicy queue", key)
n.internalNetworkPolicyQueue.Add(key)
metrics.LengthInternalNetworkPolicyQueue.Set(float64(n.internalNetworkPolicyQueue.Len()))
}

// Run begins watching and syncing of a NetworkPolicyController.
Expand Down Expand Up @@ -925,16 +929,22 @@ func (n *NetworkPolicyController) Run(stopCh <-chan struct{}) {

func (n *NetworkPolicyController) appliedToGroupWorker() {
for n.processNextAppliedToGroupWorkItem() {
metrics.OpsAppliedToGroupProcessed.Inc()
metrics.LengthAppliedToGroupQueue.Set(float64(n.appliedToGroupQueue.Len()))
}
}

func (n *NetworkPolicyController) addressGroupWorker() {
for n.processNextAddressGroupWorkItem() {
metrics.OpsAddressGroupProcessed.Inc()
metrics.LengthAddressGroupQueue.Set(float64(n.addressGroupQueue.Len()))
}
}

func (n *NetworkPolicyController) internalNetworkPolicyWorker() {
for n.processNextInternalNetworkPolicyWorkItem() {
metrics.OpsInternalNetworkPolicyProcessed.Inc()
metrics.LengthInternalNetworkPolicyQueue.Set(float64(n.internalNetworkPolicyQueue.Len()))
}
}

Expand Down Expand Up @@ -1033,7 +1043,9 @@ func (n *NetworkPolicyController) processNextAppliedToGroupWorkItem() bool {
func (n *NetworkPolicyController) syncAddressGroup(key string) error {
startTime := time.Now()
defer func() {
klog.V(2).Infof("Finished syncing AddressGroup %s. (%v)", key, time.Since(startTime))
d := time.Since(startTime)
metrics.DurationAddressGroupSyncing.Observe(float64(d.Milliseconds()))
klog.V(2).Infof("Finished syncing AddressGroup %s. (%v)", key, d)
}()
// Get all internal NetworkPolicy objects that refers this AddressGroup.
nps, err := n.internalNetworkPolicyStore.GetByIndex(store.AddressGroupIndex, key)
Expand Down Expand Up @@ -1079,7 +1091,7 @@ func (n *NetworkPolicyController) syncAddressGroup(key string) error {
podSet := networking.GroupMemberPodSet{}
for _, pod := range pods {
if pod.Status.PodIP == "" {
// No need to insert Pod IPAdddress when it is unset.
// No need to insert Pod IPAddress when it is unset.
continue
}
podSet.Insert(podToMemberPod(pod, true, false))
Expand Down Expand Up @@ -1136,7 +1148,9 @@ func podToMemberPod(pod *v1.Pod, includeIP, includePodRef bool) *networking.Grou
func (n *NetworkPolicyController) syncAppliedToGroup(key string) error {
startTime := time.Now()
defer func() {
klog.V(2).Infof("Finished syncing AppliedToGroup %s. (%v)", key, time.Since(startTime))
d := time.Since(startTime)
metrics.DurationAppliedToGroupSyncing.Observe(float64(d.Milliseconds()))
klog.V(2).Infof("Finished syncing AppliedToGroup %s. (%v)", key, d)
}()
podSetByNode := make(map[string]networking.GroupMemberPodSet)
var pods []*v1.Pod
Expand Down Expand Up @@ -1201,7 +1215,9 @@ func (n *NetworkPolicyController) syncAppliedToGroup(key string) error {
func (n *NetworkPolicyController) syncInternalNetworkPolicy(key string) error {
startTime := time.Now()
defer func() {
klog.V(2).Infof("Finished syncing internal NetworkPolicy %s. (%v)", key, time.Since(startTime))
d := time.Since(startTime)
metrics.DurationInternalNetworkPolicySyncing.Observe(float64(d.Milliseconds()))
klog.V(2).Infof("Finished syncing internal NetworkPolicy %s. (%v)", key, d)
}()
klog.V(2).Infof("Syncing internal NetworkPolicy %s", key)
nodeNames := sets.String{}
Expand Down

0 comments on commit 4867626

Please sign in to comment.