Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support service destination in traceflow #979

Merged
merged 1 commit into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ spec:
- required:
- pod
- namespace
- required:
- service
- namespace
- required:
- ip
properties:
Expand All @@ -233,6 +236,8 @@ spec:
type: string
pod:
type: string
service:
type: string
type: object
packet:
properties:
Expand Down
5 changes: 5 additions & 0 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ spec:
- required:
- pod
- namespace
- required:
- service
- namespace
- required:
- ip
properties:
Expand All @@ -233,6 +236,8 @@ spec:
type: string
pod:
type: string
service:
type: string
type: object
packet:
properties:
Expand Down
5 changes: 5 additions & 0 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ spec:
- required:
- pod
- namespace
- required:
- service
- namespace
- required:
- ip
properties:
Expand All @@ -233,6 +236,8 @@ spec:
type: string
pod:
type: string
service:
type: string
type: object
packet:
properties:
Expand Down
5 changes: 5 additions & 0 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ spec:
- required:
- pod
- namespace
- required:
- service
- namespace
- required:
- ip
properties:
Expand All @@ -233,6 +236,8 @@ spec:
type: string
pod:
type: string
service:
type: string
type: object
packet:
properties:
Expand Down
5 changes: 5 additions & 0 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ spec:
- required:
- pod
- namespace
- required:
- service
- namespace
- required:
- ip
properties:
Expand All @@ -233,6 +236,8 @@ spec:
type: string
pod:
type: string
service:
type: string
type: object
packet:
properties:
Expand Down
3 changes: 3 additions & 0 deletions build/yamls/base/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,16 @@ spec:
properties:
pod:
type: string
service:
type: string
namespace:
type: string
ip:
type: string
format: ipv4
oneOf:
- required: ["pod", "namespace"]
- required: ["service", "namespace"]
- required: ["ip"]
packet:
type: object
Expand Down
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func run(o *Options) error {
if features.DefaultFeatureGate.Enabled(features.Traceflow) {
traceflowController = traceflow.NewTraceflowController(
k8sClient,
informerFactory,
crdClient,
traceflowInformer,
ofClient,
Expand Down
2 changes: 1 addition & 1 deletion cmd/antrea-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func run(o *Options) error {

var traceflowController *traceflow.Controller
if features.DefaultFeatureGate.Enabled(features.Traceflow) {
traceflowController = traceflow.NewTraceflowController(crdClient, traceflowInformer)
traceflowController = traceflow.NewTraceflowController(crdClient, podInformer, traceflowInformer)
}

apiServerConfig, err := createAPIServerConfig(o.config.ClientConnection.Kubeconfig,
Expand Down
34 changes: 34 additions & 0 deletions pkg/agent/controller/traceflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/contiv/libOpenflow/openflow13"
"github.com/contiv/libOpenflow/protocol"
"github.com/contiv/ofnet/ofctrl"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -100,6 +101,27 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*opsv1alpha1.Tracefl
obs = append(obs, *ob)
}

// Collect Service DNAT.
if pktIn.Data.Ethertype == 0x800 {
ipPacket, ok := pktIn.Data.Data.(*protocol.IPv4)
if !ok {
return nil, nil, errors.New("invalid traceflow IPv4 packet")
}
ctNwDst, err := getInfoInCtNwDstField(matchers)
if err != nil {
return nil, nil, err
}
ipDst := ipPacket.NWDst.String()
if ctNwDst != "" && ipDst != ctNwDst {
ob := &opsv1alpha1.Observation{
Component: opsv1alpha1.LB,
Action: opsv1alpha1.Forwarded,
TranslatedDstIP: ipDst,
}
obs = append(obs, *ob)
}
}

// Collect egress conjunctionID and get NetworkPolicy from cache.
if match = getMatchRegField(matchers, uint32(openflow.EgressReg)); match != nil {
egressInfo, err := getInfoInReg(match, nil)
Expand Down Expand Up @@ -194,3 +216,15 @@ func getInfoInTunnelDst(regMatch *ofctrl.MatchField) (string, error) {
}
return regValue.String(), nil
}

func getInfoInCtNwDstField(matchers *ofctrl.Matchers) (string, error) {
match := matchers.GetMatchByName("NXM_NX_CT_NW_DST")
if match == nil {
return "", nil
}
regValue, ok := match.GetValue().(net.IP)
if !ok {
return "", errors.New("packet-in conntrack IP destination value cannot be retrieved from metadata")
}
return regValue.String(), nil
}
51 changes: 43 additions & 8 deletions pkg/agent/controller/traceflow/traceflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
Expand All @@ -38,6 +40,7 @@ import (
clientsetversioned "github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned"
opsinformers "github.com/vmware-tanzu/antrea/pkg/client/informers/externalversions/ops/v1alpha1"
opslisters "github.com/vmware-tanzu/antrea/pkg/client/listers/ops/v1alpha1"
"github.com/vmware-tanzu/antrea/pkg/features"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig"
)

Expand Down Expand Up @@ -65,6 +68,8 @@ const (
// the switch for traceflow request.
type Controller struct {
kubeClient clientset.Interface
serviceLister corelisters.ServiceLister
serviceListerSynced cache.InformerSynced
traceflowClient clientsetversioned.Interface
traceflowInformer opsinformers.TraceflowInformer
traceflowLister opslisters.TraceflowLister
Expand All @@ -85,6 +90,7 @@ type Controller struct {
// events.
func NewTraceflowController(
kubeClient clientset.Interface,
informerFactory informers.SharedInformerFactory,
traceflowClient clientsetversioned.Interface,
traceflowInformer opsinformers.TraceflowInformer,
client openflow.Client,
Expand Down Expand Up @@ -118,6 +124,11 @@ func NewTraceflowController(
)
// Register packetInHandler
c.ofClient.RegisterPacketInHandler("traceflow", c)
// Add serviceLister if AntreaProxy enabled
if features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
c.serviceLister = informerFactory.Core().V1().Services().Lister()
jianjuns marked this conversation as resolved.
Show resolved Hide resolved
c.serviceListerSynced = informerFactory.Core().V1().Services().Informer().HasSynced
}
return c
}

Expand All @@ -135,6 +146,12 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
defer klog.Infof("Shutting down %s", controllerName)

klog.Infof("Waiting for caches to sync for %s", controllerName)
if features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
if !cache.WaitForCacheSync(stopCh, c.serviceListerSynced) {
klog.Errorf("Unable to sync service cache for %s", controllerName)
return
}
}
if !cache.WaitForCacheSync(stopCh, c.traceflowListerSynced) {
klog.Errorf("Unable to sync caches for %s", controllerName)
return
Expand Down Expand Up @@ -244,17 +261,21 @@ func (c *Controller) syncTraceflow(traceflowName string) error {
// startTraceflow deploys OVS flow entries for Traceflow and inject packet if current Node
// is Sender Node.
func (c *Controller) startTraceflow(tf *opsv1alpha1.Traceflow) error {
// Deploy flow entries for traceflow
klog.V(2).Infof("Deploy flow entries for Traceflow %s", tf.Name)
err := c.ofClient.InstallTraceflowFlows(tf.Status.DataplaneTag)
err := c.validateTraceflow(tf)
defer func() {
if err != nil {
c.errorTraceflowCRD(tf, fmt.Sprintf("Node: %s, error: %+v", tf.Name, err))
c.errorTraceflowCRD(tf, fmt.Sprintf("Node: %s, error: %+v", c.nodeConfig.Name, err))
}
}()
if err != nil {
return err
gran-vmv marked this conversation as resolved.
Show resolved Hide resolved
}
// Deploy flow entries for traceflow
klog.V(2).Infof("Deploy flow entries for Traceflow %s", tf.Name)
err = c.ofClient.InstallTraceflowFlows(tf.Status.DataplaneTag)
if err != nil {
return err
}

// TODO: let controller compute the source Node, and the source Node can just return an error,
// if fails to find the Pod.
Expand All @@ -268,6 +289,13 @@ func (c *Controller) startTraceflow(tf *opsv1alpha1.Traceflow) error {
return err
}

func (c *Controller) validateTraceflow(tf *opsv1alpha1.Traceflow) error {
if tf.Spec.Destination.Service != "" && !features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
return errors.New("using Service destination requires AntreaProxy feature enabled")
}
return nil
}

func (c *Controller) injectPacket(tf *opsv1alpha1.Traceflow) error {
podInterfaces := c.interfaceStore.GetContainerInterfacesByPod(tf.Spec.Source.Pod, tf.Spec.Source.Namespace)
// Update Traceflow phase to Running.
Expand All @@ -285,7 +313,7 @@ func (c *Controller) injectPacket(tf *opsv1alpha1.Traceflow) error {
if hasInterface {
dstMAC = dstPodInterface.MAC.String()
}
} else {
} else if tf.Spec.Destination.Pod != "" {
dstPodInterfaces := c.interfaceStore.GetContainerInterfacesByPod(tf.Spec.Destination.Pod, tf.Spec.Destination.Namespace)
if len(dstPodInterfaces) > 0 {
dstMAC = dstPodInterfaces[0].MAC.String()
Expand All @@ -299,11 +327,18 @@ func (c *Controller) injectPacket(tf *opsv1alpha1.Traceflow) error {
dstIP = dstPod.Status.PodIP
dstNodeIP = dstPod.Status.HostIP
}
} else if tf.Spec.Destination.Service != "" {
dstSvc, err := c.serviceLister.Services(tf.Spec.Destination.Namespace).Get(tf.Spec.Destination.Service)
if err != nil {
return err
}
dstIP = dstSvc.Spec.ClusterIP
}
if dstNodeIP != "" {
// Check encap status if no dstMAC found which means the destination is Service or the destination Pod/IP is not on local Node.
if dstMAC == "" {
peerIP := net.ParseIP(dstNodeIP)
if c.networkConfig.TunnelType == ovsconfig.GeneveTunnel && peerIP != nil && c.networkConfig.TrafficEncapMode.NeedsEncapToPeer(peerIP, c.nodeConfig.NodeIPAddr) {
// Wait a small period for other Nodes.
if c.networkConfig.TunnelType == ovsconfig.GeneveTunnel && (tf.Spec.Destination.Pod == "" || c.networkConfig.TrafficEncapMode.NeedsEncapToPeer(peerIP, c.nodeConfig.NodeIPAddr)) {
// If the destination is Service/IP or the packet will be encapsulated to remote Node, wait a small period for other Nodes.
time.Sleep(time.Duration(injectPacketDelay) * time.Second)
} else {
// Inter-node traceflow is only available when the packet is encapsulated in Geneve tunnel.
Expand Down
15 changes: 11 additions & 4 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,12 +512,19 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow {
// avoid unexpected packet drop in Traceflow.
func (c *client) traceflowConnectionTrackFlows(dataplaneTag uint8, category cookie.Category) binding.Flow {
connectionTrackStateTable := c.pipeline[conntrackStateTable]
return connectionTrackStateTable.BuildFlow(priorityNormal+2).
flowBuilder := connectionTrackStateTable.BuildFlow(priorityLow+2).
MatchRegRange(int(TraceflowReg), uint32(dataplaneTag), OfTraceflowMarkRange).
SetHardTimeout(300).
Action().ResubmitToTable(connectionTrackStateTable.GetNext()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done()
Cookie(c.cookieAllocator.Request(category).Raw())
if c.enableProxy {
flowBuilder = flowBuilder.
Action().ResubmitToTable(sessionAffinityTable).
Action().ResubmitToTable(serviceLBTable)
} else {
flowBuilder = flowBuilder.
Action().ResubmitToTable(connectionTrackStateTable.GetNext())
}
return flowBuilder.Done()
}

// ctRewriteDstMACFlow rewrites the destination MAC with local host gateway MAC if the packets has set ct_mark but not sent from the host gateway.
Expand Down
Loading