Skip to content

Commit

Permalink
Optimize processing of egress rules that have no named port (#1100)
Browse files Browse the repository at this point in the history
If an egress rule allows all addresses but has no named port, it doesn't
need to have the "matchAllPodsPeer" in its "To" Peer which is used for
named port resolving. This could avoid the transmission of the
"matchAllPods" AddressGroup and the reconciliation of the relevant
rules.
  • Loading branch information
tnqn authored Aug 20, 2020
1 parent 255b195 commit f8acc2b
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 131 deletions.
43 changes: 25 additions & 18 deletions pkg/controller/networkpolicy/clusternetworkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package networkpolicy
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

Expand Down Expand Up @@ -135,18 +136,23 @@ func (n *NetworkPolicyController) deleteCNP(old interface{}) {
n.deleteDereferencedAddressGroups(oldInternalNP)
}

// toAntreaServicesForCRD converts a secv1alpha1.NetworkPolicyPort object to an
// Antrea Service object.
func toAntreaServicesForCRD(npPorts []secv1alpha1.NetworkPolicyPort) []networking.Service {
// toAntreaServicesForCRD converts a slice of secv1alpha1.NetworkPolicyPort
// objects to a slice of Antrea Service objects. A bool is returned along with
// the Service objects to indicate whether any named port exists.
func toAntreaServicesForCRD(npPorts []secv1alpha1.NetworkPolicyPort) ([]networking.Service, bool) {
var antreaServices []networking.Service
var namedPortExists bool
for _, npPort := range npPorts {
if npPort.Port != nil && npPort.Port.Type == intstr.String {
namedPortExists = true
}
antreaService := networking.Service{
Protocol: toAntreaProtocol(npPort.Protocol),
Port: npPort.Port,
}
antreaServices = append(antreaServices, antreaService)
}
return antreaServices
return antreaServices, namedPortExists
}

// toAntreaIPBlockForCRD converts a secv1alpha1.IPBlock to an Antrea IPBlock.
Expand Down Expand Up @@ -174,24 +180,23 @@ func getTierPriority(tier string) networking.TierPriority {
return tierPriorityMap[tier]
}

func (n *NetworkPolicyController) toAntreaPeerForCRD(peers []secv1alpha1.NetworkPolicyPeer, cnp *secv1alpha1.ClusterNetworkPolicy, dir networking.Direction) *networking.NetworkPolicyPeer {
func (n *NetworkPolicyController) toAntreaPeerForCRD(peers []secv1alpha1.NetworkPolicyPeer, cnp *secv1alpha1.ClusterNetworkPolicy, dir networking.Direction, namedPortExists bool) *networking.NetworkPolicyPeer {
var addressGroups []string
// Empty NetworkPolicyPeer is supposed to match all addresses.
// It's treated as an IPBlock "0.0.0.0/0".
if len(peers) == 0 {
// For an ingress Peer, skip adding the AddressGroup matching all Pods
// because in case of ingress Rule, the named Port resolution happens on
// Pods in AppliedToGroup.
if dir == networking.DirectionIn {
// For an egress Peer that specifies any named ports, it creates or
// reuses the AddressGroup matching all Pods in all Namespaces and
// appends the AddressGroup UID to the returned Peer such that it can be
// used to resolve the named ports.
// For other cases it uses the IPBlock "0.0.0.0/0" to avoid the overhead
// of handling member updates of the AddressGroup.
if dir == networking.DirectionIn || !namedPortExists {
return &matchAllPeer
}
// For an egress Peer, create an AddressGroup matching all Pods in all
// Namespaces such that it can be used to resolve named Ports. This
// AddressGroup is set in the NetworkPolicyPeer of matchAllPeer.
allPodsGroupUID := n.createAddressGroupForCRD(matchAllPodsPeerCrd, cnp)
podsPeer := matchAllPeer
addressGroups = append(addressGroups, allPodsGroupUID)
podsPeer.AddressGroups = addressGroups
podsPeer.AddressGroups = append(addressGroups, allPodsGroupUID)
return &podsPeer
}
var ipBlocks []networking.IPBlock
Expand Down Expand Up @@ -253,21 +258,23 @@ func (n *NetworkPolicyController) processClusterNetworkPolicy(cnp *secv1alpha1.C
// Compute NetworkPolicyRule for Egress Rule.
for idx, ingressRule := range cnp.Spec.Ingress {
// Set default action to ALLOW to allow traffic.
services, namedPortExists := toAntreaServicesForCRD(ingressRule.Ports)
rules = append(rules, networking.NetworkPolicyRule{
Direction: networking.DirectionIn,
From: *n.toAntreaPeerForCRD(ingressRule.From, cnp, networking.DirectionIn),
Services: toAntreaServicesForCRD(ingressRule.Ports),
From: *n.toAntreaPeerForCRD(ingressRule.From, cnp, networking.DirectionIn, namedPortExists),
Services: services,
Action: ingressRule.Action,
Priority: int32(idx),
})
}
// Compute NetworkPolicyRule for Egress Rule.
for idx, egressRule := range cnp.Spec.Egress {
// Set default action to ALLOW to allow traffic.
services, namedPortExists := toAntreaServicesForCRD(egressRule.Ports)
rules = append(rules, networking.NetworkPolicyRule{
Direction: networking.DirectionOut,
To: *n.toAntreaPeerForCRD(egressRule.To, cnp, networking.DirectionOut),
Services: toAntreaServicesForCRD(egressRule.Ports),
To: *n.toAntreaPeerForCRD(egressRule.To, cnp, networking.DirectionOut, namedPortExists),
Services: services,
Action: egressRule.Action,
Priority: int32(idx),
})
Expand Down
81 changes: 46 additions & 35 deletions pkg/controller/networkpolicy/clusternetworkpolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

Expand All @@ -31,32 +30,46 @@ import (
)

func TestToAntreaServicesForCRD(t *testing.T) {
tcpProto := v1.ProtocolTCP
portNum := intstr.FromInt(80)
tables := []struct {
ports []secv1alpha1.NetworkPolicyPort
expValues []networking.Service
ports []secv1alpha1.NetworkPolicyPort
expServices []networking.Service
expNamedPortExists bool
}{
{
getCNPPorts(tcpProto),
[]networking.Service{
ports: []secv1alpha1.NetworkPolicyPort{
{
Protocol: toAntreaProtocol(&tcpProto),
Port: &portNum,
Protocol: &k8sProtocolTCP,
Port: &int80,
},
},
expServices: []networking.Service{
{
Protocol: toAntreaProtocol(&k8sProtocolTCP),
Port: &int80,
},
},
expNamedPortExists: false,
},
{
ports: []secv1alpha1.NetworkPolicyPort{
{
Protocol: &k8sProtocolTCP,
Port: &strHTTP,
},
},
expServices: []networking.Service{
{
Protocol: toAntreaProtocol(&k8sProtocolTCP),
Port: &strHTTP,
},
},
expNamedPortExists: true,
},
}
for _, table := range tables {
services := toAntreaServicesForCRD(table.ports)
service := services[0]
expValue := table.expValues[0]
if *service.Protocol != *expValue.Protocol {
t.Errorf("Unexpected Antrea Protocol in Antrea Service. Expected %v, got %v", *expValue.Protocol, *service.Protocol)
}
if *service.Port != *expValue.Port {
t.Errorf("Unexpected Antrea Port in Antrea Service. Expected %v, got %v", *expValue.Port, *service.Port)
}
services, namedPortExist := toAntreaServicesForCRD(table.ports)
assert.Equal(t, table.expServices, services)
assert.Equal(t, table.expNamedPortExists, namedPortExist)
}
}

Expand Down Expand Up @@ -123,10 +136,11 @@ func TestToAntreaPeerForCRD(t *testing.T) {
matchAllPodsPeer := matchAllPeer
matchAllPodsPeer.AddressGroups = []string{getNormalizedUID(toGroupSelector("", nil, &selectorAll).NormalizedName)}
tests := []struct {
name string
inPeers []secv1alpha1.NetworkPolicyPeer
outPeer networking.NetworkPolicyPeer
direction networking.Direction
name string
inPeers []secv1alpha1.NetworkPolicyPeer
outPeer networking.NetworkPolicyPeer
direction networking.Direction
namedPortExists bool
}{
{
name: "pod-ns-selector-peer-ingress",
Expand Down Expand Up @@ -205,16 +219,23 @@ func TestToAntreaPeerForCRD(t *testing.T) {
direction: networking.DirectionIn,
},
{
name: "empty-peer-egress",
name: "empty-peer-egress-with-named-port",
inPeers: []secv1alpha1.NetworkPolicyPeer{},
outPeer: matchAllPodsPeer,
direction: networking.DirectionOut,
namedPortExists: true,
},
{
name: "empty-peer-egress-without-named-port",
inPeers: []secv1alpha1.NetworkPolicyPeer{},
outPeer: matchAllPodsPeer,
outPeer: matchAllPeer,
direction: networking.DirectionOut,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, npc := newController()
actualPeer := npc.toAntreaPeerForCRD(tt.inPeers, testCNPObj, tt.direction)
actualPeer := npc.toAntreaPeerForCRD(tt.inPeers, testCNPObj, tt.direction, tt.namedPortExists)
if !reflect.DeepEqual(tt.outPeer.AddressGroups, (*actualPeer).AddressGroups) {
t.Errorf("Unexpected AddressGroups in Antrea Peer conversion. Expected %v, got %v", tt.outPeer.AddressGroups, (*actualPeer).AddressGroups)
}
Expand Down Expand Up @@ -981,13 +1002,3 @@ func getCNP() *secv1alpha1.ClusterNetworkPolicy {
return npObj

}

func getCNPPorts(proto v1.Protocol) []secv1alpha1.NetworkPolicyPort {
portNum := intstr.FromInt(80)
port := secv1alpha1.NetworkPolicyPort{
Protocol: &proto,
Port: &portNum,
}
ports := []secv1alpha1.NetworkPolicyPort{port}
return ports
}
43 changes: 25 additions & 18 deletions pkg/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
Expand Down Expand Up @@ -474,18 +475,23 @@ func toAntreaProtocol(npProtocol *v1.Protocol) *networking.Protocol {
return &internalProtocol
}

// toAntreaServices converts a networkingv1.NetworkPolicyPort object to an
// Antrea Service object.
func toAntreaServices(npPorts []networkingv1.NetworkPolicyPort) []networking.Service {
// toAntreaServices converts a slice of networkingv1.NetworkPolicyPort objects
// to a slice of Antrea Service objects. A bool is returned along with the
// Service objects to indicate whether any named port exists.
func toAntreaServices(npPorts []networkingv1.NetworkPolicyPort) ([]networking.Service, bool) {
var antreaServices []networking.Service
var namedPortExists bool
for _, npPort := range npPorts {
if npPort.Port != nil && npPort.Port.Type == intstr.String {
namedPortExists = true
}
antreaService := networking.Service{
Protocol: toAntreaProtocol(npPort.Protocol),
Port: npPort.Port,
}
antreaServices = append(antreaServices, antreaService)
}
return antreaServices
return antreaServices, namedPortExists
}

// toAntreaIPBlock converts a networkingv1.IPBlock to an Antrea IPBlock.
Expand Down Expand Up @@ -524,21 +530,23 @@ func (n *NetworkPolicyController) processNetworkPolicy(np *networkingv1.NetworkP
// Compute NetworkPolicyRule for Ingress Rule.
for _, ingressRule := range np.Spec.Ingress {
ingressRuleExists = true
services, namedPortExists := toAntreaServices(ingressRule.Ports)
rules = append(rules, networking.NetworkPolicyRule{
Direction: networking.DirectionIn,
From: *n.toAntreaPeer(ingressRule.From, np, networking.DirectionIn),
Services: toAntreaServices(ingressRule.Ports),
From: *n.toAntreaPeer(ingressRule.From, np, networking.DirectionIn, namedPortExists),
Services: services,
Priority: defaultRulePriority,
Action: &defaultAction,
})
}
// Compute NetworkPolicyRule for Egress Rule.
for _, egressRule := range np.Spec.Egress {
egressRuleExists = true
services, namedPortExists := toAntreaServices(egressRule.Ports)
rules = append(rules, networking.NetworkPolicyRule{
Direction: networking.DirectionOut,
To: *n.toAntreaPeer(egressRule.To, np, networking.DirectionOut),
Services: toAntreaServices(egressRule.Ports),
To: *n.toAntreaPeer(egressRule.To, np, networking.DirectionOut, namedPortExists),
Services: services,
Priority: defaultRulePriority,
Action: &defaultAction,
})
Expand Down Expand Up @@ -575,25 +583,24 @@ func (n *NetworkPolicyController) processNetworkPolicy(np *networkingv1.NetworkP
return internalNetworkPolicy
}

func (n *NetworkPolicyController) toAntreaPeer(peers []networkingv1.NetworkPolicyPeer, np *networkingv1.NetworkPolicy, dir networking.Direction) *networking.NetworkPolicyPeer {
func (n *NetworkPolicyController) toAntreaPeer(peers []networkingv1.NetworkPolicyPeer, np *networkingv1.NetworkPolicy, dir networking.Direction, namedPortExists bool) *networking.NetworkPolicyPeer {
var addressGroups []string
// Empty NetworkPolicyPeer is supposed to match all addresses.
// See https://kubernetes.io/docs/concepts/services-networking/network-policies/#default-allow-all-ingress-traffic.
// It's treated as an IPBlock "0.0.0.0/0".
if len(peers) == 0 {
// For an ingress Peer, skip adding the AddressGroup matching all Pods
// because in case of ingress Rule, the named Port resolution happens on
// Pods in AppliedToGroup.
if dir == networking.DirectionIn {
// For an egress Peer that specifies any named ports, it creates or
// reuses the AddressGroup matching all Pods in all Namespaces and
// appends the AddressGroup UID to the returned Peer such that it can be
// used to resolve the named ports.
// For other cases it uses the IPBlock "0.0.0.0/0" to avoid the overhead
// of handling member updates of the AddressGroup.
if dir == networking.DirectionIn || !namedPortExists {
return &matchAllPeer
}
// For an egress Peer, create an AddressGroup matching all Pods in all
// Namespaces such that it can be used to resolve named Ports. This
// AddressGroup is set in the NetworkPolicyPeer of matchAllPeer.
allPodsGroupUID := n.createAddressGroup(matchAllPodsPeer, np)
podsPeer := matchAllPeer
addressGroups = append(addressGroups, allPodsGroupUID)
podsPeer.AddressGroups = addressGroups
podsPeer.AddressGroups = append(addressGroups, allPodsGroupUID)
return &podsPeer
}
var ipBlocks []networking.IPBlock
Expand Down
Loading

0 comments on commit f8acc2b

Please sign in to comment.