From ad7d64d2fa45436680b1d9b2422c21e8a96f27d8 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Thu, 4 Mar 2021 19:20:15 +0800 Subject: [PATCH] Do not construct intermediate sets when calculating union of multiple sets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit An AddressGroup can be shared by multiple NetworkPolicies and its span is the union of the NetworkPolicies'. The syncAddressGroup method uses "Union" method to calculate the union set. However, the method always create a new sets and copies the original items when traversing each set. This performs badly when the number of the NetworkPolicies increase and the span is big. This patch optimizes it by using a "Merge" function to avoid above cost. The benchmark impact is as below when there are 1000 NetworkPolicies sharing the AddressGroup and the span is 1000 Nodes: name old time/op new time/op delta SyncAddressGroup-48 417µs ± 4% 326µs ± 4% -21.91% (p=0.008 n=5+5) name old alloc/op new alloc/op delta SyncAddressGroup-48 98.9kB ± 0% 51.1kB ± 1% -48.27% (p=0.008 n=5+5) name old allocs/op new allocs/op delta SyncAddressGroup-48 1.05k ± 0% 0.05k ± 8% -94.78% (p=0.008 n=5+5) --- .../networkpolicy/networkpolicy_controller.go | 5 +- .../networkpolicy_controller_perf_test.go | 204 +++++++++--------- .../networkpolicy/status_controller_test.go | 26 +-- pkg/util/sets/string.go | 33 +++ 4 files changed, 152 insertions(+), 116 deletions(-) create mode 100644 pkg/util/sets/string.go diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index 0630018fe4e..55bf1dccdf7 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -59,6 +59,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy/store" antreatypes "github.com/vmware-tanzu/antrea/pkg/controller/types" "github.com/vmware-tanzu/antrea/pkg/features" + utilsets "github.com/vmware-tanzu/antrea/pkg/util/sets" ) const ( @@ -1416,7 +1417,7 @@ func (n *NetworkPolicyController) syncAddressGroup(key string) error { addrGroupNodeNames := sets.String{} for _, internalNPObj := range nps { internalNP := internalNPObj.(*antreatypes.NetworkPolicy) - addrGroupNodeNames = addrGroupNodeNames.Union(internalNP.SpanMeta.NodeNames) + utilsets.Merge(addrGroupNodeNames, internalNP.SpanMeta.NodeNames) } memberSet := n.populateAddressGroupMemberSet(addressGroup) updatedAddressGroup := &antreatypes.AddressGroup{ @@ -1697,7 +1698,7 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key string) error { continue } appGroup := appGroupObj.(*antreatypes.AppliedToGroup) - nodeNames = nodeNames.Union(appGroup.SpanMeta.NodeNames) + utilsets.Merge(nodeNames, appGroup.SpanMeta.NodeNames) } updatedNetworkPolicy := &antreatypes.NetworkPolicy{ UID: internalNP.UID, diff --git a/pkg/controller/networkpolicy/networkpolicy_controller_perf_test.go b/pkg/controller/networkpolicy/networkpolicy_controller_perf_test.go index 3ee9b6fefea..9b0b522d60e 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller_perf_test.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller_perf_test.go @@ -49,77 +49,17 @@ The metrics are not accurate under the race detector, and will be skipped when t func TestInitXLargeScaleWithSmallNamespaces(t *testing.T) { getObjects := func() ([]*corev1.Namespace, []*networkingv1.NetworkPolicy, []*corev1.Pod) { namespace := rand.String(8) - namespaces := []*corev1.Namespace{ - { - ObjectMeta: metav1.ObjectMeta{Name: namespace, Labels: map[string]string{"app": namespace}}, - }, - } + namespaces := []*corev1.Namespace{newNamespace(namespace, map[string]string{"app": namespace})} networkPolicies := []*networkingv1.NetworkPolicy{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "default-deny-all", UID: types.UID(uuid.New().String())}, - Spec: networkingv1.NetworkPolicySpec{ - PodSelector: metav1.LabelSelector{}, - PolicyTypes: []networkingv1.PolicyType{networkingv1.PolicyTypeIngress, networkingv1.PolicyTypeEgress}, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "np-1", UID: types.UID(uuid.New().String())}, - Spec: networkingv1.NetworkPolicySpec{ - PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"app-1": "scale-1"}}, - PolicyTypes: []networkingv1.PolicyType{networkingv1.PolicyTypeIngress, networkingv1.PolicyTypeEgress}, - Ingress: []networkingv1.NetworkPolicyIngressRule{ - { - From: []networkingv1.NetworkPolicyPeer{ - { - PodSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app-1": "scale-1"}, - }, - }, - }, - }, - }, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "np-2", UID: types.UID(uuid.New().String())}, - Spec: networkingv1.NetworkPolicySpec{ - PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"app-2": "scale-2"}}, - PolicyTypes: []networkingv1.PolicyType{networkingv1.PolicyTypeIngress, networkingv1.PolicyTypeEgress}, - Ingress: []networkingv1.NetworkPolicyIngressRule{ - { - From: []networkingv1.NetworkPolicyPeer{ - { - PodSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app-2": "scale-2"}, - }, - }, - }, - }, - }, - }, - }, + newNetworkPolicy(namespace, "default-deny-all", nil, nil, nil), + newNetworkPolicy(namespace, "np-1", map[string]string{"app-1": "scale-1"}, map[string]string{"app-1": "scale-1"}, nil), + newNetworkPolicy(namespace, "np-2", map[string]string{"app-2": "scale-2"}, map[string]string{"app-2": "scale-2"}, nil), } pods := []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "pod1", UID: types.UID(uuid.New().String()), Labels: map[string]string{"app-1": "scale-1"}}, - Spec: corev1.PodSpec{NodeName: getRandomNodeName()}, - Status: corev1.PodStatus{PodIP: getRandomIP()}, - }, - { - ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "pod2", UID: types.UID(uuid.New().String()), Labels: map[string]string{"app-1": "scale-1"}}, - Spec: corev1.PodSpec{NodeName: getRandomNodeName()}, - Status: corev1.PodStatus{PodIP: getRandomIP()}, - }, - { - ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "pod3", UID: types.UID(uuid.New().String()), Labels: map[string]string{"app-2": "scale-2"}}, - Spec: corev1.PodSpec{NodeName: getRandomNodeName()}, - Status: corev1.PodStatus{PodIP: getRandomIP()}, - }, - { - ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "pod4", UID: types.UID(uuid.New().String()), Labels: map[string]string{"app-2": "scale-2"}}, - Spec: corev1.PodSpec{NodeName: getRandomNodeName()}, - Status: corev1.PodStatus{PodIP: getRandomIP()}, - }, + newPod(namespace, "pod1", map[string]string{"app-1": "scale-1"}), + newPod(namespace, "pod2", map[string]string{"app-1": "scale-1"}), + newPod(namespace, "pod3", map[string]string{"app-2": "scale-2"}), + newPod(namespace, "pod4", map[string]string{"app-2": "scale-2"}), } return namespaces, networkPolicies, pods } @@ -139,39 +79,9 @@ The metrics are not accurate under the race detector, and will be skipped when t func TestInitXLargeScaleWithOneNamespace(t *testing.T) { namespace := rand.String(8) getObjects := func() ([]*corev1.Namespace, []*networkingv1.NetworkPolicy, []*corev1.Pod) { - namespaces := []*corev1.Namespace{ - { - ObjectMeta: metav1.ObjectMeta{Name: namespace, Labels: map[string]string{"app": namespace}}, - }, - } - uid := rand.String(8) - networkPolicies := []*networkingv1.NetworkPolicy{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "np-1" + uid, UID: types.UID(uuid.New().String())}, - Spec: networkingv1.NetworkPolicySpec{ - PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"app-1": "scale-1"}}, - PolicyTypes: []networkingv1.PolicyType{networkingv1.PolicyTypeIngress, networkingv1.PolicyTypeEgress}, - Ingress: []networkingv1.NetworkPolicyIngressRule{ - { - From: []networkingv1.NetworkPolicyPeer{ - { - PodSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app-1": "scale-1"}, - }, - }, - }, - }, - }, - }, - }, - } - pods := []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "pod1" + uid, UID: types.UID(uuid.New().String()), Labels: map[string]string{"app-1": "scale-1"}}, - Spec: corev1.PodSpec{NodeName: getRandomNodeName()}, - Status: corev1.PodStatus{PodIP: getRandomIP()}, - }, - } + namespaces := []*corev1.Namespace{newNamespace(namespace, map[string]string{"app": namespace})} + networkPolicies := []*networkingv1.NetworkPolicy{newNetworkPolicy(namespace, "", map[string]string{"app-1": "scale-1"}, map[string]string{"app-1": "scale-1"}, nil)} + pods := []*corev1.Pod{newPod(namespace, "", map[string]string{"app-1": "scale-1"})} return namespaces, networkPolicies, pods } namespaces, networkPolicies, pods := getXObjects(10000, getObjects) @@ -331,3 +241,95 @@ func toRunTimeObjects(namespaces []*corev1.Namespace, networkPolicies []*network } return objs } + +func newNamespace(name string, labels map[string]string) *corev1.Namespace { + return &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: name, Labels: labels}, + } +} + +func newPod(namespace, name string, labels map[string]string) *corev1.Pod { + if name == "" { + name = "pod-" + rand.String(8) + } + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name, UID: types.UID(uuid.New().String()), Labels: labels}, + Spec: corev1.PodSpec{NodeName: getRandomNodeName()}, + Status: corev1.PodStatus{PodIP: getRandomIP()}, + } + return pod +} + +func newNetworkPolicy(namespace, name string, podSelector, ingressPodSelector, egressPodSelector map[string]string) *networkingv1.NetworkPolicy { + if name == "" { + name = "np-" + rand.String(8) + } + policy := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name, UID: types.UID(uuid.New().String())}, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: podSelector}, + PolicyTypes: []networkingv1.PolicyType{networkingv1.PolicyTypeIngress, networkingv1.PolicyTypeEgress}, + }, + } + if ingressPodSelector != nil { + policy.Spec.Ingress = []networkingv1.NetworkPolicyIngressRule{ + { + From: []networkingv1.NetworkPolicyPeer{ + { + PodSelector: &metav1.LabelSelector{ + MatchLabels: ingressPodSelector, + }, + }, + }, + }, + } + } + if egressPodSelector != nil { + policy.Spec.Egress = []networkingv1.NetworkPolicyEgressRule{ + { + To: []networkingv1.NetworkPolicyPeer{ + { + PodSelector: &metav1.LabelSelector{ + MatchLabels: egressPodSelector, + }, + }, + }, + }, + } + } + return policy +} + +func BenchmarkSyncAddressGroup(b *testing.B) { + namespace := "default" + labels := map[string]string{"app-1": "scale-1"} + getObjects := func() ([]*corev1.Namespace, []*networkingv1.NetworkPolicy, []*corev1.Pod) { + namespaces := []*corev1.Namespace{newNamespace(namespace, nil)} + networkPolicies := []*networkingv1.NetworkPolicy{newNetworkPolicy(namespace, "", labels, labels, nil)} + pods := []*corev1.Pod{newPod(namespace, "", labels)} + return namespaces, networkPolicies, pods + } + namespaces, networkPolicies, pods := getXObjects(1000, getObjects) + objs := toRunTimeObjects(namespaces[0:1], networkPolicies, pods) + stopCh := make(chan struct{}) + defer close(stopCh) + _, c := newController(objs...) + c.informerFactory.Start(stopCh) + + for c.appliedToGroupQueue.Len() > 0 { + key, _ := c.appliedToGroupQueue.Get() + c.syncAppliedToGroup(key.(string)) + c.appliedToGroupQueue.Done(key) + } + for c.internalNetworkPolicyQueue.Len() > 0 { + key, _ := c.internalNetworkPolicyQueue.Get() + c.syncInternalNetworkPolicy(key.(string)) + c.internalNetworkPolicyQueue.Done(key) + } + key, _ := c.addressGroupQueue.Get() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + c.syncAddressGroup(key.(string)) + } +} diff --git a/pkg/controller/networkpolicy/status_controller_test.go b/pkg/controller/networkpolicy/status_controller_test.go index 277f6553260..6a428b9eb61 100644 --- a/pkg/controller/networkpolicy/status_controller_test.go +++ b/pkg/controller/networkpolicy/status_controller_test.go @@ -88,7 +88,7 @@ func newTestStatusController(initialObjects ...runtime.Object) (*StatusControlle return statusController, antreaClientset, antreaInformerFactory, networkPolicyStore, networkPolicyControl } -func newNetworkPolicy(name string, generation int64, nodes []string, ref *controlplane.NetworkPolicyReference) *types.NetworkPolicy { +func newInternalNetworkPolicy(name string, generation int64, nodes []string, ref *controlplane.NetworkPolicyReference) *types.NetworkPolicy { return &types.NetworkPolicy{ SpanMeta: types.SpanMeta{NodeNames: sets.NewString(nodes...)}, Generation: generation, @@ -156,8 +156,8 @@ func TestCreateAntreaNetworkPolicy(t *testing.T) { { name: "no realization status", networkPolicy: []*types.NetworkPolicy{ - newNetworkPolicy("anp1", 1, []string{"node1", "node2"}, newAntreaNetworkPolicyReference("ns1", "anp1")), - newNetworkPolicy("cnp1", 1, []string{"node1", "node2"}, newAntreaClusterNetworkPolicyReference("cnp1")), + newInternalNetworkPolicy("anp1", 1, []string{"node1", "node2"}, newAntreaNetworkPolicyReference("ns1", "anp1")), + newInternalNetworkPolicy("cnp1", 1, []string{"node1", "node2"}, newAntreaClusterNetworkPolicyReference("cnp1")), }, expectedANPStatus: &secv1alpha1.NetworkPolicyStatus{ Phase: secv1alpha1.NetworkPolicyRealizing, @@ -175,8 +175,8 @@ func TestCreateAntreaNetworkPolicy(t *testing.T) { { name: "partially realized", networkPolicy: []*types.NetworkPolicy{ - newNetworkPolicy("anp1", 2, []string{"node1", "node2"}, newAntreaNetworkPolicyReference("ns1", "anp1")), - newNetworkPolicy("cnp1", 3, []string{"node1", "node2"}, newAntreaClusterNetworkPolicyReference("cnp1")), + newInternalNetworkPolicy("anp1", 2, []string{"node1", "node2"}, newAntreaNetworkPolicyReference("ns1", "anp1")), + newInternalNetworkPolicy("cnp1", 3, []string{"node1", "node2"}, newAntreaClusterNetworkPolicyReference("cnp1")), }, collectedNetworkPolicyStatus: []*controlplane.NetworkPolicyStatus{ newNetworkPolicyStatus("anp1", "node1", 1), @@ -200,8 +200,8 @@ func TestCreateAntreaNetworkPolicy(t *testing.T) { { name: "entirely realized", networkPolicy: []*types.NetworkPolicy{ - newNetworkPolicy("anp1", 3, []string{"node1", "node2"}, newAntreaNetworkPolicyReference("ns1", "anp1")), - newNetworkPolicy("cnp1", 4, []string{"node1", "node2"}, newAntreaClusterNetworkPolicyReference("cnp1")), + newInternalNetworkPolicy("anp1", 3, []string{"node1", "node2"}, newAntreaNetworkPolicyReference("ns1", "anp1")), + newInternalNetworkPolicy("cnp1", 4, []string{"node1", "node2"}, newAntreaClusterNetworkPolicyReference("cnp1")), }, collectedNetworkPolicyStatus: []*controlplane.NetworkPolicyStatus{ newNetworkPolicyStatus("anp1", "node1", 3), @@ -251,8 +251,8 @@ func TestCreateAntreaNetworkPolicy(t *testing.T) { } func TestUpdateAntreaNetworkPolicy(t *testing.T) { - anp1 := newNetworkPolicy("anp1", 1, []string{"node1", "node2"}, newAntreaNetworkPolicyReference("ns1", "anp1")) - cnp1 := newNetworkPolicy("cnp1", 2, []string{"node3", "node4", "node5"}, newAntreaClusterNetworkPolicyReference("cnp1")) + anp1 := newInternalNetworkPolicy("anp1", 1, []string{"node1", "node2"}, newAntreaNetworkPolicyReference("ns1", "anp1")) + cnp1 := newInternalNetworkPolicy("cnp1", 2, []string{"node3", "node4", "node5"}, newAntreaClusterNetworkPolicyReference("cnp1")) statusController, _, antreaInformerFactory, networkPolicyStore, networkPolicyControl := newTestStatusController(toAntreaNetworkPolicy(anp1), toAntreaNetworkPolicy(cnp1)) stopCh := make(chan struct{}) defer close(stopCh) @@ -281,8 +281,8 @@ func TestUpdateAntreaNetworkPolicy(t *testing.T) { DesiredNodesRealized: 3, }, networkPolicyControl.getAntreaClusterNetworkPolicyStatus()) - anp1Updated := newNetworkPolicy("anp1", 2, []string{"node1", "node2", "node3"}, newAntreaNetworkPolicyReference("ns1", "anp1")) - cnp1Updated := newNetworkPolicy("cnp1", 3, []string{"node4", "node5"}, newAntreaClusterNetworkPolicyReference("cnp1")) + anp1Updated := newInternalNetworkPolicy("anp1", 2, []string{"node1", "node2", "node3"}, newAntreaNetworkPolicyReference("ns1", "anp1")) + cnp1Updated := newInternalNetworkPolicy("cnp1", 3, []string{"node4", "node5"}, newAntreaClusterNetworkPolicyReference("cnp1")) networkPolicyStore.Update(anp1Updated) networkPolicyStore.Update(cnp1Updated) // TODO: Use a determinate mechanism. @@ -302,7 +302,7 @@ func TestUpdateAntreaNetworkPolicy(t *testing.T) { } func TestDeleteAntreaNetworkPolicy(t *testing.T) { - initialNetworkPolicy := newNetworkPolicy("anp1", 1, []string{"node1", "node2"}, newAntreaNetworkPolicyReference("ns1", "anp1")) + initialNetworkPolicy := newInternalNetworkPolicy("anp1", 1, []string{"node1", "node2"}, newAntreaNetworkPolicyReference("ns1", "anp1")) initialANP := toAntreaNetworkPolicy(initialNetworkPolicy) statusController, _, antreaInformerFactory, networkPolicyStore, _ := newTestStatusController(initialANP) stopCh := make(chan struct{}) @@ -334,7 +334,7 @@ func BenchmarkSyncHandler(b *testing.B) { for i := 0; i < nodeNum; i++ { nodes = append(nodes, fmt.Sprintf("node%d", i)) } - networkPolicy := newNetworkPolicy("anp1", 1, nodes, newAntreaNetworkPolicyReference("ns1", "anp1")) + networkPolicy := newInternalNetworkPolicy("anp1", 1, nodes, newAntreaNetworkPolicyReference("ns1", "anp1")) statusController, _, _, networkPolicyStore, _ := newTestStatusController() networkPolicyStore.Create(networkPolicy) diff --git a/pkg/util/sets/string.go b/pkg/util/sets/string.go new file mode 100644 index 00000000000..4ed8dfb7f08 --- /dev/null +++ b/pkg/util/sets/string.go @@ -0,0 +1,33 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sets + +import "k8s.io/apimachinery/pkg/util/sets" + +// Merge merges the src sets into dst and returns dst. +// This assumes that dst is non-nil. +// For example: +// s1 = {a1, a2, a3} +// s2 = {a1, a2, a4, a5} +// Merge(s1, s2) = {a1, a2, a3, a4, a5} +// s1 = {a1, a2, a3, a4, a5} +// +// It supersedes s1.Union(s2) when constructing a new sets is not the intention. +func Merge(dst, src sets.String) sets.String { + for item := range src { + dst.Insert(item) + } + return dst +}