Skip to content

Commit

Permalink
Clean up stale NetworkPolicies after reconnection (#704)
Browse files Browse the repository at this point in the history
We must do a full sync between the NetworkPolicy cache and the latest
objects after reconnecting to antrea-controller, otherwise stale
NetworkPolicies won't be cleaned up and will cause unexpected behavior.

This PR adds a dummy Bookmark event after initial events of a new watch
so that antrea-agent can know its end and do an atomic update to cache
which can identify stale objects.
  • Loading branch information
tnqn authored May 19, 2020
1 parent 50aee19 commit 66df572
Show file tree
Hide file tree
Showing 18 changed files with 792 additions and 218 deletions.
98 changes: 96 additions & 2 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

Expand Down Expand Up @@ -332,6 +333,29 @@ func (c *ruleCache) GetAddressGroupNum() int {
return len(c.addressSetByGroup)
}

// ReplaceAddressGroups atomically adds the given groups to the cache and deletes
// the pre-existing groups that are not in the given groups from the cache.
// It makes the cache in sync with the apiserver when restarting a watch.
func (c *ruleCache) ReplaceAddressGroups(groups []*v1beta1.AddressGroup) {
c.addressSetLock.Lock()
defer c.addressSetLock.Unlock()

oldGroupKeys := make(sets.String, len(c.addressSetByGroup))
for key := range c.addressSetByGroup {
oldGroupKeys.Insert(key)
}

for _, group := range groups {
oldGroupKeys.Delete(group.Name)
c.addAddressGroupLocked(group)
}

for key := range oldGroupKeys {
delete(c.addressSetByGroup, key)
}
return
}

// AddAddressGroup adds a new *v1beta1.AddressGroup to the cache. The rules
// referencing it will be regarded as dirty.
// It's safe to add an AddressGroup multiple times as it only overrides the
Expand All @@ -340,6 +364,10 @@ func (c *ruleCache) AddAddressGroup(group *v1beta1.AddressGroup) error {
c.addressSetLock.Lock()
defer c.addressSetLock.Unlock()

return c.addAddressGroupLocked(group)
}

func (c *ruleCache) addAddressGroupLocked(group *v1beta1.AddressGroup) error {
podSet := v1beta1.GroupMemberPodSet{}
for i := range group.Pods {
// Must not store address of loop iterator variable as it's the same
Expand All @@ -348,6 +376,10 @@ func (c *ruleCache) AddAddressGroup(group *v1beta1.AddressGroup) error {
// https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable
podSet.Insert(&group.Pods[i])
}
oldPodSet, exists := c.addressSetByGroup[group.Name]
if exists && oldPodSet.Equal(podSet) {
return nil
}
c.addressSetByGroup[group.Name] = podSet
c.onAddressGroupUpdate(group.Name)
return nil
Expand Down Expand Up @@ -392,6 +424,29 @@ func (c *ruleCache) GetAppliedToGroupNum() int {
return len(c.podSetByGroup)
}

// ReplaceAppliedToGroups atomically adds the given groups to the cache and deletes
// the pre-existing groups that are not in the given groups from the cache.
// It makes the cache in sync with the apiserver when restarting a watch.
func (c *ruleCache) ReplaceAppliedToGroups(groups []*v1beta1.AppliedToGroup) {
c.podSetLock.Lock()
defer c.podSetLock.Unlock()

oldGroupKeys := make(sets.String, len(c.podSetByGroup))
for key := range c.podSetByGroup {
oldGroupKeys.Insert(key)
}

for _, group := range groups {
oldGroupKeys.Delete(group.Name)
c.addAppliedToGroupLocked(group)
}

for key := range oldGroupKeys {
delete(c.podSetByGroup, key)
}
return
}

// AddAppliedToGroup adds a new *v1beta1.AppliedToGroup to the cache. The rules
// referencing it will be regarded as dirty.
// It's safe to add an AppliedToGroup multiple times as it only overrides the
Expand All @@ -400,10 +455,18 @@ func (c *ruleCache) AddAppliedToGroup(group *v1beta1.AppliedToGroup) error {
c.podSetLock.Lock()
defer c.podSetLock.Unlock()

return c.addAppliedToGroupLocked(group)
}

func (c *ruleCache) addAppliedToGroupLocked(group *v1beta1.AppliedToGroup) error {
podSet := v1beta1.GroupMemberPodSet{}
for i := range group.Pods {
podSet.Insert(&group.Pods[i])
}
oldPodSet, exists := c.podSetByGroup[group.Name]
if exists && oldPodSet.Equal(podSet) {
return nil
}
c.podSetByGroup[group.Name] = podSet
c.onAppliedToGroupUpdate(group.Name)
return nil
Expand Down Expand Up @@ -464,6 +527,29 @@ func (c *ruleCache) GetNetworkPolicyNum() int {
return len(c.policyMap)
}

// ReplaceNetworkPolicies atomically adds the given policies to the cache and deletes
// the pre-existing policies that are not in the given policies from the cache.
// It makes the cache in sync with the apiserver when restarting a watch.
func (c *ruleCache) ReplaceNetworkPolicies(policies []*v1beta1.NetworkPolicy) {
c.policyMapLock.Lock()
defer c.policyMapLock.Unlock()

oldKeys := make(sets.String, len(c.policyMap))
for key := range c.policyMap {
oldKeys.Insert(key)
}

for i := range policies {
oldKeys.Delete(string(policies[i].UID))
c.addNetworkPolicyLocked(policies[i])
}

for key := range oldKeys {
c.deleteNetworkPolicyLocked(key)
}
return
}

// AddNetworkPolicy adds a new *v1beta1.NetworkPolicy to the cache.
// It could happen that an existing NetworkPolicy is "added" again when the
// watcher reconnects to the Apiserver, we use the same processing as
Expand All @@ -472,6 +558,10 @@ func (c *ruleCache) AddNetworkPolicy(policy *v1beta1.NetworkPolicy) error {
c.policyMapLock.Lock()
defer c.policyMapLock.Unlock()

return c.addNetworkPolicyLocked(policy)
}

func (c *ruleCache) addNetworkPolicyLocked(policy *v1beta1.NetworkPolicy) error {
c.policyMap[string(policy.UID)] = &types.NamespacedName{policy.Namespace, policy.Name}
return c.UpdateNetworkPolicy(policy)
}
Expand Down Expand Up @@ -512,8 +602,12 @@ func (c *ruleCache) DeleteNetworkPolicy(policy *v1beta1.NetworkPolicy) error {
c.policyMapLock.Lock()
defer c.policyMapLock.Unlock()

delete(c.policyMap, string(policy.UID))
existingRules, _ := c.rules.ByIndex(policyIndex, string(policy.UID))
return c.deleteNetworkPolicyLocked(string(policy.UID))
}

func (c *ruleCache) deleteNetworkPolicyLocked(uid string) error {
delete(c.policyMap, uid)
existingRules, _ := c.rules.ByIndex(policyIndex, uid)
for _, r := range existingRules {
ruleID := r.(*rule).ID
c.rules.Delete(r)
Expand Down
Loading

0 comments on commit 66df572

Please sign in to comment.