Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #5566 and #5583 Fix deadlock when accessing dirtyRules in fqdn controller #5579

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions pkg/agent/controller/networkpolicy/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type ruleRealizationUpdate struct {
// ruleSyncTracker tracks the realization status of FQDN rules that are
// applied to workloads on this Node.
type ruleSyncTracker struct {
mutex sync.Mutex
mutex sync.RWMutex
// updateCh is the channel used by the rule reconciler to report rule realization status.
updateCh chan ruleRealizationUpdate
// ruleToSubscribers keeps track of the subscribers that are currently subscribed
Expand Down Expand Up @@ -497,14 +497,12 @@ func (f *fqdnController) syncDirtyRules(fqdn string, waitCh chan error, addressU
utilsets.MergeString(dirtyRules, f.selectorItemToRuleIDs[selectorItem])
}
if !addressUpdate {
f.ruleSyncTracker.mutex.Lock()
defer f.ruleSyncTracker.mutex.Unlock()
// If there is no address update for this FQDN, and rules selecting this FQDN
// were all previously realized successfully, then there will be no dirty rules
// left to be synced. On the contrary, if some rules that select this FQDN are
// still in the dirtyRules set of the ruleSyncTracker, then only those rules
// should be retried for reconciliation, and packetOut shall be blocked.
dirtyRules = f.ruleSyncTracker.dirtyRules.Intersection(dirtyRules)
dirtyRules = f.ruleSyncTracker.getDirtyRules().Intersection(dirtyRules)
}
if len(dirtyRules) > 0 {
klog.V(4).InfoS("Dirty rules blocking packetOut", "dirtyRules", dirtyRules)
Expand Down Expand Up @@ -533,6 +531,14 @@ func (rst *ruleSyncTracker) subscribe(waitCh chan error, dirtyRules sets.String)
}
}

// getDirtyRules retrieves the current dirty rule set of ruleSyncTracker.
func (rst *ruleSyncTracker) getDirtyRules() sets.String {
rst.mutex.RLock()
defer rst.mutex.RUnlock()
// Must return a copy as the set can be updated in-place by Run func.
return rst.dirtyRules.Union(nil)
}

func (rst *ruleSyncTracker) Run(stopCh <-chan struct{}) {
for {
select {
Expand Down
134 changes: 134 additions & 0 deletions pkg/agent/controller/networkpolicy/fqdn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package networkpolicy

import (
"context"
"fmt"
"net"
"testing"
"time"
Expand Down Expand Up @@ -387,3 +388,136 @@ func TestGetIPsForFQDNSelectors(t *testing.T) {
})
}
}

func TestSyncDirtyRules(t *testing.T) {
testFQDN := "test.antrea.io"
selectorItem := fqdnSelectorItem{
matchName: testFQDN,
}
testFQDN2 := "dev.antrea.io"
selectorItem2 := fqdnSelectorItem{
matchName: testFQDN2,
}
tests := []struct {
name string
fqdnsToSync []string
waitChs []chan error
addressUpdates []bool
prevDirtyRules sets.String
notifications []ruleRealizationUpdate
expectedDirtyRuleSyncCalls []string
expectedDirtyRulesRemaining sets.String
expectErr bool
}{
{
name: "test non-blocking dirty rule sync with address update",
fqdnsToSync: []string{testFQDN},
prevDirtyRules: sets.NewString(),
addressUpdates: []bool{true},
waitChs: []chan error{nil},
notifications: []ruleRealizationUpdate{{"1", nil}, {"2", nil}},
expectedDirtyRuleSyncCalls: []string{"1", "2"},
expectedDirtyRulesRemaining: sets.NewString(),
expectErr: false,
},
{
name: "test blocking dirty rule sync with address update",
fqdnsToSync: []string{testFQDN},
prevDirtyRules: sets.NewString(),
waitChs: []chan error{make(chan error, 1)},
addressUpdates: []bool{true},
notifications: []ruleRealizationUpdate{{"1", nil}, {"2", nil}},
expectedDirtyRuleSyncCalls: []string{"1", "2"},
expectedDirtyRulesRemaining: sets.NewString(),
expectErr: false,
},
{
name: "test blocking dirty rule sync with failed rule realization",
fqdnsToSync: []string{testFQDN},
prevDirtyRules: sets.NewString(),
waitChs: []chan error{make(chan error, 1)},
addressUpdates: []bool{true},
notifications: []ruleRealizationUpdate{{"1", nil}, {"2", fmt.Errorf("ovs err")}},
expectedDirtyRuleSyncCalls: []string{"1", "2"},
expectedDirtyRulesRemaining: sets.NewString("2"),
expectErr: true,
},
{
name: "test blocking dirty rule sync without address update but previously failed rule realization",
fqdnsToSync: []string{testFQDN},
prevDirtyRules: sets.NewString("2"),
waitChs: []chan error{make(chan error, 1)},
addressUpdates: []bool{false},
notifications: []ruleRealizationUpdate{{"2", nil}},
expectedDirtyRuleSyncCalls: []string{"2"},
expectedDirtyRulesRemaining: sets.NewString(),
expectErr: false,
},
{
name: "test blocking dirty rule sync without address update",
fqdnsToSync: []string{testFQDN},
prevDirtyRules: sets.NewString(),
waitChs: []chan error{make(chan error, 1)},
addressUpdates: []bool{false},
notifications: []ruleRealizationUpdate{},
expectedDirtyRuleSyncCalls: []string{},
expectedDirtyRulesRemaining: sets.NewString(),
expectErr: false,
},
{
name: "test blocking single dirty rule multiple FQDN concurrent updates",
fqdnsToSync: []string{testFQDN, testFQDN2},
prevDirtyRules: sets.NewString(),
waitChs: []chan error{make(chan error, 1), make(chan error, 1)},
addressUpdates: []bool{true, false},
notifications: []ruleRealizationUpdate{{"1", nil}, {"2", nil}},
expectedDirtyRuleSyncCalls: []string{"1", "2", "2"},
expectedDirtyRulesRemaining: sets.NewString(),
expectErr: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
controller := gomock.NewController(t)
f, _ := newMockFQDNController(t, controller, nil)
var dirtyRuleSyncCalls []string
f.dirtyRuleHandler = func(s string) {
dirtyRuleSyncCalls = append(dirtyRuleSyncCalls, s)
}
f.addFQDNSelector("1", []string{testFQDN})
f.addFQDNSelector("2", []string{testFQDN})
f.addFQDNSelector("2", []string{testFQDN2})
f.setFQDNMatchSelector(testFQDN, selectorItem)
f.setFQDNMatchSelector(testFQDN2, selectorItem2)
// This simulates failed rule syncs in previous syncDirtyRules() calls
if len(tc.prevDirtyRules) > 0 {
f.ruleSyncTracker.dirtyRules = tc.prevDirtyRules
}
stopCh := make(chan struct{})
defer close(stopCh)
go f.runRuleSyncTracker(stopCh)

for i, fqdn := range tc.fqdnsToSync {
f.syncDirtyRules(fqdn, tc.waitChs[i], tc.addressUpdates[i])
}
for _, update := range tc.notifications {
f.ruleSyncTracker.updateCh <- update
}
assert.ElementsMatch(t, tc.expectedDirtyRuleSyncCalls, dirtyRuleSyncCalls)
for _, waitCh := range tc.waitChs {
if waitCh != nil {
assert.Eventually(t, func() bool {
select {
case err := <-waitCh:
if err != nil && !tc.expectErr {
return false
}
}
return true
}, ruleRealizationTimeout, time.Millisecond*10, "Failed to successfully wait for rule syncs")
}
}
assert.Equal(t, tc.expectedDirtyRulesRemaining, f.ruleSyncTracker.getDirtyRules())
})
}
}