From f3225bc888431d749426d7b2818fd396dc4f7566 Mon Sep 17 00:00:00 2001 From: Hongliang Liu Date: Thu, 9 Mar 2023 00:30:53 +0800 Subject: [PATCH] Reject the request to a Service without an Endpoint When requesting a Service without an Endpoint, the connection should be rejected, rather than timeout according to the expectation of Kubernetes sig-network tests. This PR also reorders `NestedServiceRegMark` in pkg/agent/openflow/field.go. Signed-off-by: Hongliang Liu --- pkg/agent/controller/networkpolicy/fqdn.go | 4 +- .../controller/networkpolicy/packetin.go | 10 -- pkg/agent/controller/networkpolicy/reject.go | 50 +----- pkg/agent/multicast/mcast_discovery.go | 24 +-- pkg/agent/openflow/client_test.go | 6 + pkg/agent/openflow/fields.go | 8 +- pkg/agent/openflow/packetin.go | 29 ++- pkg/agent/openflow/packetout.go | 104 +++++++++++ pkg/agent/openflow/pipeline.go | 8 + pkg/agent/openflow/service.go | 10 ++ pkg/agent/openflow/service_test.go | 4 + pkg/agent/proxy/proxier.go | 74 +++++++- pkg/agent/proxy/proxier_test.go | 166 +++++++++++++++++- pkg/ovs/openflow/interfaces.go | 1 + pkg/ovs/openflow/testing/mock_openflow.go | 2 +- 15 files changed, 405 insertions(+), 95 deletions(-) create mode 100644 pkg/agent/openflow/packetout.go diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index 7cbaa2e50c0..46282a965d3 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -756,7 +756,7 @@ func (f *fqdnController) handlePacketIn(pktIn *ofctrl.PacketIn) error { f.onDNSResponseMsg(&dnsMsg, time.Now(), waitCh) } go func() { - ethernetPkt, err := getEthernetPacket(pktIn) + ethernetPkt, err := openflow.GetEthernetPacket(pktIn) if err != nil { // Can't parse the packet. Forward it to the Pod. waitCh <- nil @@ -821,7 +821,7 @@ func (f *fqdnController) handlePacketIn(pktIn *ofctrl.PacketIn) error { // sendDNSPacketout forwards the DNS response packet to the original requesting client. func (f *fqdnController) sendDNSPacketout(pktIn *ofctrl.PacketIn) error { - ethernetPkt, err := getEthernetPacket(pktIn) + ethernetPkt, err := openflow.GetEthernetPacket(pktIn) if err != nil { return err } diff --git a/pkg/agent/controller/networkpolicy/packetin.go b/pkg/agent/controller/networkpolicy/packetin.go index 7730641f12e..abe0dbd0973 100644 --- a/pkg/agent/controller/networkpolicy/packetin.go +++ b/pkg/agent/controller/networkpolicy/packetin.go @@ -21,8 +21,6 @@ import ( "time" "antrea.io/libOpenflow/openflow15" - "antrea.io/libOpenflow/protocol" - "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" "github.com/vmware/go-ipfix/pkg/registry" "k8s.io/klog/v2" @@ -210,11 +208,3 @@ func isAntreaPolicyEgressTable(tableID uint8) bool { } return false } - -func getEthernetPacket(pktIn *ofctrl.PacketIn) (*protocol.Ethernet, error) { - ethernetPkt := new(protocol.Ethernet) - if err := ethernetPkt.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil { - return nil, fmt.Errorf("failed to parse ethernet packet from packet-in message: %v", err) - } - return ethernetPkt, nil -} diff --git a/pkg/agent/controller/networkpolicy/reject.go b/pkg/agent/controller/networkpolicy/reject.go index 5581ac15790..70cbcb732ad 100644 --- a/pkg/agent/controller/networkpolicy/reject.go +++ b/pkg/agent/controller/networkpolicy/reject.go @@ -15,7 +15,6 @@ package networkpolicy import ( - "encoding/binary" "fmt" "net" @@ -92,7 +91,7 @@ const ( func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { // All src/dst mean the source/destination of the reject packet, which are destination/source of the incoming packet. // Get ethernet data. - ethernetPkt, err := getEthernetPacket(pktIn) + ethernetPkt, err := openflow.GetEthernetPacket(pktIn) if err != nil { return err } @@ -191,47 +190,7 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, c.tunPort) mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType, isFlexibleIPAMSrc, isFlexibleIPAMDst, ctZone) - if proto == protocol.Type_TCP { - // Get TCP data. - oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, _, _, err := binding.GetTCPHeaderData(ethernetPkt.Data) - if err != nil { - return err - } - // While sending TCP reject packet-out, switch original src/dst port, - // set the ackNum as original seqNum+1 and set the flag as ack+rst. - return c.ofClient.SendTCPPacketOut( - srcMAC, - dstMAC, - srcIP, - dstIP, - inPort, - outPort, - isIPv6, - oriTCPDstPort, - oriTCPSrcPort, - 0, - oriTCPSeqNum+1, - 0, - TCPAck|TCPRst, - 0, - nil, - mutateFunc) - } - // Use ICMP host administratively prohibited for ICMP, UDP, SCTP reject. - icmpType := ICMPDstUnreachableType - icmpCode := ICMPDstHostAdminProhibitedCode - ipHdrLen := IPv4HdrLen - if isIPv6 { - icmpType = ICMPv6DstUnreachableType - icmpCode = ICMPv6DstAdminProhibitedCode - ipHdrLen = IPv6HdrLen - } - ipHdr, _ := ethernetPkt.Data.MarshalBinary() - icmpData := make([]byte, int(ICMPUnusedHdrLen+ipHdrLen+8)) - // Put ICMP unused header in Data prop and set it to zero. - binary.BigEndian.PutUint32(icmpData[:ICMPUnusedHdrLen], 0) - copy(icmpData[ICMPUnusedHdrLen:], ipHdr[:ipHdrLen+8]) - return c.ofClient.SendICMPPacketOut( + return openflow.SendRejectPacketOut(c.ofClient, srcMAC, dstMAC, srcIP, @@ -239,9 +198,8 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { inPort, outPort, isIPv6, - icmpType, - icmpCode, - icmpData, + ethernetPkt, + proto, mutateFunc) } diff --git a/pkg/agent/multicast/mcast_discovery.go b/pkg/agent/multicast/mcast_discovery.go index d775368e215..1cae3a61a9d 100644 --- a/pkg/agent/multicast/mcast_discovery.go +++ b/pkg/agent/multicast/mcast_discovery.go @@ -21,7 +21,6 @@ import ( "sync" "time" - "antrea.io/libOpenflow/openflow15" "antrea.io/libOpenflow/protocol" "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" @@ -33,17 +32,13 @@ import ( "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/apis/crd/v1alpha1" + binding "antrea.io/antrea/pkg/ovs/openflow" ) const ( IGMPProtocolNumber = 2 ) -const ( - openflowKeyTunnelSrc = "NXM_NX_TUN_IPV4_SRC" - openflowKeyInPort = "OXM_OF_IN_PORT" -) - var ( // igmpMaxResponseTime is the maximum time allowed before sending a responding report which is used for the // "Max Resp Code" field in the IGMP query message. It is also the maximum time to wait for the IGMP report message @@ -79,7 +74,7 @@ func (s *IGMPSnooper) HandlePacketIn(pktIn *ofctrl.PacketIn) error { if match == nil { return fmt.Errorf("error getting match from IGMP marks in CustomField") } - customReasons, err := getInfoInReg(match, openflow.CustomReasonField.GetRange().ToNXRange()) + customReasons, err := openflow.GetInfoInReg(match, openflow.CustomReasonField.GetRange().ToNXRange()) if err != nil { klog.ErrorS(err, "Received error while unloading customReason from OVS reg") return err @@ -90,20 +85,9 @@ func (s *IGMPSnooper) HandlePacketIn(pktIn *ofctrl.PacketIn) error { return nil } -func getInfoInReg(regMatch *ofctrl.MatchField, rng *openflow15.NXRange) (uint32, error) { - regValue, ok := regMatch.GetValue().(*ofctrl.NXRegister) - if !ok { - return 0, errors.New("register value cannot be retrieved") - } - if rng != nil { - return ofctrl.GetUint32ValueWithRange(regValue.Data, rng), nil - } - return regValue.Data, nil -} - func (s *IGMPSnooper) parseSrcInterface(pktIn *ofctrl.PacketIn) (*interfacestore.InterfaceConfig, error) { matches := pktIn.GetMatches() - ofPortField := matches.GetMatchByName(openflowKeyInPort) + ofPortField := matches.GetMatchByName(binding.OxmFieldInPort) if ofPortField == nil { return nil, errors.New("in_port field not found") } @@ -327,7 +311,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { func (s *IGMPSnooper) parseSrcNode(pktIn *ofctrl.PacketIn) (net.IP, error) { matches := pktIn.GetMatches() - tunSrcField := matches.GetMatchByName(openflowKeyTunnelSrc) + tunSrcField := matches.GetMatchByName(binding.NxmFieldTunIPv4Src) if tunSrcField == nil { return nil, errors.New("in_port field not found") } diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 05e0a1ce17c..3a5eabcffb6 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -905,6 +905,12 @@ func Test_client_InstallServiceGroup(t *testing.T) { "bucket=bucket_id:1,weight:100,actions=set_field:0xa0a0065->reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT", deleteOFEntriesError: fmt.Errorf("error when deleting Openflow entries for Service Endpoints Group 100"), }, + { + name: "No Endpoints", + endpoints: []proxy.Endpoint{}, + expectedGroup: "group_id=100,type=select," + + "bucket=bucket_id:0,weight:100,actions=set_field:0x40000/0x70000->reg4,resubmit:EndpointDNAT", + }, } for _, tc := range testCases { diff --git a/pkg/agent/openflow/fields.go b/pkg/agent/openflow/fields.go index ab21ad84624..b24736b979a 100644 --- a/pkg/agent/openflow/fields.go +++ b/pkg/agent/openflow/fields.go @@ -109,10 +109,12 @@ var ( // - 0b001: packet need to do service selection. // - 0b010: packet has done service selection. // - 0b011: packet has done service selection and the selection result needs to be cached. + // - 0b100: there is no Endpoint to do service selection. ServiceEPStateField = binding.NewRegField(4, 16, 18) EpToSelectRegMark = binding.NewRegMark(ServiceEPStateField, 0b001) EpSelectedRegMark = binding.NewRegMark(ServiceEPStateField, 0b010) EpToLearnRegMark = binding.NewRegMark(ServiceEPStateField, 0b011) + NoEpToSelectRegMark = binding.NewRegMark(ServiceEPStateField, 0b100) // reg4[0..18]: Field to store the union value of Endpoint port and Endpoint status. It is used as a single match // when needed. EpUnionField = binding.NewRegField(4, 0, 18) @@ -128,11 +130,11 @@ var ( // externalTrafficPolicy is Cluster. ToClusterServiceRegMark = binding.NewOneBitRegMark(4, 21) // reg4[22..23]: Field to store the action of a traffic control rule. Marks in this field include: - TrafficControlActionField = binding.NewRegField(4, 22, 23) - // reg4[24]: Mark to indicate whether the Endpoints of a Service includes another Service's ClusterIP. - NestedServiceRegMark = binding.NewOneBitRegMark(4, 24) + TrafficControlActionField = binding.NewRegField(4, 22, 23) TrafficControlMirrorRegMark = binding.NewRegMark(TrafficControlActionField, 0b01) TrafficControlRedirectRegMark = binding.NewRegMark(TrafficControlActionField, 0b10) + // reg4[24]: Mark to indicate whether the Endpoints of a Service includes another Service's ClusterIP. + NestedServiceRegMark = binding.NewOneBitRegMark(4, 24) // reg5(NXM_NX_REG5) // Field to cache the Egress conjunction ID hit by TraceFlow packet. diff --git a/pkg/agent/openflow/packetin.go b/pkg/agent/openflow/packetin.go index fde9ea88c75..8883cb75c4d 100644 --- a/pkg/agent/openflow/packetin.go +++ b/pkg/agent/openflow/packetin.go @@ -16,9 +16,12 @@ package openflow import ( "encoding/binary" + "errors" "fmt" "antrea.io/libOpenflow/openflow15" + "antrea.io/libOpenflow/protocol" + "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" "golang.org/x/time/rate" "k8s.io/klog/v2" @@ -53,6 +56,9 @@ const ( // PacketInReasonMC shares PacketInReasonNP for IGMP packet_in message. This is because OVS "controller" action // only correctly supports reason 0 or 1. Change to another value after the OVS action is corrected. PacketInReasonMC = PacketInReasonNP + // PacketInReasonSvcReject shares PacketInReasonNP for IGMP packet_in message. This is because OVS "controller" action + // only correctly supports reason 0 or 1. Change to another value after the OVS action is corrected. + PacketInReasonSvcReject = PacketInReasonNP // PacketInQueueSize defines the size of PacketInQueue. // When PacketInQueue reaches PacketInQueueSize, new packet-in will be dropped. PacketInQueueSize = 200 @@ -81,7 +87,7 @@ type featureStartPacketIn struct { packetInQueue *openflow.PacketInQueue } -func newfeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn { +func newFeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn { featurePacketIn := featureStartPacketIn{reason: reason, stopCh: stopCh} featurePacketIn.packetInQueue = openflow.NewPacketInQueue(PacketInQueueSize, rate.Limit(PacketInQueueRate)) @@ -96,7 +102,7 @@ func (c *client) StartPacketInHandler(stopCh <-chan struct{}) { // Iterate through each feature that starts packetin. Subscribe with their specified reason. for reason := range c.packetInHandlers { - featurePacketIn := newfeatureStartPacketIn(reason, stopCh) + featurePacketIn := newFeatureStartPacketIn(reason, stopCh) err := c.subscribeFeaturePacketIn(featurePacketIn) if err != nil { klog.Errorf("received error %+v while subscribing packetin for each feature", err) @@ -148,3 +154,22 @@ func GetMatchFieldByRegID(matchers *ofctrl.Matchers, regID int) *ofctrl.MatchFie } return &ofctrl.MatchField{MatchField: openflow15.NewRegMatchFieldWithMask(regID, data, mask)} } + +func GetInfoInReg(regMatch *ofctrl.MatchField, rng *openflow15.NXRange) (uint32, error) { + regValue, ok := regMatch.GetValue().(*ofctrl.NXRegister) + if !ok { + return 0, errors.New("register value cannot be retrieved") + } + if rng != nil { + return ofctrl.GetUint32ValueWithRange(regValue.Data, rng), nil + } + return regValue.Data, nil +} + +func GetEthernetPacket(pktIn *ofctrl.PacketIn) (*protocol.Ethernet, error) { + ethernetPkt := new(protocol.Ethernet) + if err := ethernetPkt.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil { + return nil, fmt.Errorf("failed to parse ethernet packet from packet-in message: %v", err) + } + return ethernetPkt, nil +} diff --git a/pkg/agent/openflow/packetout.go b/pkg/agent/openflow/packetout.go new file mode 100644 index 00000000000..2ab0a4b89c6 --- /dev/null +++ b/pkg/agent/openflow/packetout.go @@ -0,0 +1,104 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package openflow + +import ( + "encoding/binary" + + "antrea.io/libOpenflow/protocol" + + binding "antrea.io/antrea/pkg/ovs/openflow" +) + +const ( + ipv4HdrLen uint16 = 20 + ipv6HdrLen uint16 = 40 + + icmpUnusedHdrLen uint16 = 4 + + tcpAck uint8 = 0b010000 + tcpRst uint8 = 0b000100 + + icmpDstUnreachableType uint8 = 3 + icmpDstHostAdminProhibitedCode uint8 = 10 + + icmpv6DstUnreachableType uint8 = 1 + icmpv6DstAdminProhibitedCode uint8 = 1 +) + +func SendRejectPacketOut(ofClient Client, + srcMAC string, + dstMAC string, + srcIP string, + dstIP string, + inPort uint32, + outPort uint32, + isIPv6 bool, + ethernetPkt *protocol.Ethernet, + proto uint8, + mutateFunc func(binding.PacketOutBuilder) binding.PacketOutBuilder) error { + if proto == protocol.Type_TCP { + // Get TCP data. + oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, _, _, err := binding.GetTCPHeaderData(ethernetPkt.Data) + if err != nil { + return err + } + // While sending TCP reject packet-out, switch original src/dst port, + // set the ackNum as original seqNum+1 and set the flag as ack+rst. + return ofClient.SendTCPPacketOut( + srcMAC, + dstMAC, + srcIP, + dstIP, + inPort, + outPort, + isIPv6, + oriTCPDstPort, + oriTCPSrcPort, + 0, + oriTCPSeqNum+1, + 0, + tcpAck|tcpRst, + 0, + nil, + mutateFunc) + } + // Use ICMP host administratively prohibited for ICMP, UDP, SCTP reject. + icmpType := icmpDstUnreachableType + icmpCode := icmpDstHostAdminProhibitedCode + ipHdrLen := ipv4HdrLen + if isIPv6 { + icmpType = icmpv6DstUnreachableType + icmpCode = icmpv6DstAdminProhibitedCode + ipHdrLen = ipv6HdrLen + } + ipHdr, _ := ethernetPkt.Data.MarshalBinary() + icmpData := make([]byte, int(icmpUnusedHdrLen+ipHdrLen+8)) + // Put ICMP unused header in Data prop and set it to zero. + binary.BigEndian.PutUint32(icmpData[:icmpUnusedHdrLen], 0) + copy(icmpData[icmpUnusedHdrLen:], ipHdr[:ipHdrLen+8]) + return ofClient.SendICMPPacketOut( + srcMAC, + dstMAC, + srcIP, + dstIP, + inPort, + outPort, + isIPv6, + icmpType, + icmpCode, + icmpData, + mutateFunc) +} diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 71891efa825..34b4e5fe0e0 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2480,6 +2480,14 @@ func (f *featureService) endpointDNATFlow(endpointIP net.IP, endpointPort uint16 // EndpointDNATTable. Otherwise, buckets will resubmit packets to EndpointDNATTable directly. func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group { group := f.bridge.CreateGroup(groupID).ResetBuckets() + + if len(endpoints) == 0 { + return group.Bucket().Weight(100). + LoadToRegField(ServiceEPStateField, NoEpToSelectRegMark.GetValue()). + ResubmitToTable(EndpointDNATTable.GetID()). + Done() + } + var resubmitTableID uint8 if withSessionAffinity { resubmitTableID = ServiceLBTable.GetID() diff --git a/pkg/agent/openflow/service.go b/pkg/agent/openflow/service.go index 5f7e4048972..b2913086ba2 100644 --- a/pkg/agent/openflow/service.go +++ b/pkg/agent/openflow/service.go @@ -124,6 +124,15 @@ func newFeatureService( } } +// serviceNoEndpointFlow generates the flow to match the packets to Service without Endpoint and send them to controller. +func (f *featureService) serviceNoEndpointFlow() binding.Flow { + return EndpointDNATTable.ofTable.BuildFlow(priorityNormal). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchRegMark(NoEpToSelectRegMark). + Action().SendToController(uint8(PacketInReasonSvcReject)). + Done() +} + func (f *featureService) initFlows() []binding.Flow { var flows []binding.Flow if f.enableProxy { @@ -134,6 +143,7 @@ func (f *featureService) initFlows() []binding.Flow { flows = append(flows, f.snatConntrackFlows()...) flows = append(flows, f.serviceNeedLBFlow()) flows = append(flows, f.sessionAffinityReselectFlow()) + flows = append(flows, f.serviceNoEndpointFlow()) flows = append(flows, f.l2ForwardOutputHairpinServiceFlow()) if f.proxyAll { // This installs the flows to match the first packet of NodePort connection. The flows set a bit of a register diff --git a/pkg/agent/openflow/service_test.go b/pkg/agent/openflow/service_test.go index afe2f87e34b..ff3c56bfcfc 100644 --- a/pkg/agent/openflow/service_test.go +++ b/pkg/agent/openflow/service_test.go @@ -41,6 +41,7 @@ func Test_featureService_initFlows(t *testing.T) { "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ip actions=resubmit:SessionAffinity,resubmit:ServiceLB", "cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4", "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB", + "cookie=0x1030000000000, table=EndpointDNAT, priority=200,reg4=0x40000/0x70000 actions=controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark))", @@ -62,6 +63,7 @@ func Test_featureService_initFlows(t *testing.T) { "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ipv6 actions=resubmit:SessionAffinity,resubmit:ServiceLB", "cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4", "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB", + "cookie=0x1030000000000, table=EndpointDNAT, priority=200,reg4=0x40000/0x70000 actions=controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark))", @@ -86,6 +88,7 @@ func Test_featureService_initFlows(t *testing.T) { "cookie=0x1030000000000, table=NodePortMark, priority=200,ip,nw_dst=169.254.0.252 actions=set_field:0x80000/0x80000->reg4", "cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4", "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB", + "cookie=0x1030000000000, table=EndpointDNAT, priority=200,reg4=0x40000/0x70000 actions=controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark))", @@ -110,6 +113,7 @@ func Test_featureService_initFlows(t *testing.T) { "cookie=0x1030000000000, table=NodePortMark, priority=200,ipv6,ipv6_dst=fc01::aabb:ccdd:eefe actions=set_field:0x80000/0x80000->reg4", "cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4", "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB", + "cookie=0x1030000000000, table=EndpointDNAT, priority=200,reg4=0x40000/0x70000 actions=controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark))", diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 09a80478d1e..eb3f59f7c46 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -15,6 +15,7 @@ package proxy import ( + "errors" "fmt" "math" "net" @@ -23,6 +24,8 @@ import ( "sync" "time" + "antrea.io/libOpenflow/protocol" + "antrea.io/ofnet/ofctrl" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" @@ -409,10 +412,6 @@ func (p *proxier) installServices() { p.endpointsInstalledMap[svcPortName] = endpointsInstalled } endpointsToInstall := p.endpointsMap[svcPortName] - // If both expected Endpoints number and installed Endpoints number are 0, we don't need to take care of this Service. - if len(endpointsToInstall) == 0 && len(endpointsInstalled) == 0 { - continue - } installedSvcPort, ok := p.serviceInstalledMap[svcPortName] var pSvcInfo *types.ServiceInfo @@ -960,6 +959,7 @@ func (p *proxier) deleteServiceByIP(serviceStr string) { func (p *proxier) Run(stopCh <-chan struct{}) { p.once.Do(func() { + p.ofClient.RegisterPacketInHandler(uint8(openflow.PacketInReasonSvcReject), "svc-reject", p) go p.serviceConfig.Run(stopCh) if p.endpointSliceEnabled { go p.endpointSliceConfig.Run(stopCh) @@ -1023,6 +1023,72 @@ func (p *proxier) GetServiceFlowKeys(serviceName, namespace string) ([]string, [ return flows, groups, found } +func (p *proxier) HandlePacketIn(pktIn *ofctrl.PacketIn) error { + if pktIn == nil { + return errors.New("empty packetin for Antrea Proxy") + } + matches := pktIn.GetMatches() + + noEpToSelectRegField := openflow.NoEpToSelectRegMark.GetField() + noEpToSelectRegValue := openflow.NoEpToSelectRegMark.GetValue() + match := openflow.GetMatchFieldByRegID(matches, noEpToSelectRegField.GetRegID()) + if match == nil { + return fmt.Errorf("error getting match NoEpToSelectRegMark") + } + regValue, err := openflow.GetInfoInReg(match, noEpToSelectRegField.GetRange().ToNXRange()) + if err != nil { + klog.ErrorS(err, "Received error while unloading NoEpToSelectRegMark from OVS reg") + return err + } + // Filter out the packets that don't have reg mark NoEpToSelectRegMark. + if regValue&noEpToSelectRegValue != noEpToSelectRegValue { + return nil + } + + // Get ethernet data. + ethernetPkt, err := openflow.GetEthernetPacket(pktIn) + if err != nil { + return err + } + srcMAC := ethernetPkt.HWDst.String() + dstMAC := ethernetPkt.HWSrc.String() + + var ( + srcIP, dstIP string + proto uint8 + isIPv6 bool + ) + switch ipPkt := ethernetPkt.Data.(type) { + case *protocol.IPv4: + srcIP = ipPkt.NWDst.String() + dstIP = ipPkt.NWSrc.String() + proto = ipPkt.Protocol + isIPv6 = false + case *protocol.IPv6: + srcIP = ipPkt.NWDst.String() + dstIP = ipPkt.NWSrc.String() + proto = ipPkt.NextHeader + isIPv6 = true + } + + inPortField := matches.GetMatchByName(binding.OxmFieldInPort) + if inPortField == nil { + return fmt.Errorf("error when getting match field inPort") + } + outPort := inPortField.GetValue().(uint32) + return openflow.SendRejectPacketOut(p.ofClient, + srcMAC, + dstMAC, + srcIP, + dstIP, + 0, + outPort, + isIPv6, + ethernetPkt, + proto, + nil) +} + func NewProxier( hostname string, informerFactory informers.SharedInformerFactory, diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 52f88619ba1..ac4c921067f 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -1360,10 +1360,20 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) svc := makeTestClusterIPService(&svcPortName, svcIP, int32(svcPort), corev1.ProtocolTCP, nil, nil, false) + updatedSvc := makeTestClusterIPService(&svcPortName, svcIP, int32(svcPort+1), corev1.ProtocolTCP, nil, nil, false) makeServiceMap(fp, svc) makeEndpointSliceMap(fp) + + groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, []k8sproxy.Endpoint{}).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), gomock.Any(), uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) + fp.syncProxyRules() + assert.Contains(t, fp.serviceInstalledMap, svcPortName) + + mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) + fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) fp.syncProxyRules() - assert.NotContains(t, fp.serviceInstalledMap, svcPortName) } func TestClusterIPNoEndpoint(t *testing.T) { @@ -1375,6 +1385,137 @@ func TestClusterIPNoEndpoint(t *testing.T) { }) } +func testNodePortNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, isIPv6 bool) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient, mockRouteClient := getMockClients(ctrl) + groupAllocator := openflow.NewGroupAllocator(isIPv6) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + + svc := makeTestNodePortService(&svcPortName, svcIP, + int32(svcPort), + int32(svcNodePort), + corev1.ProtocolTCP, + nil, + corev1.ServiceInternalTrafficPolicyCluster, + corev1.ServiceExternalTrafficPolicyTypeLocal) + updatedSvc := makeTestNodePortService(&svcPortName, svcIP, + int32(svcPort+1), + int32(svcNodePort), + corev1.ProtocolTCP, + nil, + corev1.ServiceInternalTrafficPolicyCluster, + corev1.ServiceExternalTrafficPolicyTypeLocal) + makeServiceMap(fp, svc) + makeEndpointSliceMap(fp) + + vIP := agentconfig.VirtualNodePortDNATIPv4 + if isIPv6 { + vIP = agentconfig.VirtualNodePortDNATIPv6 + } + + groupIDCluster := fp.groupCounter.AllocateIfNotExist(svcPortName, false) + groupIDLocal := fp.groupCounter.AllocateIfNotExist(svcPortName, true) + mockOFClient.EXPECT().InstallServiceGroup(groupIDCluster, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort), gomock.Any(), uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) + mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + fp.syncProxyRules() + + mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) + mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) + mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) + fp.syncProxyRules() +} + +func TestNodePortNoEndpoint(t *testing.T) { + t.Run("IPv4", func(t *testing.T) { + testNodePortNoEndpoint(t, nodePortAddressesIPv4, svc1IPv4, false) + }) + t.Run("IPv6", func(t *testing.T) { + testNodePortNoEndpoint(t, nodePortAddressesIPv6, svc1IPv6, true) + }) +} + +func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, loadBalancerIP net.IP, isIPv6 bool) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient, mockRouteClient := getMockClients(ctrl) + groupAllocator := openflow.NewGroupAllocator(isIPv6) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + + internalTrafficPolicy := corev1.ServiceInternalTrafficPolicyCluster + externalTrafficPolicy := corev1.ServiceExternalTrafficPolicyTypeLocal + + svc := makeTestLoadBalancerService(&svcPortName, svcIP, + []net.IP{loadBalancerIP}, + int32(svcPort), + int32(svcNodePort), + corev1.ProtocolTCP, + nil, + &internalTrafficPolicy, + externalTrafficPolicy) + updatedSvc := makeTestLoadBalancerService(&svcPortName, svcIP, + []net.IP{loadBalancerIP}, + int32(svcPort+1), + int32(svcNodePort), + corev1.ProtocolTCP, + nil, + &internalTrafficPolicy, + externalTrafficPolicy) + makeServiceMap(fp, svc) + makeEndpointSliceMap(fp) + + vIP := agentconfig.VirtualNodePortDNATIPv4 + if isIPv6 { + vIP = agentconfig.VirtualNodePortDNATIPv6 + } + + groupIDCluster := fp.groupCounter.AllocateIfNotExist(svcPortName, false) + groupIDLocal := fp.groupCounter.AllocateIfNotExist(svcPortName, true) + mockOFClient.EXPECT().InstallServiceGroup(groupIDCluster, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort), gomock.Any(), uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort), gomock.Any(), uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) + mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) + fp.syncProxyRules() + + mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) + mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort+1), gomock.Any(), uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) + mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) + fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) + fp.syncProxyRules() +} + +func TestLoadBalancerNoEndpoint(t *testing.T) { + t.Run("IPv4", func(t *testing.T) { + testLoadBalancerNoEndpoint(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, false) + }) + t.Run("IPv6", func(t *testing.T) { + testLoadBalancerNoEndpoint(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, true) + }) +} + func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1391,10 +1532,9 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP epTCP, epPortTCP := makeTestEndpointSliceEndpointAndPort(&svcPortNameTCP, epIP, int32(svcPort), corev1.ProtocolTCP, false) epsTCP := makeTestEndpointSlice(svcPortNameTCP.Namespace, svcPortNameTCP.Name, []discovery.Endpoint{*epTCP}, []discovery.EndpointPort{*epPortTCP}, isIPv6) - makeEndpointSliceMap(fp, epsTCP) epUDP, epPortUDP := makeTestEndpointSliceEndpointAndPort(&svcPortNameUDP, epIP, int32(svcPort), corev1.ProtocolUDP, false) epsUDP := makeTestEndpointSlice(svcPortNameUDP.Namespace, svcPortNameUDP.Name, []discovery.Endpoint{*epUDP}, []discovery.EndpointPort{*epPortUDP}, isIPv6) - makeEndpointSliceMap(fp, epsUDP) + makeEndpointSliceMap(fp, epsTCP, epsUDP) protocolTCP := binding.ProtocolTCP protocolUDP := binding.ProtocolUDP @@ -1406,16 +1546,22 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP groupID := fp.groupCounter.AllocateIfNotExist(svcPortNameTCP, false) groupIDUDP := fp.groupCounter.AllocateIfNotExist(svcPortNameUDP, false) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(2) + mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(protocolTCP, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(protocolUDP, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), protocolTCP, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDUDP, svcIP, uint16(svcPort), protocolUDP, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockOFClient.EXPECT().UninstallEndpointFlows(protocolUDP, gomock.Any()).Times(1) fp.syncProxyRules() + mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallEndpointFlows(protocolUDP, gomock.Any()).Times(1) fp.endpointsChanges.OnEndpointSliceUpdate(epsUDP, true) fp.syncProxyRules() + + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallEndpointFlows(protocolTCP, gomock.Any()).Times(1) + fp.endpointsChanges.OnEndpointSliceUpdate(epsTCP, true) + fp.syncProxyRules() } func TestClusterIPRemoveSamePortEndpoint(t *testing.T) { @@ -1447,15 +1593,16 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv } groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(2) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) assert.Contains(t, fp.endpointsInstalledMap, svcPortName) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) fp.endpointsChanges.OnEndpointSliceUpdate(eps, true) fp.syncProxyRules() @@ -1463,6 +1610,7 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv endpointsMap, ok := fp.endpointsInstalledMap[svcPortName] assert.True(t, ok) assert.Equal(t, 0, len(endpointsMap)) + fp.syncProxyRules() } func TestClusterIPRemoveEndpoints(t *testing.T) { @@ -1566,6 +1714,10 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne }) makeServiceMap(fp, svc) makeEndpointsMap(fp) + + groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) + mockOFClient.EXPECT().InstallServiceGroup(groupID, true, []k8sproxy.Endpoint{}).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), gomock.Any(), uint16(10800), false, gomock.Any(), false).Times(1) fp.syncProxyRules() } diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index f9656162a0a..bd1ded899ba 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -77,6 +77,7 @@ const ( NxmFieldDstIPv4 = "NXM_OF_IP_DST" NxmFieldSrcIPv6 = "NXM_NX_IPV6_SRC" NxmFieldDstIPv6 = "NXM_NX_IPV6_DST" + NxmFieldTunIPv4Src = "NXM_NX_TUN_IPV4_SRC" OxmFieldVLANVID = "OXM_OF_VLAN_VID" OxmFieldInPort = "OXM_OF_IN_PORT" diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index b0e78e64c2f..5d618fe8c5b 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.