Skip to content

Commit

Permalink
Remove the limit on the number of Endpoints in AntreaProxy (#4167)
Browse files Browse the repository at this point in the history
Fixes: #2092

Due to the message size of Openflow, the maximum number of Endpoints for
a Service in AntreaProxy is 800. Since Openflow 1.5 is used in Antrea,
the operation `insert_buckets` introduced in Openflow 1.5 can be used to
create a Service with more than 800 Endpoints. To sync Service with more
than 800 Endpoints to OVS, multiple Openflow messages will be sent to
OVS in a bundle (the first message uses `add` to sync the OVS group and
its corresponding buckets to OVS, other messages use `insert_buckets` to
sync other buckets to OVS).

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl authored Sep 27, 2022
1 parent 0f77529 commit e7486d5
Show file tree
Hide file tree
Showing 14 changed files with 221 additions and 150 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module antrea.io/antrea
go 1.19

require (
antrea.io/libOpenflow v0.8.0
antrea.io/ofnet v0.6.1
antrea.io/libOpenflow v0.8.1
antrea.io/ofnet v0.6.2
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Mellanox/sriovnet v1.1.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
antrea.io/libOpenflow v0.8.0 h1:Xm6mlSqdXtDD418nf1lndoDvMi8scqUan8pkEUZ2oas=
antrea.io/libOpenflow v0.8.0/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o=
antrea.io/ofnet v0.6.1 h1:w/FIagCrN7dKt2A2R9grlmcSyGrqlCu+uFYPthtfXeg=
antrea.io/ofnet v0.6.1/go.mod h1:qWqi11pI3kBYcS9SYWm92ZOiOPBx04Jx21cDmJlJhOg=
antrea.io/libOpenflow v0.8.1 h1:uxkXvhlPRXAVw26LW6Pt2jCSEh8NR56vQW70YGvy7aU=
antrea.io/libOpenflow v0.8.1/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o=
antrea.io/ofnet v0.6.2 h1:CCW2lOVBtEgpbsIT+DatHcYCfQ5+MDTHSSlQrc2Qelc=
antrea.io/ofnet v0.6.2/go.mod h1:/gjpTqhUpyn8uZnef+ytdCCAeY5oGG1jCr/szPUqVXU=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down
19 changes: 13 additions & 6 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,15 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff
defer c.replayMutex.RUnlock()

group := c.featureService.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Service Endpoints Group: %w", err)
_, installed := c.featureService.groupCache.Load(groupID)
if !installed {
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Service Endpoints Group %d: %w", groupID, err)
}
} else {
if err := group.Modify(); err != nil {
return fmt.Errorf("error when modifying Service Endpoints Group %d: %w", groupID, err)
}
}
c.featureService.groupCache.Store(groupID, group)
return nil
Expand All @@ -633,8 +640,8 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff
func (c *client) UninstallServiceGroup(groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
if !c.bridge.DeleteGroup(groupID) {
return fmt.Errorf("group %d delete failed", groupID)
if err := c.bridge.DeleteGroup(groupID); err != nil {
return fmt.Errorf("error when deleting Service Endpoints Group %d: %w", groupID, err)
}
c.featureService.groupCache.Delete(groupID)
return nil
Expand Down Expand Up @@ -1328,8 +1335,8 @@ func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceive
func (c *client) UninstallMulticastGroup(groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
if !c.bridge.DeleteGroup(groupID) {
return fmt.Errorf("group %d delete failed", groupID)
if err := c.bridge.DeleteGroup(groupID); err != nil {
return fmt.Errorf("error when deleting Multicast receiver Group %d: %w", groupID, err)
}
c.featureMulticast.groupCache.Delete(groupID)
return nil
Expand Down
26 changes: 13 additions & 13 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,12 +613,12 @@ func TestServiceGroupInstallAndUninstall(t *testing.T) {
groupID1 := binding.GroupIDType(1)
groupID2 := binding.GroupIDType(2)
for _, tc := range []struct {
groupID binding.GroupIDType
sessionAffinity bool
deleteSucceeded bool
groupID binding.GroupIDType
sessionAffinity bool
deleteGroupError error
}{
{groupID: groupID1, deleteSucceeded: false, sessionAffinity: true},
{groupID: groupID2, deleteSucceeded: true, sessionAffinity: false},
{groupID: groupID1, deleteGroupError: fmt.Errorf("failed to delete"), sessionAffinity: true},
{groupID: groupID2, deleteGroupError: nil, sessionAffinity: false},
} {
mockGroup := ovsoftest.NewMockGroup(ctrl)
mockBucketBuilder := ovsoftest.NewMockBucketBuilder(ctrl)
Expand All @@ -638,10 +638,10 @@ func TestServiceGroupInstallAndUninstall(t *testing.T) {
require.NoError(t, err)
_, exists := client.featureService.groupCache.Load(tc.groupID)
assert.True(t, exists)
mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteSucceeded)
mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteGroupError)
err = client.UninstallServiceGroup(tc.groupID)
_, exists = client.featureService.groupCache.Load(tc.groupID)
if tc.deleteSucceeded {
if tc.deleteGroupError == nil {
assert.NoError(t, err)
assert.False(t, exists)
} else {
Expand Down Expand Up @@ -675,11 +675,11 @@ func TestMulticastGroupInstallAndUninstall(t *testing.T) {
groupID1 := binding.GroupIDType(1)
groupID2 := binding.GroupIDType(2)
for _, tc := range []struct {
groupID binding.GroupIDType
deleteSucceeded bool
groupID binding.GroupIDType
deleteGroupError error
}{
{groupID: groupID1, deleteSucceeded: false},
{groupID: groupID2, deleteSucceeded: true},
{groupID: groupID1, deleteGroupError: fmt.Errorf("failed to delete")},
{groupID: groupID2, deleteGroupError: nil},
} {
mockGroup := ovsoftest.NewMockGroup(ctrl)
mockBucketBuilder := ovsoftest.NewMockBucketBuilder(ctrl)
Expand All @@ -696,9 +696,9 @@ func TestMulticastGroupInstallAndUninstall(t *testing.T) {
require.NoError(t, err)
_, exists := client.featureMulticast.groupCache.Load(tc.groupID)
assert.True(t, exists)
mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteSucceeded)
mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteGroupError)
err = client.UninstallMulticastGroup(tc.groupID)
if tc.deleteSucceeded {
if tc.deleteGroupError == nil {
assert.NoError(t, err)
_, exists = client.featureMulticast.groupCache.Load(tc.groupID)
assert.False(t, exists)
Expand Down
20 changes: 15 additions & 5 deletions pkg/agent/openflow/multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,16 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType,
ResubmitToTable(MulticastOutputTable.GetID()).
Done()
}
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Multicast receiver Group: %w", err)

_, installed := f.groupCache.Load(groupID)
if !installed {
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Multicast receiver Group %d: %w", groupID, err)
}
} else {
if err := group.Modify(); err != nil {
return fmt.Errorf("error when modifying Multicast receiver Group %d: %w", groupID, err)
}
}
f.groupCache.Store(groupID, group)
return nil
Expand Down Expand Up @@ -176,14 +184,16 @@ func (f *featureMulticast) multicastPodMetricFlows(podIP net.IP, podOFPort uint3
}

func (f *featureMulticast) replayGroups() {
var groups []binding.OFEntry
f.groupCache.Range(func(id, value interface{}) bool {
group := value.(binding.Group)
group.Reset()
if err := group.Add(); err != nil {
klog.ErrorS(err, "Error when replaying cached group", "group", id)
}
groups = append(groups, group)
return true
})
if err := f.bridge.AddOFEntriesInBundle(groups, nil, nil); err != nil {
klog.ErrorS(err, "error when replaying cached groups for Multicast")
}
}

func (f *featureMulticast) multicastRemoteReportFlows(groupID binding.GroupIDType, firstMulticastTable binding.Table) []binding.Flow {
Expand Down
8 changes: 5 additions & 3 deletions pkg/agent/openflow/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,14 @@ func (f *featureService) replayFlows() []binding.Flow {
}

func (f *featureService) replayGroups() {
var groups []binding.OFEntry
f.groupCache.Range(func(id, value interface{}) bool {
group := value.(binding.Group)
group.Reset()
if err := group.Add(); err != nil {
klog.Errorf("Error when replaying cached group %d: %v", id, err)
}
groups = append(groups, group)
return true
})
if err := f.bridge.AddOFEntriesInBundle(groups, nil, nil); err != nil {
klog.ErrorS(err, "error when replaying cached groups for Service")
}
}
80 changes: 10 additions & 70 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"math"
"net"
"reflect"
"sort"
"strings"
"sync"
"time"
Expand All @@ -29,7 +28,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
k8sapitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
Expand All @@ -50,10 +48,6 @@ import (
const (
resyncPeriod = time.Minute
componentName = "antrea-agent-proxy"
// Due to the maximum message size in Openflow 1.3 and the implementation of Services in Antrea, the maximum number
// of Endpoints that Antrea can support at the moment is 800. If the number of Endpoints for a given Service exceeds
// 800, extra Endpoints will be dropped.
maxEndpoints = 800
// SessionAffinity timeout is implemented using a hard_timeout in OVS. hard_timeout is
// represented by a uint16 in the OpenFlow protocol,
maxSupportedAffinityTimeout = math.MaxUint16
Expand Down Expand Up @@ -107,8 +101,6 @@ type proxier struct {
serviceStringMap map[string]k8sproxy.ServicePortName
// serviceStringMapMutex protects serviceStringMap object.
serviceStringMapMutex sync.Mutex
// oversizeServiceSet records the Services that have more than 800 Endpoints.
oversizeServiceSet sets.String

serviceHealthServer healthcheck.ServiceHealthServer
numLocalEndpoints map[apimachinerytypes.NamespacedName]int
Expand Down Expand Up @@ -156,9 +148,6 @@ func (p *proxier) removeStaleServices() {
}
svcInfo := svcPort.(*types.ServiceInfo)
klog.V(2).Infof("Removing stale Service: %s %s", svcPortName.Name, svcInfo.String())
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil {
klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName)
continue
Expand Down Expand Up @@ -420,66 +409,18 @@ func (p *proxier) installServices() {
}

var allEndpointUpdateList, localEndpointUpdateList []k8sproxy.Endpoint
if len(endpoints) > maxEndpoints {
if !p.oversizeServiceSet.Has(svcPortName.String()) {
klog.Warningf("Since Endpoints of Service %s exceeds %d, extra Endpoints will be dropped", svcPortName.String(), maxEndpoints)
p.oversizeServiceSet.Insert(svcPortName.String())
}
// If the length of endpoints > maxEndpoints, endpoints should be cut. However, since endpoints is a map, iterate
// the map and append every Endpoint to a target slice. When the Endpoint is local, append Endpoint to slice
// localEndpointList, otherwise append the Endpoint to slice remoteEndpointList. Since the iteration order
// of map in Golang is not guaranteed, if split the Endpoints directly without any sorting, some Endpoints
// may not be installed, so split the endpointList after sorting.
var remoteEndpointList, localEndpointList []k8sproxy.Endpoint
for _, endpoint := range endpoints {
if endpoint.GetIsLocal() {
localEndpointList = append(localEndpointList, endpoint)
} else {
remoteEndpointList = append(remoteEndpointList, endpoint)
// Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy
// are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy
// is Cluster, all Endpoints should be installed and checked.
for _, endpoint := range endpoints {
if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal {
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
}
}

sort.Sort(byEndpoint(remoteEndpointList))
sort.Sort(byEndpoint(localEndpointList))

if len(localEndpointList) > maxEndpoints {
// When the number of local Endpoints is greater than maxEndpoints, choose maxEndpoints Endpoints from
// localEndpointList to install.
allEndpointUpdateList = localEndpointList[:maxEndpoints]
} else {
// When the number of local Endpoints is smaller than maxEndpoints, choose all Endpoints of localEndpointList
// and part of remoteEndpointList to install.
localEndpointUpdateList = localEndpointList
allEndpointUpdateList = append(localEndpointList, remoteEndpointList[:maxEndpoints-len(localEndpointList)]...)
}
// Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy
// are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy
// is Cluster, all Endpoints should be installed and checked.
for _, endpoint := range allEndpointUpdateList {
if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal {
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
break
}
}
}
} else {
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
// Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy
// are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy
// is Cluster, all Endpoints should be installed and checked.
for _, endpoint := range endpoints {
if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal {
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
}
}
allEndpointUpdateList = append(allEndpointUpdateList, endpoint)
if endpoint.GetIsLocal() {
localEndpointUpdateList = append(localEndpointUpdateList, endpoint)
}
allEndpointUpdateList = append(allEndpointUpdateList, endpoint)
if endpoint.GetIsLocal() {
localEndpointUpdateList = append(localEndpointUpdateList, endpoint)
}
}

Expand Down Expand Up @@ -1011,7 +952,6 @@ func NewProxier(
endpointReferenceCounter: map[string]int{},
nodeLabels: map[string]string{},
serviceStringMap: map[string]k8sproxy.ServicePortName{},
oversizeServiceSet: sets.NewString(),
groupCounter: groupCounter,
ofClient: ofClient,
routeClient: routeClient,
Expand Down
8 changes: 4 additions & 4 deletions pkg/ovs/openflow/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Bridge interface {
DeleteTable(id uint8) bool
CreateGroupTypeAll(id GroupIDType) Group
CreateGroup(id GroupIDType) Group
DeleteGroup(id GroupIDType) bool
DeleteGroup(id GroupIDType) error
CreateMeter(id MeterIDType, flags ofctrl.MeterFlag) Meter
DeleteMeter(id MeterIDType) bool
DeleteMeterAll() error
Expand Down Expand Up @@ -183,9 +183,9 @@ type OFEntry interface {
// Modify / Delete methods can be called on this object. This method
// should be called if a reconnection event happened.
Reset()
// GetBundleMessage returns ofctrl.OpenFlowModMessage which can be used in Bundle messages. operation specifies what
// operation is expected to be taken on the OFEntry.
GetBundleMessage(operation OFOperation) (ofctrl.OpenFlowModMessage, error)
// GetBundleMessages returns a slice of ofctrl.OpenFlowModMessage which can be used in Bundle messages. operation
// specifies what operation is expected to be taken on the OFEntry.
GetBundleMessages(operation OFOperation) ([]ofctrl.OpenFlowModMessage, error)
}

type Flow interface {
Expand Down
Loading

0 comments on commit e7486d5

Please sign in to comment.