Skip to content

Commit

Permalink
Reject the request to a Service without an Endpoint
Browse files Browse the repository at this point in the history
When requesting a Service without an Endpoint, the connection should be rejected,
rather than timeout according to the expectation of Kubernetes sig-network tests.

This PR also reorders `NestedServiceRegMark` in pkg/agent/openflow/field.go.

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Mar 21, 2023
1 parent 8bccfc1 commit f3225bc
Show file tree
Hide file tree
Showing 15 changed files with 405 additions and 95 deletions.
4 changes: 2 additions & 2 deletions pkg/agent/controller/networkpolicy/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ func (f *fqdnController) handlePacketIn(pktIn *ofctrl.PacketIn) error {
f.onDNSResponseMsg(&dnsMsg, time.Now(), waitCh)
}
go func() {
ethernetPkt, err := getEthernetPacket(pktIn)
ethernetPkt, err := openflow.GetEthernetPacket(pktIn)
if err != nil {
// Can't parse the packet. Forward it to the Pod.
waitCh <- nil
Expand Down Expand Up @@ -821,7 +821,7 @@ func (f *fqdnController) handlePacketIn(pktIn *ofctrl.PacketIn) error {

// sendDNSPacketout forwards the DNS response packet to the original requesting client.
func (f *fqdnController) sendDNSPacketout(pktIn *ofctrl.PacketIn) error {
ethernetPkt, err := getEthernetPacket(pktIn)
ethernetPkt, err := openflow.GetEthernetPacket(pktIn)
if err != nil {
return err
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"time"

"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
"antrea.io/libOpenflow/util"
"antrea.io/ofnet/ofctrl"
"github.com/vmware/go-ipfix/pkg/registry"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -210,11 +208,3 @@ func isAntreaPolicyEgressTable(tableID uint8) bool {
}
return false
}

func getEthernetPacket(pktIn *ofctrl.PacketIn) (*protocol.Ethernet, error) {
ethernetPkt := new(protocol.Ethernet)
if err := ethernetPkt.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil {
return nil, fmt.Errorf("failed to parse ethernet packet from packet-in message: %v", err)
}
return ethernetPkt, nil
}
50 changes: 4 additions & 46 deletions pkg/agent/controller/networkpolicy/reject.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package networkpolicy

import (
"encoding/binary"
"fmt"
"net"

Expand Down Expand Up @@ -92,7 +91,7 @@ const (
func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error {
// All src/dst mean the source/destination of the reject packet, which are destination/source of the incoming packet.
// Get ethernet data.
ethernetPkt, err := getEthernetPacket(pktIn)
ethernetPkt, err := openflow.GetEthernetPacket(pktIn)
if err != nil {
return err
}
Expand Down Expand Up @@ -191,57 +190,16 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error {
inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, c.tunPort)
mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType, isFlexibleIPAMSrc, isFlexibleIPAMDst, ctZone)

if proto == protocol.Type_TCP {
// Get TCP data.
oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, _, _, err := binding.GetTCPHeaderData(ethernetPkt.Data)
if err != nil {
return err
}
// While sending TCP reject packet-out, switch original src/dst port,
// set the ackNum as original seqNum+1 and set the flag as ack+rst.
return c.ofClient.SendTCPPacketOut(
srcMAC,
dstMAC,
srcIP,
dstIP,
inPort,
outPort,
isIPv6,
oriTCPDstPort,
oriTCPSrcPort,
0,
oriTCPSeqNum+1,
0,
TCPAck|TCPRst,
0,
nil,
mutateFunc)
}
// Use ICMP host administratively prohibited for ICMP, UDP, SCTP reject.
icmpType := ICMPDstUnreachableType
icmpCode := ICMPDstHostAdminProhibitedCode
ipHdrLen := IPv4HdrLen
if isIPv6 {
icmpType = ICMPv6DstUnreachableType
icmpCode = ICMPv6DstAdminProhibitedCode
ipHdrLen = IPv6HdrLen
}
ipHdr, _ := ethernetPkt.Data.MarshalBinary()
icmpData := make([]byte, int(ICMPUnusedHdrLen+ipHdrLen+8))
// Put ICMP unused header in Data prop and set it to zero.
binary.BigEndian.PutUint32(icmpData[:ICMPUnusedHdrLen], 0)
copy(icmpData[ICMPUnusedHdrLen:], ipHdr[:ipHdrLen+8])
return c.ofClient.SendICMPPacketOut(
return openflow.SendRejectPacketOut(c.ofClient,
srcMAC,
dstMAC,
srcIP,
dstIP,
inPort,
outPort,
isIPv6,
icmpType,
icmpCode,
icmpData,
ethernetPkt,
proto,
mutateFunc)
}

Expand Down
24 changes: 4 additions & 20 deletions pkg/agent/multicast/mcast_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"
"time"

"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
"antrea.io/libOpenflow/util"
"antrea.io/ofnet/ofctrl"
Expand All @@ -33,17 +32,13 @@ import (
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
binding "antrea.io/antrea/pkg/ovs/openflow"
)

const (
IGMPProtocolNumber = 2
)

const (
openflowKeyTunnelSrc = "NXM_NX_TUN_IPV4_SRC"
openflowKeyInPort = "OXM_OF_IN_PORT"
)

var (
// igmpMaxResponseTime is the maximum time allowed before sending a responding report which is used for the
// "Max Resp Code" field in the IGMP query message. It is also the maximum time to wait for the IGMP report message
Expand Down Expand Up @@ -79,7 +74,7 @@ func (s *IGMPSnooper) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
if match == nil {
return fmt.Errorf("error getting match from IGMP marks in CustomField")
}
customReasons, err := getInfoInReg(match, openflow.CustomReasonField.GetRange().ToNXRange())
customReasons, err := openflow.GetInfoInReg(match, openflow.CustomReasonField.GetRange().ToNXRange())
if err != nil {
klog.ErrorS(err, "Received error while unloading customReason from OVS reg")
return err
Expand All @@ -90,20 +85,9 @@ func (s *IGMPSnooper) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
return nil
}

func getInfoInReg(regMatch *ofctrl.MatchField, rng *openflow15.NXRange) (uint32, error) {
regValue, ok := regMatch.GetValue().(*ofctrl.NXRegister)
if !ok {
return 0, errors.New("register value cannot be retrieved")
}
if rng != nil {
return ofctrl.GetUint32ValueWithRange(regValue.Data, rng), nil
}
return regValue.Data, nil
}

func (s *IGMPSnooper) parseSrcInterface(pktIn *ofctrl.PacketIn) (*interfacestore.InterfaceConfig, error) {
matches := pktIn.GetMatches()
ofPortField := matches.GetMatchByName(openflowKeyInPort)
ofPortField := matches.GetMatchByName(binding.OxmFieldInPort)
if ofPortField == nil {
return nil, errors.New("in_port field not found")
}
Expand Down Expand Up @@ -327,7 +311,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error {

func (s *IGMPSnooper) parseSrcNode(pktIn *ofctrl.PacketIn) (net.IP, error) {
matches := pktIn.GetMatches()
tunSrcField := matches.GetMatchByName(openflowKeyTunnelSrc)
tunSrcField := matches.GetMatchByName(binding.NxmFieldTunIPv4Src)
if tunSrcField == nil {
return nil, errors.New("in_port field not found")
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,12 @@ func Test_client_InstallServiceGroup(t *testing.T) {
"bucket=bucket_id:1,weight:100,actions=set_field:0xa0a0065->reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT",
deleteOFEntriesError: fmt.Errorf("error when deleting Openflow entries for Service Endpoints Group 100"),
},
{
name: "No Endpoints",
endpoints: []proxy.Endpoint{},
expectedGroup: "group_id=100,type=select," +
"bucket=bucket_id:0,weight:100,actions=set_field:0x40000/0x70000->reg4,resubmit:EndpointDNAT",
},
}

for _, tc := range testCases {
Expand Down
8 changes: 5 additions & 3 deletions pkg/agent/openflow/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ var (
// - 0b001: packet need to do service selection.
// - 0b010: packet has done service selection.
// - 0b011: packet has done service selection and the selection result needs to be cached.
// - 0b100: there is no Endpoint to do service selection.
ServiceEPStateField = binding.NewRegField(4, 16, 18)
EpToSelectRegMark = binding.NewRegMark(ServiceEPStateField, 0b001)
EpSelectedRegMark = binding.NewRegMark(ServiceEPStateField, 0b010)
EpToLearnRegMark = binding.NewRegMark(ServiceEPStateField, 0b011)
NoEpToSelectRegMark = binding.NewRegMark(ServiceEPStateField, 0b100)
// reg4[0..18]: Field to store the union value of Endpoint port and Endpoint status. It is used as a single match
// when needed.
EpUnionField = binding.NewRegField(4, 0, 18)
Expand All @@ -128,11 +130,11 @@ var (
// externalTrafficPolicy is Cluster.
ToClusterServiceRegMark = binding.NewOneBitRegMark(4, 21)
// reg4[22..23]: Field to store the action of a traffic control rule. Marks in this field include:
TrafficControlActionField = binding.NewRegField(4, 22, 23)
// reg4[24]: Mark to indicate whether the Endpoints of a Service includes another Service's ClusterIP.
NestedServiceRegMark = binding.NewOneBitRegMark(4, 24)
TrafficControlActionField = binding.NewRegField(4, 22, 23)
TrafficControlMirrorRegMark = binding.NewRegMark(TrafficControlActionField, 0b01)
TrafficControlRedirectRegMark = binding.NewRegMark(TrafficControlActionField, 0b10)
// reg4[24]: Mark to indicate whether the Endpoints of a Service includes another Service's ClusterIP.
NestedServiceRegMark = binding.NewOneBitRegMark(4, 24)

// reg5(NXM_NX_REG5)
// Field to cache the Egress conjunction ID hit by TraceFlow packet.
Expand Down
29 changes: 27 additions & 2 deletions pkg/agent/openflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package openflow

import (
"encoding/binary"
"errors"
"fmt"

"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
"antrea.io/libOpenflow/util"
"antrea.io/ofnet/ofctrl"
"golang.org/x/time/rate"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -53,6 +56,9 @@ const (
// PacketInReasonMC shares PacketInReasonNP for IGMP packet_in message. This is because OVS "controller" action
// only correctly supports reason 0 or 1. Change to another value after the OVS action is corrected.
PacketInReasonMC = PacketInReasonNP
// PacketInReasonSvcReject shares PacketInReasonNP for IGMP packet_in message. This is because OVS "controller" action
// only correctly supports reason 0 or 1. Change to another value after the OVS action is corrected.
PacketInReasonSvcReject = PacketInReasonNP
// PacketInQueueSize defines the size of PacketInQueue.
// When PacketInQueue reaches PacketInQueueSize, new packet-in will be dropped.
PacketInQueueSize = 200
Expand Down Expand Up @@ -81,7 +87,7 @@ type featureStartPacketIn struct {
packetInQueue *openflow.PacketInQueue
}

func newfeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn {
func newFeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn {
featurePacketIn := featureStartPacketIn{reason: reason, stopCh: stopCh}
featurePacketIn.packetInQueue = openflow.NewPacketInQueue(PacketInQueueSize, rate.Limit(PacketInQueueRate))

Expand All @@ -96,7 +102,7 @@ func (c *client) StartPacketInHandler(stopCh <-chan struct{}) {

// Iterate through each feature that starts packetin. Subscribe with their specified reason.
for reason := range c.packetInHandlers {
featurePacketIn := newfeatureStartPacketIn(reason, stopCh)
featurePacketIn := newFeatureStartPacketIn(reason, stopCh)
err := c.subscribeFeaturePacketIn(featurePacketIn)
if err != nil {
klog.Errorf("received error %+v while subscribing packetin for each feature", err)
Expand Down Expand Up @@ -148,3 +154,22 @@ func GetMatchFieldByRegID(matchers *ofctrl.Matchers, regID int) *ofctrl.MatchFie
}
return &ofctrl.MatchField{MatchField: openflow15.NewRegMatchFieldWithMask(regID, data, mask)}
}

func GetInfoInReg(regMatch *ofctrl.MatchField, rng *openflow15.NXRange) (uint32, error) {
regValue, ok := regMatch.GetValue().(*ofctrl.NXRegister)
if !ok {
return 0, errors.New("register value cannot be retrieved")
}
if rng != nil {
return ofctrl.GetUint32ValueWithRange(regValue.Data, rng), nil
}
return regValue.Data, nil
}

func GetEthernetPacket(pktIn *ofctrl.PacketIn) (*protocol.Ethernet, error) {
ethernetPkt := new(protocol.Ethernet)
if err := ethernetPkt.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil {
return nil, fmt.Errorf("failed to parse ethernet packet from packet-in message: %v", err)
}
return ethernetPkt, nil
}
104 changes: 104 additions & 0 deletions pkg/agent/openflow/packetout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2023 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package openflow

import (
"encoding/binary"

"antrea.io/libOpenflow/protocol"

binding "antrea.io/antrea/pkg/ovs/openflow"
)

const (
ipv4HdrLen uint16 = 20
ipv6HdrLen uint16 = 40

icmpUnusedHdrLen uint16 = 4

tcpAck uint8 = 0b010000
tcpRst uint8 = 0b000100

icmpDstUnreachableType uint8 = 3
icmpDstHostAdminProhibitedCode uint8 = 10

icmpv6DstUnreachableType uint8 = 1
icmpv6DstAdminProhibitedCode uint8 = 1
)

func SendRejectPacketOut(ofClient Client,
srcMAC string,
dstMAC string,
srcIP string,
dstIP string,
inPort uint32,
outPort uint32,
isIPv6 bool,
ethernetPkt *protocol.Ethernet,
proto uint8,
mutateFunc func(binding.PacketOutBuilder) binding.PacketOutBuilder) error {
if proto == protocol.Type_TCP {
// Get TCP data.
oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, _, _, err := binding.GetTCPHeaderData(ethernetPkt.Data)
if err != nil {
return err
}
// While sending TCP reject packet-out, switch original src/dst port,
// set the ackNum as original seqNum+1 and set the flag as ack+rst.
return ofClient.SendTCPPacketOut(
srcMAC,
dstMAC,
srcIP,
dstIP,
inPort,
outPort,
isIPv6,
oriTCPDstPort,
oriTCPSrcPort,
0,
oriTCPSeqNum+1,
0,
tcpAck|tcpRst,
0,
nil,
mutateFunc)
}
// Use ICMP host administratively prohibited for ICMP, UDP, SCTP reject.
icmpType := icmpDstUnreachableType
icmpCode := icmpDstHostAdminProhibitedCode
ipHdrLen := ipv4HdrLen
if isIPv6 {
icmpType = icmpv6DstUnreachableType
icmpCode = icmpv6DstAdminProhibitedCode
ipHdrLen = ipv6HdrLen
}
ipHdr, _ := ethernetPkt.Data.MarshalBinary()
icmpData := make([]byte, int(icmpUnusedHdrLen+ipHdrLen+8))
// Put ICMP unused header in Data prop and set it to zero.
binary.BigEndian.PutUint32(icmpData[:icmpUnusedHdrLen], 0)
copy(icmpData[icmpUnusedHdrLen:], ipHdr[:ipHdrLen+8])
return ofClient.SendICMPPacketOut(
srcMAC,
dstMAC,
srcIP,
dstIP,
inPort,
outPort,
isIPv6,
icmpType,
icmpCode,
icmpData,
mutateFunc)
}
Loading

0 comments on commit f3225bc

Please sign in to comment.