Skip to content

Commit

Permalink
Add gateway monitor metrics and event (#2345)
Browse files Browse the repository at this point in the history
* add gateway monitor metrices and event

* add e2e

* refactor

* replace gaube to counter

* fix e2e1

* fix

* check is gateway node

* fix dual mode test fail

* add nf_tables and legacy_tables check
  • Loading branch information
changluyi authored Feb 22, 2023
1 parent c061ae1 commit ee53dfe
Show file tree
Hide file tree
Showing 13 changed files with 380 additions and 10 deletions.
20 changes: 19 additions & 1 deletion pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ func (c *Controller) enqueueUpdateSubnet(old, new interface{}) {
!reflect.DeepEqual(oldSubnet.Spec.Acls, newSubnet.Spec.Acls) ||
oldSubnet.Spec.U2OInterconnection != newSubnet.Spec.U2OInterconnection {
klog.V(3).Infof("enqueue update subnet %s", key)

if oldSubnet.Spec.GatewayType != newSubnet.Spec.GatewayType {
c.recorder.Eventf(newSubnet, v1.EventTypeNormal, "SubnetGatewayTypeChanged",
"subnet gateway type changes from %s to %s ", oldSubnet.Spec.GatewayType, newSubnet.Spec.GatewayType)
}

if oldSubnet.Spec.GatewayNode != newSubnet.Spec.GatewayNode {
c.recorder.Eventf(newSubnet, v1.EventTypeNormal, "SubnetGatewayNodeChanged",
"gateway node changes from %s to %s ", oldSubnet.Spec.GatewayNode, newSubnet.Spec.GatewayNode)
}

c.addOrUpdateSubnetQueue.Add(key)
}
}
Expand Down Expand Up @@ -470,6 +481,7 @@ func (c Controller) patchSubnetStatus(subnet *kubeovnv1.Subnet, reason string, e
c.recorder.Eventf(subnet, v1.EventTypeWarning, reason, errStr)
} else {
subnet.Status.Validated(reason, "")
c.recorder.Eventf(subnet, v1.EventTypeNormal, reason, errStr)
if reason == "SetPrivateLogicalSwitchSuccess" || reason == "ResetLogicalSwitchAclSuccess" || reason == "ReconcileCentralizedGatewaySuccess" {
subnet.Status.Ready(reason, "")
}
Expand Down Expand Up @@ -1143,6 +1155,8 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
}
subnet.Spec.GatewayNode = ""
subnet.Status.ActivateGateway = ""
c.recorder.Eventf(subnet, v1.EventTypeNormal, "ChangeToCentralizedGw", "")

bytes, err := subnet.Status.Bytes()
if err != nil {
return err
Expand Down Expand Up @@ -1278,8 +1292,11 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
} else {
// centralized subnet
if subnet.Spec.GatewayNode == "" {
klog.Errorf("subnet %s Spec.GatewayNode field must be specified for centralized gateway type", subnet.Name)
errMsg := fmt.Sprintf("subnet %s Spec.GatewayNode field must be specified for centralized gateway type", subnet.Name)
klog.Errorf(errMsg)
reason := "NoReadyGateway"
subnet.Status.NotReady("NoReadyGateway", "")
c.recorder.Eventf(subnet, v1.EventTypeWarning, reason, errMsg)
bytes, err := subnet.Status.Bytes()
if err != nil {
return err
Expand Down Expand Up @@ -1428,6 +1445,7 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
}
if newActivateNode == "" {
klog.Warningf("all subnet %s gws are not ready", subnet.Name)
c.recorder.Eventf(subnet, v1.EventTypeWarning, "NoActiveGatewayFound", fmt.Sprintf("subnet %s gws are not ready", subnet.Name))
subnet.Status.ActivateGateway = newActivateNode
bytes, err := subnet.Status.Bytes()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
go wait.Until(c.runPodWorker, time.Second, stopCh)
go wait.Until(c.runGateway, 3*time.Second, stopCh)
go wait.Until(c.loopEncapIpCheck, 3*time.Second, stopCh)
go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
go wait.Until(func() {
if err := c.markAndCleanInternalPort(); err != nil {
klog.Errorf("gc ovs port error: %v", err)
Expand Down
10 changes: 8 additions & 2 deletions pkg/daemon/controller_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ import (

// ControllerRuntime represents runtime specific controller members
type ControllerRuntime struct {
iptables map[string]*iptables.IPTables
ipsets map[string]*ipsets.IPSets
iptables map[string]*iptables.IPTables
ipsets map[string]*ipsets.IPSets
gwCounters map[string]*util.GwIPtableCounters
}

func (c *Controller) initRuntime() error {
c.ControllerRuntime.iptables = make(map[string]*iptables.IPTables)
c.ControllerRuntime.ipsets = make(map[string]*ipsets.IPSets)
c.ControllerRuntime.gwCounters = make(map[string]*util.GwIPtableCounters)

if c.protocol == kubeovnv1.ProtocolIPv4 || c.protocol == kubeovnv1.ProtocolDual {
iptables, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
Expand Down Expand Up @@ -531,6 +533,10 @@ func (c *Controller) loopEncapIpCheck() {
}
}

func (c *Controller) ovnMetricsUpdate() {
c.setOvnSubnetGatewayMetric()
}

func (c *Controller) operateMod() {
modules, ok := os.LookupEnv(util.KoENV)
if !ok || modules == "" {
Expand Down
4 changes: 4 additions & 0 deletions pkg/daemon/controller_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,5 +232,9 @@ func rotateLog() {
// TODO
}

func (c *Controller) ovnMetricsUpdate() {
// TODO
}

func (c *Controller) operateMod() {
}
8 changes: 5 additions & 3 deletions pkg/daemon/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,24 +138,26 @@ func (c *Controller) getServicesCIDR(protocol string) []string {
return ret
}

func (c *Controller) getDefaultVpcSubnetsCIDR(protocol string) ([]string, error) {
func (c *Controller) getDefaultVpcSubnetsCIDR(protocol string) ([]string, map[string]string, error) {
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Error("failed to list subnets")
return nil, err
return nil, nil, err
}

ret := make([]string, 0, len(subnets)+1)
subnetMap := make(map[string]string, len(subnets)+1)
if c.config.NodeLocalDnsIP != "" && net.ParseIP(c.config.NodeLocalDnsIP) != nil && util.CheckProtocol(c.config.NodeLocalDnsIP) == protocol {
ret = append(ret, c.config.NodeLocalDnsIP)
}
for _, subnet := range subnets {
if subnet.Spec.Vpc == util.DefaultVpc && (subnet.Spec.Vlan == "" || subnet.Spec.LogicalGateway) && subnet.Spec.CIDRBlock != "" {
cidrBlock := getCidrByProtocol(subnet.Spec.CIDRBlock, protocol)
ret = append(ret, cidrBlock)
subnetMap[subnet.Name] = cidrBlock
}
}
return ret, nil
return ret, subnetMap, nil
}

func (c *Controller) getOtherNodes(protocol string) ([]string, error) {
Expand Down
145 changes: 144 additions & 1 deletion pkg/daemon/gateway_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *Controller) setIPSet() error {
continue
}
services := c.getServicesCIDR(protocol)
subnets, err := c.getDefaultVpcSubnetsCIDR(protocol)
subnets, _, err := c.getDefaultVpcSubnetsCIDR(protocol)
if err != nil {
klog.Errorf("get subnets failed, %+v", err)
return err
Expand Down Expand Up @@ -571,6 +571,48 @@ func (c *Controller) setIptables() error {
}
}

var ovnSubnetGatewayCountRules []util.IPTableRule
_, subnetCidrs, err := c.getDefaultVpcSubnetsCIDR(protocol)
if err != nil {
klog.Errorf("get subnets failed, %+v", err)
return err
}

for name, subnetCidr := range subnetCidrs {
ovnSubnetGatewayCountRules = []util.IPTableRule{
{Table: "filter", Chain: "FORWARD", Rule: strings.Fields(fmt.Sprintf(`-m comment --comment %s,%s -s %s`, util.OvnSubnetGatewayIptables, name, subnetCidr))},
{Table: "filter", Chain: "FORWARD", Rule: strings.Fields(fmt.Sprintf(`-m comment --comment %s,%s -d %s`, util.OvnSubnetGatewayIptables, name, subnetCidr))},
}
iptablesRules = append(iptablesRules, ovnSubnetGatewayCountRules...)
}

rules, err := c.iptables[protocol].List("filter", "FORWARD")
if err != nil {
klog.Errorf(`failed to list iptables rule table "filter" chain "FORWARD" with err %v `, err)
return err
}

for _, rule := range rules {
isAbandonRule := true
if !strings.Contains(rule, util.OvnSubnetGatewayIptables) {
continue
}

for name := range subnetCidrs {
keyStr := strings.Join([]string{util.OvnSubnetGatewayIptables, name}, ",")
if strings.Contains(rule, keyStr) {
isAbandonRule = false
break
}
}

if isAbandonRule {
rule := strings.ReplaceAll(rule, "\"", "")
// rule[11:] skip "-A FORWARD "
abandonedRules = append(abandonedRules, util.IPTableRule{Table: "filter", Chain: "FORWARD", Rule: strings.Fields(rule[11:])})
}
}

var natPreroutingRules, natPostroutingRules []util.IPTableRule
for _, rule := range iptablesRules {
if rule.Table == NAT {
Expand Down Expand Up @@ -641,6 +683,107 @@ func (c *Controller) setIptables() error {
return nil
}

func (c *Controller) setOvnSubnetGatewayMetric() {
hostname := os.Getenv(util.HostnameEnv)
for proto, iptables := range c.iptables {
rules, err := iptables.ListWithCounters("filter", "FORWARD")
if err != nil {
klog.Errorf("get proto %s iptables failed with err %v ", proto, err)
continue
}

for _, rule := range rules {
items := strings.Fields(rule)
cidr := ""
direction := ""
subnetName := ""
var currentPackets, currentPacketBytes int
if len(items) <= 10 {
continue
}
for _, item := range items {
if strings.Contains(item, util.OvnSubnetGatewayIptables) {
cidr = items[3]
if items[2] == "-s" {
direction = "egress"
} else if items[2] == "-d" {
direction = "ingress"
} else {
break
}

comments := strings.Split(items[7], ",")
if len(comments) != 2 {
break
}
subnetName = comments[1][:len(comments[1])-1]
currentPackets, err = strconv.Atoi(items[9])
if err != nil {
break
}
currentPacketBytes, err = strconv.Atoi(items[10])
if err != nil {
break
}
}
}

proto := util.CheckProtocol(cidr)

if cidr == "" || direction == "" || subnetName == "" && proto != "" {
continue
}

lastPacketBytes := 0
lastPackets := 0
diffPacketBytes := 0
diffPackets := 0

key := strings.Join([]string{subnetName, direction, proto}, "/")
if ret, ok := c.gwCounters[key]; ok {
lastPackets = ret.Packets
lastPacketBytes = ret.PacketBytes
} else {
c.gwCounters[key] = &util.GwIPtableCounters{
Packets: lastPackets,
PacketBytes: lastPacketBytes,
}
}

if lastPacketBytes == 0 && lastPackets == 0 {
// the gwCounters may just initialize don't cal the diff values,
// it may loss packets to calculate during a metric period
c.gwCounters[key].Packets = currentPackets
c.gwCounters[key].PacketBytes = currentPacketBytes
continue
}

if currentPackets >= lastPackets && currentPacketBytes >= lastPacketBytes {
diffPacketBytes = currentPacketBytes - lastPacketBytes
diffPackets = currentPackets - lastPackets
} else {
// if currentPacketBytes < lastPacketBytes, the reason is that iptables rule is reset ,
// it may loss packets to calculate during a metric period
c.gwCounters[key].Packets = currentPackets
c.gwCounters[key].PacketBytes = currentPacketBytes
continue
}

c.gwCounters[key].Packets = currentPackets
c.gwCounters[key].PacketBytes = currentPacketBytes

klog.V(3).Infof(`hostname %s key %s cidr %s direction %s proto %s has diffPackets %d diffPacketBytes %d currentPackets %d currentPacketBytes %d lastPackets %d lastPacketBytes %d`,
hostname, key, cidr, direction, proto, diffPackets, diffPacketBytes, currentPackets, currentPacketBytes, lastPackets, lastPacketBytes)
if diffPackets > 0 {
metricOvnSubnetGatewayPackets.WithLabelValues(hostname, key, cidr, direction, proto).Add(float64(diffPackets))
}
if diffPacketBytes > 0 {
metricOvnSubnetGatewayPacketBytes.WithLabelValues(hostname, key, cidr, direction, proto).Add(float64(diffPacketBytes))
}
}
}
}

func (c *Controller) addEgressConfig(subnet *kubeovnv1.Subnet, ip string) error {
if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) ||
subnet.Spec.GatewayType != kubeovnv1.GWDistributedType ||
Expand Down
31 changes: 31 additions & 0 deletions pkg/daemon/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,31 @@ var (
[]string{"code", "method", "host"},
)

metricOvnSubnetGatewayPacketBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "ovn_subnet_gateway_packet_bytes",
Help: "the ovn subnet gateway packet bytes.",
}, []string{
"hostname",
"subnet_name",
"cidr",
"direction",
"protocol",
},
)

metricOvnSubnetGatewayPackets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "ovn_subnet_gateway_packets",
Help: "the ovn subnet gateway packet num.",
}, []string{
"hostname",
"subnet_name",
"cidr",
"direction",
"protocol",
},
)
// reflector metrics

// TODO(directxman12): update these to be histograms once the metrics overhaul KEP
Expand Down Expand Up @@ -125,11 +150,17 @@ var (
func InitMetrics() {
registerReflectorMetrics()
registerClientMetrics()
registerOvnSubnetGatewayMetrics()
prometheus.MustRegister(cniOperationHistogram)
prometheus.MustRegister(cniWaitAddressResult)
prometheus.MustRegister(cniConnectivityResult)
}

func registerOvnSubnetGatewayMetrics() {
prometheus.MustRegister(metricOvnSubnetGatewayPacketBytes)
prometheus.MustRegister(metricOvnSubnetGatewayPackets)
}

// registerClientMetrics sets up the client latency metrics from client-go
func registerClientMetrics() {
// register the metrics with our registry
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,6 @@ const (
U2OExcludeIPAg = "%s.u2o_exclude_ip.%s"

DefaultServiceSessionStickinessTimeout = 10800

OvnSubnetGatewayIptables = "ovn-subnet-gateway"
)
5 changes: 5 additions & 0 deletions pkg/util/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@ type IPTableRule struct {
Chain string
Rule []string
}

type GwIPtableCounters struct {
Packets int
PacketBytes int
}
8 changes: 6 additions & 2 deletions test/e2e/framework/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ type EventClient struct {
typedcorev1.EventInterface
}

func (f *Framework) EventClient() *EventClient {
func (f *Framework) EventClient(namespace string) *EventClient {
ns := f.Namespace.Name
if namespace != "" {
ns = namespace
}
return &EventClient{
f: f,
EventInterface: f.ClientSet.CoreV1().Events(f.Namespace.Name),
EventInterface: f.ClientSet.CoreV1().Events(ns),
}
}

Expand Down
Loading

0 comments on commit ee53dfe

Please sign in to comment.