From a532eade5841f36434f9212d25b475837f33d276 Mon Sep 17 00:00:00 2001 From: Yang Ding Date: Mon, 31 Aug 2020 16:42:48 -0700 Subject: [PATCH] Fix deadlock caused by acquiring priorityMutex twice --- .../controller/networkpolicy/reconciler.go | 13 +-- .../networkpolicy/reconciler_test.go | 88 +++++++++++++++++-- 2 files changed, 89 insertions(+), 12 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/reconciler.go b/pkg/agent/controller/networkpolicy/reconciler.go index e3ef6b3c869..2b0c64098a4 100644 --- a/pkg/agent/controller/networkpolicy/reconciler.go +++ b/pkg/agent/controller/networkpolicy/reconciler.go @@ -635,11 +635,6 @@ func (r *reconciler) updateOFRule(ofID uint32, addedFrom []types.Address, addedT func (r *reconciler) uninstallOFRule(ofID uint32, table binding.TableIDType) error { klog.V(2).Infof("Uninstalling ofRule %d", ofID) - priorityAssigner, exists := r.priorityAssigners[table] - if exists { - priorityAssigner.mutex.Lock() - defer priorityAssigner.mutex.Unlock() - } stalePriorities, err := r.ofClient.UninstallPolicyRuleFlows(ofID) if err != nil { return fmt.Errorf("error uninstalling ofRule %v: %v", ofID, err) @@ -652,6 +647,8 @@ func (r *reconciler) uninstallOFRule(ofID uint32, table binding.TableIDType) err // Cannot parse the priority str. Theoretically this should never happen. return err } + // If there are stalePriorities, priorityAssigners[table] must not be nil. + priorityAssigner, _ := r.priorityAssigners[table] priorityAssigner.assigner.Release(uint16(priorityNum)) } } @@ -668,7 +665,6 @@ func (r *reconciler) Forget(ruleID string) error { klog.Infof("Forgetting rule %v", ruleID) value, exists := r.lastRealizeds.Load(ruleID) - if !exists { // No-op if the rule was not realized before. return nil @@ -676,6 +672,11 @@ func (r *reconciler) Forget(ruleID string) error { lastRealized := value.(*lastRealized) table := r.getOFRuleTable(lastRealized.CompletedRule) + priorityAssigner, exists := r.priorityAssigners[table] + if exists { + priorityAssigner.mutex.Lock() + defer priorityAssigner.mutex.Unlock() + } for svcKey, ofID := range lastRealized.ofIDs { if err := r.uninstallOFRule(ofID, table); err != nil { return err diff --git a/pkg/agent/controller/networkpolicy/reconciler_test.go b/pkg/agent/controller/networkpolicy/reconciler_test.go index bb4f9324464..5c1a570acb2 100644 --- a/pkg/agent/controller/networkpolicy/reconciler_test.go +++ b/pkg/agent/controller/networkpolicy/reconciler_test.go @@ -38,7 +38,7 @@ var ( appliedToGroup1 = v1beta1.NewGroupMemberPodSet(newAppliedToGroupMember("pod1", "ns1")) appliedToGroup2 = v1beta1.NewGroupMemberPodSet(newAppliedToGroupMember("pod2", "ns1")) - appliedToGroup3 = v1beta1.NewGroupMemberPodSet(newAppliedToGroupMember("pod3", "ns1")) + appliedToGroup3 = v1beta1.NewGroupMemberPodSet(newAppliedToGroupMember("pod4", "ns1")) appliedToGroupWithSameContainerPort = v1beta1.NewGroupMemberPodSet( newAppliedToGroupMember("pod1", "ns1", v1beta1.NamedPort{Name: "http", Protocol: v1beta1.ProtocolTCP, Port: 80}), newAppliedToGroupMember("pod3", "ns1", v1beta1.NamedPort{Name: "http", Protocol: v1beta1.ProtocolTCP, Port: 80}), @@ -47,6 +47,8 @@ var ( newAppliedToGroupMember("pod1", "ns1", v1beta1.NamedPort{Name: "http", Protocol: v1beta1.ProtocolTCP, Port: 80}), newAppliedToGroupMember("pod3", "ns1", v1beta1.NamedPort{Name: "http", Protocol: v1beta1.ProtocolTCP, Port: 443}), ) + appliedToGroupWithSingleContainerPort = v1beta1.NewGroupMemberPodSet( + newAppliedToGroupMember("pod1", "ns1", v1beta1.NamedPort{Name: "http", Protocol: v1beta1.ProtocolTCP, Port: 80})) protocolTCP = v1beta1.ProtocolTCP @@ -67,6 +69,9 @@ var ( servicesKey1 = normalizeServices(services1) services2 = []v1beta1.Service{serviceTCP} servicesKey2 = normalizeServices(services2) + + policyPriority = float64(1) + tierPriority = v1beta1.TierPriority(1) ) func newCIDR(cidrStr string) *net.IPNet { @@ -124,6 +129,20 @@ func TestReconcilerForget(t *testing.T) { []uint32{8, 9}, false, }, + { + "known-multiple-ofrule-cnp", + map[string]*lastRealized{ + "foo": { + ofIDs: map[servicesKey]uint32{servicesKey1: 8, servicesKey2: 9}, + CompletedRule: &CompletedRule{ + rule: &rule{Direction: v1beta1.DirectionIn, PolicyPriority: &policyPriority, TierPriority: &tierPriority}, + }, + }, + }, + "foo", + []uint32{8, 9}, + false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -602,6 +621,12 @@ func TestReconcilerUpdate(t *testing.T) { IP: net.ParseIP("3.3.3.3"), ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{PodName: "pod2", PodNamespace: "ns1", ContainerID: "container2"}, OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: 2}}) + ifaceStore.AddInterface( + &interfacestore.InterfaceConfig{ + InterfaceName: util.GenerateContainerInterfaceName("pod3", "ns1", "container3"), + IP: net.ParseIP("4.4.4.4"), + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{PodName: "pod3", PodNamespace: "ns1", ContainerID: "container3"}, + OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: 3}}) tests := []struct { name string originalRule *CompletedRule @@ -610,6 +635,7 @@ func TestReconcilerUpdate(t *testing.T) { expectedAddedTo []types.Address expectedDeletedFrom []types.Address expectedDeletedTo []types.Address + expectUninstall bool wantErr bool }{ { @@ -629,6 +655,7 @@ func TestReconcilerUpdate(t *testing.T) { ipsToOFAddresses(sets.NewString("1.1.1.1")), ofPortsToOFAddresses(sets.NewInt32(1)), false, + false, }, { "updating-egress-rule", @@ -647,6 +674,7 @@ func TestReconcilerUpdate(t *testing.T) { ipsToOFAddresses(sets.NewString("2.2.2.2")), ipsToOFAddresses(sets.NewString("1.1.1.1")), false, + false, }, { "updating-ingress-rule-with-missing-ofport", @@ -665,6 +693,7 @@ func TestReconcilerUpdate(t *testing.T) { ipsToOFAddresses(sets.NewString("1.1.1.1")), ofPortsToOFAddresses(sets.NewInt32(1)), false, + false, }, { "updating-egress-rule-with-missing-ip", @@ -683,6 +712,7 @@ func TestReconcilerUpdate(t *testing.T) { ipsToOFAddresses(sets.NewString("2.2.2.2")), ipsToOFAddresses(sets.NewString("1.1.1.1")), false, + false, }, { "updating-egress-rule-deny-all", @@ -701,6 +731,45 @@ func TestReconcilerUpdate(t *testing.T) { ipsToOFAddresses(sets.NewString("2.2.2.2")), []types.Address{}, false, + false, + }, + { + "updating-cnp-ingress-rule", + &CompletedRule{ + rule: &rule{ID: "ingress-rule", Direction: v1beta1.DirectionIn, PolicyPriority: &policyPriority, TierPriority: &tierPriority}, + FromAddresses: addressGroup1, + Pods: appliedToGroup1, + }, + &CompletedRule{ + rule: &rule{ID: "ingress-rule", Direction: v1beta1.DirectionIn, PolicyPriority: &policyPriority, TierPriority: &tierPriority}, + FromAddresses: addressGroup2, + Pods: appliedToGroup2, + }, + ipsToOFAddresses(sets.NewString("1.1.1.2")), + ofPortsToOFAddresses(sets.NewInt32(2)), + ipsToOFAddresses(sets.NewString("1.1.1.1")), + ofPortsToOFAddresses(sets.NewInt32(1)), + false, + false, + }, + { + "updating-cnp-ingress-rule-uninstall", + &CompletedRule{ + rule: &rule{ID: "ingress-rule", Direction: v1beta1.DirectionIn, PolicyPriority: &policyPriority, TierPriority: &tierPriority, Services: []v1beta1.Service{serviceHTTP}}, + FromAddresses: addressGroup1, + Pods: appliedToGroupWithDiffContainerPort, + }, + &CompletedRule{ + rule: &rule{ID: "ingress-rule", Direction: v1beta1.DirectionIn, PolicyPriority: &policyPriority, TierPriority: &tierPriority, Services: []v1beta1.Service{serviceHTTP}}, + FromAddresses: addressGroup1, + Pods: appliedToGroupWithSingleContainerPort, + }, + []types.Address{}, + []types.Address{}, + []types.Address{}, + []types.Address{}, + true, + false, }, } for _, tt := range tests { @@ -708,18 +777,25 @@ func TestReconcilerUpdate(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() mockOFClient := openflowtest.NewMockClient(controller) - mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any()) + mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any()).MaxTimes(2) + priority := gomock.Any() + if !tt.originalRule.isAntreaNetworkPolicyRule() { + priority = nil + } + if tt.expectUninstall { + mockOFClient.EXPECT().UninstallPolicyRuleFlows(gomock.Any()) + } if len(tt.expectedAddedFrom) > 0 { - mockOFClient.EXPECT().AddPolicyRuleAddress(gomock.Any(), types.SrcAddress, gomock.Eq(tt.expectedAddedFrom), nil) + mockOFClient.EXPECT().AddPolicyRuleAddress(gomock.Any(), types.SrcAddress, gomock.Eq(tt.expectedAddedFrom), priority) } if len(tt.expectedAddedTo) > 0 { - mockOFClient.EXPECT().AddPolicyRuleAddress(gomock.Any(), types.DstAddress, gomock.Eq(tt.expectedAddedTo), nil) + mockOFClient.EXPECT().AddPolicyRuleAddress(gomock.Any(), types.DstAddress, gomock.Eq(tt.expectedAddedTo), priority) } if len(tt.expectedDeletedFrom) > 0 { - mockOFClient.EXPECT().DeletePolicyRuleAddress(gomock.Any(), types.SrcAddress, gomock.Eq(tt.expectedDeletedFrom), nil) + mockOFClient.EXPECT().DeletePolicyRuleAddress(gomock.Any(), types.SrcAddress, gomock.Eq(tt.expectedDeletedFrom), priority) } if len(tt.expectedDeletedTo) > 0 { - mockOFClient.EXPECT().DeletePolicyRuleAddress(gomock.Any(), types.DstAddress, gomock.Eq(tt.expectedDeletedTo), nil) + mockOFClient.EXPECT().DeletePolicyRuleAddress(gomock.Any(), types.DstAddress, gomock.Eq(tt.expectedDeletedTo), priority) } r := newReconciler(mockOFClient, ifaceStore) if err := r.Reconcile(tt.originalRule); (err != nil) != tt.wantErr {