From adcce440d6e1123a9d255ab1c8b4af5911429a09 Mon Sep 17 00:00:00 2001 From: gran Date: Mon, 13 Jul 2020 16:56:46 +0800 Subject: [PATCH] Support service destination in traceflow --- build/yamls/antrea-eks.yml | 5 ++ build/yamls/antrea-gke.yml | 5 ++ build/yamls/antrea-ipsec.yml | 5 ++ build/yamls/antrea.yml | 5 ++ build/yamls/base/crds.yml | 3 + cmd/antrea-agent/agent.go | 1 + cmd/antrea-controller/controller.go | 2 +- pkg/agent/controller/traceflow/packetin.go | 34 ++++++++ .../traceflow/traceflow_controller.go | 50 +++++++++-- pkg/agent/openflow/pipeline.go | 15 +++- pkg/controller/traceflow/controller.go | 39 ++++++++- test/e2e/traceflow_test.go | 83 +++++++++++++++++++ 12 files changed, 231 insertions(+), 16 deletions(-) diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index d4a4016a660..e524f2fa9a0 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -211,6 +211,9 @@ spec: - required: - pod - namespace + - required: + - service + - namespace - required: - ip properties: @@ -221,6 +224,8 @@ spec: type: string pod: type: string + service: + type: string type: object packet: properties: diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index f7b17f07454..1a723e52ad2 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -211,6 +211,9 @@ spec: - required: - pod - namespace + - required: + - service + - namespace - required: - ip properties: @@ -221,6 +224,8 @@ spec: type: string pod: type: string + service: + type: string type: object packet: properties: diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index f4c14af81cd..b81dd7411fc 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -211,6 +211,9 @@ spec: - required: - pod - namespace + - required: + - service + - namespace - required: - ip properties: @@ -221,6 +224,8 @@ spec: type: string pod: type: string + service: + type: string type: object packet: properties: diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 4906e2ba7c4..6298920cf2c 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -211,6 +211,9 @@ spec: - required: - pod - namespace + - required: + - service + - namespace - required: - ip properties: @@ -221,6 +224,8 @@ spec: type: string pod: type: string + service: + type: string type: object packet: properties: diff --git a/build/yamls/base/crds.yml b/build/yamls/base/crds.yml index c2f591f170c..9edd43fe705 100644 --- a/build/yamls/base/crds.yml +++ b/build/yamls/base/crds.yml @@ -104,6 +104,8 @@ spec: properties: pod: type: string + service: + type: string namespace: type: string ip: @@ -111,6 +113,7 @@ spec: format: ipv4 oneOf: - required: ["pod", "namespace"] + - required: ["service", "namespace"] - required: ["ip"] packet: type: object diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 2dd9386f02e..86233bfbd90 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -138,6 +138,7 @@ func run(o *Options) error { if features.DefaultFeatureGate.Enabled(features.Traceflow) { traceflowController = traceflow.NewTraceflowController( k8sClient, + informerFactory, crdClient, traceflowInformer, ofClient, diff --git a/cmd/antrea-controller/controller.go b/cmd/antrea-controller/controller.go index 8fd19509ee7..96515464e3a 100644 --- a/cmd/antrea-controller/controller.go +++ b/cmd/antrea-controller/controller.go @@ -93,7 +93,7 @@ func run(o *Options) error { var traceflowController *traceflow.Controller if features.DefaultFeatureGate.Enabled(features.Traceflow) { - traceflowController = traceflow.NewTraceflowController(crdClient, traceflowInformer) + traceflowController = traceflow.NewTraceflowController(crdClient, podInformer, traceflowInformer) } apiServerConfig, err := createAPIServerConfig(o.config.ClientConnection.Kubeconfig, diff --git a/pkg/agent/controller/traceflow/packetin.go b/pkg/agent/controller/traceflow/packetin.go index 96ccce1c010..26ba08b5305 100644 --- a/pkg/agent/controller/traceflow/packetin.go +++ b/pkg/agent/controller/traceflow/packetin.go @@ -22,6 +22,7 @@ import ( "time" "github.com/contiv/libOpenflow/openflow13" + "github.com/contiv/libOpenflow/protocol" "github.com/contiv/ofnet/ofctrl" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/retry" @@ -100,6 +101,27 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*opsv1alpha1.Tracefl obs = append(obs, *ob) } + // Collect service DNAT. + if pktIn.Data.Ethertype == 0x800 { + ipPacket, ok := pktIn.Data.Data.(*protocol.IPv4) + if !ok { + return nil, nil, errors.New("invalid traceflow IPv4 packet") + } + ctNwDst, err := getInfoInCtNwDstField(matchers) + if err != nil { + return nil, nil, err + } + ipDst := ipPacket.NWDst.String() + if ctNwDst != "" && ipDst != ctNwDst { + ob := &opsv1alpha1.Observation{ + Component: opsv1alpha1.LB, + Action: opsv1alpha1.Forwarded, + TranslatedDstIP: ipDst, + } + obs = append(obs, *ob) + } + } + // Collect egress conjunctionID and get NetworkPolicy from cache. if match = getMatchRegField(matchers, uint32(openflow.EgressReg)); match != nil { egressInfo, err := getInfoInReg(match, nil) @@ -194,3 +216,15 @@ func getInfoInTunnelDst(regMatch *ofctrl.MatchField) (string, error) { } return regValue.String(), nil } + +func getInfoInCtNwDstField(matchers *ofctrl.Matchers) (string, error) { + match := matchers.GetMatchByName("NXM_NX_CT_NW_DST") + if match == nil { + return "", nil + } + regValue, ok := match.GetValue().(net.IP) + if !ok { + return "", errors.New("packet-in conntrack IP destination value cannot be retrieved from metadata") + } + return regValue.String(), nil +} diff --git a/pkg/agent/controller/traceflow/traceflow_controller.go b/pkg/agent/controller/traceflow/traceflow_controller.go index 23b4ca7b862..620b982e026 100644 --- a/pkg/agent/controller/traceflow/traceflow_controller.go +++ b/pkg/agent/controller/traceflow/traceflow_controller.go @@ -26,7 +26,9 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" @@ -38,6 +40,7 @@ import ( clientsetversioned "github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned" opsinformers "github.com/vmware-tanzu/antrea/pkg/client/informers/externalversions/ops/v1alpha1" opslisters "github.com/vmware-tanzu/antrea/pkg/client/listers/ops/v1alpha1" + "github.com/vmware-tanzu/antrea/pkg/features" "github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig" ) @@ -65,6 +68,8 @@ const ( // the switch for traceflow request. type Controller struct { kubeClient clientset.Interface + serviceLister corelisters.ServiceLister + serviceListerSynced cache.InformerSynced traceflowClient clientsetversioned.Interface traceflowInformer opsinformers.TraceflowInformer traceflowLister opslisters.TraceflowLister @@ -85,6 +90,7 @@ type Controller struct { // events. func NewTraceflowController( kubeClient clientset.Interface, + informerFactory informers.SharedInformerFactory, traceflowClient clientsetversioned.Interface, traceflowInformer opsinformers.TraceflowInformer, client openflow.Client, @@ -118,6 +124,11 @@ func NewTraceflowController( ) // Register packetInHandler c.ofClient.RegisterPacketInHandler("traceflow", c) + // Add serviceLister if AntreaProxy enabled + if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { + c.serviceLister = informerFactory.Core().V1().Services().Lister() + c.serviceListerSynced = informerFactory.Core().V1().Services().Informer().HasSynced + } return c } @@ -135,6 +146,12 @@ func (c *Controller) Run(stopCh <-chan struct{}) { defer klog.Infof("Shutting down %s", controllerName) klog.Infof("Waiting for caches to sync for %s", controllerName) + if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { + if !cache.WaitForCacheSync(stopCh, c.serviceListerSynced) { + klog.Errorf("Unable to sync service cache for %s", controllerName) + return + } + } if !cache.WaitForCacheSync(stopCh, c.traceflowListerSynced) { klog.Errorf("Unable to sync caches for %s", controllerName) return @@ -244,17 +261,21 @@ func (c *Controller) syncTraceflow(traceflowName string) error { // startTraceflow deploys OVS flow entries for Traceflow and inject packet if current Node // is Sender Node. func (c *Controller) startTraceflow(tf *opsv1alpha1.Traceflow) error { - // Deploy flow entries for traceflow - klog.V(2).Infof("Deploy flow entries for Traceflow %s", tf.Name) - err := c.ofClient.InstallTraceflowFlows(tf.Status.DataplaneTag) + err := c.validateTraceflow(tf) defer func() { if err != nil { - c.errorTraceflowCRD(tf, fmt.Sprintf("Node: %s, error: %+v", tf.Name, err)) + c.errorTraceflowCRD(tf, fmt.Sprintf("Node: %s, error: %+v", c.nodeConfig.Name, err)) } }() if err != nil { return err } + // Deploy flow entries for traceflow + klog.V(2).Infof("Deploy flow entries for Traceflow %s", tf.Name) + err = c.ofClient.InstallTraceflowFlows(tf.Status.DataplaneTag) + if err != nil { + return err + } // TODO: let controller compute the source Node, and the source Node can just return an error, // if fails to find the Pod. @@ -268,6 +289,13 @@ func (c *Controller) startTraceflow(tf *opsv1alpha1.Traceflow) error { return err } +func (c *Controller) validateTraceflow(tf *opsv1alpha1.Traceflow) error { + if tf.Spec.Destination.Service != "" && !features.DefaultFeatureGate.Enabled(features.AntreaProxy) { + return errors.New("service destination requires AntreaProxy feature enabled") + } + return nil +} + func (c *Controller) injectPacket(tf *opsv1alpha1.Traceflow) error { podInterfaces := c.interfaceStore.GetContainerInterfacesByPod(tf.Spec.Source.Pod, tf.Spec.Source.Namespace) // Update Traceflow phase to Running. @@ -285,7 +313,7 @@ func (c *Controller) injectPacket(tf *opsv1alpha1.Traceflow) error { if hasInterface { dstMAC = dstPodInterface.MAC.String() } - } else { + } else if tf.Spec.Destination.Pod != "" { dstPodInterfaces := c.interfaceStore.GetContainerInterfacesByPod(tf.Spec.Destination.Pod, tf.Spec.Destination.Namespace) if len(dstPodInterfaces) > 0 { dstMAC = dstPodInterfaces[0].MAC.String() @@ -299,10 +327,18 @@ func (c *Controller) injectPacket(tf *opsv1alpha1.Traceflow) error { dstIP = dstPod.Status.PodIP dstNodeIP = dstPod.Status.HostIP } + } else if tf.Spec.Destination.Service != "" { + dstSvc, err := c.serviceLister.Services(tf.Spec.Destination.Namespace).Get(tf.Spec.Destination.Service) + if err != nil { + return err + } + dstIP = dstSvc.Spec.ClusterIP } - if dstNodeIP != "" { + // Check encap status if Node of destination Pod is remote or destination is IP/Service + if dstNodeIP != "" || tf.Spec.Destination.Pod == "" { peerIP := net.ParseIP(dstNodeIP) - if c.networkConfig.TunnelType == ovsconfig.GeneveTunnel && peerIP != nil && c.networkConfig.TrafficEncapMode.NeedsEncapToPeer(peerIP, c.nodeConfig.NodeIPAddr) { + if c.networkConfig.TunnelType == ovsconfig.GeneveTunnel && (tf.Spec.Destination.Pod == "" || c.networkConfig.TrafficEncapMode.NeedsEncapToPeer(peerIP, c.nodeConfig.NodeIPAddr)) { + // Destination is IP/Service or packet will be encapsulated to remote Node // Wait a small period for other Nodes. time.Sleep(time.Duration(injectPacketDelay) * time.Second) } else { diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index fafb7b3803f..762ccc22059 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -513,12 +513,19 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow { // avoid unexpected packet drop in Traceflow. func (c *client) traceflowConnectionTrackFlows(dataplaneTag uint8, category cookie.Category) binding.Flow { connectionTrackStateTable := c.pipeline[conntrackStateTable] - return connectionTrackStateTable.BuildFlow(priorityNormal+2). + flowBuilder := connectionTrackStateTable.BuildFlow(priorityLow+2). MatchRegRange(int(TraceflowReg), uint32(dataplaneTag), OfTraceflowMarkRange). SetHardTimeout(300). - Action().ResubmitToTable(connectionTrackStateTable.GetNext()). - Cookie(c.cookieAllocator.Request(category).Raw()). - Done() + Cookie(c.cookieAllocator.Request(category).Raw()) + if c.enableProxy { + flowBuilder = flowBuilder. + Action().ResubmitToTable(sessionAffinityTable). + Action().ResubmitToTable(serviceLBTable) + } else { + flowBuilder = flowBuilder. + Action().ResubmitToTable(connectionTrackStateTable.GetNext()) + } + return flowBuilder.Done() } // reEntranceBypassCTFlow generates flow that bypass CT for traffic re-entering host network space. diff --git a/pkg/controller/traceflow/controller.go b/pkg/controller/traceflow/controller.go index 5e75f3822e3..774ec009f2e 100644 --- a/pkg/controller/traceflow/controller.go +++ b/pkg/controller/traceflow/controller.go @@ -18,13 +18,16 @@ import ( "context" "encoding/json" "errors" + "fmt" "sync" "time" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" @@ -58,6 +61,7 @@ var ( // Controller is for traceflow. type Controller struct { client versioned.Interface + podInformer coreinformers.PodInformer traceflowInformer opsinformers.TraceflowInformer traceflowLister opslisters.TraceflowLister traceflowListerSynced cache.InformerSynced @@ -66,10 +70,11 @@ type Controller struct { runningTraceflows map[uint8]string // tag->traceflowName if tf.Status.Phase is Running. } -// NewTraceflowController creates a new traceflow controller. -func NewTraceflowController(client versioned.Interface, traceflowInformer opsinformers.TraceflowInformer) *Controller { +// NewTraceflowController creates a new traceflow controller and adds podIP indexer to podInformer. +func NewTraceflowController(client versioned.Interface, podInformer coreinformers.PodInformer, traceflowInformer opsinformers.TraceflowInformer) *Controller { c := &Controller{ client: client, + podInformer: podInformer, traceflowInformer: traceflowInformer, traceflowLister: traceflowInformer.Lister(), traceflowListerSynced: traceflowInformer.Informer().HasSynced, @@ -84,9 +89,21 @@ func NewTraceflowController(client versioned.Interface, traceflowInformer opsinf }, resyncPeriod, ) + podInformer.Informer().AddIndexers(cache.Indexers{"podIP": podIPIndexFunc}) return c } +func podIPIndexFunc(obj interface{}) ([]string, error) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return nil, fmt.Errorf("obj is not pod: %+v", obj) + } + if pod.Status.PodIP != "" && pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed { + return []string{pod.Status.PodIP}, nil + } + return nil, nil +} + // enqueueTraceflow adds an object to the controller work queue. func (c *Controller) enqueueTraceflow(tf *opsv1alpha1.Traceflow) { c.queue.Add(tf.Name) @@ -224,14 +241,28 @@ func (c *Controller) checkTraceflowStatus(tf *opsv1alpha1.Traceflow) (retry bool retry = false sender := false receiver := false - for _, nodeResult := range tf.Status.Results { - for _, ob := range nodeResult.Observations { + for i, nodeResult := range tf.Status.Results { + for j, ob := range nodeResult.Observations { if ob.Component == opsv1alpha1.SpoofGuard { sender = true } if ob.Action == opsv1alpha1.Delivered || ob.Action == opsv1alpha1.Dropped { receiver = true } + if ob.TranslatedDstIP != "" { + // Add Pod ns/name to observation if TranslatedDstIP (a.k.a. Service Endpoint address) is Pod IP. + pods, err := c.podInformer.Informer().GetIndexer().ByIndex("podIP", ob.TranslatedDstIP) + if err != nil { + klog.Warningf("Get Pods by IP failed, error: %+v", err) + } else if len(pods) > 0 { + pod, ok := pods[0].(*corev1.Pod) + if !ok { + klog.Warningf("Invalid Pod obj in cache") + } else { + tf.Status.Results[i].Observations[j].Pod = fmt.Sprintf("%s/%s", pod.Namespace, pod.Name) + } + } + } } } if sender && receiver { diff --git a/test/e2e/traceflow_test.go b/test/e2e/traceflow_test.go index 2e9aca17c59..6713eda0eac 100644 --- a/test/e2e/traceflow_test.go +++ b/test/e2e/traceflow_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -59,6 +60,12 @@ func TestTraceflow(t *testing.T) { defer node1CleanupFn() defer node2CleanupFn() + require.NoError(t, data.createNginxPod("nginx", node2)) + nginxIP, err := data.podWaitForIP(defaultTimeout, "nginx", testNamespace) + require.NoError(t, err) + svc, err := data.createNginxClusterIPService(false) + require.NoError(t, err) + // Setup 2 NetworkPolicies: // 1. Allow all egress traffic. // 2. Deny ingress traffic on pod with label antrea-e2e = node1Pods[1]. So flow node1Pods[0] -> node1Pods[1] will be dropped. @@ -97,6 +104,7 @@ func TestTraceflow(t *testing.T) { // 2. node1Pods[0] -> node2Pods[0], inter node1 and node2. // 3. node1Pods[0] -> node1IPs[1], intra node1. // 4. node1Pods[0] -> node2IPs[0], inter node1 and node2. + // 5. node1Pods[0] -> service, inter node1 and node2. testcases := []testcase{ { name: "intraNodeTraceflow", @@ -321,6 +329,78 @@ func TestTraceflow(t *testing.T) { }, }, }, + { + name: "serviceTraceflow", + tf: &v1alpha1.Traceflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-svc-%s-", testNamespace, node1Pods[0], svc.Name)), + }, + Spec: v1alpha1.TraceflowSpec{ + Source: v1alpha1.Source{ + Namespace: testNamespace, + Pod: node1Pods[0], + }, + Destination: v1alpha1.Destination{ + Namespace: testNamespace, + Service: svc.Name, + }, + Packet: v1alpha1.Packet{ + IPHeader: v1alpha1.IPHeader{ + Protocol: 6, + }, + TransportHeader: v1alpha1.TransportHeader{ + TCP: &v1alpha1.TCPHeader{ + DstPort: 80, + Flags: 2, + }, + }, + }, + }, + }, + expectedPhase: v1alpha1.Succeeded, + expectedResults: []v1alpha1.NodeResult{ + { + Node: node1, + Observations: []v1alpha1.Observation{ + { + Component: v1alpha1.SpoofGuard, + Action: v1alpha1.Forwarded, + }, + { + Component: v1alpha1.LB, + Pod: fmt.Sprintf("%s/%s", testNamespace, "nginx"), + TranslatedDstIP: nginxIP, + Action: v1alpha1.Forwarded, + }, + { + Component: v1alpha1.NetworkPolicy, + ComponentInfo: "EgressRule", + Action: v1alpha1.Forwarded, + }, + { + Component: v1alpha1.Forwarding, + ComponentInfo: "Output", + Action: v1alpha1.Forwarded, + }, + }, + }, + { + Node: node2, + Observations: []v1alpha1.Observation{ + { + Component: v1alpha1.Forwarding, + ComponentInfo: "Classification", + Action: v1alpha1.Received, + }, + { + Component: v1alpha1.Forwarding, + ComponentInfo: "Output", + Action: v1alpha1.Delivered, + }, + }, + }, + }, + }, } t.Run("traceflowGroupTest", func(t *testing.T) { @@ -405,6 +485,7 @@ func (data *TestData) enableTraceflow(t *testing.T) error { configMap.Data["antrea-controller.conf"] = antreaControllerConf antreaAgentConf, _ := configMap.Data["antrea-agent.conf"] antreaAgentConf = strings.Replace(antreaAgentConf, "# Traceflow: false", " Traceflow: true", 1) + antreaAgentConf = strings.Replace(antreaAgentConf, "# AntreaProxy: false", " AntreaProxy: true", 1) antreaAgentConf = strings.Replace(antreaAgentConf, "#tunnelType: geneve", "tunnelType: geneve", 1) configMap.Data["antrea-agent.conf"] = antreaAgentConf @@ -436,6 +517,8 @@ func compareObservations(expected v1alpha1.NodeResult, actual v1alpha1.NodeResul for i := 0; i < len(exObs); i++ { if exObs[i].Component != acObs[i].Component || exObs[i].ComponentInfo != acObs[i].ComponentInfo || + exObs[i].Pod != acObs[i].Pod || + exObs[i].TranslatedDstIP != acObs[i].TranslatedDstIP || exObs[i].Action != acObs[i].Action { return fmt.Errorf("Observations should be %v, but got %v", exObs, acObs) }