From 5fdb6d2b785d4db9f6db691530b311fd6071f81b Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Thu, 25 Mar 2021 17:19:06 +0800 Subject: [PATCH] Add controller to antrea-agent for implementing Egress --- build/yamls/antrea-aks.yml | 24 + build/yamls/antrea-eks.yml | 24 + build/yamls/antrea-gke.yml | 24 + build/yamls/antrea-ipsec.yml | 24 + build/yamls/antrea.yml | 24 + build/yamls/base/crds.yml | 22 + cmd/antrea-agent/agent.go | 8 +- .../controller/egress/egress_controller.go | 674 ++++++++++++++++++ pkg/agent/controller/egress/id_allocator.go | 59 ++ .../controller/egress/local_ip_detector.go | 41 ++ test/e2e/egress_test.go | 209 ++++++ test/e2e/framework.go | 22 +- 12 files changed, 1146 insertions(+), 9 deletions(-) create mode 100644 pkg/agent/controller/egress/egress_controller.go create mode 100644 pkg/agent/controller/egress/id_allocator.go create mode 100644 pkg/agent/controller/egress/local_ip_detector.go create mode 100644 test/e2e/egress_test.go diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 212c669de3e..70afd83dbf3 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -630,6 +630,30 @@ spec: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + labels: + app: antrea + name: egresses.crd.antrea.io +spec: + group: crd.antrea.io + names: + kind: Egress + plural: egresses + shortNames: + - eg + singular: egress + scope: Cluster + versions: + - name: v1alpha2 + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + served: true + storage: true +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: labels: app: antrea diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 4fbf4b00c54..68f0a691e71 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -630,6 +630,30 @@ spec: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + labels: + app: antrea + name: egresses.crd.antrea.io +spec: + group: crd.antrea.io + names: + kind: Egress + plural: egresses + shortNames: + - eg + singular: egress + scope: Cluster + versions: + - name: v1alpha2 + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + served: true + storage: true +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: labels: app: antrea diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index f473c28beb6..5fa2a5f83ce 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -630,6 +630,30 @@ spec: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + labels: + app: antrea + name: egresses.crd.antrea.io +spec: + group: crd.antrea.io + names: + kind: Egress + plural: egresses + shortNames: + - eg + singular: egress + scope: Cluster + versions: + - name: v1alpha2 + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + served: true + storage: true +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: labels: app: antrea diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 43b404e2662..baab0b76d36 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -630,6 +630,30 @@ spec: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + labels: + app: antrea + name: egresses.crd.antrea.io +spec: + group: crd.antrea.io + names: + kind: Egress + plural: egresses + shortNames: + - eg + singular: egress + scope: Cluster + versions: + - name: v1alpha2 + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + served: true + storage: true +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: labels: app: antrea diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 7aab91ef04c..a068e571a05 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -630,6 +630,30 @@ spec: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + labels: + app: antrea + name: egresses.crd.antrea.io +spec: + group: crd.antrea.io + names: + kind: Egress + plural: egresses + shortNames: + - eg + singular: egress + scope: Cluster + versions: + - name: v1alpha2 + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + served: true + storage: true +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: labels: app: antrea diff --git a/build/yamls/base/crds.yml b/build/yamls/base/crds.yml index 78b2e1f277e..da80bc8231b 100644 --- a/build/yamls/base/crds.yml +++ b/build/yamls/base/crds.yml @@ -1,6 +1,28 @@ --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + name: egresses.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha2 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + scope: Cluster + names: + plural: egresses + singular: egress + kind: Egress + shortNames: + - eg +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: name: antreacontrollerinfos.crd.antrea.io spec: diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 9265d92e96e..c879cbc4820 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -27,6 +27,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/cniserver" _ "github.com/vmware-tanzu/antrea/pkg/agent/cniserver/ipam" "github.com/vmware-tanzu/antrea/pkg/agent/config" + "github.com/vmware-tanzu/antrea/pkg/agent/controller/egress" "github.com/vmware-tanzu/antrea/pkg/agent/controller/networkpolicy" "github.com/vmware-tanzu/antrea/pkg/agent/controller/noderoute" "github.com/vmware-tanzu/antrea/pkg/agent/controller/traceflow" @@ -75,6 +76,7 @@ func run(o *Options) error { informerFactory := informers.NewSharedInformerFactory(k8sClient, informerDefaultResync) crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync) traceflowInformer := crdInformerFactory.Crd().V1alpha1().Traceflows() + egressInformer := crdInformerFactory.Crd().V1alpha2().Egresses() // Create Antrea Clientset for the given config. antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient) @@ -99,7 +101,7 @@ func run(o *Options) error { ofClient := openflow.NewClient(o.config.OVSBridge, ovsBridgeMgmtAddr, ovsDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy), features.DefaultFeatureGate.Enabled(features.AntreaPolicy), - false) + features.DefaultFeatureGate.Enabled(features.Egress)) _, serviceCIDRNet, _ := net.ParseCIDR(o.config.ServiceCIDR) var serviceCIDRNetv6 *net.IPNet @@ -191,6 +193,8 @@ func run(o *Options) error { statsCollector = stats.NewCollector(antreaClientProvider, ofClient, networkPolicyController) } + egressController := egress.NewEgressController(ofClient, egressInformer, antreaClientProvider, ifaceStore, routeClient, nodeConfig.Name) + var proxier proxy.Proxier if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode) @@ -282,6 +286,8 @@ func run(o *Options) error { go networkPolicyController.Run(stopCh) + go egressController.Run(stopCh) + if features.DefaultFeatureGate.Enabled(features.NetworkPolicyStats) { go statsCollector.Run(stopCh) } diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go new file mode 100644 index 00000000000..57264028e56 --- /dev/null +++ b/pkg/agent/controller/egress/egress_controller.go @@ -0,0 +1,674 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package egress + +import ( + "context" + "fmt" + "net" + "reflect" + "strings" + "sync" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" + + "github.com/vmware-tanzu/antrea/pkg/agent" + "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" + "github.com/vmware-tanzu/antrea/pkg/agent/openflow" + "github.com/vmware-tanzu/antrea/pkg/agent/route" + cpv1b2 "github.com/vmware-tanzu/antrea/pkg/apis/controlplane/v1beta2" + crdv1a2 "github.com/vmware-tanzu/antrea/pkg/apis/crd/v1alpha2" + crdinformers "github.com/vmware-tanzu/antrea/pkg/client/informers/externalversions/crd/v1alpha2" + crdlisters "github.com/vmware-tanzu/antrea/pkg/client/listers/crd/v1alpha2" + "github.com/vmware-tanzu/antrea/pkg/k8s" +) + +const ( + controllerName = "AntreaAgentEgressController" + // How long to wait before retrying the processing of an Egress change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + // Default number of workers processing an Egress change. + defaultWorkers = 4 + // Disable resyncing. + resyncPeriod time.Duration = 0 + // maxEgressIPsPerNode is the maximum number of Egress IPs can be configured on a Node. + maxEgressIPsPerNode = 255 +) + +var emptyWatch = watch.NewEmptyWatch() + +// egressState keeps the actual state of an Egress that has been realized. +type egressState struct { + // The actual egress IP of the Egress. If it's different from the desired IP, there is an update to EgressIP, and we + // need to remove previously installed flows. + egressIP string + // The actual openflow ports for which we have installed SNAT rules. Used to identify stale openflow ports when + // updating or deleting an Egress. + ofPorts sets.Int32 + // The actual Pods of the Egress. Used to identify stale Pods when updating or deleting an Egress. + pods sets.String +} + +// egressIPState keeps the actual state of a local Egress IP. It's maintained separately from egressState because +// multiple Egresses can share an EgressIP. +type egressIPState struct { + egressIP net.IP + // The names of the Egresses that are currently referring to it. + egressNames sets.String + // The datapath mark of this Egress IP. + mark uint32 + // Whether its flows have been installed. + flowsInstalled bool + // Whether its iptables rule has been installed. + ruleInstalled bool +} + +// egressBinding keeps the Egresses applying to a Pod. +// There is one effective Egress for a Pod at any given time. +type egressBinding struct { + effectiveEgress string + alternativeEgresses sets.String +} + +type EgressController struct { + ofClient openflow.Client + routeClient route.Interface + antreaClientProvider agent.AntreaClientProvider + + egressInformer crdinformers.EgressInformer + egressLister crdlisters.EgressLister + egressListerSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + + // Use an interface for IP detector to enable testing. + localIPDetector LocalIPDetector + ifaceStore interfacestore.InterfaceStore + nodeName string + idAllocator *idAllocator + + egressGroups map[string]sets.String + egressGroupsMutex sync.RWMutex + + egressBindings map[string]*egressBinding + egressBindingsMutex sync.RWMutex + + egressStates map[string]*egressState + // The mutex is to protect the map, not the egressState items. The workqueue guarantees an Egress will only be + // processed by a single worker at any time. So the returned EgressState has no race condition. + egressStatesMutex sync.RWMutex + + egressIPStates map[string]*egressIPState + egressIPStatesMutex sync.Mutex +} + +func NewEgressController( + ofClient openflow.Client, + egressInformer crdinformers.EgressInformer, + antreaClientGetter agent.AntreaClientProvider, + ifaceStore interfacestore.InterfaceStore, + routeClient route.Interface, + nodeName string, +) *EgressController { + c := &EgressController{ + ofClient: ofClient, + routeClient: routeClient, + antreaClientProvider: antreaClientGetter, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "egressgroup"), + egressInformer: egressInformer, + egressLister: egressInformer.Lister(), + egressListerSynced: egressInformer.Informer().HasSynced, + nodeName: nodeName, + ifaceStore: ifaceStore, + egressGroups: map[string]sets.String{}, + egressStates: map[string]*egressState{}, + egressIPStates: map[string]*egressIPState{}, + egressBindings: map[string]*egressBinding{}, + localIPDetector: &localIPDetector{}, + idAllocator: newIDAllocator(maxEgressIPsPerNode), + } + + egressInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueEgress, + UpdateFunc: func(old, cur interface{}) { + c.enqueueEgress(cur) + }, + DeleteFunc: c.enqueueEgress, + }, + resyncPeriod, + ) + return c +} + +func (c *EgressController) enqueueEgress(obj interface{}) { + egress, isEgress := obj.(*crdv1a2.Egress) + if !isEgress { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.Errorf("Received unexpected object: %v", obj) + return + } + egress, ok = deletedState.Obj.(*crdv1a2.Egress) + if !ok { + klog.Errorf("DeletedFinalStateUnknown contains non-Egress object: %v", deletedState.Obj) + return + } + } + c.queue.Add(egress.Name) +} + +// Run will create defaultWorkers workers (go routines) which will process the Egress events from the +// workqueue. +func (c *EgressController) Run(stopCh <-chan struct{}) { + defer c.queue.ShutDown() + + klog.Infof("Starting %s", controllerName) + defer klog.Infof("Shutting down %s", controllerName) + + if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.egressListerSynced) { + return + } + + go wait.NonSlidingUntil(c.watch, 5*time.Second, stopCh) + + for i := 0; i < defaultWorkers; i++ { + go wait.Until(c.worker, time.Second, stopCh) + } + <-stopCh +} + +// worker is a long-running function that will continually call the processNextWorkItem function in +// order to read and process a message on the workqueue. +func (c *EgressController) worker() { + for c.processNextWorkItem() { + } +} + +func (c *EgressController) processNextWorkItem() bool { + obj, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(obj) + + // We expect strings (Egress name) to come off the workqueue. + if key, ok := obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call Forget here else we'd + // go into a loop of attempting to process a work item that is invalid. + // This should not happen. + c.queue.Forget(obj) + klog.Errorf("Expected string in work queue but got %#v", obj) + return true + } else if err := c.syncEgress(key); err == nil { + // If no error occurs we Forget this item so it does not get queued again until + // another change happens. + c.queue.Forget(key) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.queue.AddRateLimited(key) + klog.Errorf("Error syncing Egress %s, requeuing. Error: %v", key, err) + } + return true +} + +// realizeEgressIP realizes a local Egress IP. Multiple Egresses can share the same Egress IP. +// If it's called the first time for an Egress IP, it allocates a locally-unique mark for the IP and installs flows and +// iptables rule for this IP and the mark. Otherwise it just returns the mark. +func (c *EgressController) realizeEgressIP(egressName, egressIP string) (uint32, error) { + c.egressIPStatesMutex.Lock() + defer c.egressIPStatesMutex.Unlock() + + ipState, exists := c.egressIPStates[egressIP] + // Create an egressIPState if this is the first Egress using the IP. + if !exists { + ipState = &egressIPState{ + egressIP: net.ParseIP(egressIP), + egressNames: sets.NewString(egressName), + } + c.egressIPStates[egressIP] = ipState + } else if !ipState.egressNames.Has(egressName) { + ipState.egressNames.Insert(egressName) + } + + // Ensure the Egress IP has a mark allocated. + var err error + if ipState.mark == 0 { + ipState.mark, err = c.idAllocator.allocate() + if err != nil { + return 0, fmt.Errorf("error allocating mark for IP %s: %v", egressIP, err) + } + } + // Ensure datapath is configured properly. + if !ipState.flowsInstalled { + if err := c.ofClient.InstallSNATMarkFlows(ipState.egressIP, ipState.mark); err != nil { + return 0, fmt.Errorf("error installing SNAT mark flows for IP %s: %v", ipState.egressIP, err) + } + ipState.flowsInstalled = true + } + if !ipState.ruleInstalled { + if err := c.routeClient.AddSNATRule(ipState.egressIP, ipState.mark); err != nil { + return 0, fmt.Errorf("error installing SNAT rule for IP %s: %v", ipState.egressIP, err) + } + ipState.ruleInstalled = true + } + return ipState.mark, nil +} + +// unrealizeEgressIP unrealizes a local Egress IP, reverts what realizeEgressIP does. +// Only when the last Egress unrealizes the Egress IP, it will releases the IP's mark and uninstalls corresponding flows +// and iptables rule. +func (c *EgressController) unrealizeEgressIP(egressName, egressIP string) error { + c.egressIPStatesMutex.Lock() + defer c.egressIPStatesMutex.Unlock() + + ipState, exist := c.egressIPStates[egressIP] + // The Egress IP was not configured before, do nothing. + if !exist { + return nil + } + // Unlink the Egress from the EgressIP. If it's the last Egress referring to it, uninstall its datapath rules and + // release the mark. + ipState.egressNames.Delete(egressName) + if len(ipState.egressNames) > 0 { + return nil + } + if ipState.mark != 0 { + if ipState.ruleInstalled { + if err := c.routeClient.DeleteSNATRule(ipState.mark); err != nil { + return err + } + ipState.ruleInstalled = false + } + if ipState.flowsInstalled { + if err := c.ofClient.UninstallSNATMarkFlows(ipState.mark); err != nil { + return err + } + ipState.flowsInstalled = false + } + c.idAllocator.release(ipState.mark) + } + delete(c.egressIPStates, egressIP) + return nil +} + +func (c *EgressController) getEgressState(egressName string) (*egressState, bool) { + c.egressStatesMutex.RLock() + defer c.egressStatesMutex.RUnlock() + state, exists := c.egressStates[egressName] + return state, exists +} + +func (c *EgressController) deleteEgressState(egressName string) { + c.egressStatesMutex.Lock() + defer c.egressStatesMutex.Unlock() + delete(c.egressStates, egressName) +} + +func (c *EgressController) newEgressState(egressName string, egressIP string) *egressState { + c.egressStatesMutex.Lock() + defer c.egressStatesMutex.Unlock() + state := &egressState{ + egressIP: egressIP, + ofPorts: sets.NewInt32(), + pods: sets.NewString(), + } + c.egressStates[egressName] = state + return state +} + +// uninstallEgress uninstalls datapath rules for an Egress. It's called when an Egress is removed or its Egress IP changes. +func (c *EgressController) uninstallEgress(egressName string) error { + egressState, exist := c.getEgressState(egressName) + // The Egress hasn't been installed, do nothing. + if !exist { + return nil + } + // Uninstall all of its Pod flows. + if err := c.uninstallPodFlows(egressState.ofPorts, egressState); err != nil { + return err + } + // Remove Pods from the Egress state after uninstalling Pod's flows to avoid overlapping. Otherwise another Egress + // may install new flows for the Pod before this Egress uninstalls its previous flows, causing conflicts. + c.removePodsFromEgress(egressName, egressState.pods, egressState) + // Release the EgressIP's mark if the Egress is the last one referring to it. + if err := c.unrealizeEgressIP(egressName, egressState.egressIP); err != nil { + return err + } + // Remove the Egress's state. + c.deleteEgressState(egressName) + return nil +} + +// bindPodEgress binds the Pod with the Egress and returns whether this Egress is the effective one for the Pod. +func (c *EgressController) bindPodEgress(pod, egress string) bool { + c.egressBindingsMutex.Lock() + defer c.egressBindingsMutex.Unlock() + + binding, exists := c.egressBindings[pod] + if !exists { + // Promote itself as the effective Egress if there was not one. + c.egressBindings[pod] = &egressBinding{ + effectiveEgress: egress, + alternativeEgresses: sets.NewString(), + } + return true + } + if binding.effectiveEgress == egress { + return true + } + if !binding.alternativeEgresses.Has(egress) { + binding.alternativeEgresses.Insert(egress) + } + return false +} + +// unbindPodEgress unbinds the Pod with the Egress. +// If the unbound Egress was the effective one for the Pod and there are any alternative ones, it will return the new +// effective Egress and true. Otherwise it return empty string and false. +func (c *EgressController) unbindPodEgress(pod, egress string) (string, bool) { + c.egressBindingsMutex.Lock() + defer c.egressBindingsMutex.Unlock() + + // The binding must exist. + binding := c.egressBindings[pod] + if binding.effectiveEgress == egress { + var popped bool + binding.effectiveEgress, popped = binding.alternativeEgresses.PopAny() + if !popped { + // Remove the Pod's binding if there is no alternative. + delete(c.egressBindings, pod) + return "", false + } + return binding.effectiveEgress, true + } + binding.alternativeEgresses.Delete(egress) + return "", false +} + +func (c *EgressController) syncEgress(egressName string) error { + startTime := time.Now() + defer func() { + klog.V(4).Infof("Finished syncing Egress for %s. (%v)", egressName, time.Since(startTime)) + }() + + egress, err := c.egressLister.Get(egressName) + if err != nil { + if errors.IsNotFound(err) { + return c.uninstallEgress(egressName) + } + return err + } + + eState, exist := c.getEgressState(egressName) + // If the EgressIP changes, uninstall the Egress first, then installs the Egress with new IP. + if exist && eState.egressIP != egress.Spec.EgressIP { + if err := c.uninstallEgress(egressName); err != nil { + return err + } + exist = false + } + + if !exist { + eState = c.newEgressState(egressName, egress.Spec.EgressIP) + } + + var mark uint32 + // Get or allocate a mark for the EgressIP if it's local. + isLocalIP, err := c.localIPDetector.IsLocalIP(egress.Spec.EgressIP) + if err != nil { + return err + } + if isLocalIP { + mark, err = c.realizeEgressIP(egressName, egress.Spec.EgressIP) + if err != nil { + return err + } + } + + // Copy the previous ofPorts and Pods. They will be used to identify stale ofPorts and Pods. + staleOFPorts := eState.ofPorts.Union(nil) + stalePods := eState.pods.Union(nil) + + // Get a copy of the desired Pods. + pods := func() sets.String { + c.egressGroupsMutex.RLock() + defer c.egressGroupsMutex.RUnlock() + pods, exist := c.egressGroups[egressName] + if !exist { + return nil + } + return pods.Union(nil) + }() + + egressIP := net.ParseIP(eState.egressIP) + // Install SNAT flows for desired Pods. + for pod := range pods { + eState.pods.Insert(pod) + stalePods.Delete(pod) + + // If the Egress is not the effective one for the Pod, do nothing. + if !c.bindPodEgress(pod, egressName) { + continue + } + + // Get the Pod's openflow port. + parts := strings.Split(pod, "/") + podNamespace, podName := parts[0], parts[1] + ifaces := c.ifaceStore.GetContainerInterfacesByPod(podName, podNamespace) + if len(ifaces) == 0 { + klog.V(2).Infof("Interfaces of Pod %s/%s not found", podNamespace, podName) + continue + } + + ofPort := ifaces[0].OFPort + if eState.ofPorts.Has(ofPort) { + staleOFPorts.Delete(ofPort) + continue + } + if err := c.ofClient.InstallPodSNATFlows(uint32(ofPort), egressIP, mark); err != nil { + return err + } + eState.ofPorts.Insert(ofPort) + } + + // Uninstall SNAT flows for stale Pods. + if err := c.uninstallPodFlows(staleOFPorts, eState); err != nil { + return err + } + c.removePodsFromEgress(egressName, stalePods, eState) + return nil +} + +func (c *EgressController) uninstallPodFlows(ofPorts sets.Int32, egressState *egressState) error { + for ofPort := range ofPorts { + if err := c.ofClient.UninstallPodSNATFlows(uint32(ofPort)); err != nil { + return err + } + egressState.ofPorts.Delete(ofPort) + } + return nil +} + +// removePodsFromEgress removes Pods from the Egress's actual state. +// For each Pod, if the Egress was the Pod's effective Egress and there are other Egresses applying to it, it will pick +// one and trigger its resync. +func (c *EgressController) removePodsFromEgress(egressName string, pods sets.String, egressState *egressState) { + newEffectiveEgresses := sets.NewString() + for pod := range pods { + delete(egressState.pods, pod) + newEffectiveEgress, exists := c.unbindPodEgress(pod, egressName) + if exists { + newEffectiveEgresses.Insert(newEffectiveEgress) + } + } + // Trigger resyncing of the new effective Egresses of the removed Pods. + for egress := range newEffectiveEgresses { + c.queue.Add(egress) + } +} + +func (c *EgressController) watch() { + klog.Info("Starting watch for EgressGroup") + antreaClient, err := c.antreaClientProvider.GetAntreaClient() + if err != nil { + klog.Warningf("Failed to get antrea client: %v", err) + return + } + options := metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("nodeName", c.nodeName).String(), + } + watcher, err := antreaClient.ControlplaneV1beta2().EgressGroups().Watch(context.TODO(), options) + if err != nil { + klog.Warningf("Failed to start watch for EgressGroup: %v", err) + return + } + // Watch method doesn't return error but "emptyWatch" in case of some partial data errors, + // e.g. timeout error. Make sure that watcher is not empty and log warning otherwise. + if reflect.TypeOf(watcher) == reflect.TypeOf(emptyWatch) { + klog.Warning("Failed to start watch for EgressGroup, please ensure antrea service is reachable for the agent") + return + } + + klog.Info("Started watch for EgressGroup") + eventCount := 0 + defer func() { + klog.Infof("Stopped watch for EgressGroup, total items received: %d", eventCount) + watcher.Stop() + }() + + // First receive init events from the result channel and buffer them until + // a Bookmark event is received, indicating that all init events have been + // received. + var initObjects []*cpv1b2.EgressGroup +loop: + for { + select { + case event, ok := <-watcher.ResultChan(): + if !ok { + klog.Warningf("Result channel for EgressGroup was closed") + return + } + switch event.Type { + case watch.Added: + klog.V(2).Infof("Added EgressGroup (%#v)", event.Object) + initObjects = append(initObjects, event.Object.(*cpv1b2.EgressGroup)) + case watch.Bookmark: + break loop + } + } + } + klog.Infof("Received %d init events for EgressGroup", len(initObjects)) + + eventCount += len(initObjects) + c.replaceEgressGroups(initObjects) + + for { + select { + case event, ok := <-watcher.ResultChan(): + if !ok { + return + } + switch event.Type { + case watch.Added: + c.addEgressGroup(event.Object.(*cpv1b2.EgressGroup)) + klog.V(2).Infof("Added EgressGroup (%#v)", event.Object) + case watch.Modified: + c.patchEgressGroup(event.Object.(*cpv1b2.EgressGroupPatch)) + klog.V(2).Infof("Updated EgressGroup (%#v)", event.Object) + case watch.Deleted: + c.deleteEgressGroup(event.Object.(*cpv1b2.EgressGroup)) + klog.V(2).Infof("Removed EgressGroup (%#v)", event.Object) + default: + klog.Errorf("Unknown event: %v", event) + return + } + eventCount++ + } + } +} + +func (c *EgressController) replaceEgressGroups(groups []*cpv1b2.EgressGroup) { + c.egressGroupsMutex.Lock() + defer c.egressGroupsMutex.Unlock() + + oldGroupKeys := make(sets.String, len(c.egressGroups)) + for key := range c.egressGroups { + oldGroupKeys.Insert(key) + } + + for _, group := range groups { + oldGroupKeys.Delete(group.Name) + pods := sets.NewString() + for _, member := range group.GroupMembers { + pods.Insert(k8s.NamespacedName(member.Pod.Namespace, member.Pod.Name)) + } + prevPods := c.egressGroups[group.Name] + if pods.Equal(prevPods) { + continue + } + c.egressGroups[group.Name] = pods + c.queue.Add(group.Name) + } + + for key := range oldGroupKeys { + delete(c.egressGroups, key) + c.queue.Add(key) + } +} + +func (c *EgressController) addEgressGroup(group *cpv1b2.EgressGroup) { + pods := sets.NewString() + for _, member := range group.GroupMembers { + pods.Insert(k8s.NamespacedName(member.Pod.Namespace, member.Pod.Name)) + } + + c.egressGroupsMutex.Lock() + defer c.egressGroupsMutex.Unlock() + + c.egressGroups[group.Name] = pods + c.queue.Add(group.Name) +} + +func (c *EgressController) patchEgressGroup(patch *cpv1b2.EgressGroupPatch) { + c.egressGroupsMutex.Lock() + defer c.egressGroupsMutex.Unlock() + + for _, member := range patch.AddedGroupMembers { + c.egressGroups[patch.Name].Insert(k8s.NamespacedName(member.Pod.Namespace, member.Pod.Name)) + + } + for _, member := range patch.RemovedGroupMembers { + c.egressGroups[patch.Name].Delete(k8s.NamespacedName(member.Pod.Namespace, member.Pod.Name)) + } + c.queue.Add(patch.Name) +} + +func (c *EgressController) deleteEgressGroup(group *cpv1b2.EgressGroup) { + c.egressGroupsMutex.Lock() + defer c.egressGroupsMutex.Unlock() + + delete(c.egressGroups, group.Name) + c.queue.Add(group.Name) +} diff --git a/pkg/agent/controller/egress/id_allocator.go b/pkg/agent/controller/egress/id_allocator.go new file mode 100644 index 00000000000..5ab4849b460 --- /dev/null +++ b/pkg/agent/controller/egress/id_allocator.go @@ -0,0 +1,59 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package egress + +import ( + "container/list" + "fmt" + "sync" +) + +type idAllocator struct { + sync.Mutex + unallocatedPool *list.List +} + +func (a *idAllocator) allocate() (uint32, error) { + a.Lock() + defer a.Unlock() + + if a.unallocatedPool.Len() == 0 { + return 0, fmt.Errorf("no ID available") + } + + firstUnallocated := a.unallocatedPool.Front() + value := a.unallocatedPool.Remove(firstUnallocated) + + v := value.(uint32) + return v, nil +} + +func (a *idAllocator) release(id uint32) error { + a.Lock() + defer a.Unlock() + + a.unallocatedPool.PushFront(id) + return nil +} + +func newIDAllocator(size uint32) *idAllocator { + unallocatedPool := list.New() + for i := uint32(1); i <= size; i++ { + unallocatedPool.PushBack(i) + } + return &idAllocator{ + unallocatedPool: unallocatedPool, + } +} diff --git a/pkg/agent/controller/egress/local_ip_detector.go b/pkg/agent/controller/egress/local_ip_detector.go new file mode 100644 index 00000000000..bc14969f382 --- /dev/null +++ b/pkg/agent/controller/egress/local_ip_detector.go @@ -0,0 +1,41 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package egress + +import ( + "net" +) + +type LocalIPDetector interface { + IsLocalIP(ip string) (bool, error) +} + +type localIPDetector struct{} + +// IsLocalIP checks if the provided IP is configured on the Node. +// TODO: Instead of listing all IP addresses every time, it can maintain a cache and subscribe the IP address change to +// keep the cache up-to-date. +func (d *localIPDetector) IsLocalIP(ip string) (bool, error) { + addrs, err := net.InterfaceAddrs() + if err != nil { + return false, err + } + for _, addr := range addrs { + if ip == addr.(*net.IPNet).IP.String() { + return true, nil + } + } + return false, nil +} diff --git a/test/e2e/egress_test.go b/test/e2e/egress_test.go new file mode 100644 index 00000000000..09e9257f9a3 --- /dev/null +++ b/test/e2e/egress_test.go @@ -0,0 +1,209 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/vmware-tanzu/antrea/pkg/apis/crd/v1alpha2" +) + +func TestEgress(t *testing.T) { + skipIfNotIPv4Cluster(t) + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer teardownTest(t, data) + + cc := []configChange{ + {"Egress", "true", true}, + } + ac := []configChange{ + {"Egress", "true", true}, + } + if err := data.mutateAntreaConfigMap(cc, ac, true, true); err != nil { + t.Fatalf("Failed to enable NetworkPolicyStats feature: %v", err) + } + + egressNode := controlPlaneNodeName() + egressNodeIP := controlPlaneNodeIP() + localIP0 := "1.1.1.10" + localIP1 := "1.1.1.11" + serverIP := "1.1.1.20" + fakeServer := "fakeserver" + + // Create a http server in another netns to fake an external server connected to the egress Node. + cmd := fmt.Sprintf(`ip netns add %[1]s && \ +ip link add dev %[1]s-a type veth peer name %[1]s-b && \ +ip link set dev %[1]s-a netns %[1]s && \ +ip addr add %[3]s/24 dev %[1]s-b && \ +ip addr add %[4]s/24 dev %[1]s-b && \ +ip link set dev %[1]s-b up && \ +ip netns exec %[1]s ip addr add %[2]s/24 dev %[1]s-a && \ +ip netns exec %[1]s ip link set dev %[1]s-a up && \ +ip netns exec %[1]s ip route replace default via %[3]s && \ +ip netns exec %[1]s /agnhost netexec +`, fakeServer, serverIP, localIP0, localIP1) + if err := data.createPodOnNode(fakeServer, egressNode, agnhostImage, []string{"sh", "-c", cmd}, nil, nil, nil, true, func(pod *v1.Pod) { + privileged := true + pod.Spec.Containers[0].SecurityContext = &v1.SecurityContext{Privileged: &privileged} + }); err != nil { + t.Fatalf("Failed to create server Pod: %v", err) + } + defer deletePodWrapper(t, data, fakeServer) + if err := data.podWaitForRunning(defaultTimeout, fakeServer, testNamespace); err != nil { + t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", fakeServer) + } + + localPod := "localpod" + remotePod := "remotepod" + if err := data.createBusyboxPodOnNode(localPod, egressNode); err != nil { + t.Fatalf("Failed to create local Pod: %v", err) + } + defer deletePodWrapper(t, data, localPod) + if err := data.podWaitForRunning(defaultTimeout, localPod, testNamespace); err != nil { + t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", localPod) + } + if err := data.createBusyboxPodOnNode(remotePod, workerNodeName(1)); err != nil { + t.Fatalf("Failed to create remote Pod: %v", err) + } + defer deletePodWrapper(t, data, remotePod) + if err := data.podWaitForRunning(defaultTimeout, remotePod, testNamespace); err != nil { + t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", remotePod) + } + + getClientIP := func(pod string) (string, string, error) { + cmd := []string{"wget", "-T", "3", "-O", "-", fmt.Sprintf("%s:8080/clientip", serverIP)} + return data.runCommandFromPod(testNamespace, pod, busyboxContainerName, cmd) + } + + assertClientIP := func(pod string, clientIP string) { + var exeErr error + var stdout, stderr string + if err := wait.Poll(100*time.Millisecond, 2*time.Second, func() (done bool, err error) { + stdout, stderr, exeErr = getClientIP(pod) + if exeErr != nil { + return false, nil + } + // The stdout return is in this format: x.x.x.x:port or [xx:xx:xx::x]:port + host, _, err := net.SplitHostPort(stdout) + if err != nil { + return false, nil + } + return host == clientIP, nil + }); err != nil { + t.Fatalf("Failed to get expected client IP %s for Pod %s, stdout: %s, stderr: %s, err: %v", clientIP, pod, stdout, stderr, exeErr) + } + } + + assertConnError := func(pod string) { + var exeErr error + var stdout, stderr string + if err := wait.Poll(100*time.Millisecond, 2*time.Second, func() (done bool, err error) { + stdout, stderr, exeErr = getClientIP(pod) + if exeErr != nil { + return true, nil + } + return false, nil + }); err != nil { + t.Fatalf("Failed to get expected error, stdout: %v, stderr: %v, err: %v", stdout, stderr, exeErr) + } + } + + // As the fake server runs in a netns of the Egress Node, only egress Node can reach the server, Pods running on + // other Nodes cannot reach it before Egress is added. + assertClientIP(localPod, localIP0) + assertConnError(remotePod) + + t.Logf("Creating an Egress applying to both Pods") + egress := &v1alpha2.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "test-egress"}, + Spec: v1alpha2.EgressSpec{ + AppliedTo: v1alpha2.AppliedTo{ + PodSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "antrea-e2e", + Operator: metav1.LabelSelectorOpExists, + }, + }, + }, + }, + // Due to the implementation restriction that SNAT IPs must be reachable from worker Nodes (because SNAT IPs + // are used as tunnel destination IP), it can only use the Node IP of the Egress Node to verify the case of + // remote Pod. + EgressIP: egressNodeIP, + }, + } + egress, err = data.crdClient.CrdV1alpha2().Egresses().Create(context.TODO(), egress, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create Egress %v: %v", egress, err) + } + defer data.crdClient.CrdV1alpha2().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) + assertClientIP(localPod, egressNodeIP) + assertClientIP(remotePod, egressNodeIP) + + t.Log("Updating the Egress's AppliedTo to remotePod only") + egress.Spec.AppliedTo = v1alpha2.AppliedTo{ + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"antrea-e2e": remotePod}, + }, + } + egress, err = data.crdClient.CrdV1alpha2().Egresses().Update(context.TODO(), egress, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update Egress %v: %v", egress, err) + } + assertClientIP(localPod, localIP0) + assertClientIP(remotePod, egressNodeIP) + + t.Log("Updating the Egress's AppliedTo to localPod only") + egress.Spec.AppliedTo = v1alpha2.AppliedTo{ + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"antrea-e2e": localPod}, + }, + } + egress, err = data.crdClient.CrdV1alpha2().Egresses().Update(context.TODO(), egress, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update Egress %v: %v", egress, err) + } + assertClientIP(localPod, egressNodeIP) + assertConnError(remotePod) + + t.Logf("Updating the Egress's EgressIP to %s", localIP1) + egress.Spec.EgressIP = localIP1 + egress, err = data.crdClient.CrdV1alpha2().Egresses().Update(context.TODO(), egress, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update Egress %v: %v", egress, err) + } + assertClientIP(localPod, localIP1) + assertConnError(remotePod) + + t.Log("Deleting the Egress") + err = data.crdClient.CrdV1alpha2().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Failed to delete Egress %v: %v", egress, err) + } + assertClientIP(localPod, localIP0) + assertConnError(remotePod) +} diff --git a/test/e2e/framework.go b/test/e2e/framework.go index ef303148758..3a2ab2ca471 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -122,6 +122,7 @@ type ClusterInfo struct { svcV4NetworkCIDR string svcV6NetworkCIDR string controlPlaneNodeName string + controlPlaneNodeIP string nodes map[int]ClusterNode k8sServerVersion string } @@ -230,6 +231,10 @@ func controlPlaneNodeName() string { return clusterInfo.controlPlaneNodeName } +func controlPlaneNodeIP() string { + return clusterInfo.controlPlaneNodeIP +} + // nodeName returns an empty string if there is no Node with the provided idx. If idx is 0, the name // of the control-plane Node will be returned. func nodeName(idx int) string { @@ -308,24 +313,25 @@ func collectClusterInfo() error { return ok }() + var nodeIP string + for _, address := range node.Status.Addresses { + if address.Type == corev1.NodeInternalIP { + nodeIP = address.Address + break + } + } + var nodeIdx int // If multiple control-plane Nodes (HA), we will select the last one in the list if isControlPlaneNode { nodeIdx = 0 clusterInfo.controlPlaneNodeName = node.Name + clusterInfo.controlPlaneNodeIP = nodeIP } else { nodeIdx = workerIdx workerIdx++ } - var nodeIP string - for _, address := range node.Status.Addresses { - if address.Type == corev1.NodeInternalIP { - nodeIP = address.Address - break - } - } - var podV4NetworkCIDR, podV6NetworkCIDR string var gwV4Addr, gwV6Addr string processPodCIDR := func(podCIDR string) error {