Skip to content

Commit

Permalink
Reject the request to a Service without an Endpoint
Browse files Browse the repository at this point in the history
When requesting a Service without an Endpoint, the connection should be rejected,
rather than timeout according to the expectation of Kubernetes sig-network tests.

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Mar 1, 2023
1 parent aafea18 commit 9a6448a
Show file tree
Hide file tree
Showing 16 changed files with 326 additions and 74 deletions.
6 changes: 3 additions & 3 deletions pkg/agent/controller/networkpolicy/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ func (f *fqdnController) handlePacketIn(pktIn *ofctrl.PacketIn) error {
f.onDNSResponseMsg(&dnsMsg, time.Now(), waitCh)
}
go func() {
ethernetPkt, err := getEthernetPacket(pktIn)
ethernetPkt, err := openflow.GetEthernetPacket(pktIn)
if err != nil {
return
}
Expand Down Expand Up @@ -793,7 +793,7 @@ func (f *fqdnController) sendDNSPacketout(pktIn *ofctrl.PacketIn) error {
prot uint8
isIPv6 bool
)
ethernetPkt, err := getEthernetPacket(pktIn)
ethernetPkt, err := openflow.GetEthernetPacket(pktIn)
if err != nil {
return err
}
Expand Down Expand Up @@ -823,7 +823,7 @@ func (f *fqdnController) sendDNSPacketout(pktIn *ofctrl.PacketIn) error {
// Use the original in_port number in the packetIn message to avoid an invalid input port number. Note that,
// this should not happen in container case as antrea-gw0 always exists. This check is for security.
matches := pktIn.GetMatches()
inPortField := matches.GetMatchByName("OXM_OF_IN_PORT")
inPortField := matches.GetMatchByName(binding.OxmFieldInPort)
if inPortField != nil {
inPort = inPortField.GetValue().(uint32)
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"time"

"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
"antrea.io/libOpenflow/util"
"antrea.io/ofnet/ofctrl"
"github.com/vmware/go-ipfix/pkg/registry"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -210,11 +208,3 @@ func isAntreaPolicyEgressTable(tableID uint8) bool {
}
return false
}

func getEthernetPacket(pktIn *ofctrl.PacketIn) (*protocol.Ethernet, error) {
ethernetPkt := new(protocol.Ethernet)
if err := ethernetPkt.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil {
return nil, fmt.Errorf("failed to parse ethernet packet from packet-in message: %v", err)
}
return ethernetPkt, nil
}
46 changes: 4 additions & 42 deletions pkg/agent/controller/networkpolicy/reject.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package networkpolicy

import (
"encoding/binary"
"fmt"

"antrea.io/libOpenflow/protocol"
Expand Down Expand Up @@ -87,7 +86,7 @@ const (
// packet-in message.
func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error {
// Get ethernet data.
ethernetPkt, err := getEthernetPacket(pktIn)
ethernetPkt, err := openflow.GetEthernetPacket(pktIn)
if err != nil {
return err
}
Expand Down Expand Up @@ -168,53 +167,16 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error {
inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, c.tunPort)
mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType)

if proto == protocol.Type_TCP {
// Get TCP data.
oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, err := binding.GetTCPHeaderData(ethernetPkt.Data)
if err != nil {
return err
}
// While sending TCP reject packet-out, switch original src/dst port,
// set the ackNum as original seqNum+1 and set the flag as ack+rst.
return c.ofClient.SendTCPPacketOut(
srcMAC,
dstMAC,
srcIP,
dstIP,
inPort,
outPort,
isIPv6,
oriTCPDstPort,
oriTCPSrcPort,
oriTCPSeqNum+1,
TCPAck|TCPRst,
mutateFunc)
}
// Use ICMP host administratively prohibited for ICMP, UDP, SCTP reject.
icmpType := ICMPDstUnreachableType
icmpCode := ICMPDstHostAdminProhibitedCode
ipHdrLen := IPv4HdrLen
if isIPv6 {
icmpType = ICMPv6DstUnreachableType
icmpCode = ICMPv6DstAdminProhibitedCode
ipHdrLen = IPv6HdrLen
}
ipHdr, _ := ethernetPkt.Data.MarshalBinary()
icmpData := make([]byte, int(ICMPUnusedHdrLen+ipHdrLen+8))
// Put ICMP unused header in Data prop and set it to zero.
binary.BigEndian.PutUint32(icmpData[:ICMPUnusedHdrLen], 0)
copy(icmpData[ICMPUnusedHdrLen:], ipHdr[:ipHdrLen+8])
return c.ofClient.SendICMPPacketOut(
return openflow.SendRejectPacketOut(c.ofClient,
srcMAC,
dstMAC,
srcIP,
dstIP,
inPort,
outPort,
isIPv6,
icmpType,
icmpCode,
icmpData,
ethernetPkt,
proto,
mutateFunc)
}

Expand Down
10 changes: 3 additions & 7 deletions pkg/agent/multicast/mcast_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@ import (
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
binding "antrea.io/antrea/pkg/ovs/openflow"
)

const (
IGMPProtocolNumber = 2
)

const (
openflowKeyTunnelSrc = "NXM_NX_TUN_IPV4_SRC"
openflowKeyInPort = "OXM_OF_IN_PORT"
)

var (
// igmpMaxResponseTime is the maximum time allowed before sending a responding report which is used for the
// "Max Resp Code" field in the IGMP query message. It is also the maximum time to wait for the IGMP report message
Expand Down Expand Up @@ -103,7 +99,7 @@ func getInfoInReg(regMatch *ofctrl.MatchField, rng *openflow15.NXRange) (uint32,

func (s *IGMPSnooper) parseSrcInterface(pktIn *ofctrl.PacketIn) (*interfacestore.InterfaceConfig, error) {
matches := pktIn.GetMatches()
ofPortField := matches.GetMatchByName(openflowKeyInPort)
ofPortField := matches.GetMatchByName(binding.OxmFieldInPort)
if ofPortField == nil {
return nil, errors.New("in_port field not found")
}
Expand Down Expand Up @@ -327,7 +323,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error {

func (s *IGMPSnooper) parseSrcNode(pktIn *ofctrl.PacketIn) (net.IP, error) {
matches := pktIn.GetMatches()
tunSrcField := matches.GetMatchByName(openflowKeyTunnelSrc)
tunSrcField := matches.GetMatchByName(binding.NxmFieldTunIPv4Src)
if tunSrcField == nil {
return nil, errors.New("in_port field not found")
}
Expand Down
28 changes: 26 additions & 2 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ type Client interface {
// DeleteAddressFromDNSConjunction removes addresses from the toAddresses of the dns packetIn conjunction.
DeleteAddressFromDNSConjunction(id uint32, addrs []types.Address) error

InstallServiceNoEndpointGroup(groupID binding.GroupIDType) error

// InstallMulticastFlows installs the flow to forward Multicast traffic normally, and output it to antrea-gw0
// to ensure it can be forwarded to the external addresses.
InstallMulticastFlows(multicastIP net.IP, groupID binding.GroupIDType) error
Expand Down Expand Up @@ -700,8 +702,9 @@ func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP,
if affinityTimeout != 0 {
flows = append(flows, c.featureService.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal, svcType))
}

cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol)
return c.addFlows(c.featureService.cachedFlows, cacheKey, flows)
return c.modifyFlows(c.featureService.cachedFlows, cacheKey, flows)
}

func (c *client) UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
Expand Down Expand Up @@ -1103,7 +1106,9 @@ func setBasePacketOutBuilder(packetOutBuilder binding.PacketOutBuilder, srcMAC s

packetOutBuilder = packetOutBuilder.SetTTL(128)

packetOutBuilder = packetOutBuilder.SetInport(inPort)
if inPort != 0 {
packetOutBuilder = packetOutBuilder.SetInport(inPort)
}
if outPort != 0 {
packetOutBuilder = packetOutBuilder.SetOutport(outPort)
}
Expand Down Expand Up @@ -1316,6 +1321,25 @@ func (c *client) SendIGMPRemoteReportPacketOut(
return c.bridge.SendPacketOut(packetOutObj)
}

func (c *client) InstallServiceNoEndpointGroup(groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

group := c.featureService.serviceNoEndpointGroup(groupID)
_, installed := c.featureService.groupCache.Load(groupID)
if !installed {
if err := c.ofEntryOperations.AddOFEntries([]binding.OFEntry{group}); err != nil {
return fmt.Errorf("error when installing Service without Endpoint group %d: %w", groupID, err)
}
} else {
if err := c.ofEntryOperations.ModifyOFEntries([]binding.OFEntry{group}); err != nil {
return fmt.Errorf("error when modifying Service without Endpoint group %d: %w", groupID, err)
}
}
c.featureService.groupCache.Store(groupID, group)
return nil
}

func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
Expand Down
41 changes: 41 additions & 0 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1972,6 +1972,47 @@ func Test_client_InstallTrafficControlReturnPortFlow(t *testing.T) {
require.False(t, ok)
}

func Test_client_InstallServiceNoEndpointGroup(t *testing.T) {
groupID := binding.GroupIDType(100)
testCases := []struct {
name string
ipv4Enabled bool
ipv6Enabled bool
expectedGroup string
}{
{
name: "IPv4",
ipv4Enabled: true,
expectedGroup: "group_id=100,type=select," +
"bucket=bucket_id:0,weight:100,actions=set_field:0x40000/0x70000->reg4,resubmit:EndpointDNAT",
},
{
name: "IPv6",
ipv6Enabled: true,
expectedGroup: "group_id=100,type=select," +
"bucket=bucket_id:0,weight:100,actions=set_field:0x40000/0x70000->reg4,resubmit:EndpointDNAT",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)

fc := newFakeClient(m, tc.ipv4Enabled, tc.ipv6Enabled, config.K8sNode, config.TrafficEncapModeEncap)
defer resetPipelines()

m.EXPECT().AddOFEntries(gomock.Any()).Return(nil).Times(1)

assert.NoError(t, fc.InstallServiceNoEndpointGroup(groupID))
gCacheI, ok := fc.featureService.groupCache.Load(groupID)
require.True(t, ok)
group := getGroupFromCache(gCacheI.(binding.Group))
assert.Equal(t, tc.expectedGroup, group)
})
}
}

func Test_client_InstallMulticastGroup(t *testing.T) {
groupID := binding.GroupIDType(101)
localReceivers := []uint32{50, 100}
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/openflow/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,12 @@ var (
// - 0b001: packet need to do service selection.
// - 0b010: packet has done service selection.
// - 0b011: packet has done service selection and the selection result needs to be cached.
// - 0b100: there is no Endpoint to do service selection.
ServiceEPStateField = binding.NewRegField(4, 16, 18)
EpToSelectRegMark = binding.NewRegMark(ServiceEPStateField, 0b001)
EpSelectedRegMark = binding.NewRegMark(ServiceEPStateField, 0b010)
EpToLearnRegMark = binding.NewRegMark(ServiceEPStateField, 0b011)
NoEpToSelectRegMark = binding.NewRegMark(ServiceEPStateField, 0b100)
// reg4[0..18]: Field to store the union value of Endpoint port and Endpoint status. It is used as a single match
// when needed.
EpUnionField = binding.NewRegField(4, 0, 18)
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/openflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ const (
PacketInReasonNP ofpPacketInReason = 0
// PacketInReasonMC shares PacketInReasonNP for IGMP packet_in message. This is because OVS "controller" action
// only correctly supports reason 0 or 1. Change to another value after the OVS action is corrected.
PacketInReasonMC = PacketInReasonNP
PacketInReasonMC = PacketInReasonNP
PacketInReasonSvcReject = PacketInReasonNP
// PacketInQueueSize defines the size of PacketInQueue.
// When PacketInQueue reaches PacketInQueueSize, new packet-in will be dropped.
PacketInQueueSize = 200
Expand Down Expand Up @@ -81,7 +82,7 @@ type featureStartPacketIn struct {
packetInQueue *openflow.PacketInQueue
}

func newfeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn {
func newFeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn {
featurePacketIn := featureStartPacketIn{reason: reason, stopCh: stopCh}
featurePacketIn.packetInQueue = openflow.NewPacketInQueue(PacketInQueueSize, rate.Limit(PacketInQueueRate))

Expand All @@ -96,7 +97,7 @@ func (c *client) StartPacketInHandler(stopCh <-chan struct{}) {

// Iterate through each feature that starts packetin. Subscribe with their specified reason.
for reason := range c.packetInHandlers {
featurePacketIn := newfeatureStartPacketIn(reason, stopCh)
featurePacketIn := newFeatureStartPacketIn(reason, stopCh)
err := c.subscribeFeaturePacketIn(featurePacketIn)
if err != nil {
klog.Errorf("received error %+v while subscribing packetin for each feature", err)
Expand Down
Loading

0 comments on commit 9a6448a

Please sign in to comment.