Skip to content

Commit

Permalink
Support service destination in traceflow
Browse files Browse the repository at this point in the history
  • Loading branch information
gran-vmv committed Jul 30, 2020
1 parent be28dd1 commit eae46c3
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 16 deletions.
5 changes: 5 additions & 0 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ spec:
- required:
- pod
- namespace
- required:
- service
- namespace
- required:
- ip
properties:
Expand All @@ -221,6 +224,8 @@ spec:
type: string
pod:
type: string
service:
type: string
type: object
packet:
properties:
Expand Down
5 changes: 5 additions & 0 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ spec:
- required:
- pod
- namespace
- required:
- service
- namespace
- required:
- ip
properties:
Expand All @@ -221,6 +224,8 @@ spec:
type: string
pod:
type: string
service:
type: string
type: object
packet:
properties:
Expand Down
5 changes: 5 additions & 0 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ spec:
- required:
- pod
- namespace
- required:
- service
- namespace
- required:
- ip
properties:
Expand All @@ -221,6 +224,8 @@ spec:
type: string
pod:
type: string
service:
type: string
type: object
packet:
properties:
Expand Down
5 changes: 5 additions & 0 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ spec:
- required:
- pod
- namespace
- required:
- service
- namespace
- required:
- ip
properties:
Expand All @@ -221,6 +224,8 @@ spec:
type: string
pod:
type: string
service:
type: string
type: object
packet:
properties:
Expand Down
3 changes: 3 additions & 0 deletions build/yamls/base/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,16 @@ spec:
properties:
pod:
type: string
service:
type: string
namespace:
type: string
ip:
type: string
format: ipv4
oneOf:
- required: ["pod", "namespace"]
- required: ["service", "namespace"]
- required: ["ip"]
packet:
type: object
Expand Down
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func run(o *Options) error {
if features.DefaultFeatureGate.Enabled(features.Traceflow) {
traceflowController = traceflow.NewTraceflowController(
k8sClient,
informerFactory,
crdClient,
traceflowInformer,
ofClient,
Expand Down
2 changes: 1 addition & 1 deletion cmd/antrea-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func run(o *Options) error {

var traceflowController *traceflow.Controller
if features.DefaultFeatureGate.Enabled(features.Traceflow) {
traceflowController = traceflow.NewTraceflowController(crdClient, traceflowInformer)
traceflowController = traceflow.NewTraceflowController(crdClient, podInformer, traceflowInformer)
}

apiServerConfig, err := createAPIServerConfig(o.config.ClientConnection.Kubeconfig,
Expand Down
34 changes: 34 additions & 0 deletions pkg/agent/controller/traceflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/contiv/libOpenflow/openflow13"
"github.com/contiv/libOpenflow/protocol"
"github.com/contiv/ofnet/ofctrl"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -100,6 +101,27 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*opsv1alpha1.Tracefl
obs = append(obs, *ob)
}

// Collect service DNAT.
if pktIn.Data.Ethertype == 0x800 {
ipPacket, ok := pktIn.Data.Data.(*protocol.IPv4)
if !ok {
return nil, nil, errors.New("invalid traceflow IPv4 packet")
}
ctNwDst, err := getInfoInCtNwDstField(matchers)
if err != nil {
return nil, nil, err
}
ipDst := ipPacket.NWDst.String()
if ctNwDst != "" && ipDst != ctNwDst {
ob := &opsv1alpha1.Observation{
Component: opsv1alpha1.LB,
Action: opsv1alpha1.Forwarded,
TranslatedDstIP: ipDst,
}
obs = append(obs, *ob)
}
}

// Collect egress conjunctionID and get NetworkPolicy from cache.
if match = getMatchRegField(matchers, uint32(openflow.EgressReg)); match != nil {
egressInfo, err := getInfoInReg(match, nil)
Expand Down Expand Up @@ -194,3 +216,15 @@ func getInfoInTunnelDst(regMatch *ofctrl.MatchField) (string, error) {
}
return regValue.String(), nil
}

func getInfoInCtNwDstField(matchers *ofctrl.Matchers) (string, error) {
match := matchers.GetMatchByName("NXM_NX_CT_NW_DST")
if match == nil {
return "", nil
}
regValue, ok := match.GetValue().(net.IP)
if !ok {
return "", errors.New("packet-in conntrack IP destination value cannot be retrieved from metadata")
}
return regValue.String(), nil
}
50 changes: 43 additions & 7 deletions pkg/agent/controller/traceflow/traceflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
Expand All @@ -38,6 +40,7 @@ import (
clientsetversioned "github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned"
opsinformers "github.com/vmware-tanzu/antrea/pkg/client/informers/externalversions/ops/v1alpha1"
opslisters "github.com/vmware-tanzu/antrea/pkg/client/listers/ops/v1alpha1"
"github.com/vmware-tanzu/antrea/pkg/features"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig"
)

Expand Down Expand Up @@ -65,6 +68,8 @@ const (
// the switch for traceflow request.
type Controller struct {
kubeClient clientset.Interface
serviceLister corelisters.ServiceLister
serviceListerSynced cache.InformerSynced
traceflowClient clientsetversioned.Interface
traceflowInformer opsinformers.TraceflowInformer
traceflowLister opslisters.TraceflowLister
Expand All @@ -85,6 +90,7 @@ type Controller struct {
// events.
func NewTraceflowController(
kubeClient clientset.Interface,
informerFactory informers.SharedInformerFactory,
traceflowClient clientsetversioned.Interface,
traceflowInformer opsinformers.TraceflowInformer,
client openflow.Client,
Expand Down Expand Up @@ -118,6 +124,11 @@ func NewTraceflowController(
)
// Register packetInHandler
c.ofClient.RegisterPacketInHandler("traceflow", c)
// Add serviceLister if AntreaProxy enabled
if features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
c.serviceLister = informerFactory.Core().V1().Services().Lister()
c.serviceListerSynced = informerFactory.Core().V1().Services().Informer().HasSynced
}
return c
}

Expand All @@ -135,6 +146,12 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
defer klog.Infof("Shutting down %s", controllerName)

klog.Infof("Waiting for caches to sync for %s", controllerName)
if features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
if !cache.WaitForCacheSync(stopCh, c.serviceListerSynced) {
klog.Errorf("Unable to sync service cache for %s", controllerName)
return
}
}
if !cache.WaitForCacheSync(stopCh, c.traceflowListerSynced) {
klog.Errorf("Unable to sync caches for %s", controllerName)
return
Expand Down Expand Up @@ -244,17 +261,21 @@ func (c *Controller) syncTraceflow(traceflowName string) error {
// startTraceflow deploys OVS flow entries for Traceflow and inject packet if current Node
// is Sender Node.
func (c *Controller) startTraceflow(tf *opsv1alpha1.Traceflow) error {
// Deploy flow entries for traceflow
klog.V(2).Infof("Deploy flow entries for Traceflow %s", tf.Name)
err := c.ofClient.InstallTraceflowFlows(tf.Status.DataplaneTag)
err := c.validateTraceflow(tf)
defer func() {
if err != nil {
c.errorTraceflowCRD(tf, fmt.Sprintf("Node: %s, error: %+v", tf.Name, err))
c.errorTraceflowCRD(tf, fmt.Sprintf("Node: %s, error: %+v", c.nodeConfig.Name, err))
}
}()
if err != nil {
return err
}
// Deploy flow entries for traceflow
klog.V(2).Infof("Deploy flow entries for Traceflow %s", tf.Name)
err = c.ofClient.InstallTraceflowFlows(tf.Status.DataplaneTag)
if err != nil {
return err
}

// TODO: let controller compute the source Node, and the source Node can just return an error,
// if fails to find the Pod.
Expand All @@ -268,6 +289,13 @@ func (c *Controller) startTraceflow(tf *opsv1alpha1.Traceflow) error {
return err
}

func (c *Controller) validateTraceflow(tf *opsv1alpha1.Traceflow) error {
if tf.Spec.Destination.Service != "" && !features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
return errors.New("service destination requires AntreaProxy feature enabled")
}
return nil
}

func (c *Controller) injectPacket(tf *opsv1alpha1.Traceflow) error {
podInterfaces := c.interfaceStore.GetContainerInterfacesByPod(tf.Spec.Source.Pod, tf.Spec.Source.Namespace)
// Update Traceflow phase to Running.
Expand All @@ -285,7 +313,7 @@ func (c *Controller) injectPacket(tf *opsv1alpha1.Traceflow) error {
if hasInterface {
dstMAC = dstPodInterface.MAC.String()
}
} else {
} else if tf.Spec.Destination.Pod != "" {
dstPodInterfaces := c.interfaceStore.GetContainerInterfacesByPod(tf.Spec.Destination.Pod, tf.Spec.Destination.Namespace)
if len(dstPodInterfaces) > 0 {
dstMAC = dstPodInterfaces[0].MAC.String()
Expand All @@ -299,10 +327,18 @@ func (c *Controller) injectPacket(tf *opsv1alpha1.Traceflow) error {
dstIP = dstPod.Status.PodIP
dstNodeIP = dstPod.Status.HostIP
}
} else if tf.Spec.Destination.Service != "" {
dstSvc, err := c.serviceLister.Services(tf.Spec.Destination.Namespace).Get(tf.Spec.Destination.Service)
if err != nil {
return err
}
dstIP = dstSvc.Spec.ClusterIP
}
if dstNodeIP != "" {
// Check encap status if Node of destination Pod is remote or destination is IP/Service
if dstNodeIP != "" || tf.Spec.Destination.Pod == "" {
peerIP := net.ParseIP(dstNodeIP)
if c.networkConfig.TunnelType == ovsconfig.GeneveTunnel && peerIP != nil && c.networkConfig.TrafficEncapMode.NeedsEncapToPeer(peerIP, c.nodeConfig.NodeIPAddr) {
if c.networkConfig.TunnelType == ovsconfig.GeneveTunnel && (tf.Spec.Destination.Pod == "" || c.networkConfig.TrafficEncapMode.NeedsEncapToPeer(peerIP, c.nodeConfig.NodeIPAddr)) {
// Destination is IP/Service or packet will be encapsulated to remote Node
// Wait a small period for other Nodes.
time.Sleep(time.Duration(injectPacketDelay) * time.Second)
} else {
Expand Down
15 changes: 11 additions & 4 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,12 +513,19 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow {
// avoid unexpected packet drop in Traceflow.
func (c *client) traceflowConnectionTrackFlows(dataplaneTag uint8, category cookie.Category) binding.Flow {
connectionTrackStateTable := c.pipeline[conntrackStateTable]
return connectionTrackStateTable.BuildFlow(priorityNormal+2).
flowBuilder := connectionTrackStateTable.BuildFlow(priorityLow+2).
MatchRegRange(int(TraceflowReg), uint32(dataplaneTag), OfTraceflowMarkRange).
SetHardTimeout(300).
Action().ResubmitToTable(connectionTrackStateTable.GetNext()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done()
Cookie(c.cookieAllocator.Request(category).Raw())
if c.enableProxy {
flowBuilder = flowBuilder.
Action().ResubmitToTable(sessionAffinityTable).
Action().ResubmitToTable(serviceLBTable)
} else {
flowBuilder = flowBuilder.
Action().ResubmitToTable(connectionTrackStateTable.GetNext())
}
return flowBuilder.Done()
}

// reEntranceBypassCTFlow generates flow that bypass CT for traffic re-entering host network space.
Expand Down
Loading

0 comments on commit eae46c3

Please sign in to comment.