Skip to content

Commit

Permalink
Record an event when Egress IP assignment changes
Browse files Browse the repository at this point in the history
Record an event when EgressIP is assigned to the Node interface or when
it is unassigned from the Node interface.

Signed-off-by: Pulkit Jain <jainpu@vmware.com>
  • Loading branch information
Pulkit Jain committed Jan 2, 2024
1 parent 923b429 commit c2d24ad
Show file tree
Hide file tree
Showing 15 changed files with 117 additions and 31 deletions.
6 changes: 6 additions & 0 deletions build/charts/antrea/templates/agent/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,9 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
6 changes: 6 additions & 0 deletions build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6283,6 +6283,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6283,6 +6283,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6283,6 +6283,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6296,6 +6296,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
8 changes: 8 additions & 0 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6283,6 +6283,14 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- update
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func run(o *Options) error {
}
if o.enableEgress {
egressController, err = egress.NewEgressController(
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
ofClient, k8sClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, serviceCIDRProvider, o.config.Egress.MaxEgressIPsPerNode,
features.DefaultFeatureGate.Enabled(features.EgressTrafficShaping),
)
Expand Down
40 changes: 37 additions & 3 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
Expand All @@ -49,6 +52,7 @@ import (
cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
crdv1b1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
clientsetversioned "antrea.io/antrea/pkg/client/clientset/versioned"
"antrea.io/antrea/pkg/client/clientset/versioned/scheme"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1beta1"
crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1beta1"
"antrea.io/antrea/pkg/controller/metrics"
Expand Down Expand Up @@ -136,6 +140,7 @@ type egressBinding struct {
type EgressController struct {
ofClient openflow.Client
routeClient route.Interface
k8sClient kubernetes.Interface
crdClient clientsetversioned.Interface
antreaClientProvider agent.AntreaClientProvider

Expand Down Expand Up @@ -175,10 +180,14 @@ type EgressController struct {
serviceCIDRUpdateRetryDelay time.Duration

trafficShapingEnabled bool

eventBroadcaster record.EventBroadcaster
record record.EventRecorder
}

func NewEgressController(
ofClient openflow.Client,
k8sClient kubernetes.Interface,
antreaClientGetter agent.AntreaClientProvider,
crdClient clientsetversioned.Interface,
ifaceStore interfacestore.InterfaceStore,
Expand All @@ -196,9 +205,17 @@ func NewEgressController(
if trafficShapingEnabled && !openflow.OVSMetersAreSupported() {
klog.Info("EgressTrafficShaping feature gate is enabled, but it is ignored because OVS meters are not supported.")
}

eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
corev1.EventSource{Component: controllerName},
)

c := &EgressController{
ofClient: ofClient,
routeClient: routeClient,
k8sClient: k8sClient,
antreaClientProvider: antreaClientGetter,
crdClient: crdClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "egressgroup"),
Expand All @@ -220,6 +237,9 @@ func NewEgressController(
serviceCIDRUpdateRetryDelay: 10 * time.Second,

trafficShapingEnabled: openflow.OVSMetersAreSupported() && trafficShapingEnabled,

eventBroadcaster: eventBroadcaster,
record: recorder,
}
ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice)
if err != nil {
Expand Down Expand Up @@ -388,6 +408,12 @@ func (c *EgressController) Run(stopCh <-chan struct{}) {
klog.Infof("Starting %s", controllerName)
defer klog.Infof("Shutting down %s", controllerName)

c.eventBroadcaster.StartStructuredLogging(0)
c.eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{
Interface: c.k8sClient.CoreV1().Events(""),
})
defer c.eventBroadcaster.Shutdown()

go c.localIPDetector.Run(stopCh)
go c.egressIPScheduler.Run(stopCh)
go c.ipAssigner.Run(stopCh)
Expand Down Expand Up @@ -848,14 +874,22 @@ func (c *EgressController) syncEgress(egressName string) error {
// Ensure the Egress IP is assigned to the system. Force advertising the IP if it was previously assigned to
// another Node in the Egress API. This could force refreshing other peers' neighbor cache when the Egress IP is
// obtained by this Node and another Node at the same time in some situations, e.g. split brain.
if err := c.ipAssigner.AssignIP(desiredEgressIP, egress.Status.EgressNode != c.nodeName); err != nil {
assigned, err := c.ipAssigner.AssignIP(desiredEgressIP, egress.Status.EgressNode != c.nodeName)
if err != nil {
return err
}
if assigned {
c.record.Eventf(egress, corev1.EventTypeNormal, "IPAssigned", "Assigned Egress %s with IP %s on Node %s", egress.Name, desiredEgressIP, desiredNode)
}
} else {
// Unassign the Egress IP from the local Node if it was assigned by the agent.
if err := c.ipAssigner.UnassignIP(desiredEgressIP); err != nil {
unassigned, err := c.ipAssigner.UnassignIP(desiredEgressIP)
if err != nil {
return err
}
if unassigned {
c.record.Eventf(egress, corev1.EventTypeNormal, "IPUnassigned", "Unassigned Egress %s with IP %s from Node %s", egress.Name, desiredEgressIP, c.nodeName)
}
}

// Realize the latest EgressIP and get the desired mark.
Expand Down Expand Up @@ -951,7 +985,7 @@ func (c *EgressController) uninstallEgress(egressName string, eState *egressStat
}
}
// Unassign the Egress IP from the local Node if it was assigned by the agent.
if err := c.ipAssigner.UnassignIP(eState.egressIP); err != nil {
if _, err := c.ipAssigner.UnassignIP(eState.egressIP); err != nil {
return err
}
// Remove the Egress's state.
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
mockServiceCIDRProvider := servicecidrtest.NewMockInterface(controller)
mockServiceCIDRProvider.EXPECT().AddEventHandler(gomock.Any())
egressController, _ := NewEgressController(mockOFClient,
k8sClient,
&antreaClientGetter{clientset},
crdClient,
ifaceStore,
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/controller/serviceexternalip/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (c *ServiceExternalIPController) assignIP(ip string, service apimachineryty
c.assignedIPsMutex.Lock()
defer c.assignedIPsMutex.Unlock()
if _, ok := c.assignedIPs[ip]; !ok {
if err := c.ipAssigner.AssignIP(ip, true); err != nil {
if _, err := c.ipAssigner.AssignIP(ip, true); err != nil {
return err
}
c.assignedIPs[ip] = sets.New[string](service.String())
Expand All @@ -411,7 +411,7 @@ func (c *ServiceExternalIPController) unassignIP(ip string, service apimachinery
return nil
}
if assigned.Len() == 1 && assigned.Has(service.String()) {
if err := c.ipAssigner.UnassignIP(ip); err != nil {
if _, err := c.ipAssigner.UnassignIP(ip); err != nil {
return err
}
delete(c.assignedIPs, ip)
Expand Down
9 changes: 7 additions & 2 deletions pkg/agent/ipassigner/ip_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ import "k8s.io/apimachinery/pkg/util/sets"
// IPAssigner provides methods to assign or unassign IP.
type IPAssigner interface {
// AssignIP ensures the provided IP is assigned to the system.
AssignIP(ip string, forceAdvertise bool) error
// It returns true only in the case when there is no error and the IP provided
// was not assigned to the interface before the operation, in all other cases it
// returns false.
AssignIP(ip string, forceAdvertise bool) (bool, error)
// UnassignIP ensures the provided IP is not assigned to the system.
UnassignIP(ip string) error
// It returns true only in the case when there is no error and the IP provided
// was assigned to the interface before the operation.
UnassignIP(ip string) (bool, error)
// AssignedIPs return the IPs that are assigned to the system by this IPAssigner.
AssignedIPs() sets.Set[string]
// InitIPs ensures the IPs that are assigned to the system match the given IPs.
Expand Down
28 changes: 14 additions & 14 deletions pkg/agent/ipassigner/ip_assigner_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ func (a *ipAssigner) loadIPAddresses() (sets.Set[string], error) {
}

// AssignIP ensures the provided IP is assigned to the dummy device and the ARP/NDP responders.
func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error {
func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) (bool, error) {
parsedIP := net.ParseIP(ip)
if parsedIP == nil {
return fmt.Errorf("invalid IP %s", ip)
return false, fmt.Errorf("invalid IP %s", ip)
}
a.mutex.Lock()
defer a.mutex.Unlock()
Expand All @@ -156,14 +156,14 @@ func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error {
if forceAdvertise {
a.advertise(parsedIP)
}
return nil
return false, nil
}

if a.dummyDevice != nil {
addr := util.NewIPNet(parsedIP)
if err := netlink.AddrAdd(a.dummyDevice, &netlink.Addr{IPNet: addr}); err != nil {
if !errors.Is(err, unix.EEXIST) {
return fmt.Errorf("failed to add IP %v to interface %s: %v", ip, a.dummyDevice.Attrs().Name, err)
return false, fmt.Errorf("failed to add IP %v to interface %s: %v", ip, a.dummyDevice.Attrs().Name, err)
} else {
klog.InfoS("IP was already assigned to interface", "ip", parsedIP, "interface", a.dummyDevice.Attrs().Name)
}
Expand All @@ -174,18 +174,18 @@ func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error {

if utilnet.IsIPv4(parsedIP) && a.arpResponder != nil {
if err := a.arpResponder.AddIP(parsedIP); err != nil {
return fmt.Errorf("failed to assign IP %v to ARP responder: %v", ip, err)
return false, fmt.Errorf("failed to assign IP %v to ARP responder: %v", ip, err)
}
}
if utilnet.IsIPv6(parsedIP) && a.ndpResponder != nil {
if err := a.ndpResponder.AddIP(parsedIP); err != nil {
return fmt.Errorf("failed to assign IP %v to NDP responder: %v", ip, err)
return false, fmt.Errorf("failed to assign IP %v to NDP responder: %v", ip, err)
}
}
// Always advertise the IP when the IP is newly assigned to this Node.
a.advertise(parsedIP)
a.assignedIPs.Insert(ip)
return nil
return true, nil
}

func (a *ipAssigner) advertise(ip net.IP) {
Expand All @@ -203,24 +203,24 @@ func (a *ipAssigner) advertise(ip net.IP) {
}

// UnassignIP ensures the provided IP is not assigned to the dummy device.
func (a *ipAssigner) UnassignIP(ip string) error {
func (a *ipAssigner) UnassignIP(ip string) (bool, error) {
parsedIP := net.ParseIP(ip)
if parsedIP == nil {
return fmt.Errorf("invalid IP %s", ip)
return false, fmt.Errorf("invalid IP %s", ip)
}
a.mutex.Lock()
defer a.mutex.Unlock()

if !a.assignedIPs.Has(ip) {
klog.V(2).InfoS("The IP is not assigned", "ip", ip)
return nil
return false, nil
}

if a.dummyDevice != nil {
addr := util.NewIPNet(parsedIP)
if err := netlink.AddrDel(a.dummyDevice, &netlink.Addr{IPNet: addr}); err != nil {
if !errors.Is(err, unix.EADDRNOTAVAIL) {
return fmt.Errorf("failed to delete IP %v from interface %s: %v", ip, a.dummyDevice.Attrs().Name, err)
return false, fmt.Errorf("failed to delete IP %v from interface %s: %v", ip, a.dummyDevice.Attrs().Name, err)
} else {
klog.InfoS("IP does not exist on interface", "ip", parsedIP, "interface", a.dummyDevice.Attrs().Name)
}
Expand All @@ -230,17 +230,17 @@ func (a *ipAssigner) UnassignIP(ip string) error {

if utilnet.IsIPv4(parsedIP) && a.arpResponder != nil {
if err := a.arpResponder.RemoveIP(parsedIP); err != nil {
return fmt.Errorf("failed to remove IP %v from ARP responder: %v", ip, err)
return false, fmt.Errorf("failed to remove IP %v from ARP responder: %v", ip, err)
}
}
if utilnet.IsIPv6(parsedIP) && a.ndpResponder != nil {
if err := a.ndpResponder.RemoveIP(parsedIP); err != nil {
return fmt.Errorf("failed to remove IP %v from NDP responder: %v", ip, err)
return false, fmt.Errorf("failed to remove IP %v from NDP responder: %v", ip, err)
}
}

a.assignedIPs.Delete(ip)
return nil
return true, nil
}

// AssignedIPs return the IPs that are assigned to the dummy device.
Expand Down
14 changes: 8 additions & 6 deletions pkg/agent/ipassigner/testing/mock_ipassigner.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions test/e2e/egress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/apis/crd/v1beta1"
"antrea.io/antrea/pkg/client/clientset/versioned/scheme"
"antrea.io/antrea/pkg/features"
"antrea.io/antrea/pkg/util/k8s"
)
Expand Down Expand Up @@ -406,6 +407,11 @@ func testEgressCRUD(t *testing.T, data *TestData) {
exists, err := hasIP(data, egress.Status.EgressNode, egress.Spec.EgressIP)
require.NoError(t, err, "Failed to check if IP exists on Node")
assert.True(t, exists, "Didn't find desired IP on Node")
// Testing the events recorded during creation of an Egress resource.
expectedMessage := fmt.Sprintf("Assigned Egress %s with IP %s on Node %v", egress.Name, tt.expectedEgressIP, egress.Status.EgressNode)
events, err := data.clientset.CoreV1().Events("").Search(scheme.Scheme, egress)
require.NoError(t, err)
assert.Contains(t, events.Items[0].Message, expectedMessage)
}

checkEIPStatus := func(expectedUsed int) {
Expand Down
Loading

0 comments on commit c2d24ad

Please sign in to comment.