From 9a6448a38f78b94d1973e8fec9e09d309af7938c Mon Sep 17 00:00:00 2001 From: Hongliang Liu Date: Wed, 22 Feb 2023 14:53:26 +0800 Subject: [PATCH] Reject the request to a Service without an Endpoint When requesting a Service without an Endpoint, the connection should be rejected, rather than timeout according to the expectation of Kubernetes sig-network tests. Signed-off-by: Hongliang Liu --- pkg/agent/controller/networkpolicy/fqdn.go | 6 +- .../controller/networkpolicy/packetin.go | 10 -- pkg/agent/controller/networkpolicy/reject.go | 46 +------- pkg/agent/multicast/mcast_discovery.go | 10 +- pkg/agent/openflow/client.go | 28 ++++- pkg/agent/openflow/client_test.go | 41 +++++++ pkg/agent/openflow/fields.go | 2 + pkg/agent/openflow/packetin.go | 7 +- pkg/agent/openflow/packetout.go | 111 ++++++++++++++++++ pkg/agent/openflow/service.go | 20 ++++ pkg/agent/openflow/service_test.go | 4 + pkg/agent/openflow/testing/mock_openflow.go | 16 ++- pkg/agent/proxy/proxier.go | 88 +++++++++++++- pkg/agent/proxy/proxier_test.go | 7 ++ pkg/ovs/openflow/interfaces.go | 2 + pkg/ovs/openflow/testing/mock_openflow.go | 2 +- 16 files changed, 326 insertions(+), 74 deletions(-) create mode 100644 pkg/agent/openflow/packetout.go diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index 48a7eb3b70e..b78b6a55e7f 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -756,7 +756,7 @@ func (f *fqdnController) handlePacketIn(pktIn *ofctrl.PacketIn) error { f.onDNSResponseMsg(&dnsMsg, time.Now(), waitCh) } go func() { - ethernetPkt, err := getEthernetPacket(pktIn) + ethernetPkt, err := openflow.GetEthernetPacket(pktIn) if err != nil { return } @@ -793,7 +793,7 @@ func (f *fqdnController) sendDNSPacketout(pktIn *ofctrl.PacketIn) error { prot uint8 isIPv6 bool ) - ethernetPkt, err := getEthernetPacket(pktIn) + ethernetPkt, err := openflow.GetEthernetPacket(pktIn) if err != nil { return err } @@ -823,7 +823,7 @@ func (f *fqdnController) sendDNSPacketout(pktIn *ofctrl.PacketIn) error { // Use the original in_port number in the packetIn message to avoid an invalid input port number. Note that, // this should not happen in container case as antrea-gw0 always exists. This check is for security. matches := pktIn.GetMatches() - inPortField := matches.GetMatchByName("OXM_OF_IN_PORT") + inPortField := matches.GetMatchByName(binding.OxmFieldInPort) if inPortField != nil { inPort = inPortField.GetValue().(uint32) } diff --git a/pkg/agent/controller/networkpolicy/packetin.go b/pkg/agent/controller/networkpolicy/packetin.go index 7730641f12e..abe0dbd0973 100644 --- a/pkg/agent/controller/networkpolicy/packetin.go +++ b/pkg/agent/controller/networkpolicy/packetin.go @@ -21,8 +21,6 @@ import ( "time" "antrea.io/libOpenflow/openflow15" - "antrea.io/libOpenflow/protocol" - "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" "github.com/vmware/go-ipfix/pkg/registry" "k8s.io/klog/v2" @@ -210,11 +208,3 @@ func isAntreaPolicyEgressTable(tableID uint8) bool { } return false } - -func getEthernetPacket(pktIn *ofctrl.PacketIn) (*protocol.Ethernet, error) { - ethernetPkt := new(protocol.Ethernet) - if err := ethernetPkt.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil { - return nil, fmt.Errorf("failed to parse ethernet packet from packet-in message: %v", err) - } - return ethernetPkt, nil -} diff --git a/pkg/agent/controller/networkpolicy/reject.go b/pkg/agent/controller/networkpolicy/reject.go index c468f11a6d3..b9ed8345256 100644 --- a/pkg/agent/controller/networkpolicy/reject.go +++ b/pkg/agent/controller/networkpolicy/reject.go @@ -15,7 +15,6 @@ package networkpolicy import ( - "encoding/binary" "fmt" "antrea.io/libOpenflow/protocol" @@ -87,7 +86,7 @@ const ( // packet-in message. func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { // Get ethernet data. - ethernetPkt, err := getEthernetPacket(pktIn) + ethernetPkt, err := openflow.GetEthernetPacket(pktIn) if err != nil { return err } @@ -168,43 +167,7 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, c.tunPort) mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType) - if proto == protocol.Type_TCP { - // Get TCP data. - oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, err := binding.GetTCPHeaderData(ethernetPkt.Data) - if err != nil { - return err - } - // While sending TCP reject packet-out, switch original src/dst port, - // set the ackNum as original seqNum+1 and set the flag as ack+rst. - return c.ofClient.SendTCPPacketOut( - srcMAC, - dstMAC, - srcIP, - dstIP, - inPort, - outPort, - isIPv6, - oriTCPDstPort, - oriTCPSrcPort, - oriTCPSeqNum+1, - TCPAck|TCPRst, - mutateFunc) - } - // Use ICMP host administratively prohibited for ICMP, UDP, SCTP reject. - icmpType := ICMPDstUnreachableType - icmpCode := ICMPDstHostAdminProhibitedCode - ipHdrLen := IPv4HdrLen - if isIPv6 { - icmpType = ICMPv6DstUnreachableType - icmpCode = ICMPv6DstAdminProhibitedCode - ipHdrLen = IPv6HdrLen - } - ipHdr, _ := ethernetPkt.Data.MarshalBinary() - icmpData := make([]byte, int(ICMPUnusedHdrLen+ipHdrLen+8)) - // Put ICMP unused header in Data prop and set it to zero. - binary.BigEndian.PutUint32(icmpData[:ICMPUnusedHdrLen], 0) - copy(icmpData[ICMPUnusedHdrLen:], ipHdr[:ipHdrLen+8]) - return c.ofClient.SendICMPPacketOut( + return openflow.SendRejectPacketOut(c.ofClient, srcMAC, dstMAC, srcIP, @@ -212,9 +175,8 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { inPort, outPort, isIPv6, - icmpType, - icmpCode, - icmpData, + ethernetPkt, + proto, mutateFunc) } diff --git a/pkg/agent/multicast/mcast_discovery.go b/pkg/agent/multicast/mcast_discovery.go index d775368e215..4bdc95a41f1 100644 --- a/pkg/agent/multicast/mcast_discovery.go +++ b/pkg/agent/multicast/mcast_discovery.go @@ -33,17 +33,13 @@ import ( "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/apis/crd/v1alpha1" + binding "antrea.io/antrea/pkg/ovs/openflow" ) const ( IGMPProtocolNumber = 2 ) -const ( - openflowKeyTunnelSrc = "NXM_NX_TUN_IPV4_SRC" - openflowKeyInPort = "OXM_OF_IN_PORT" -) - var ( // igmpMaxResponseTime is the maximum time allowed before sending a responding report which is used for the // "Max Resp Code" field in the IGMP query message. It is also the maximum time to wait for the IGMP report message @@ -103,7 +99,7 @@ func getInfoInReg(regMatch *ofctrl.MatchField, rng *openflow15.NXRange) (uint32, func (s *IGMPSnooper) parseSrcInterface(pktIn *ofctrl.PacketIn) (*interfacestore.InterfaceConfig, error) { matches := pktIn.GetMatches() - ofPortField := matches.GetMatchByName(openflowKeyInPort) + ofPortField := matches.GetMatchByName(binding.OxmFieldInPort) if ofPortField == nil { return nil, errors.New("in_port field not found") } @@ -327,7 +323,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { func (s *IGMPSnooper) parseSrcNode(pktIn *ofctrl.PacketIn) (net.IP, error) { matches := pktIn.GetMatches() - tunSrcField := matches.GetMatchByName(openflowKeyTunnelSrc) + tunSrcField := matches.GetMatchByName(binding.NxmFieldTunIPv4Src) if tunSrcField == nil { return nil, errors.New("in_port field not found") } diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index a425dee9757..dfb070246f8 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -280,6 +280,8 @@ type Client interface { // DeleteAddressFromDNSConjunction removes addresses from the toAddresses of the dns packetIn conjunction. DeleteAddressFromDNSConjunction(id uint32, addrs []types.Address) error + InstallServiceNoEndpointGroup(groupID binding.GroupIDType) error + // InstallMulticastFlows installs the flow to forward Multicast traffic normally, and output it to antrea-gw0 // to ensure it can be forwarded to the external addresses. InstallMulticastFlows(multicastIP net.IP, groupID binding.GroupIDType) error @@ -700,8 +702,9 @@ func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, if affinityTimeout != 0 { flows = append(flows, c.featureService.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal, svcType)) } + cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol) - return c.addFlows(c.featureService.cachedFlows, cacheKey, flows) + return c.modifyFlows(c.featureService.cachedFlows, cacheKey, flows) } func (c *client) UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error { @@ -1103,7 +1106,9 @@ func setBasePacketOutBuilder(packetOutBuilder binding.PacketOutBuilder, srcMAC s packetOutBuilder = packetOutBuilder.SetTTL(128) - packetOutBuilder = packetOutBuilder.SetInport(inPort) + if inPort != 0 { + packetOutBuilder = packetOutBuilder.SetInport(inPort) + } if outPort != 0 { packetOutBuilder = packetOutBuilder.SetOutport(outPort) } @@ -1316,6 +1321,25 @@ func (c *client) SendIGMPRemoteReportPacketOut( return c.bridge.SendPacketOut(packetOutObj) } +func (c *client) InstallServiceNoEndpointGroup(groupID binding.GroupIDType) error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + + group := c.featureService.serviceNoEndpointGroup(groupID) + _, installed := c.featureService.groupCache.Load(groupID) + if !installed { + if err := c.ofEntryOperations.AddOFEntries([]binding.OFEntry{group}); err != nil { + return fmt.Errorf("error when installing Service without Endpoint group %d: %w", groupID, err) + } + } else { + if err := c.ofEntryOperations.ModifyOFEntries([]binding.OFEntry{group}); err != nil { + return fmt.Errorf("error when modifying Service without Endpoint group %d: %w", groupID, err) + } + } + c.featureService.groupCache.Store(groupID, group) + return nil +} + func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index c4bd7bf0b30..1bf944147be 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -1972,6 +1972,47 @@ func Test_client_InstallTrafficControlReturnPortFlow(t *testing.T) { require.False(t, ok) } +func Test_client_InstallServiceNoEndpointGroup(t *testing.T) { + groupID := binding.GroupIDType(100) + testCases := []struct { + name string + ipv4Enabled bool + ipv6Enabled bool + expectedGroup string + }{ + { + name: "IPv4", + ipv4Enabled: true, + expectedGroup: "group_id=100,type=select," + + "bucket=bucket_id:0,weight:100,actions=set_field:0x40000/0x70000->reg4,resubmit:EndpointDNAT", + }, + { + name: "IPv6", + ipv6Enabled: true, + expectedGroup: "group_id=100,type=select," + + "bucket=bucket_id:0,weight:100,actions=set_field:0x40000/0x70000->reg4,resubmit:EndpointDNAT", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + m := oftest.NewMockOFEntryOperations(ctrl) + + fc := newFakeClient(m, tc.ipv4Enabled, tc.ipv6Enabled, config.K8sNode, config.TrafficEncapModeEncap) + defer resetPipelines() + + m.EXPECT().AddOFEntries(gomock.Any()).Return(nil).Times(1) + + assert.NoError(t, fc.InstallServiceNoEndpointGroup(groupID)) + gCacheI, ok := fc.featureService.groupCache.Load(groupID) + require.True(t, ok) + group := getGroupFromCache(gCacheI.(binding.Group)) + assert.Equal(t, tc.expectedGroup, group) + }) + } +} + func Test_client_InstallMulticastGroup(t *testing.T) { groupID := binding.GroupIDType(101) localReceivers := []uint32{50, 100} diff --git a/pkg/agent/openflow/fields.go b/pkg/agent/openflow/fields.go index 72ddd62b3be..74bf3648d76 100644 --- a/pkg/agent/openflow/fields.go +++ b/pkg/agent/openflow/fields.go @@ -106,10 +106,12 @@ var ( // - 0b001: packet need to do service selection. // - 0b010: packet has done service selection. // - 0b011: packet has done service selection and the selection result needs to be cached. + // - 0b100: there is no Endpoint to do service selection. ServiceEPStateField = binding.NewRegField(4, 16, 18) EpToSelectRegMark = binding.NewRegMark(ServiceEPStateField, 0b001) EpSelectedRegMark = binding.NewRegMark(ServiceEPStateField, 0b010) EpToLearnRegMark = binding.NewRegMark(ServiceEPStateField, 0b011) + NoEpToSelectRegMark = binding.NewRegMark(ServiceEPStateField, 0b100) // reg4[0..18]: Field to store the union value of Endpoint port and Endpoint status. It is used as a single match // when needed. EpUnionField = binding.NewRegField(4, 0, 18) diff --git a/pkg/agent/openflow/packetin.go b/pkg/agent/openflow/packetin.go index c0b88bfa5dc..f4d1a620cfe 100644 --- a/pkg/agent/openflow/packetin.go +++ b/pkg/agent/openflow/packetin.go @@ -52,7 +52,8 @@ const ( PacketInReasonNP ofpPacketInReason = 0 // PacketInReasonMC shares PacketInReasonNP for IGMP packet_in message. This is because OVS "controller" action // only correctly supports reason 0 or 1. Change to another value after the OVS action is corrected. - PacketInReasonMC = PacketInReasonNP + PacketInReasonMC = PacketInReasonNP + PacketInReasonSvcReject = PacketInReasonNP // PacketInQueueSize defines the size of PacketInQueue. // When PacketInQueue reaches PacketInQueueSize, new packet-in will be dropped. PacketInQueueSize = 200 @@ -81,7 +82,7 @@ type featureStartPacketIn struct { packetInQueue *openflow.PacketInQueue } -func newfeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn { +func newFeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn { featurePacketIn := featureStartPacketIn{reason: reason, stopCh: stopCh} featurePacketIn.packetInQueue = openflow.NewPacketInQueue(PacketInQueueSize, rate.Limit(PacketInQueueRate)) @@ -96,7 +97,7 @@ func (c *client) StartPacketInHandler(stopCh <-chan struct{}) { // Iterate through each feature that starts packetin. Subscribe with their specified reason. for reason := range c.packetInHandlers { - featurePacketIn := newfeatureStartPacketIn(reason, stopCh) + featurePacketIn := newFeatureStartPacketIn(reason, stopCh) err := c.subscribeFeaturePacketIn(featurePacketIn) if err != nil { klog.Errorf("received error %+v while subscribing packetin for each feature", err) diff --git a/pkg/agent/openflow/packetout.go b/pkg/agent/openflow/packetout.go new file mode 100644 index 00000000000..91e7c1e309c --- /dev/null +++ b/pkg/agent/openflow/packetout.go @@ -0,0 +1,111 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package openflow + +import ( + "encoding/binary" + "fmt" + + "antrea.io/libOpenflow/protocol" + "antrea.io/libOpenflow/util" + "antrea.io/ofnet/ofctrl" + + binding "antrea.io/antrea/pkg/ovs/openflow" +) + +const ( + ipv4HdrLen uint16 = 20 + ipv6HdrLen uint16 = 40 + + icmpUnusedHdrLen uint16 = 4 + + tcpAck uint8 = 0b010000 + tcpRst uint8 = 0b000100 + + icmpDstUnreachableType uint8 = 3 + icmpDstHostAdminProhibitedCode uint8 = 10 + + icmpv6DstUnreachableType uint8 = 1 + icmpv6DstAdminProhibitedCode uint8 = 1 +) + +func GetEthernetPacket(pktIn *ofctrl.PacketIn) (*protocol.Ethernet, error) { + ethernetPkt := new(protocol.Ethernet) + if err := ethernetPkt.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil { + return nil, fmt.Errorf("failed to parse ethernet packet from packet-in message: %v", err) + } + return ethernetPkt, nil +} + +func SendRejectPacketOut(ofClient Client, + srcMAC string, + dstMAC string, + srcIP string, + dstIP string, + inPort uint32, + outPort uint32, + isIPv6 bool, + ethernetPkt *protocol.Ethernet, + proto uint8, + mutateFunc func(binding.PacketOutBuilder) binding.PacketOutBuilder) error { + if proto == protocol.Type_TCP { + // Get TCP data. + oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, err := binding.GetTCPHeaderData(ethernetPkt.Data) + if err != nil { + return err + } + // While sending TCP reject packet-out, switch original src/dst port, + // set the ackNum as original seqNum+1 and set the flag as ack+rst. + return ofClient.SendTCPPacketOut( + srcMAC, + dstMAC, + srcIP, + dstIP, + inPort, + outPort, + isIPv6, + oriTCPDstPort, + oriTCPSrcPort, + oriTCPSeqNum+1, + tcpAck|tcpRst, + mutateFunc) + } + // Use ICMP host administratively prohibited for ICMP, UDP, SCTP reject. + icmpType := icmpDstUnreachableType + icmpCode := icmpDstHostAdminProhibitedCode + ipHdrLen := ipv4HdrLen + if isIPv6 { + icmpType = icmpv6DstUnreachableType + icmpCode = icmpv6DstAdminProhibitedCode + ipHdrLen = ipv6HdrLen + } + ipHdr, _ := ethernetPkt.Data.MarshalBinary() + icmpData := make([]byte, int(icmpUnusedHdrLen+ipHdrLen+8)) + // Put ICMP unused header in Data prop and set it to zero. + binary.BigEndian.PutUint32(icmpData[:icmpUnusedHdrLen], 0) + copy(icmpData[icmpUnusedHdrLen:], ipHdr[:ipHdrLen+8]) + return ofClient.SendICMPPacketOut( + srcMAC, + dstMAC, + srcIP, + dstIP, + inPort, + outPort, + isIPv6, + icmpType, + icmpCode, + icmpData, + mutateFunc) +} diff --git a/pkg/agent/openflow/service.go b/pkg/agent/openflow/service.go index 5f7e4048972..0ad6e136c0a 100644 --- a/pkg/agent/openflow/service.go +++ b/pkg/agent/openflow/service.go @@ -124,6 +124,25 @@ func newFeatureService( } } +// serviceNoEndpointGroup creates a shared group for the Services without Endpoint. The packets to any Service that +// doesn't have Endpoint will be forwarded to this group. +func (f *featureService) serviceNoEndpointGroup(groupID binding.GroupIDType) binding.Group { + return f.bridge.CreateGroup(groupID).ResetBuckets(). + Bucket().Weight(100). + LoadToRegField(ServiceEPStateField, NoEpToSelectRegMark.GetValue()). + ResubmitToTable(EndpointDNATTable.GetID()). + Done() +} + +// serviceNoEndpointFlow generates the flow to match the packets to Service without Endpoint and send them to controller. +func (f *featureService) serviceNoEndpointFlow() binding.Flow { + return EndpointDNATTable.ofTable.BuildFlow(priorityLow). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchRegMark(NoEpToSelectRegMark). + Action().SendToController(uint8(PacketInReasonSvcReject)). + Done() +} + func (f *featureService) initFlows() []binding.Flow { var flows []binding.Flow if f.enableProxy { @@ -134,6 +153,7 @@ func (f *featureService) initFlows() []binding.Flow { flows = append(flows, f.snatConntrackFlows()...) flows = append(flows, f.serviceNeedLBFlow()) flows = append(flows, f.sessionAffinityReselectFlow()) + flows = append(flows, f.serviceNoEndpointFlow()) flows = append(flows, f.l2ForwardOutputHairpinServiceFlow()) if f.proxyAll { // This installs the flows to match the first packet of NodePort connection. The flows set a bit of a register diff --git a/pkg/agent/openflow/service_test.go b/pkg/agent/openflow/service_test.go index afe2f87e34b..8fb1be931e0 100644 --- a/pkg/agent/openflow/service_test.go +++ b/pkg/agent/openflow/service_test.go @@ -41,6 +41,7 @@ func Test_featureService_initFlows(t *testing.T) { "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ip actions=resubmit:SessionAffinity,resubmit:ServiceLB", "cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4", "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB", + "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x40000/0x70000 actions=controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark))", @@ -62,6 +63,7 @@ func Test_featureService_initFlows(t *testing.T) { "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ipv6 actions=resubmit:SessionAffinity,resubmit:ServiceLB", "cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4", "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB", + "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x40000/0x70000 actions=controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark))", @@ -86,6 +88,7 @@ func Test_featureService_initFlows(t *testing.T) { "cookie=0x1030000000000, table=NodePortMark, priority=200,ip,nw_dst=169.254.0.252 actions=set_field:0x80000/0x80000->reg4", "cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4", "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB", + "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x40000/0x70000 actions=controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark))", @@ -110,6 +113,7 @@ func Test_featureService_initFlows(t *testing.T) { "cookie=0x1030000000000, table=NodePortMark, priority=200,ipv6,ipv6_dst=fc01::aabb:ccdd:eefe actions=set_field:0x80000/0x80000->reg4", "cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4", "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB", + "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x40000/0x70000 actions=controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark))", diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 06356eb9391..49598ed201d 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -479,6 +479,20 @@ func (mr *MockClientMockRecorder) InstallServiceGroup(arg0, arg1, arg2 interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallServiceGroup", reflect.TypeOf((*MockClient)(nil).InstallServiceGroup), arg0, arg1, arg2) } +// InstallServiceNoEndpointGroup mocks base method +func (m *MockClient) InstallServiceNoEndpointGroup(arg0 openflow.GroupIDType) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallServiceNoEndpointGroup", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallServiceNoEndpointGroup indicates an expected call of InstallServiceNoEndpointGroup +func (mr *MockClientMockRecorder) InstallServiceNoEndpointGroup(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallServiceNoEndpointGroup", reflect.TypeOf((*MockClient)(nil).InstallServiceNoEndpointGroup), arg0) +} + // InstallTraceflowFlows mocks base method func (m *MockClient) InstallTraceflowFlows(arg0 byte, arg1, arg2, arg3 bool, arg4 *openflow.Packet, arg5 uint32, arg6 uint16) error { m.ctrl.T.Helper() diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 063b86ad33b..88a03766183 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -15,6 +15,7 @@ package proxy import ( + "errors" "fmt" "math" "net" @@ -23,6 +24,8 @@ import ( "sync" "time" + "antrea.io/libOpenflow/protocol" + "antrea.io/ofnet/ofctrl" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" @@ -95,7 +98,8 @@ type proxier struct { // endpointReferenceCounter stores the number of times an Endpoint is referenced by Services. endpointReferenceCounter map[string]int // groupCounter is used to allocate groupID. - groupCounter types.GroupCounter + groupCounter types.GroupCounter + serviceNoEndpointGroupID binding.GroupIDType // serviceStringMap provides map from serviceString(ClusterIP:Port/Proto) to ServicePortName. serviceStringMap map[string]k8sproxy.ServicePortName // serviceStringMapMutex protects serviceStringMap object. @@ -357,8 +361,12 @@ func (p *proxier) installServices() { if p.topologyAwareHintsEnabled { endpoints = filterEndpoints(endpoints, svcInfo, p.nodeLabels) } - // If both expected Endpoints number and installed Endpoints number are 0, we don't need to take care of this Service. + // If both expected Endpoints number and installed Endpoints number are 0, we only need to install flows for the + // Service to reject the requests to the Service without Endpoints. if len(endpoints) == 0 && len(endpointsInstalled) == 0 { + if err := p.ofClient.InstallServiceFlows(p.serviceNoEndpointGroupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, 0, false, ""); err != nil { + klog.ErrorS(err, "Error when installing Service flows", "Service", svcPortName) + } continue } @@ -422,13 +430,20 @@ func (p *proxier) installServices() { } } - // If there are expired Endpoints, Endpoints installed should be updated. - if internalNodeLocal && externalNodeLocal && len(localEndpointUpdateList) < len(endpointsInstalled) || - !(internalNodeLocal && externalNodeLocal) && len(allEndpointUpdateList) < len(endpointsInstalled) { + // If there are expired Endpoints, installed flows for Endpoints should be updated. + if internalNodeLocal && (externalNodeLocal || svcInfo.NodePort() == 0) && len(localEndpointUpdateList) < len(endpointsInstalled) || + (!internalNodeLocal || !externalNodeLocal && svcInfo.NodePort() > 0) && len(allEndpointUpdateList) < len(endpointsInstalled) { klog.V(2).Infof("Some Endpoints of Service %s removed, updating Endpoints", svcInfo.String()) needUpdateEndpoints = true } + // If either expected Endpoints number or installed Endpoints number is 0, installed flows for Service should be + // updated. + if internalNodeLocal && (externalNodeLocal || svcInfo.NodePort() == 0) && (len(localEndpointUpdateList) == 0 || len(endpointsInstalled) == 0) || + (!internalNodeLocal || !externalNodeLocal && svcInfo.NodePort() > 0) && (len(allEndpointUpdateList) == 0 || len(endpointsInstalled) == 0) { + needUpdateService = true + } + var deletedLoadBalancerIPs, addedLoadBalancerIPs []string if p.proxyLoadBalancerIPs { if pSvcInfo != nil { @@ -554,6 +569,10 @@ func (p *proxier) installServices() { // Install ClusterIP flows for the Service. groupID := p.groupCounter.AllocateIfNotExist(svcPortName, internalNodeLocal) + // If the number of expected Endpoints is 0, use a zero group ID to install a reject flow. + if internalNodeLocal && len(localEndpointUpdateList) == 0 || len(allEndpointUpdateList) == 0 { + groupID = p.serviceNoEndpointGroupID + } if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), externalNodeLocal, corev1.ServiceTypeClusterIP); err != nil { klog.Errorf("Error when installing Service flows: %v", err) continue @@ -561,6 +580,10 @@ func (p *proxier) installServices() { if p.proxyAll { nGroupID := p.groupCounter.AllocateIfNotExist(svcPortName, externalNodeLocal) + // If the number of expected Endpoints is 0, use a zero group ID to install a reject flow. + if internalNodeLocal && len(localEndpointUpdateList) == 0 || len(allEndpointUpdateList) == 0 { + nGroupID = p.serviceNoEndpointGroupID + } // Install ClusterIP route on Node so that ClusterIP can be accessed on Node. Every time a new ClusterIP // is created, the routing target IP block will be recalculated for expansion to be able to route the new // created ClusterIP. Deleting a ClusterIP will not shrink the target routing IP block. The Service CIDR @@ -582,6 +605,10 @@ func (p *proxier) installServices() { if p.proxyLoadBalancerIPs { nGroupID := p.groupCounter.AllocateIfNotExist(svcPortName, externalNodeLocal) + // If the number of expected Endpoints is 0, use a zero group ID to install a reject flow. + if internalNodeLocal && len(localEndpointUpdateList) == 0 || len(allEndpointUpdateList) == 0 { + nGroupID = p.serviceNoEndpointGroupID + } // Service LoadBalancer flows can be partially updated. var toDelete, toAdd []string if needRemoval { @@ -844,6 +871,12 @@ func (p *proxier) deleteServiceByIP(serviceStr string) { func (p *proxier) Run(stopCh <-chan struct{}) { p.once.Do(func() { + p.ofClient.RegisterPacketInHandler(uint8(openflow.PacketInReasonSvcReject), "svc-reject", p) + p.serviceNoEndpointGroupID = p.groupCounter.AllocateIfNotExist(k8sproxy.ServicePortName{}, false) + if err := p.ofClient.InstallServiceNoEndpointGroup(p.serviceNoEndpointGroupID); err != nil { + klog.ErrorS(err, "Failed to install Openflow group for Service without Endpoint") + return + } go p.serviceConfig.Run(stopCh) if p.endpointSliceEnabled { go p.endpointSliceConfig.Run(stopCh) @@ -904,6 +937,51 @@ func (p *proxier) GetServiceFlowKeys(serviceName, namespace string) ([]string, [ return flows, groups, found } +func (p *proxier) HandlePacketIn(pktIn *ofctrl.PacketIn) error { + if pktIn == nil { + return errors.New("empty packetin for Antrea Proxy") + } + // Get ethernet data. + ethernetPkt, err := openflow.GetEthernetPacket(pktIn) + if err != nil { + return err + } + srcMAC := ethernetPkt.HWDst.String() + dstMAC := ethernetPkt.HWSrc.String() + + var ( + srcIP, dstIP string + proto uint8 + isIPv6 bool + ) + switch ipPkt := ethernetPkt.Data.(type) { + case *protocol.IPv4: + srcIP = ipPkt.NWDst.String() + dstIP = ipPkt.NWSrc.String() + proto = ipPkt.Protocol + isIPv6 = false + case *protocol.IPv6: + srcIP = ipPkt.NWDst.String() + dstIP = ipPkt.NWSrc.String() + proto = ipPkt.NextHeader + isIPv6 = true + } + + matches := pktIn.GetMatches() + outPort := matches.GetMatchByName(binding.OxmFieldInPort).GetValue().(uint32) + return openflow.SendRejectPacketOut(p.ofClient, + srcMAC, + dstMAC, + srcIP, + dstIP, + 0, + outPort, + isIPv6, + ethernetPkt, + proto, + nil) +} + func NewProxier( hostname string, informerFactory informers.SharedInformerFactory, diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 94394a1c8d1..10aeb83ad1b 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -337,6 +337,7 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP if o.endpointSliceEnabled { p.endpointsChanges = newEndpointsChangesTracker(hostname, o.endpointSliceEnabled, isIPv6) } + p.serviceNoEndpointGroupID = p.groupCounter.AllocateIfNotExist(k8sproxy.ServicePortName{}, false) return p } @@ -1060,6 +1061,8 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { svc := makeTestClusterIPService(&svcPortName, svcIP, int32(svcPort), corev1.ProtocolTCP, nil, nil) makeServiceMap(fp, svc) makeEndpointsMap(fp) + + mockOFClient.EXPECT().InstallServiceFlows(fp.serviceNoEndpointGroupID, svcIP, uint16(svcPort), gomock.Any(), uint16(0), false, gomock.Any()).Times(1) fp.syncProxyRules() } @@ -1110,6 +1113,7 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP mockOFClient.EXPECT().UninstallEndpointFlows(protocolUDP, gomock.Any()).Times(1) fp.syncProxyRules() + mockOFClient.EXPECT().InstallServiceFlows(fp.serviceNoEndpointGroupID, svcIP, uint16(svcPort), protocolUDP, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1) fp.endpointsChanges.OnEndpointUpdate(epUDP, nil) fp.syncProxyRules() } @@ -1149,6 +1153,7 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) fp.syncProxyRules() + mockOFClient.EXPECT().InstallServiceFlows(fp.serviceNoEndpointGroupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1) fp.endpointsChanges.OnEndpointUpdate(ep, nil) fp.syncProxyRules() } @@ -1254,6 +1259,8 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne }) makeServiceMap(fp, svc) makeEndpointsMap(fp) + + mockOFClient.EXPECT().InstallServiceFlows(fp.serviceNoEndpointGroupID, svcIP, uint16(svcPort), gomock.Any(), uint16(0), false, gomock.Any()).Times(1) fp.syncProxyRules() } diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 6fbc0d735b2..1f68455c6f9 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -76,8 +76,10 @@ const ( NxmFieldDstIPv4 = "NXM_OF_IP_DST" NxmFieldSrcIPv6 = "NXM_NX_IPV6_SRC" NxmFieldDstIPv6 = "NXM_NX_IPV6_DST" + NxmFieldTunIPv4Src = "NXM_NX_TUN_IPV4_SRC" OxmFieldVLANVID = "OXM_OF_VLAN_VID" + OxmFieldInPort = "OXM_OF_IN_PORT" ) const ( diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index ce28d5953de..57d137440b4 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.