Skip to content

Commit

Permalink
Reject the request to a Service without an Endpoint
Browse files Browse the repository at this point in the history
When requesting a Service without an Endpoint, the connection should be rejected,
rather than timeout according to the expectation of Kubernetes sig-network tests.

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Mar 20, 2023
1 parent a66f078 commit be3fba1
Show file tree
Hide file tree
Showing 15 changed files with 346 additions and 78 deletions.
4 changes: 2 additions & 2 deletions pkg/agent/controller/networkpolicy/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ func (f *fqdnController) handlePacketIn(pktIn *ofctrl.PacketIn) error {
f.onDNSResponseMsg(&dnsMsg, time.Now(), waitCh)
}
go func() {
ethernetPkt, err := getEthernetPacket(pktIn)
ethernetPkt, err := openflow.GetEthernetPacket(pktIn)
if err != nil {
// Can't parse the packet. Forward it to the Pod.
waitCh <- nil
Expand Down Expand Up @@ -821,7 +821,7 @@ func (f *fqdnController) handlePacketIn(pktIn *ofctrl.PacketIn) error {

// sendDNSPacketout forwards the DNS response packet to the original requesting client.
func (f *fqdnController) sendDNSPacketout(pktIn *ofctrl.PacketIn) error {
ethernetPkt, err := getEthernetPacket(pktIn)
ethernetPkt, err := openflow.GetEthernetPacket(pktIn)
if err != nil {
return err
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"time"

"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
"antrea.io/libOpenflow/util"
"antrea.io/ofnet/ofctrl"
"github.com/vmware/go-ipfix/pkg/registry"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -210,11 +208,3 @@ func isAntreaPolicyEgressTable(tableID uint8) bool {
}
return false
}

func getEthernetPacket(pktIn *ofctrl.PacketIn) (*protocol.Ethernet, error) {
ethernetPkt := new(protocol.Ethernet)
if err := ethernetPkt.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil {
return nil, fmt.Errorf("failed to parse ethernet packet from packet-in message: %v", err)
}
return ethernetPkt, nil
}
50 changes: 4 additions & 46 deletions pkg/agent/controller/networkpolicy/reject.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package networkpolicy

import (
"encoding/binary"
"fmt"
"net"

Expand Down Expand Up @@ -92,7 +91,7 @@ const (
func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error {
// All src/dst mean the source/destination of the reject packet, which are destination/source of the incoming packet.
// Get ethernet data.
ethernetPkt, err := getEthernetPacket(pktIn)
ethernetPkt, err := openflow.GetEthernetPacket(pktIn)
if err != nil {
return err
}
Expand Down Expand Up @@ -191,57 +190,16 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error {
inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, c.tunPort)
mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType, isFlexibleIPAMSrc, isFlexibleIPAMDst, ctZone)

if proto == protocol.Type_TCP {
// Get TCP data.
oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, _, _, err := binding.GetTCPHeaderData(ethernetPkt.Data)
if err != nil {
return err
}
// While sending TCP reject packet-out, switch original src/dst port,
// set the ackNum as original seqNum+1 and set the flag as ack+rst.
return c.ofClient.SendTCPPacketOut(
srcMAC,
dstMAC,
srcIP,
dstIP,
inPort,
outPort,
isIPv6,
oriTCPDstPort,
oriTCPSrcPort,
0,
oriTCPSeqNum+1,
0,
TCPAck|TCPRst,
0,
nil,
mutateFunc)
}
// Use ICMP host administratively prohibited for ICMP, UDP, SCTP reject.
icmpType := ICMPDstUnreachableType
icmpCode := ICMPDstHostAdminProhibitedCode
ipHdrLen := IPv4HdrLen
if isIPv6 {
icmpType = ICMPv6DstUnreachableType
icmpCode = ICMPv6DstAdminProhibitedCode
ipHdrLen = IPv6HdrLen
}
ipHdr, _ := ethernetPkt.Data.MarshalBinary()
icmpData := make([]byte, int(ICMPUnusedHdrLen+ipHdrLen+8))
// Put ICMP unused header in Data prop and set it to zero.
binary.BigEndian.PutUint32(icmpData[:ICMPUnusedHdrLen], 0)
copy(icmpData[ICMPUnusedHdrLen:], ipHdr[:ipHdrLen+8])
return c.ofClient.SendICMPPacketOut(
return openflow.SendRejectPacketOut(c.ofClient,
srcMAC,
dstMAC,
srcIP,
dstIP,
inPort,
outPort,
isIPv6,
icmpType,
icmpCode,
icmpData,
ethernetPkt,
proto,
mutateFunc)
}

Expand Down
10 changes: 3 additions & 7 deletions pkg/agent/multicast/mcast_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@ import (
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
binding "antrea.io/antrea/pkg/ovs/openflow"
)

const (
IGMPProtocolNumber = 2
)

const (
openflowKeyTunnelSrc = "NXM_NX_TUN_IPV4_SRC"
openflowKeyInPort = "OXM_OF_IN_PORT"
)

var (
// igmpMaxResponseTime is the maximum time allowed before sending a responding report which is used for the
// "Max Resp Code" field in the IGMP query message. It is also the maximum time to wait for the IGMP report message
Expand Down Expand Up @@ -103,7 +99,7 @@ func getInfoInReg(regMatch *ofctrl.MatchField, rng *openflow15.NXRange) (uint32,

func (s *IGMPSnooper) parseSrcInterface(pktIn *ofctrl.PacketIn) (*interfacestore.InterfaceConfig, error) {
matches := pktIn.GetMatches()
ofPortField := matches.GetMatchByName(openflowKeyInPort)
ofPortField := matches.GetMatchByName(binding.OxmFieldInPort)
if ofPortField == nil {
return nil, errors.New("in_port field not found")
}
Expand Down Expand Up @@ -327,7 +323,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error {

func (s *IGMPSnooper) parseSrcNode(pktIn *ofctrl.PacketIn) (net.IP, error) {
matches := pktIn.GetMatches()
tunSrcField := matches.GetMatchByName(openflowKeyTunnelSrc)
tunSrcField := matches.GetMatchByName(binding.NxmFieldTunIPv4Src)
if tunSrcField == nil {
return nil, errors.New("in_port field not found")
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,12 @@ func Test_client_InstallServiceGroup(t *testing.T) {
"bucket=bucket_id:1,weight:100,actions=set_field:0xa0a0065->reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT",
deleteOFEntriesError: fmt.Errorf("error when deleting Openflow entries for Service Endpoints Group 100"),
},
{
name: "No Endpoints",
endpoints: []proxy.Endpoint{},
expectedGroup: "group_id=100,type=select," +
"bucket=bucket_id:0,weight:100,actions=set_field:0x40000/0x70000->reg4,resubmit:EndpointDNAT",
},
}

for _, tc := range testCases {
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/openflow/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ var (
// - 0b001: packet need to do service selection.
// - 0b010: packet has done service selection.
// - 0b011: packet has done service selection and the selection result needs to be cached.
// - 0b100: there is no Endpoint to do service selection.
ServiceEPStateField = binding.NewRegField(4, 16, 18)
EpToSelectRegMark = binding.NewRegMark(ServiceEPStateField, 0b001)
EpSelectedRegMark = binding.NewRegMark(ServiceEPStateField, 0b010)
EpToLearnRegMark = binding.NewRegMark(ServiceEPStateField, 0b011)
NoEpToSelectRegMark = binding.NewRegMark(ServiceEPStateField, 0b100)
// reg4[0..18]: Field to store the union value of Endpoint port and Endpoint status. It is used as a single match
// when needed.
EpUnionField = binding.NewRegField(4, 0, 18)
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/openflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ const (
PacketInReasonNP ofpPacketInReason = 0
// PacketInReasonMC shares PacketInReasonNP for IGMP packet_in message. This is because OVS "controller" action
// only correctly supports reason 0 or 1. Change to another value after the OVS action is corrected.
PacketInReasonMC = PacketInReasonNP
PacketInReasonMC = PacketInReasonNP
PacketInReasonSvcReject = PacketInReasonNP
// PacketInQueueSize defines the size of PacketInQueue.
// When PacketInQueue reaches PacketInQueueSize, new packet-in will be dropped.
PacketInQueueSize = 200
Expand Down Expand Up @@ -81,7 +82,7 @@ type featureStartPacketIn struct {
packetInQueue *openflow.PacketInQueue
}

func newfeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn {
func newFeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn {
featurePacketIn := featureStartPacketIn{reason: reason, stopCh: stopCh}
featurePacketIn.packetInQueue = openflow.NewPacketInQueue(PacketInQueueSize, rate.Limit(PacketInQueueRate))

Expand All @@ -96,7 +97,7 @@ func (c *client) StartPacketInHandler(stopCh <-chan struct{}) {

// Iterate through each feature that starts packetin. Subscribe with their specified reason.
for reason := range c.packetInHandlers {
featurePacketIn := newfeatureStartPacketIn(reason, stopCh)
featurePacketIn := newFeatureStartPacketIn(reason, stopCh)
err := c.subscribeFeaturePacketIn(featurePacketIn)
if err != nil {
klog.Errorf("received error %+v while subscribing packetin for each feature", err)
Expand Down
115 changes: 115 additions & 0 deletions pkg/agent/openflow/packetout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2023 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package openflow

import (
"encoding/binary"
"fmt"

"antrea.io/libOpenflow/protocol"
"antrea.io/libOpenflow/util"
"antrea.io/ofnet/ofctrl"

binding "antrea.io/antrea/pkg/ovs/openflow"
)

const (
ipv4HdrLen uint16 = 20
ipv6HdrLen uint16 = 40

icmpUnusedHdrLen uint16 = 4

tcpAck uint8 = 0b010000
tcpRst uint8 = 0b000100

icmpDstUnreachableType uint8 = 3
icmpDstHostAdminProhibitedCode uint8 = 10

icmpv6DstUnreachableType uint8 = 1
icmpv6DstAdminProhibitedCode uint8 = 1
)

func GetEthernetPacket(pktIn *ofctrl.PacketIn) (*protocol.Ethernet, error) {
ethernetPkt := new(protocol.Ethernet)
if err := ethernetPkt.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil {
return nil, fmt.Errorf("failed to parse ethernet packet from packet-in message: %v", err)
}
return ethernetPkt, nil
}

func SendRejectPacketOut(ofClient Client,
srcMAC string,
dstMAC string,
srcIP string,
dstIP string,
inPort uint32,
outPort uint32,
isIPv6 bool,
ethernetPkt *protocol.Ethernet,
proto uint8,
mutateFunc func(binding.PacketOutBuilder) binding.PacketOutBuilder) error {
if proto == protocol.Type_TCP {
// Get TCP data.
oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, _, _, err := binding.GetTCPHeaderData(ethernetPkt.Data)
if err != nil {
return err
}
// While sending TCP reject packet-out, switch original src/dst port,
// set the ackNum as original seqNum+1 and set the flag as ack+rst.
return ofClient.SendTCPPacketOut(
srcMAC,
dstMAC,
srcIP,
dstIP,
inPort,
outPort,
isIPv6,
oriTCPDstPort,
oriTCPSrcPort,
0,
oriTCPSeqNum+1,
0,
tcpAck|tcpRst,
0,
nil,
mutateFunc)
}
// Use ICMP host administratively prohibited for ICMP, UDP, SCTP reject.
icmpType := icmpDstUnreachableType
icmpCode := icmpDstHostAdminProhibitedCode
ipHdrLen := ipv4HdrLen
if isIPv6 {
icmpType = icmpv6DstUnreachableType
icmpCode = icmpv6DstAdminProhibitedCode
ipHdrLen = ipv6HdrLen
}
ipHdr, _ := ethernetPkt.Data.MarshalBinary()
icmpData := make([]byte, int(icmpUnusedHdrLen+ipHdrLen+8))
// Put ICMP unused header in Data prop and set it to zero.
binary.BigEndian.PutUint32(icmpData[:icmpUnusedHdrLen], 0)
copy(icmpData[icmpUnusedHdrLen:], ipHdr[:ipHdrLen+8])
return ofClient.SendICMPPacketOut(
srcMAC,
dstMAC,
srcIP,
dstIP,
inPort,
outPort,
isIPv6,
icmpType,
icmpCode,
icmpData,
mutateFunc)
}
8 changes: 8 additions & 0 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2480,6 +2480,14 @@ func (f *featureService) endpointDNATFlow(endpointIP net.IP, endpointPort uint16
// EndpointDNATTable. Otherwise, buckets will resubmit packets to EndpointDNATTable directly.
func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group {
group := f.bridge.CreateGroup(groupID).ResetBuckets()

if len(endpoints) == 0 {
return group.Bucket().Weight(100).
LoadToRegField(ServiceEPStateField, NoEpToSelectRegMark.GetValue()).
ResubmitToTable(EndpointDNATTable.GetID()).
Done()
}

var resubmitTableID uint8
if withSessionAffinity {
resubmitTableID = ServiceLBTable.GetID()
Expand Down
10 changes: 10 additions & 0 deletions pkg/agent/openflow/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ func newFeatureService(
}
}

// serviceNoEndpointFlow generates the flow to match the packets to Service without Endpoint and send them to controller.
func (f *featureService) serviceNoEndpointFlow() binding.Flow {
return EndpointDNATTable.ofTable.BuildFlow(priorityLow).
Cookie(f.cookieAllocator.Request(f.category).Raw()).
MatchRegMark(NoEpToSelectRegMark).
Action().SendToController(uint8(PacketInReasonSvcReject)).
Done()
}

func (f *featureService) initFlows() []binding.Flow {
var flows []binding.Flow
if f.enableProxy {
Expand All @@ -134,6 +143,7 @@ func (f *featureService) initFlows() []binding.Flow {
flows = append(flows, f.snatConntrackFlows()...)
flows = append(flows, f.serviceNeedLBFlow())
flows = append(flows, f.sessionAffinityReselectFlow())
flows = append(flows, f.serviceNoEndpointFlow())
flows = append(flows, f.l2ForwardOutputHairpinServiceFlow())
if f.proxyAll {
// This installs the flows to match the first packet of NodePort connection. The flows set a bit of a register
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/openflow/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func Test_featureService_initFlows(t *testing.T) {
"cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ip actions=resubmit:SessionAffinity,resubmit:ServiceLB",
"cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4",
"cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB",
"cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x40000/0x70000 actions=controller:(reason=no_match,max_len=128,id=32776)",
"cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL",
"cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))",
"cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark))",
Expand All @@ -62,6 +63,7 @@ func Test_featureService_initFlows(t *testing.T) {
"cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ipv6 actions=resubmit:SessionAffinity,resubmit:ServiceLB",
"cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4",
"cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB",
"cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x40000/0x70000 actions=controller:(reason=no_match,max_len=128,id=32776)",
"cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL",
"cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))",
"cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark))",
Expand All @@ -86,6 +88,7 @@ func Test_featureService_initFlows(t *testing.T) {
"cookie=0x1030000000000, table=NodePortMark, priority=200,ip,nw_dst=169.254.0.252 actions=set_field:0x80000/0x80000->reg4",
"cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4",
"cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB",
"cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x40000/0x70000 actions=controller:(reason=no_match,max_len=128,id=32776)",
"cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL",
"cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))",
"cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark))",
Expand All @@ -110,6 +113,7 @@ func Test_featureService_initFlows(t *testing.T) {
"cookie=0x1030000000000, table=NodePortMark, priority=200,ipv6,ipv6_dst=fc01::aabb:ccdd:eefe actions=set_field:0x80000/0x80000->reg4",
"cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4",
"cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB",
"cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x40000/0x70000 actions=controller:(reason=no_match,max_len=128,id=32776)",
"cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL",
"cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))",
"cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark))",
Expand Down
Loading

0 comments on commit be3fba1

Please sign in to comment.