From bc08da76fa5a588aede4e82d579ed494deeba688 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Mon, 22 May 2023 15:12:03 +0800 Subject: [PATCH 1/2] Avoid ServiceCIDR flapping on agent start The previous implementation always generated intermediate values for ServiceCIDR on agent start, which may interrupt the Service traffic and causes difficulty for cleaning up stale routes as the value calculated at one point may not be reliable to identify all stale routes. This commit waits for the Service Informer to be synced first, and calculates the ServiceCIDR based on all Services. Ideally the Service route won't change in most cases, and hence avoid the above issues. Besides, it fixes an issue that stale routes on Linux were not cleaned up correctly due to incorrect check. Signed-off-by: Quan Tian --- cmd/antrea-agent/agent.go | 3 + pkg/agent/route/route_linux.go | 18 +++- pkg/agent/route/route_linux_test.go | 24 +++++ pkg/agent/servicecidr/discoverer.go | 118 +++++++++++++++++------ pkg/agent/servicecidr/discoverer_test.go | 15 +-- pkg/util/ip/ip.go | 43 +++++++++ pkg/util/ip/ip_test.go | 90 +++++++++++++++++ 7 files changed, 275 insertions(+), 36 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 0094f8af2ab..59a1c6bba5f 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -222,6 +222,9 @@ func run(o *Options) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Must start after registering all event handlers. + go serviceCIDRProvider.Run(stopCh) + // Get all available NodePort addresses. var nodePortAddressesIPv4, nodePortAddressesIPv6 []net.IP if o.config.AntreaProxy.ProxyAll { diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index c6cb5cee8fc..6c4addacb2e 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -41,6 +41,7 @@ import ( binding "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/env" + utilip "antrea.io/antrea/pkg/util/ip" ) const ( @@ -1389,7 +1390,22 @@ func (c *Client) addServiceCIDRRoute(serviceCIDR *net.IPNet) error { return fmt.Errorf("error listing ip routes: %w", err) } for i := 0; i < len(routes); i++ { - if routes[i].Gw.Equal(gw) && !routes[i].Dst.IP.Equal(serviceCIDR.IP) && routes[i].Dst.Contains(serviceCIDR.IP) { + // Not the routes we are interested in. + if !routes[i].Gw.Equal(gw) { + continue + } + // It's the latest route we just installed. + if utilip.IPNetEqual(routes[i].Dst, serviceCIDR) { + continue + } + // The route covers the desired route. It was installed when the calculated ServiceCIDR was larger than the + // current one, which could happen after some Services are deleted. + if utilip.IPNetContains(routes[i].Dst, serviceCIDR) { + staleRoutes = append(staleRoutes, &routes[i]) + } + // The desired route covers the route. It was installed when the calculated ServiceCIDR was smaller than the + // current one, which could happen after some Services are added. + if utilip.IPNetContains(serviceCIDR, routes[i].Dst) { staleRoutes = append(staleRoutes, &routes[i]) } } diff --git a/pkg/agent/route/route_linux_test.go b/pkg/agent/route/route_linux_test.go index 8b9650898b2..1a081634763 100644 --- a/pkg/agent/route/route_linux_test.go +++ b/pkg/agent/route/route_linux_test.go @@ -1338,6 +1338,30 @@ func TestAddServiceCIDRRoute(t *testing.T) { }) }, }, + { + name: "Add route for Service IPv4 CIDR and clean up stale routes", + curServiceIPv4CIDR: nil, + newServiceIPv4CIDR: ip.MustParseCIDR("10.96.0.0/28"), + expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { + mockNetlink.RouteReplace(&netlink.Route{ + Dst: &net.IPNet{IP: net.ParseIP("10.96.0.0").To4(), Mask: net.CIDRMask(28, 32)}, + Gw: config.VirtualServiceIPv4, + Scope: netlink.SCOPE_UNIVERSE, + LinkIndex: 10, + }) + mockNetlink.RouteListFiltered(netlink.FAMILY_V4, &netlink.Route{LinkIndex: 10}, netlink.RT_FILTER_OIF).Return([]netlink.Route{ + {Dst: ip.MustParseCIDR("10.96.0.0/24"), Gw: config.VirtualServiceIPv4}, + {Dst: ip.MustParseCIDR("10.96.0.0/30"), Gw: config.VirtualServiceIPv4}, + }, nil) + mockNetlink.RouteListFiltered(netlink.FAMILY_V6, &netlink.Route{LinkIndex: 10}, netlink.RT_FILTER_OIF).Return([]netlink.Route{}, nil) + mockNetlink.RouteDel(&netlink.Route{ + Dst: ip.MustParseCIDR("10.96.0.0/24"), Gw: config.VirtualServiceIPv4, + }) + mockNetlink.RouteDel(&netlink.Route{ + Dst: ip.MustParseCIDR("10.96.0.0/30"), Gw: config.VirtualServiceIPv4, + }) + }, + }, { name: "Update route for Service IPv4 CIDR", curServiceIPv4CIDR: serviceIPv4CIDR1, diff --git a/pkg/agent/servicecidr/discoverer.go b/pkg/agent/servicecidr/discoverer.go index 5136081e0c7..a6a2dc74ba5 100644 --- a/pkg/agent/servicecidr/discoverer.go +++ b/pkg/agent/servicecidr/discoverer.go @@ -21,10 +21,15 @@ import ( "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" coreinformers "k8s.io/client-go/informers/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" + "k8s.io/utils/strings/slices" "antrea.io/antrea/pkg/agent/util" ) @@ -42,17 +47,22 @@ type Interface interface { AddEventHandler(handler EventHandler) } -type discoverer struct { +type Discoverer struct { serviceInformer cache.SharedIndexInformer + serviceLister corelisters.ServiceLister sync.RWMutex serviceIPv4CIDR *net.IPNet serviceIPv6CIDR *net.IPNet eventHandlers []EventHandler + // queue maintains the Service objects that need to be synced. + queue workqueue.Interface } -func NewServiceCIDRDiscoverer(serviceInformer coreinformers.ServiceInformer) Interface { - d := &discoverer{ +func NewServiceCIDRDiscoverer(serviceInformer coreinformers.ServiceInformer) *Discoverer { + d := &Discoverer{ serviceInformer: serviceInformer.Informer(), + serviceLister: serviceInformer.Lister(), + queue: workqueue.New(), } d.serviceInformer.AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ @@ -64,7 +74,37 @@ func NewServiceCIDRDiscoverer(serviceInformer coreinformers.ServiceInformer) Int return d } -func (d *discoverer) GetServiceCIDR(isIPv6 bool) (*net.IPNet, error) { +func (d *Discoverer) Run(stopCh <-chan struct{}) { + defer d.queue.ShutDown() + + klog.Info("Starting ServiceCIDRDiscoverer") + defer klog.Info("Stopping ServiceCIDRDiscoverer") + if !cache.WaitForCacheSync(stopCh, d.serviceInformer.HasSynced) { + return + } + svcs, _ := d.serviceLister.List(labels.Everything()) + d.updateServiceCIDR(svcs...) + + go func() { + for { + obj, quit := d.queue.Get() + if quit { + return + } + nn := obj.(types.NamespacedName) + + svc, _ := d.serviceLister.Services(nn.Namespace).Get(nn.Name) + // Ignore it if not found. + if svc != nil { + d.updateServiceCIDR(svc) + } + d.queue.Done(obj) + } + }() + <-stopCh +} + +func (d *Discoverer) GetServiceCIDR(isIPv6 bool) (*net.IPNet, error) { d.RLock() defer d.RUnlock() if isIPv6 { @@ -79,32 +119,37 @@ func (d *discoverer) GetServiceCIDR(isIPv6 bool) (*net.IPNet, error) { return d.serviceIPv4CIDR, nil } -func (d *discoverer) AddEventHandler(handler EventHandler) { +func (d *Discoverer) AddEventHandler(handler EventHandler) { d.eventHandlers = append(d.eventHandlers, handler) } -func (d *discoverer) addService(obj interface{}) { - svc := obj.(*corev1.Service) - d.updateServiceCIDR(svc) -} - -func (d *discoverer) updateService(_, obj interface{}) { +func (d *Discoverer) addService(obj interface{}) { svc := obj.(*corev1.Service) - d.updateServiceCIDR(svc) + klog.V(2).InfoS("Processing Service ADD event", "Service", klog.KObj(svc)) + d.queue.Add(types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}) } -func (d *discoverer) updateServiceCIDR(svc *corev1.Service) { - clusterIPs := svc.Spec.ClusterIPs - if len(clusterIPs) == 0 { - return +func (d *Discoverer) updateService(old, obj interface{}) { + oldSvc := old.(*corev1.Service) + curSvc := obj.(*corev1.Service) + klog.V(2).InfoS("Processing Service UPDATE event", "Service", klog.KObj(curSvc)) + if !slices.Equal(oldSvc.Spec.ClusterIPs, curSvc.Spec.ClusterIPs) { + d.queue.Add(types.NamespacedName{Namespace: curSvc.Namespace, Name: curSvc.Name}) } +} +func (d *Discoverer) updateServiceCIDR(svcs ...*corev1.Service) { var newServiceCIDRs []*net.IPNet - klog.V(2).InfoS("Processing Service ADD or UPDATE event", "Service", klog.KObj(svc)) - func() { - d.Lock() - defer d.Unlock() - for _, clusterIPStr := range clusterIPs { + + curServiceIPv4CIDR, curServiceIPv6CIDR := func() (*net.IPNet, *net.IPNet) { + d.RLock() + defer d.RUnlock() + return d.serviceIPv4CIDR, d.serviceIPv6CIDR + }() + + updated := false + for _, svc := range svcs { + for _, clusterIPStr := range svc.Spec.ClusterIPs { clusterIP := net.ParseIP(clusterIPStr) if clusterIP == nil { klog.V(2).InfoS("Skip invalid ClusterIP", "ClusterIP", clusterIPStr) @@ -112,10 +157,10 @@ func (d *discoverer) updateServiceCIDR(svc *corev1.Service) { } isIPv6 := utilnet.IsIPv6(clusterIP) - curServiceCIDR := d.serviceIPv4CIDR + curServiceCIDR := curServiceIPv4CIDR mask := net.IPv4len * 8 if isIPv6 { - curServiceCIDR = d.serviceIPv6CIDR + curServiceCIDR = curServiceIPv6CIDR mask = net.IPv6len * 8 } @@ -138,16 +183,31 @@ func (d *discoverer) updateServiceCIDR(svc *corev1.Service) { } if isIPv6 { - d.serviceIPv6CIDR = newServiceCIDR - klog.V(4).InfoS("Service IPv6 CIDR was updated", "ServiceCIDR", newServiceCIDR) + curServiceIPv6CIDR = newServiceCIDR } else { - d.serviceIPv4CIDR = newServiceCIDR - klog.V(4).InfoS("Service IPv4 CIDR was updated", "ServiceCIDR", newServiceCIDR) + curServiceIPv4CIDR = newServiceCIDR } - newServiceCIDRs = append(newServiceCIDRs, newServiceCIDR) + updated = true } - }() + } + if !updated { + return + } + func() { + d.Lock() + defer d.Unlock() + if d.serviceIPv4CIDR != curServiceIPv4CIDR { + d.serviceIPv4CIDR = curServiceIPv4CIDR + klog.InfoS("Service IPv4 CIDR was updated", "ServiceCIDR", curServiceIPv4CIDR) + newServiceCIDRs = append(newServiceCIDRs, curServiceIPv4CIDR) + } + if d.serviceIPv6CIDR != curServiceIPv6CIDR { + d.serviceIPv6CIDR = curServiceIPv6CIDR + klog.InfoS("Service IPv6 CIDR was updated", "ServiceCIDR", curServiceIPv6CIDR) + newServiceCIDRs = append(newServiceCIDRs, curServiceIPv6CIDR) + } + }() for _, handler := range d.eventHandlers { handler(newServiceCIDRs) } diff --git a/pkg/agent/servicecidr/discoverer_test.go b/pkg/agent/servicecidr/discoverer_test.go index 308c9801124..850db83de6a 100644 --- a/pkg/agent/servicecidr/discoverer_test.go +++ b/pkg/agent/servicecidr/discoverer_test.go @@ -67,6 +67,7 @@ func TestServiceCIDRProvider(t *testing.T) { defer close(stopCh) informerFactory.Start(stopCh) informerFactory.WaitForCacheSync(stopCh) + go serviceCIDRProvider.Run(stopCh) check := func(expectedServiceCIDR string, isServiceCIDRUpdated, isIPv6 bool) { if isServiceCIDRUpdated { @@ -84,15 +85,18 @@ func TestServiceCIDRProvider(t *testing.T) { } } serviceCIDR, err := serviceCIDRProvider.GetServiceCIDR(isIPv6) - assert.NoError(t, err) - assert.Equal(t, expectedServiceCIDR, serviceCIDR.String()) + if expectedServiceCIDR != "" { + assert.NoError(t, err) + assert.Equal(t, expectedServiceCIDR, serviceCIDR.String()) + } else { + assert.ErrorContains(t, err, "CIDR is not available yet") + } } svc := makeService("ns1", "svc0", "None", corev1.ProtocolTCP) _, err := client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - _, err = serviceCIDRProvider.GetServiceCIDR(false) - assert.ErrorContains(t, err, "Service IPv4 CIDR is not available yet") + check("", false, false) svc = makeService("ns1", "svc1", "10.10.0.1", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) @@ -121,8 +125,7 @@ func TestServiceCIDRProvider(t *testing.T) { svc = makeService("ns1", "svc60", "None", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - _, err = serviceCIDRProvider.GetServiceCIDR(true) - assert.ErrorContains(t, err, "Service IPv6 CIDR is not available yet") + check("", false, true) svc = makeService("ns1", "svc61", "10::1", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) diff --git a/pkg/util/ip/ip.go b/pkg/util/ip/ip.go index fc504508a43..bd82ea18989 100644 --- a/pkg/util/ip/ip.go +++ b/pkg/util/ip/ip.go @@ -195,6 +195,49 @@ func MustParseCIDR(cidr string) *net.IPNet { return ipNet } +// IPNetEqual returns if the provided IPNets are the same subnet. +func IPNetEqual(ipNet1, ipNet2 *net.IPNet) bool { + if ipNet1 == nil && ipNet2 == nil { + return true + } + if ipNet1 == nil || ipNet2 == nil { + return false + } + if !bytes.Equal(ipNet1.Mask, ipNet2.Mask) { + return false + } + if !ipNet1.IP.Equal(ipNet2.IP) { + return false + } + return true +} + +// IPNetContains returns if the first IPNet contains the second IPNet. +// For example: +// +// 10.0.0.0/24 contains 10.0.0.0/24. +// 10.0.0.0/24 contains 10.0.0.0/25. +// 10.0.0.0/24 contains 10.0.0.128/25. +// 10.0.0.0/24 does not contain 10.0.0.0/23. +// 10.0.0.0/24 does not contain 10.0.1.0/25. +func IPNetContains(ipNet1, ipNet2 *net.IPNet) bool { + if ipNet1 == nil || ipNet2 == nil { + return false + } + ones1, bits1 := ipNet1.Mask.Size() + ones2, bits2 := ipNet2.Mask.Size() + if bits1 != bits2 { + return false + } + if ones1 > ones2 { + return false + } + if !ipNet1.Contains(ipNet2.IP) { + return false + } + return true +} + func MustIPv6(s string) net.IP { ip := net.ParseIP(s) if !utilnet.IsIPv6(ip) { diff --git a/pkg/util/ip/ip_test.go b/pkg/util/ip/ip_test.go index ef812257fba..caf12b50f52 100644 --- a/pkg/util/ip/ip_test.go +++ b/pkg/util/ip/ip_test.go @@ -239,3 +239,93 @@ func TestAppendPortIfMissing(t *testing.T) { }) } } + +func TestIPNetEqual(t *testing.T) { + tests := []struct { + name string + ipNet1 *net.IPNet + ipNet2 *net.IPNet + want bool + }{ + { + name: "equal", + ipNet1: MustParseCIDR("1.1.1.0/30"), + ipNet2: MustParseCIDR("1.1.1.0/30"), + want: true, + }, + { + name: "different mask", + ipNet1: MustParseCIDR("1.1.1.0/30"), + ipNet2: MustParseCIDR("1.1.1.0/29"), + want: false, + }, + { + name: "different prefix", + ipNet1: MustParseCIDR("1.1.1.4/30"), + ipNet2: MustParseCIDR("1.1.1.0/30"), + want: false, + }, + { + name: "different family", + ipNet1: MustParseCIDR("1.1.1.4/30"), + ipNet2: MustParseCIDR("1:1:1:4::/30"), + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, IPNetEqual(tt.ipNet1, tt.ipNet2)) + }) + } +} + +func TestIPNetContains(t *testing.T) { + tests := []struct { + name string + ipNet1 *net.IPNet + ipNet2 *net.IPNet + want bool + }{ + { + name: "equal", + ipNet1: MustParseCIDR("10.0.0.0/24"), + ipNet2: MustParseCIDR("10.0.0.0/24"), + want: true, + }, + { + name: "contain smaller subnet", + ipNet1: MustParseCIDR("10.0.0.0/24"), + ipNet2: MustParseCIDR("10.0.0.0/25"), + want: true, + }, + { + name: "contain smaller subnet with different prefix", + ipNet1: MustParseCIDR("10.0.0.0/24"), + ipNet2: MustParseCIDR("10.0.0.128/25"), + want: true, + }, + { + name: "not contain larger subnet", + ipNet1: MustParseCIDR("10.0.0.0/24"), + ipNet2: MustParseCIDR("10.0.0.0/23"), + want: false, + }, + { + name: "not contain smaller subnet with different prefix", + ipNet1: MustParseCIDR("10.0.0.0/24"), + ipNet2: MustParseCIDR("10.0.1.0/25"), + want: false, + }, + { + name: "not contain subnet of different family", + ipNet1: MustParseCIDR("1.1.1.4/30"), + ipNet2: MustParseCIDR("1:1:1:4::/30"), + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, IPNetContains(tt.ipNet1, tt.ipNet2)) + }) + } +} From 41b469d7bf5a20a0f67609acb47d241fa0b8ff84 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Mon, 18 Sep 2023 20:22:07 +0800 Subject: [PATCH 2/2] Do not apply Egress to traffic destined for ServiceCIDRs When AntreaProxy is asked to skip some Services or is not running at all, Pod-to-Service traffic would be forwarded to Egress Node and be load-balanced remotely, as opposed to locally, which could incur performance issue and unexpected behaviors. This patch installs flows to prevent traffic destined for ServiceCIDRs from being SNAT'd. Signed-off-by: Quan Tian --- cmd/antrea-agent/agent.go | 2 +- .../controller/egress/egress_controller.go | 52 +++++++++++ .../egress/egress_controller_test.go | 87 +++++++++++++++---- pkg/agent/openflow/client.go | 18 +++- pkg/agent/openflow/client_test.go | 81 +++++++++++++++++ pkg/agent/openflow/pipeline.go | 20 +++-- pkg/agent/openflow/testing/mock_openflow.go | 14 +++ pkg/agent/servicecidr/discoverer.go | 28 +++--- pkg/agent/servicecidr/discoverer_test.go | 48 +++++----- .../servicecidr/testing/mock_servicecidr.go | 14 +-- 10 files changed, 296 insertions(+), 68 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 59a1c6bba5f..4ef09d4d911 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -491,7 +491,7 @@ func run(o *Options) error { if o.enableEgress { egressController, err = egress.NewEgressController( ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName, - memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, o.config.Egress.MaxEgressIPsPerNode, + memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, serviceCIDRProvider, o.config.Egress.MaxEgressIPsPerNode, ) if err != nil { return fmt.Errorf("error creating new Egress controller: %v", err) diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index 4901e29f27d..0f4c0884d4c 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -42,6 +42,7 @@ import ( "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" + "antrea.io/antrea/pkg/agent/servicecidr" "antrea.io/antrea/pkg/agent/types" cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" @@ -147,6 +148,11 @@ type EgressController struct { ipAssigner ipassigner.IPAssigner egressIPScheduler *egressIPScheduler + + serviceCIDRInterface servicecidr.Interface + serviceCIDRUpdateCh chan struct{} + // Declared for testing. + serviceCIDRUpdateRetryDelay time.Duration } func NewEgressController( @@ -161,6 +167,7 @@ func NewEgressController( egressInformer crdinformers.EgressInformer, nodeInformers coreinformers.NodeInformer, podUpdateSubscriber channel.Subscriber, + serviceCIDRInterface servicecidr.Interface, maxEgressIPsPerNode int, ) (*EgressController, error) { c := &EgressController{ @@ -181,6 +188,10 @@ func NewEgressController( localIPDetector: ipassigner.NewLocalIPDetector(), idAllocator: newIDAllocator(minEgressMark, maxEgressMark), cluster: cluster, + serviceCIDRInterface: serviceCIDRInterface, + // One buffer is enough as we just use it to ensure the target handler is executed once. + serviceCIDRUpdateCh: make(chan struct{}, 1), + serviceCIDRUpdateRetryDelay: 10 * time.Second, } ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice) if err != nil { @@ -214,6 +225,7 @@ func NewEgressController( podUpdateSubscriber.Subscribe(c.processPodUpdate) c.localIPDetector.AddEventHandler(c.onLocalIPUpdate) c.egressIPScheduler.AddEventHandler(c.onEgressIPSchedule) + c.serviceCIDRInterface.AddEventHandler(c.onServiceCIDRUpdate) return c, nil } @@ -222,6 +234,44 @@ func (c *EgressController) onEgressIPSchedule(egress string) { c.queue.Add(egress) } +// onServiceCIDRUpdate will be called when ServiceCIDRs change. +// It ensures updateServiceCIDRs will be executed once after this call. +func (c *EgressController) onServiceCIDRUpdate(_ []*net.IPNet) { + select { + case c.serviceCIDRUpdateCh <- struct{}{}: + default: + // The previous event is not processed yet, discard the new event. + } +} + +func (c *EgressController) updateServiceCIDRs(stopCh <-chan struct{}) { + timer := time.NewTimer(0) + defer timer.Stop() + <-timer.C // Consume the first tick. + for { + select { + case <-stopCh: + return + case <-c.serviceCIDRUpdateCh: + klog.V(2).InfoS("Received service CIDR update") + case <-timer.C: + klog.V(2).InfoS("Service CIDR update timer expired") + } + serviceCIDRs, err := c.serviceCIDRInterface.GetServiceCIDRs() + if err != nil { + klog.ErrorS(err, "Failed to get Service CIDRs") + // No need to retry in this case as the Service CIDRs won't be available until it receives a service CIDRs update. + continue + } + err = c.ofClient.InstallSNATBypassServiceFlows(serviceCIDRs) + if err != nil { + klog.ErrorS(err, "Failed to install SNAT bypass flows for Service CIDRs, will retry", "serviceCIDRs", serviceCIDRs) + // Schedule a retry as it should be transient error. + timer.Reset(c.serviceCIDRUpdateRetryDelay) + } + } +} + // processPodUpdate will be called when CNIServer publishes a Pod update event. // It triggers reconciling the effective Egress of the Pod. func (c *EgressController) processPodUpdate(e interface{}) { @@ -314,6 +364,8 @@ func (c *EgressController) Run(stopCh <-chan struct{}) { go wait.NonSlidingUntil(c.watchEgressGroup, 5*time.Second, stopCh) + go c.updateServiceCIDRs(stopCh) + for i := 0; i < defaultWorkers; i++ { go wait.Until(c.worker, time.Second, stopCh) } diff --git a/pkg/agent/controller/egress/egress_controller_test.go b/pkg/agent/controller/egress/egress_controller_test.go index 7bf14c436df..c97ccae774b 100644 --- a/pkg/agent/controller/egress/egress_controller_test.go +++ b/pkg/agent/controller/egress/egress_controller_test.go @@ -41,6 +41,7 @@ import ( "antrea.io/antrea/pkg/agent/memberlist" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" + servicecidrtest "antrea.io/antrea/pkg/agent/servicecidr/testing" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" @@ -49,6 +50,7 @@ import ( fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/k8s" ) @@ -128,14 +130,15 @@ func mockNewIPAssigner(ipAssigner ipassigner.IPAssigner) func() { type fakeController struct { *EgressController - mockController *gomock.Controller - mockOFClient *openflowtest.MockClient - mockRouteClient *routetest.MockInterface - crdClient *fakeversioned.Clientset - crdInformerFactory crdinformers.SharedInformerFactory - informerFactory informers.SharedInformerFactory - mockIPAssigner *ipassignertest.MockIPAssigner - podUpdateChannel *channel.SubscribableChannel + mockController *gomock.Controller + mockOFClient *openflowtest.MockClient + mockRouteClient *routetest.MockInterface + crdClient *fakeversioned.Clientset + crdInformerFactory crdinformers.SharedInformerFactory + informerFactory informers.SharedInformerFactory + mockIPAssigner *ipassignertest.MockIPAssigner + mockServiceCIDRInterface *servicecidrtest.MockInterface + podUpdateChannel *channel.SubscribableChannel } func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeController { @@ -163,7 +166,8 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll addPodInterface(ifaceStore, "ns4", "pod4", 4) podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) - + mockServiceCIDRProvider := servicecidrtest.NewMockInterface(controller) + mockServiceCIDRProvider.EXPECT().AddEventHandler(gomock.Any()) egressController, _ := NewEgressController(mockOFClient, &antreaClientGetter{clientset}, crdClient, @@ -175,19 +179,21 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll egressInformer, nodeInformer, podUpdateChannel, + mockServiceCIDRProvider, 255, ) egressController.localIPDetector = localIPDetector return &fakeController{ - EgressController: egressController, - mockController: controller, - mockOFClient: mockOFClient, - mockRouteClient: mockRouteClient, - crdClient: crdClient, - crdInformerFactory: crdInformerFactory, - informerFactory: informerFactory, - mockIPAssigner: mockIPAssigner, - podUpdateChannel: podUpdateChannel, + EgressController: egressController, + mockController: controller, + mockOFClient: mockOFClient, + mockRouteClient: mockRouteClient, + crdClient: crdClient, + crdInformerFactory: crdInformerFactory, + informerFactory: informerFactory, + mockIPAssigner: mockIPAssigner, + mockServiceCIDRInterface: mockServiceCIDRProvider, + podUpdateChannel: podUpdateChannel, } } @@ -1093,6 +1099,51 @@ func TestGetEgressIPByMark(t *testing.T) { } } +func TestUpdateServiceCIDRs(t *testing.T) { + c := newFakeController(t, nil) + stopCh := make(chan struct{}) + defer close(stopCh) + // Retry immediately. + c.serviceCIDRUpdateRetryDelay = 0 + + serviceCIDRs := []*net.IPNet{ + ip.MustParseCIDR("10.96.0.0/16"), + ip.MustParseCIDR("1096::/64"), + } + assert.Len(t, c.serviceCIDRUpdateCh, 0) + // Call the handler the 1st time, it should enqueue an event. + c.onServiceCIDRUpdate(serviceCIDRs) + assert.Len(t, c.serviceCIDRUpdateCh, 1) + // Call the handler the 2nd time, it should not block and should discard the event. + c.onServiceCIDRUpdate(serviceCIDRs) + assert.Len(t, c.serviceCIDRUpdateCh, 1) + + // In the 1st round, returning the ServiceCIDRs fails, it should not retry. + c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(nil, fmt.Errorf("not initialized")) + + go c.updateServiceCIDRs(stopCh) + + // Wait for the event to be processed. + require.Eventually(t, func() bool { + return len(c.serviceCIDRUpdateCh) == 0 + }, time.Second, 100*time.Millisecond) + // In the 2nd round, returning the ServiceCIDR succeeds but installing flows fails, it should retry. + c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(serviceCIDRs, nil) + c.mockOFClient.EXPECT().InstallSNATBypassServiceFlows(serviceCIDRs).Return(fmt.Errorf("transient error")) + // In the 3rd round, both succeed. + finishCh := make(chan struct{}) + c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(serviceCIDRs, nil) + c.mockOFClient.EXPECT().InstallSNATBypassServiceFlows(serviceCIDRs).Do(func(_ []*net.IPNet) { close(finishCh) }).Return(nil) + // Enqueue only one event as the 2nd failure is supposed to trigger a retry. + c.onServiceCIDRUpdate(serviceCIDRs) + + select { + case <-finishCh: + case <-time.After(time.Second): + t.Errorf("InstallSNATBypassServiceFlows didn't succeed in time") + } +} + func checkQueueItemExistence(t *testing.T, queue workqueue.RateLimitingInterface, items ...string) { t.Logf("queue len %d", queue.Len()) require.Eventually(t, func() bool { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 03064d039b9..841f3c2b627 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -133,6 +133,12 @@ type Client interface { // are removed from PolicyRule.From, else from PolicyRule.To. DeletePolicyRuleAddress(ruleID uint32, addrType types.AddressType, addresses []types.Address, priority *uint16) error + // InstallSNATBypassServiceFlows installs flows to prevent traffic destined for the specified Service CIDRs from + // being SNAT'd. Otherwise, such Pod-to-Service traffic would be forwarded to Egress Node and be load-balanced + // remotely, as opposed to locally, when AntreaProxy is asked to skip some Services or is not running at all. + // Calling the method with new CIDRs will override the flows installed for previous CIDRs. + InstallSNATBypassServiceFlows(serviceCIDRs []*net.IPNet) error + // InstallSNATMarkFlows installs flows for a local SNAT IP. On Linux, a // single flow is added to mark the packets tunnelled from remote Nodes // that should be SNAT'd with the SNAT IP. @@ -144,7 +150,7 @@ type Client interface { // InstallPodSNATFlows installs the SNAT flows for a local Pod. If the // SNAT IP for the Pod is on the local Node, a non-zero SNAT ID should - // allocated for the SNAT IP, and the installed flow sets the SNAT IP + // be allocated for the SNAT IP, and the installed flow sets the SNAT IP // mark on the egress packets from the ofPort; if the SNAT IP is on a // remote Node, snatMark should be set to 0, and the installed flow // tunnels egress packets to the remote Node using the SNAT IP as the @@ -980,6 +986,16 @@ func (c *client) generatePipelines() { } } +func (c *client) InstallSNATBypassServiceFlows(serviceCIDRs []*net.IPNet) error { + var flows []binding.Flow + for _, serviceCIDR := range serviceCIDRs { + flows = append(flows, c.featureEgress.snatSkipCIDRFlow(*serviceCIDR)) + } + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.modifyFlows(c.featureEgress.cachedFlows, "svc-cidrs", flows) +} + func (c *client) InstallSNATMarkFlows(snatIP net.IP, mark uint32) error { flow := c.featureEgress.snatIPFromTunnelFlow(snatIP, mark) cacheKey := fmt.Sprintf("s%x", mark) diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 0785773c3a1..a0a6903e4f9 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -1229,6 +1229,87 @@ func Test_client_GetServiceFlowKeys(t *testing.T) { assert.ElementsMatch(t, expectedFlowKeys, flowKeys) } +func Test_client_InstallSNATBypassServiceFlows(t *testing.T) { + testCases := []struct { + name string + serviceCIDRs []*net.IPNet + newServiceCIDRs []*net.IPNet + expectedFlows []string + expectedNewFlows []string + }{ + { + name: "IPv4", + serviceCIDRs: []*net.IPNet{ + utilip.MustParseCIDR("10.96.0.0/24"), + }, + newServiceCIDRs: []*net.IPNet{ + utilip.MustParseCIDR("10.96.0.0/16"), + }, + expectedFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/24 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + }, + expectedNewFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/16 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + }, + }, + { + name: "IPv6", + serviceCIDRs: []*net.IPNet{ + utilip.MustParseCIDR("1096::/80"), + }, + newServiceCIDRs: []*net.IPNet{ + utilip.MustParseCIDR("1096::/64"), + }, + expectedFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/80 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + }, + expectedNewFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/64 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + }, + }, + { + name: "dual-stack", + serviceCIDRs: []*net.IPNet{ + utilip.MustParseCIDR("10.96.0.0/24"), + utilip.MustParseCIDR("1096::/80"), + }, + newServiceCIDRs: []*net.IPNet{ + utilip.MustParseCIDR("10.96.0.0/16"), + utilip.MustParseCIDR("1096::/64"), + }, + expectedFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/24 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + "cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/80 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + }, + expectedNewFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/16 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + "cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/64 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + m := oftest.NewMockOFEntryOperations(ctrl) + + fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap) + defer resetPipelines() + + m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) + assert.NoError(t, fc.InstallSNATBypassServiceFlows(tc.serviceCIDRs)) + fCacheI, ok := fc.featureEgress.cachedFlows.Load("svc-cidrs") + require.True(t, ok) + assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI)) + + m.EXPECT().BundleOps(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) + assert.NoError(t, fc.InstallSNATBypassServiceFlows(tc.newServiceCIDRs)) + fCacheI, ok = fc.featureEgress.cachedFlows.Load("svc-cidrs") + require.True(t, ok) + assert.ElementsMatch(t, tc.expectedNewFlows, getFlowStrings(fCacheI)) + }) + } +} + func Test_client_InstallSNATMarkFlows(t *testing.T) { mark := uint32(100) diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 4ad46329b5d..9a5d6db2106 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2194,6 +2194,18 @@ func (f *featureNetworkPolicy) ingressClassifierFlows() []binding.Flow { return flows } +// snatSkipCIDRFlow generates the flow to skip SNAT for connection destined for the provided CIDR. +func (f *featureEgress) snatSkipCIDRFlow(cidr net.IPNet) binding.Flow { + ipProtocol := getIPProtocol(cidr.IP) + return EgressMarkTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(ipProtocol). + MatchDstIPNet(cidr). + Action().LoadRegMark(ToGatewayRegMark). + Action().GotoStage(stageSwitching). + Done() +} + // snatSkipNodeFlow generates the flow to skip SNAT for connection destined for the transport IP of a remote Node. func (f *featureEgress) snatSkipNodeFlow(nodeIP net.IP) binding.Flow { ipProtocol := getIPProtocol(nodeIP) @@ -2588,13 +2600,7 @@ func (f *featureEgress) externalFlows() []binding.Flow { ) // This generates the flows to bypass the packets sourced from local Pods and destined for the except CIDRs for Egress. for _, cidr := range f.exceptCIDRs[ipProtocol] { - flows = append(flows, EgressMarkTable.ofTable.BuildFlow(priorityHigh). - Cookie(cookieID). - MatchProtocol(ipProtocol). - MatchDstIPNet(cidr). - Action().LoadRegMark(ToGatewayRegMark). - Action().GotoStage(stageSwitching). - Done()) + flows = append(flows, f.snatSkipCIDRFlow(cidr)) } } // This generates the flow to match the packets of tracked Egress connection and forward them to stageSwitching. diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 5209a6bb51e..92f0ada8fcb 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -452,6 +452,20 @@ func (mr *MockClientMockRecorder) InstallPolicyRuleFlows(arg0 interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallPolicyRuleFlows", reflect.TypeOf((*MockClient)(nil).InstallPolicyRuleFlows), arg0) } +// InstallSNATBypassServiceFlows mocks base method +func (m *MockClient) InstallSNATBypassServiceFlows(arg0 []*net.IPNet) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallSNATBypassServiceFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallSNATBypassServiceFlows indicates an expected call of InstallSNATBypassServiceFlows +func (mr *MockClientMockRecorder) InstallSNATBypassServiceFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallSNATBypassServiceFlows", reflect.TypeOf((*MockClient)(nil).InstallSNATBypassServiceFlows), arg0) +} + // InstallSNATMarkFlows mocks base method func (m *MockClient) InstallSNATMarkFlows(arg0 net.IP, arg1 uint32) error { m.ctrl.T.Helper() diff --git a/pkg/agent/servicecidr/discoverer.go b/pkg/agent/servicecidr/discoverer.go index a6a2dc74ba5..9dbe18afc81 100644 --- a/pkg/agent/servicecidr/discoverer.go +++ b/pkg/agent/servicecidr/discoverer.go @@ -42,7 +42,7 @@ const ( type EventHandler func(serviceCIDRs []*net.IPNet) type Interface interface { - GetServiceCIDR(isIPv6 bool) (*net.IPNet, error) + GetServiceCIDRs() ([]*net.IPNet, error) // The added handlers will be called when Service CIDR changes. AddEventHandler(handler EventHandler) } @@ -56,6 +56,8 @@ type Discoverer struct { eventHandlers []EventHandler // queue maintains the Service objects that need to be synced. queue workqueue.Interface + // initialized indicates whether the Discoverer has been initialized. + initialized bool } func NewServiceCIDRDiscoverer(serviceInformer coreinformers.ServiceInformer) *Discoverer { @@ -104,19 +106,20 @@ func (d *Discoverer) Run(stopCh <-chan struct{}) { <-stopCh } -func (d *Discoverer) GetServiceCIDR(isIPv6 bool) (*net.IPNet, error) { +func (d *Discoverer) GetServiceCIDRs() ([]*net.IPNet, error) { d.RLock() defer d.RUnlock() - if isIPv6 { - if d.serviceIPv6CIDR == nil { - return nil, fmt.Errorf("Service IPv6 CIDR is not available yet") - } - return d.serviceIPv6CIDR, nil + if !d.initialized { + return nil, fmt.Errorf("Service CIDR discoverer is not initialized yet") + } + var serviceCIDRs []*net.IPNet + if d.serviceIPv4CIDR != nil { + serviceCIDRs = append(serviceCIDRs, d.serviceIPv4CIDR) } - if d.serviceIPv4CIDR == nil { - return nil, fmt.Errorf("Service IPv4 CIDR is not available yet") + if d.serviceIPv6CIDR != nil { + serviceCIDRs = append(serviceCIDRs, d.serviceIPv6CIDR) } - return d.serviceIPv4CIDR, nil + return serviceCIDRs, nil } func (d *Discoverer) AddEventHandler(handler EventHandler) { @@ -178,8 +181,10 @@ func (d *Discoverer) updateServiceCIDR(svcs ...*corev1.Service) { continue } } else { + mask := net.CIDRMask(mask, mask) + clusterIP := clusterIP.Mask(mask) // If the calculated Service CIDR doesn't exist, generate a new Service CIDR with the ClusterIP. - newServiceCIDR = &net.IPNet{IP: clusterIP, Mask: net.CIDRMask(mask, mask)} + newServiceCIDR = &net.IPNet{IP: clusterIP, Mask: mask} } if isIPv6 { @@ -207,6 +212,7 @@ func (d *Discoverer) updateServiceCIDR(svcs ...*corev1.Service) { klog.InfoS("Service IPv6 CIDR was updated", "ServiceCIDR", curServiceIPv6CIDR) newServiceCIDRs = append(newServiceCIDRs, curServiceIPv6CIDR) } + d.initialized = true }() for _, handler := range d.eventHandlers { handler(newServiceCIDRs) diff --git a/pkg/agent/servicecidr/discoverer_test.go b/pkg/agent/servicecidr/discoverer_test.go index 850db83de6a..48d13f243a2 100644 --- a/pkg/agent/servicecidr/discoverer_test.go +++ b/pkg/agent/servicecidr/discoverer_test.go @@ -25,6 +25,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + + "antrea.io/antrea/pkg/util/ip" ) func makeService(namespace, name string, clusterIP string, protocol corev1.Protocol) *corev1.Service { @@ -54,12 +56,10 @@ func TestServiceCIDRProvider(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(client, 0) svcInformer := informerFactory.Core().V1().Services() serviceCIDRProvider := NewServiceCIDRDiscoverer(svcInformer) - serviceCIDRChan := make(chan *net.IPNet) + serviceCIDRChan := make(chan []*net.IPNet) serviceCIDRProvider.AddEventHandler(func(serviceCIDRs []*net.IPNet) { { - for _, serviceCIDR := range serviceCIDRs { - serviceCIDRChan <- serviceCIDR - } + serviceCIDRChan <- serviceCIDRs } }) @@ -69,11 +69,11 @@ func TestServiceCIDRProvider(t *testing.T) { informerFactory.WaitForCacheSync(stopCh) go serviceCIDRProvider.Run(stopCh) - check := func(expectedServiceCIDR string, isServiceCIDRUpdated, isIPv6 bool) { - if isServiceCIDRUpdated { + check := func(expectedServiceCIDRs []*net.IPNet, expectedEvent []*net.IPNet) { + if expectedEvent != nil { select { case event := <-serviceCIDRChan: - assert.Equal(t, expectedServiceCIDR, event.String()) + assert.Equal(t, expectedEvent, event) case <-time.After(time.Second): t.Fatalf("timed out waiting for expected Service CIDR") } @@ -84,70 +84,72 @@ func TestServiceCIDRProvider(t *testing.T) { case <-time.After(100 * time.Millisecond): } } - serviceCIDR, err := serviceCIDRProvider.GetServiceCIDR(isIPv6) - if expectedServiceCIDR != "" { + serviceCIDRs, err := serviceCIDRProvider.GetServiceCIDRs() + if expectedServiceCIDRs != nil { assert.NoError(t, err) - assert.Equal(t, expectedServiceCIDR, serviceCIDR.String()) + assert.Equal(t, expectedServiceCIDRs, serviceCIDRs) } else { - assert.ErrorContains(t, err, "CIDR is not available yet") + assert.ErrorContains(t, err, "Service CIDR discoverer is not initialized yet") } } + check(nil, nil) + svc := makeService("ns1", "svc0", "None", corev1.ProtocolTCP) _, err := client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("", false, false) + check(nil, nil) svc = makeService("ns1", "svc1", "10.10.0.1", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10.10.0.1/32", true, false) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.1/32")}, []*net.IPNet{ip.MustParseCIDR("10.10.0.1/32")}) svc = makeService("ns1", "svc2", "10.10.0.2", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10.10.0.0/30", true, false) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/30")}, []*net.IPNet{ip.MustParseCIDR("10.10.0.0/30")}) svc = makeService("ns1", "svc5", "10.10.0.5", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10.10.0.0/29", true, false) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29")}, []*net.IPNet{ip.MustParseCIDR("10.10.0.0/29")}) svc = makeService("ns1", "svc4", "10.10.0.4", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10.10.0.0/29", false, false) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29")}, nil) err = client.CoreV1().Services("ns1").Delete(context.TODO(), "svc4", metav1.DeleteOptions{}) assert.NoError(t, err) - check("10.10.0.0/29", false, false) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29")}, nil) svc = makeService("ns1", "svc60", "None", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("", false, true) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29")}, nil) svc = makeService("ns1", "svc61", "10::1", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10::1/128", true, true) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29"), ip.MustParseCIDR("10::1/128")}, []*net.IPNet{ip.MustParseCIDR("10::1/128")}) svc = makeService("ns1", "svc62", "10::2", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10::/126", true, true) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29"), ip.MustParseCIDR("10::/126")}, []*net.IPNet{ip.MustParseCIDR("10::/126")}) svc = makeService("ns1", "svc65", "10::5", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10::/125", true, true) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29"), ip.MustParseCIDR("10::/125")}, []*net.IPNet{ip.MustParseCIDR("10::/125")}) svc = makeService("ns1", "svc64", "10::4", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10::/125", false, true) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29"), ip.MustParseCIDR("10::/125")}, nil) err = client.CoreV1().Services("ns1").Delete(context.TODO(), "svc64", metav1.DeleteOptions{}) assert.NoError(t, err) - check("10::/125", false, true) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29"), ip.MustParseCIDR("10::/125")}, nil) } diff --git a/pkg/agent/servicecidr/testing/mock_servicecidr.go b/pkg/agent/servicecidr/testing/mock_servicecidr.go index 3c149aba119..56e940f0e8f 100644 --- a/pkg/agent/servicecidr/testing/mock_servicecidr.go +++ b/pkg/agent/servicecidr/testing/mock_servicecidr.go @@ -61,17 +61,17 @@ func (mr *MockInterfaceMockRecorder) AddEventHandler(arg0 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddEventHandler", reflect.TypeOf((*MockInterface)(nil).AddEventHandler), arg0) } -// GetServiceCIDR mocks base method -func (m *MockInterface) GetServiceCIDR(arg0 bool) (*net.IPNet, error) { +// GetServiceCIDRs mocks base method +func (m *MockInterface) GetServiceCIDRs() ([]*net.IPNet, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetServiceCIDR", arg0) - ret0, _ := ret[0].(*net.IPNet) + ret := m.ctrl.Call(m, "GetServiceCIDRs") + ret0, _ := ret[0].([]*net.IPNet) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetServiceCIDR indicates an expected call of GetServiceCIDR -func (mr *MockInterfaceMockRecorder) GetServiceCIDR(arg0 interface{}) *gomock.Call { +// GetServiceCIDRs indicates an expected call of GetServiceCIDRs +func (mr *MockInterfaceMockRecorder) GetServiceCIDRs() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServiceCIDR", reflect.TypeOf((*MockInterface)(nil).GetServiceCIDR), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServiceCIDRs", reflect.TypeOf((*MockInterface)(nil).GetServiceCIDRs)) }