Skip to content

Commit

Permalink
feat: support get pod gw0 ip.
Browse files Browse the repository at this point in the history
Signed-off-by: IRONICBo <boironic@gmail.com>
  • Loading branch information
IRONICBo committed Apr 9, 2024
1 parent fccd4e3 commit b866203
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 49 deletions.
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ func run(o *Options) error {
nodeLatencyMonitor := monitortool.NewNodeLatencyMonitor(
nodeInformer,
nodeLatencyMonitorInformer,
nodeConfig.GatewayConfig,
)
go nodeLatencyMonitor.Run(stopCh)
}
Expand Down
99 changes: 64 additions & 35 deletions pkg/agent/monitortool/latency_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package monitortool

import (
"errors"
"net"
"sync"
"time"

"github.com/containernetworking/plugins/pkg/ip"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

coreinformers "k8s.io/client-go/informers/core/v1"
)
Expand All @@ -34,11 +37,10 @@ type LatencyStore struct {
// The map of node ip to connection
connectionMap map[string]*Connection
// The map of node ip to node name
nodeIPMap map[string]string
nodeGW0Map map[string]string
}

// TODO1: use LRU cache to store the latency of the connection?
// TODO2: we only support ipv4 now
type Connection struct {
// The source IP of the connection
FromIP string
Expand All @@ -57,7 +59,7 @@ type Connection struct {
func NewLatencyStore(nodeInformer coreinformers.NodeInformer) *LatencyStore {
store := &LatencyStore{
connectionMap: make(map[string]*Connection),
nodeIPMap: make(map[string]string),
nodeGW0Map: make(map[string]string),
nodeInformer: nodeInformer,
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -131,59 +133,86 @@ func (l *LatencyStore) ListConns() map[string]*Connection {
}

func (l *LatencyStore) addNode(node *corev1.Node) {
// Add first ip address to the map
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP || addr.Type == corev1.NodeExternalIP {
if net.ParseIP(addr.Address) != nil {
l.nodeIPMap[addr.Address] = node.Name
break
}
}
l.mutex.Lock()
defer l.mutex.Unlock()

gw0IP, err := getGW0IP(node)
if err != nil {
return
}

// Add the node to the node IP map
l.nodeGW0Map[node.Name] = gw0IP
}

func (l *LatencyStore) deleteNode(node *corev1.Node) {
l.mutex.Lock()
defer l.mutex.Unlock()

for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP || addr.Type == corev1.NodeExternalIP {
if net.ParseIP(addr.Address) != nil {
// Delete the node from the node IP map
delete(l.nodeIPMap, addr.Address)
// Delete the node from the connection map
delete(l.connectionMap, addr.Address)
}
}
}
// Delete the node from the node IP map
delete(l.nodeGW0Map, node.Name)
// Delete the node from the connection map
delete(l.connectionMap, node.Name)
}

func (l *LatencyStore) updateNode(old *corev1.Node, new *corev1.Node) {
l.mutex.Lock()
defer l.mutex.Unlock()

// Delete the old node from the node IP map
for _, addr := range old.Status.Addresses {
if addr.Type == corev1.NodeInternalIP || addr.Type == corev1.NodeExternalIP {
if net.ParseIP(addr.Address) != nil {
delete(l.nodeIPMap, addr.Address)
}
}
gw0IP, err := getGW0IP(new)
if err != nil {
return
}

// Update the new node to the node IP map
l.nodeGW0Map[new.Name] = gw0IP
}

func getGW0IP(node *corev1.Node) (string, error) {
podCIDRStrs := getPodCIDRsOnNode(node)
if len(podCIDRStrs) == 0 {
// Skip the node if it does not have a PodCIDR.
klog.Warningf("Node %s does not have a PodCIDR", node.Name)
return "", errors.New("node does not have a PodCIDR")
}

// Add the new node to the node IP map
for _, addr := range new.Status.Addresses {
if addr.Type == corev1.NodeInternalIP || addr.Type == corev1.NodeExternalIP {
if net.ParseIP(addr.Address) != nil {
l.nodeIPMap[addr.Address] = new.Name
}
for _, podCIDR := range podCIDRStrs {
if podCIDR == "" {
klog.Errorf("PodCIDR is empty for Node %s", node.Name)
}

peerPodCIDRAddr, _, err := net.ParseCIDR(podCIDR)
if err != nil {
klog.Errorf("Failed to parse PodCIDR %s for Node %s", podCIDR, node.Name)
continue
}

// Add first ip in CIDR to the map
peerGatewayIP := ip.NextIP(peerPodCIDRAddr)
if peerGatewayIP.To4() != nil && peerGatewayIP.To16() != nil {
return peerGatewayIP.String(), nil
}
}

klog.Warningf("Node %s does not have a valid PodCIDR", node.Name)
return "", errors.New("node does not have a valid PodCIDR")
}

func getPodCIDRsOnNode(node *corev1.Node) []string {
if node.Spec.PodCIDRs != nil {
return node.Spec.PodCIDRs
}

if node.Spec.PodCIDR == "" {
// Does not help to return an error and trigger controller retries.
return nil
}
return []string{node.Spec.PodCIDR}
}

func (l *LatencyStore) ListNodeIPs() map[string]string {
l.mutex.RLock()
defer l.mutex.RUnlock()

return l.nodeIPMap
return l.nodeGW0Map
}
32 changes: 25 additions & 7 deletions pkg/agent/monitortool/latency_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,16 @@ var (
connectionMap = map[string]*Connection{
"127.0.0.1": conn,
}
nodeIPMap = map[string]string{
nodeGW0Map = map[string]string{
"127.0.0.1": "TestNode",
}
latencyStore = &LatencyStore{
connectionMap: connectionMap,
nodeIPMap: nodeIPMap,
}
)

func TestNewLatencyStore(t *testing.T) {
k8sClient := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
nodeInformer := informerFactory.Core().V1().Nodes()
latencyStore = NewLatencyStore(nodeInformer)
_ = NewLatencyStore(nodeInformer)

stopCh := make(chan struct{})
defer close(stopCh)
Expand Down Expand Up @@ -93,6 +89,10 @@ func TestNewLatencyStore(t *testing.T) {
}

func TestLatencyStore_GetConnByKey(t *testing.T) {
latencyStore := &LatencyStore{
connectionMap: connectionMap,
nodeGW0Map: nodeGW0Map,
}
tests := []struct {
key string
expectedConn *Connection
Expand Down Expand Up @@ -120,6 +120,10 @@ func TestLatencyStore_GetConnByKey(t *testing.T) {
}

func TestLatencyStore_DeleteConnByKey(t *testing.T) {
latencyStore := &LatencyStore{
connectionMap: connectionMap,
nodeGW0Map: nodeGW0Map,
}
tests := []struct {
key string
expectedConn *Connection
Expand All @@ -143,6 +147,10 @@ func TestLatencyStore_DeleteConnByKey(t *testing.T) {
}

func TestLatencyStore_UpdateConnByKey(t *testing.T) {
latencyStore := &LatencyStore{
connectionMap: connectionMap,
nodeGW0Map: nodeGW0Map,
}
tests := []struct {
key string
updatedConn *Connection
Expand All @@ -164,11 +172,21 @@ func TestLatencyStore_UpdateConnByKey(t *testing.T) {
}

func TestLatencyStore_ListConns(t *testing.T) {
latencyStore := &LatencyStore{
connectionMap: connectionMap,
nodeGW0Map: nodeGW0Map,
}

conns := latencyStore.ListConns()
assert.Equal(t, connectionMap, conns)
}

func TestLatencyStore_ListNodeIPs(t *testing.T) {
latencyStore := &LatencyStore{
connectionMap: connectionMap,
nodeGW0Map: nodeGW0Map,
}

nodeIPs := latencyStore.ListNodeIPs()
assert.Equal(t, nodeIPMap, nodeIPs)
assert.Equal(t, nodeGW0Map, nodeIPs)
}
16 changes: 13 additions & 3 deletions pkg/agent/monitortool/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"

config "antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/apis/crd/v1alpha2"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2"
)
Expand Down Expand Up @@ -62,6 +63,8 @@ func getICMPSeq() uint16 {

// MonitorTool is a tool to monitor the latency of the node.
type MonitorTool struct {
// Gateway config
gatewayConfig *config.GatewayConfig
// latencyStore is the cache to store the latency of each nodes.
latencyStore *LatencyStore
// latencyConfig is the config for the latency monitor.
Expand All @@ -86,8 +89,10 @@ type LatencyConfig struct {
}

func NewNodeLatencyMonitor(nodeInformer coreinformers.NodeInformer,
nlmInformer crdinformers.NodeLatencyMonitorInformer) *MonitorTool {
nlmInformer crdinformers.NodeLatencyMonitorInformer,
gatewayConfig *config.GatewayConfig) *MonitorTool {
m := &MonitorTool{
gatewayConfig: gatewayConfig,
latencyStore: NewLatencyStore(nodeInformer),
latencyConfig: &LatencyConfig{Enable: false},
latencyConfigChanged: make(chan struct{}, 1),
Expand Down Expand Up @@ -188,15 +193,20 @@ func (m *MonitorTool) pingAll() {
nodeIPs := m.latencyStore.ListNodeIPs()

// TODO: Get current node internal/external IP.
fromIP := ""
var fromIP string
if m.gatewayConfig.IPv4 != nil {
fromIP = m.gatewayConfig.IPv4.String()
} else {
fromIP = m.gatewayConfig.IPv6.String()
}

// A simple rate limiter
limiter := getRate(len(nodeIPs), m.latencyConfig.Interval, m.latencyConfig.Limit)
limitGroup := make(chan bool, int(m.latencyConfig.Limit))

klog.InfoS("Start to ping all nodes")
wg := sync.WaitGroup{}
for toIP, name := range nodeIPs {
for name, toIP := range nodeIPs {
limitGroup <- true
limiter.Wait(context.Background())
wg.Add(1)
Expand Down
18 changes: 14 additions & 4 deletions pkg/agent/monitortool/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ var (
latencyConfig3 = &LatencyConfig{
Enable: false,
}
monitorTool = &MonitorTool{
)

func TestMonitorTool_onNodeLatencyMonitorAdd(t *testing.T) {
monitorTool := &MonitorTool{
// Buffer size is 10 to avoid blocking
latencyConfigChanged: make(chan struct{}, 10),
latencyConfig: latencyConfig1,
}
)

func TestMonitorTool_onNodeLatencyMonitorAdd(t *testing.T) {
tests := []struct {
nodeLatencyMonitor *v1alpha2.NodeLatencyMonitor
expected *LatencyConfig
Expand All @@ -82,6 +82,11 @@ func TestMonitorTool_onNodeLatencyMonitorAdd(t *testing.T) {
}

func TestMonitorTool_onNodeLatencyMonitorUpdate(t *testing.T) {
monitorTool := &MonitorTool{
// Buffer size is 10 to avoid blocking
latencyConfigChanged: make(chan struct{}, 10),
latencyConfig: latencyConfig1,
}
tests := []struct {
oldNodeLatencyMonitor *v1alpha2.NodeLatencyMonitor
newNodeLatencyMonitor *v1alpha2.NodeLatencyMonitor
Expand All @@ -101,6 +106,11 @@ func TestMonitorTool_onNodeLatencyMonitorUpdate(t *testing.T) {
}

func TestMonitorTool_onNodeLatencyMonitorDelete(t *testing.T) {
monitorTool := &MonitorTool{
// Buffer size is 10 to avoid blocking
latencyConfigChanged: make(chan struct{}, 10),
latencyConfig: latencyConfig1,
}
tests := []struct {
nodeLatencyMonitor *v1alpha2.NodeLatencyMonitor
expected *LatencyConfig
Expand Down

0 comments on commit b866203

Please sign in to comment.