diff --git a/pkg/agent/metrics/prometheus.go b/pkg/agent/metrics/prometheus.go index 6402fa8d335..99ffbb0efe8 100644 --- a/pkg/agent/metrics/prometheus.go +++ b/pkg/agent/metrics/prometheus.go @@ -24,8 +24,9 @@ const ( metricNamespaceAntrea = "antrea" metricSubsystemAgent = "agent" - LabelPacketInMeterNetworkPolicy = "PacketInMeterNetworkPolicy" - LabelPacketInMeterTraceflow = "PacketInMeterTraceflow" + LabelPacketInMeterNetworkPolicy = "PacketInMeterNetworkPolicy" + LabelPacketInMeterTraceflow = "PacketInMeterTraceflow" + LabelPacketInMeterDNSInterception = "PacketInMeterDNSInterception" ) var ( @@ -240,7 +241,7 @@ func InitializeOVSMetrics() { OVSFlowOpsErrorCount.WithLabelValues(ops) OVSFlowOpsLatency.WithLabelValues(ops) } - for _, label := range []string{LabelPacketInMeterNetworkPolicy, LabelPacketInMeterTraceflow} { + for _, label := range []string{LabelPacketInMeterNetworkPolicy, LabelPacketInMeterTraceflow, LabelPacketInMeterDNSInterception} { OVSMeterPacketDroppedCount.WithLabelValues(label) } } diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index d3590aeebe0..0347a5b0112 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -797,6 +797,9 @@ func (c *client) initialize() error { if err := c.genPacketInMeter(PacketInMeterIDTF, PacketInMeterRateTF).Add(); err != nil { return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for TraceFlow packet-in rate limiting: %v", PacketInMeterIDTF, PacketInMeterRateTF, err) } + if err := c.genPacketInMeter(PacketInMeterIDDNS, PacketInMeterRateDNS).Add(); err != nil { + return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for DNS interception packet-in rate limiting: %v", PacketInMeterIDDNS, PacketInMeterRateDNS, err) + } } for _, activeFeature := range c.activatedFeatures { @@ -1561,6 +1564,8 @@ func (c *client) getMeterStats() { metrics.OVSMeterPacketDroppedCount.WithLabelValues(metrics.LabelPacketInMeterNetworkPolicy).Set(float64(packetCount)) case PacketInMeterIDTF: metrics.OVSMeterPacketDroppedCount.WithLabelValues(metrics.LabelPacketInMeterTraceflow).Set(float64(packetCount)) + case PacketInMeterIDDNS: + metrics.OVSMeterPacketDroppedCount.WithLabelValues(metrics.LabelPacketInMeterDNSInterception).Set(float64(packetCount)) default: klog.V(4).InfoS("Received unexpected meterID", "meterID", meterID) } diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index bde15a4c63f..0e14c82734d 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -2611,6 +2611,7 @@ func Test_client_ReplayFlows(t *testing.T) { }{ {id: PacketInMeterIDNP, rate: PacketInMeterRateNP}, {id: PacketInMeterIDTF, rate: PacketInMeterRateTF}, + {id: PacketInMeterIDDNS, rate: PacketInMeterRateDNS}, } { meter := ovsoftest.NewMockMeter(ctrl) meterBuilder := ovsoftest.NewMockMeterBandBuilder(ctrl) diff --git a/pkg/agent/openflow/network_policy.go b/pkg/agent/openflow/network_policy.go index 979c4ad21b1..e2ce2390d70 100644 --- a/pkg/agent/openflow/network_policy.go +++ b/pkg/agent/openflow/network_policy.go @@ -75,14 +75,16 @@ var ( MatchLabelID = types.NewMatchKey(binding.ProtocolIP, types.LabelIDAddr, "tun_id") MatchTCPFlags = types.NewMatchKey(binding.ProtocolTCP, types.TCPFlagsAddr, "tcp_flags") MatchTCPv6Flags = types.NewMatchKey(binding.ProtocolTCPv6, types.TCPFlagsAddr, "tcp_flags") - Unsupported = types.NewMatchKey(binding.ProtocolIP, types.UnSupported, "unknown") + // MatchCTState should be used with ct_state condition as matchValue. + // MatchValue example: `+rpl+trk`. + MatchCTState = types.NewMatchKey(binding.ProtocolIP, types.CTStateAddr, "ct_state") + Unsupported = types.NewMatchKey(binding.ProtocolIP, types.UnSupported, "unknown") // metricFlowIdentifier is used to identify metric flows in metric table. // There could be other flows like default flow and Traceflow flows in the table. Only metric flows are supposed to // have normal priority. metricFlowIdentifier = fmt.Sprintf("priority=%d,", priorityNormal) - protocolUDP = v1beta2.ProtocolUDP protocolTCP = v1beta2.ProtocolTCP dnsPort = int32(53) ) @@ -706,53 +708,67 @@ func (c *client) NewDNSPacketInConjunction(id uint32) error { if err := c.ofEntryOperations.AddAll(conj.actionFlows); err != nil { return fmt.Errorf("error when adding action flows for the DNS conjunction: %w", err) } - udpService := v1beta2.Service{ - Protocol: &protocolUDP, - SrcPort: &dnsPort, - } + dnsPriority := priorityDNSIntercept + dnsCTState := &openflow15.CTStates{ + // Use ct_state=+trk+rpl as matching condition. + // CTState bit-state map: + // dnat | snat | trk | inv | rpl | rel | est | new + Data: 0b00101000, + Mask: 0b00101000, + } + dnsPortMatchValue := types.BitRange{Value: uint16(dnsPort)} + conj.serviceClause = conj.newClause(1, 2, getTableByID(conj.ruleTableID), nil) conj.toClause = conj.newClause(2, 2, getTableByID(conj.ruleTableID), nil) c.featureNetworkPolicy.conjMatchFlowLock.Lock() defer c.featureNetworkPolicy.conjMatchFlowLock.Unlock() - ctxChanges := conj.serviceClause.addServiceFlows(c.featureNetworkPolicy, []v1beta2.Service{udpService}, &dnsPriority, false) - - tcpFlags := TCPFlags{ - // URG|ACK|PSH|RST|SYN|FIN| - Flag: 0b011000, - Mask: 0b011000, - } - tcpDNSPort := types.BitRange{Value: uint16(dnsPort)} + var ctxChanges []*conjMatchFlowContextChange for _, proto := range c.featureNetworkPolicy.ipProtocols { - tcpServiceMatch := &conjunctiveMatch{ + tcpMatch := &conjunctiveMatch{ tableID: conj.serviceClause.ruleTable.GetID(), priority: &dnsPriority, - } - if proto == binding.ProtocolIP { - tcpServiceMatch.matchPairs = []matchPair{ + matchPairs: []matchPair{ { - matchKey: MatchTCPSrcPort, - matchValue: tcpDNSPort, + matchKey: MatchCTState, + matchValue: dnsCTState, }, + }, + } + udpMatch := &conjunctiveMatch{ + tableID: conj.serviceClause.ruleTable.GetID(), + priority: &dnsPriority, + matchPairs: []matchPair{ + // Add CTState for UDP as well to make sure only solicited DNS responses are sent + // to userspace. { - matchKey: MatchTCPFlags, - matchValue: tcpFlags, + matchKey: MatchCTState, + matchValue: dnsCTState, }, - } + }, + } + if proto == binding.ProtocolIP { + tcpMatch.matchPairs = append(tcpMatch.matchPairs, matchPair{ + matchKey: MatchTCPSrcPort, + matchValue: dnsPortMatchValue, + }) + udpMatch.matchPairs = append(udpMatch.matchPairs, matchPair{ + matchKey: MatchUDPSrcPort, + matchValue: dnsPortMatchValue, + }) } else if proto == binding.ProtocolIPv6 { - tcpServiceMatch.matchPairs = []matchPair{ - { - matchKey: MatchTCPv6SrcPort, - matchValue: tcpDNSPort, - }, - { - matchKey: MatchTCPv6Flags, - matchValue: tcpFlags, - }, - } + tcpMatch.matchPairs = append(tcpMatch.matchPairs, matchPair{ + matchKey: MatchTCPv6SrcPort, + matchValue: dnsPortMatchValue, + }) + udpMatch.matchPairs = append(udpMatch.matchPairs, matchPair{ + matchKey: MatchUDPv6SrcPort, + matchValue: dnsPortMatchValue, + }) } - ctxChange := conj.serviceClause.addConjunctiveMatchFlow(c.featureNetworkPolicy, tcpServiceMatch, false, false) - ctxChanges = append(ctxChanges, ctxChange) + tcpCtxChange := conj.serviceClause.addConjunctiveMatchFlow(c.featureNetworkPolicy, tcpMatch, false, false) + udpCtxChange := conj.serviceClause.addConjunctiveMatchFlow(c.featureNetworkPolicy, udpMatch, false, false) + ctxChanges = append(ctxChanges, tcpCtxChange, udpCtxChange) } if err := c.featureNetworkPolicy.applyConjunctiveMatchFlows(ctxChanges); err != nil { return err diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index 6220eae93f4..833cd0edb9c 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -1447,6 +1447,47 @@ func Test_featureNetworkPolicy_initFlows(t *testing.T) { } func Test_NewDNSPacketInConjunction(t *testing.T) { + ovsMetersSupported := ovsMetersAreSupported() + ipv4ExpFlows := []string{ + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)", + } + if ovsMetersSupported { + ipv4ExpFlows = []string{ + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)", + } + } + ipv6ExpFlows := []string{ + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp6,tp_src=53 actions=conjunction(1,1/2)", + } + if ovsMetersSupported { + ipv6ExpFlows = []string{ + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp6,tp_src=53 actions=conjunction(1,1/2)", + } + } + dsExpFlows := []string{ + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp6,tp_src=53 actions=conjunction(1,1/2)", + } + if ovsMetersSupported { + dsExpFlows = []string{ + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp6,tp_src=53 actions=conjunction(1,1/2)", + } + } for _, tc := range []struct { name string enableIPv4 bool @@ -1455,39 +1496,25 @@ func Test_NewDNSPacketInConjunction(t *testing.T) { expectedFlows []string }{ { - name: "IPv4 only", - enableIPv4: true, - enableIPv6: false, - conjID: 1, - expectedFlows: []string{ - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,udp,tp_src=53 actions=conjunction(1,1/2)", - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,tcp,tp_src=53,tcp_flags=+psh+ack actions=conjunction(1,1/2)", - }, + name: "IPv4 only", + enableIPv4: true, + enableIPv6: false, + conjID: 1, + expectedFlows: ipv4ExpFlows, }, { - name: "IPv6 only", - enableIPv4: false, - enableIPv6: true, - conjID: 1, - expectedFlows: []string{ - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,udp6,tp_src=53 actions=conjunction(1,1/2)", - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,tcp6,tp_src=53,tcp_flags=+psh+ack actions=conjunction(1,1/2)", - }, + name: "IPv6 only", + enableIPv4: false, + enableIPv6: true, + conjID: 1, + expectedFlows: ipv6ExpFlows, }, { - name: "dual stack", - enableIPv4: true, - enableIPv6: true, - conjID: 1, - expectedFlows: []string{ - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric", - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,udp,tp_src=53 actions=conjunction(1,1/2)", - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,tcp,tp_src=53,tcp_flags=+psh+ack actions=conjunction(1,1/2)", - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,udp6,tp_src=53 actions=conjunction(1,1/2)", - "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,tcp6,tp_src=53,tcp_flags=+psh+ack actions=conjunction(1,1/2)", - }, + name: "dual stack", + enableIPv4: true, + enableIPv6: true, + conjID: 1, + expectedFlows: dsExpFlows, }, } { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/agent/openflow/packetin.go b/pkg/agent/openflow/packetin.go index 78373a456c0..1faf2991554 100644 --- a/pkg/agent/openflow/packetin.go +++ b/pkg/agent/openflow/packetin.go @@ -78,12 +78,14 @@ const ( // We use OpenFlow Meter for packetIn rate limiting on OVS side. // Meter Entry ID. - PacketInMeterIDNP = 1 - PacketInMeterIDTF = 2 + PacketInMeterIDNP = 1 + PacketInMeterIDTF = 2 + PacketInMeterIDDNS = 3 // Meter Entry Rate. It is represented as number of events per second. // Packets which exceed the rate will be dropped. - PacketInMeterRateNP = 100 - PacketInMeterRateTF = 100 + PacketInMeterRateNP = 100 + PacketInMeterRateTF = 100 + PacketInMeterRateDNS = 100 // PacketInQueueSize defines the size of PacketInQueue. // When PacketInQueue reaches PacketInQueueSize, new packetIn will be dropped. diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 659f69269fd..918ebd0e62f 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2094,6 +2094,9 @@ func (f *featureNetworkPolicy) addFlowMatch(fb binding.FlowBuilder, matchKey *ty fb = fb.MatchProtocol(matchKey.GetOFProtocol()) tcpFlag := matchValue.(TCPFlags) fb = fb.MatchTCPFlags(tcpFlag.Flag, tcpFlag.Mask) + case MatchCTState: + ctState := matchValue.(*openflow15.CTStates) + fb = fb.MatchCTState(ctState) } return fb } @@ -2177,13 +2180,16 @@ func (f *featureNetworkPolicy) multiClusterNetworkPolicySecurityDropFlow(table b // dnsPacketInFlow generates the flow to send dns response packets of fqdn policy selected Pods to the fqdnController for // processing. func (f *featureNetworkPolicy) dnsPacketInFlow(conjunctionID uint32) binding.Flow { - return AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priorityDNSIntercept). + fb := AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priorityDNSIntercept). Cookie(f.cookieAllocator.Request(f.category).Raw()). - MatchConjID(conjunctionID). - // FQDN should pause DNS response packets and send them to the controller. After - // the controller processes DNS response packets, like creating related flows in - // the OVS or no operations are needed, the controller will resume those packets. - Action().SendToController([]byte{uint8(PacketInCategoryDNS)}, true). + MatchConjID(conjunctionID) + if f.ovsMetersAreSupported { + fb = fb.Action().Meter(PacketInMeterIDDNS) + } + // FQDN should pause DNS response packets and send them to the controller. After + // the controller processes DNS response packets, like creating related flows in + // the OVS or no operations are needed, the controller will resume those packets. + return fb.Action().SendToController([]byte{uint8(PacketInCategoryDNS)}, true). Action().GotoTable(IngressMetricTable.GetID()). Done() } diff --git a/pkg/agent/types/networkpolicy.go b/pkg/agent/types/networkpolicy.go index 65e26e2b5f3..7722941cfcb 100644 --- a/pkg/agent/types/networkpolicy.go +++ b/pkg/agent/types/networkpolicy.go @@ -58,6 +58,7 @@ const ( IGMPAddr LabelIDAddr TCPFlagsAddr + CTStateAddr UnSupported ) diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 0dad08d450b..917310b1b8a 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -273,6 +273,7 @@ type FlowBuilder interface { MatchARPTpa(ip net.IP) FlowBuilder MatchARPOp(op uint16) FlowBuilder MatchIPDSCP(dscp uint8) FlowBuilder + MatchCTState(ctStates *openflow15.CTStates) FlowBuilder MatchCTStateNew(isSet bool) FlowBuilder MatchCTStateRel(isSet bool) FlowBuilder MatchCTStateRpl(isSet bool) FlowBuilder diff --git a/pkg/ovs/openflow/ofctrl_builder.go b/pkg/ovs/openflow/ofctrl_builder.go index 5ac426d2c10..a91bc582d6e 100644 --- a/pkg/ovs/openflow/ofctrl_builder.go +++ b/pkg/ovs/openflow/ofctrl_builder.go @@ -123,6 +123,11 @@ func (b *ofFlowBuilder) MatchRegFieldWithValue(field *RegField, data uint32) Flo return b.matchRegRange(field.regID, data, field.rng) } +func (b *ofFlowBuilder) MatchCTState(ctStates *openflow15.CTStates) FlowBuilder { + b.ctStates = ctStates + return b +} + func (b *ofFlowBuilder) MatchCTStateNew(set bool) FlowBuilder { if b.ctStates == nil { b.ctStates = openflow15.NewCTStates() diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index ca0c2762c2a..731a9a25aaf 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -1607,6 +1607,20 @@ func (mr *MockFlowBuilderMockRecorder) MatchCTSrcPort(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MatchCTSrcPort", reflect.TypeOf((*MockFlowBuilder)(nil).MatchCTSrcPort), arg0) } +// MatchCTState mocks base method +func (m *MockFlowBuilder) MatchCTState(arg0 *openflow15.CTStates) openflow.FlowBuilder { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MatchCTState", arg0) + ret0, _ := ret[0].(openflow.FlowBuilder) + return ret0 +} + +// MatchCTState indicates an expected call of MatchCTState +func (mr *MockFlowBuilderMockRecorder) MatchCTState(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MatchCTState", reflect.TypeOf((*MockFlowBuilder)(nil).MatchCTState), arg0) +} + // MatchCTStateDNAT mocks base method func (m *MockFlowBuilder) MatchCTStateDNAT(arg0 bool) openflow.FlowBuilder { m.ctrl.T.Helper()