Skip to content

Commit

Permalink
[Multicast] Clone remoteMembers when updating a GroupMemberStatus (#4903
Browse files Browse the repository at this point in the history
)

When updating GroupMemberStatus, a clone of remoteMembers should assign to the
updated GroupMemberStatus. Simply setting it with original remoteMembers
will cause race conditions.

Fixes #4904

Signed-off-by: ceclinux <src655@gmail.com>
  • Loading branch information
ceclinux authored Apr 27, 2023
1 parent feb4122 commit a1e96e4
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 5 deletions.
7 changes: 2 additions & 5 deletions pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent
newStatus := &GroupMemberStatus{
group: status.group,
localMembers: make(map[string]time.Time),
remoteMembers: status.remoteMembers,
remoteMembers: status.remoteMembers.Union(nil),
lastIGMPReport: status.lastIGMPReport,
ofGroupID: status.ofGroupID,
}
Expand Down Expand Up @@ -559,10 +559,7 @@ func (c *Controller) syncGroup(groupKey string) error {
func (c *Controller) groupIsStale(status *GroupMemberStatus) bool {
membersCount := len(status.localMembers)
diff := time.Now().Sub(status.lastIGMPReport)
if membersCount == 0 || diff > c.mcastGroupTimeout {
return true
}
return false
return membersCount == 0 || diff > c.mcastGroupTimeout
}

func (c *Controller) groupHasInstalled(groupKey string) bool {
Expand Down
87 changes: 87 additions & 0 deletions pkg/agent/multicast/mcast_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package multicast
import (
"context"
"fmt"
"math/rand"
"net"
"os"
"sync"
Expand Down Expand Up @@ -1031,6 +1032,92 @@ func TestMemberChanged(t *testing.T) {
}
}

func TestConcurrentEventHandlerAndWorkers(t *testing.T) {
c := newMockMulticastController(t, true)
c.ifaceStore = interfacestore.NewInterfaceStore()
stopCh := make(chan struct{})
defer close(stopCh)
groupIP := net.ParseIP("224.3.4.5")
numEvents := 10
var wg sync.WaitGroup
wg.Add(4)

eventFunc := func(eType eventType, isLocal bool) {
leastSignificantByteArr := rand.Perm(numEvents)
ifaceNamePrefix := "local-interfaceName"
ifaceType := interfacestore.ContainerInterface

if !isLocal {
ifaceNamePrefix = "remote-interfaceName"
ifaceType = interfacestore.TunnelInterface
}
for i := 0; i < len(leastSignificantByteArr); i++ {
var srcNode net.IP
var containerCfg *interfacestore.ContainerInterfaceConfig
if !isLocal {
srcNode = net.ParseIP(fmt.Sprintf("10.20.30.%d", leastSignificantByteArr[i]+2))
} else {
containerCfg = &interfacestore.ContainerInterfaceConfig{
ContainerID: fmt.Sprintf("container-%d", i),
}
}
iface := &interfacestore.InterfaceConfig{
Type: ifaceType,
InterfaceName: fmt.Sprintf("%s-%d", ifaceNamePrefix, i),
OVSPortConfig: &interfacestore.OVSPortConfig{
OFPort: int32(i),
},
ContainerInterfaceConfig: containerCfg,
}
if eType == groupJoin {
c.ifaceStore.AddInterface(iface)
}
c.groupEventCh <- &mcastGroupEvent{
group: groupIP,
eType: eType,
time: time.Now(),
iface: iface,
srcNode: srcNode,
}
}
}
// Below func adds local group join events.
go func() {
defer wg.Done()
eventFunc(groupJoin, true)
}()
// Below func adds local group leave events.
go func() {
defer wg.Done()
eventFunc(groupLeave, true)
}()
// Below func adds remote group join events.
go func() {
defer wg.Done()
eventFunc(groupJoin, false)
}()
// Below func adds remote group leave events.
go func() {
defer wg.Done()
eventFunc(groupLeave, false)
}()

mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockOFClient.EXPECT().InstallMulticastFlows(groupIP, gomock.Any()).AnyTimes()
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any()).AnyTimes()
mockOFClient.EXPECT().UninstallMulticastFlows(groupIP).AnyTimes()
mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockOFClient.EXPECT().SendIGMPQueryPacketOut(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
go c.eventHandler(stopCh)
for i := 0; i < 2; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}
wg.Wait()
assert.Eventually(t, func() bool {
return len(c.groupEventCh) == 0 && c.queue.Len() == 0
}, time.Second, time.Millisecond*100)
}

func TestRemoteMemberJoinLeave(t *testing.T) {
mockController := newMockMulticastController(t, true)
_ = mockController.initialize(t)
Expand Down

0 comments on commit a1e96e4

Please sign in to comment.