Skip to content

Commit

Permalink
Multi Network support in the NEG controller
Browse files Browse the repository at this point in the history
  • Loading branch information
mmamczur committed Apr 25, 2023
1 parent 8c7d784 commit 4b5325d
Show file tree
Hide file tree
Showing 36 changed files with 2,092 additions and 70 deletions.
2 changes: 2 additions & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ func runControllers(ctx *ingctx.ControllerContext) {
ctx.NodeInformer,
ctx.EndpointSliceInformer,
ctx.SvcNegInformer,
ctx.NetworkInformer,
ctx.GKENetworkParamsInformer,
ctx.HasSynced,
ctx.ControllerMetrics,
ctx.L4Namer,
Expand Down
60 changes: 37 additions & 23 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type Controller struct {
hasSynced func() bool
ingressLister cache.Indexer
serviceLister cache.Indexer
networkLister cache.Indexer
gkeNetworkParamSetLister cache.Indexer
client kubernetes.Interface
defaultBackendService utils.ServicePort
enableASM bool
Expand Down Expand Up @@ -116,6 +118,8 @@ func NewController(
nodeInformer cache.SharedIndexInformer,
endpointSliceInformer cache.SharedIndexInformer,
svcNegInformer cache.SharedIndexInformer,
networkInformer cache.SharedIndexInformer,
gkeNetworkParamSetInformer cache.SharedIndexInformer,
hasSynced func() bool,
controllerMetrics *usageMetrics.ControllerMetrics,
l4Namer namer2.L4ResourcesNamer,
Expand Down Expand Up @@ -191,29 +195,39 @@ func NewController(
}
manager.reflector = reflector

var networkIndexer cache.Indexer
if networkInformer != nil {
networkIndexer = networkInformer.GetIndexer()
}
var gkeNetworkParamSetIndexer cache.Indexer
if gkeNetworkParamSetInformer != nil {
gkeNetworkParamSetIndexer = gkeNetworkParamSetInformer.GetIndexer()
}
negController := &Controller{
client: kubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
cloud: cloud,
namer: namer,
l4Namer: l4Namer,
defaultBackendService: defaultBackendService,
hasSynced: hasSynced,
ingressLister: ingressInformer.GetIndexer(),
serviceLister: serviceInformer.GetIndexer(),
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_service_queue"),
endpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_endpoint_queue"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_node_queue"),
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
usageCollector: controllerMetrics,
syncerMetrics: syncerMetrics,
runL4: runL4Controller,
logger: logger,
client: kubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
cloud: cloud,
namer: namer,
l4Namer: l4Namer,
defaultBackendService: defaultBackendService,
hasSynced: hasSynced,
ingressLister: ingressInformer.GetIndexer(),
serviceLister: serviceInformer.GetIndexer(),
networkLister: networkIndexer,
gkeNetworkParamSetLister: gkeNetworkParamSetIndexer,
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_service_queue"),
endpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_endpoint_queue"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_node_queue"),
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
usageCollector: controllerMetrics,
syncerMetrics: syncerMetrics,
runL4: runL4Controller,
logger: logger,
}
if runIngress {
ingressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -447,7 +461,7 @@ func (c *Controller) processService(key string) error {
}
negUsage := usageMetrics.NegServiceState{}
svcPortInfoMap := make(negtypes.PortInfoMap)
networkInfo, err := network.ServiceNetwork(service, c.cloud)
networkInfo, err := network.ServiceNetwork(service, c.networkLister, c.gkeNetworkParamSetLister, c.cloud)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, test
testContext.NodeInformer,
testContext.EndpointSliceInformer,
testContext.SvcNegInformer,
testContext.NetworkInformer,
testContext.GKENetworkParamSetInformer,
func() bool { return true },
metrics.FakeControllerMetrics(),
testContext.L4Namer,
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
portInfo.EpCalculatorMode,
manager.logger.WithValues("service", klog.KRef(syncerKey.Namespace, syncerKey.Name), "negName", syncerKey.NegName),
manager.enableDualStackNEG,
&portInfo.NetworkInfo,
)
syncer = negsyncer.NewTransactionSyncer(
syncerKey,
Expand Down
26 changes: 20 additions & 6 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog/v2"
)
Expand All @@ -43,15 +44,17 @@ type LocalL4ILBEndpointsCalculator struct {
subsetSizeLimit int
svcId string
logger klog.Logger
networkInfo *network.NetworkInfo
}

func NewLocalL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string, logger klog.Logger) *LocalL4ILBEndpointsCalculator {
func NewLocalL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string, logger klog.Logger, networkInfo *network.NetworkInfo) *LocalL4ILBEndpointsCalculator {
return &LocalL4ILBEndpointsCalculator{
nodeLister: nodeLister,
zoneGetter: zoneGetter,
subsetSizeLimit: maxSubsetSizeLocal,
svcId: svcId,
logger: logger.WithName("LocalL4ILBEndpointsCalculator"),
networkInfo: networkInfo,
}
}

Expand Down Expand Up @@ -91,6 +94,10 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
l.logger.Info("Dropping Node from subset since it is not a valid LB candidate", "nodeName", node.Name)
continue
}
if !l.networkInfo.NodeConnected(node) {
l.logger.Info("Node not connected to service network", "nodeName", node.Name, "network", l.networkInfo.K8sNetwork)
continue
}
zone, err := l.zoneGetter.GetZoneForNode(node.Name)
if err != nil {
l.logger.Error(err, "Unable to find zone for node, skipping", "nodeName", node.Name)
Expand All @@ -105,7 +112,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
}
// Compute the networkEndpoints, with total endpoints count <= l.subsetSizeLimit
klog.V(2).Infof("Got zoneNodeMap as input for service", "zoneNodeMap", nodeMapToString(zoneNodeMap), "serviceID", l.svcId)
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger)
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger, l.networkInfo)
return subsetMap, nil, 0, err
}

Expand Down Expand Up @@ -133,18 +140,21 @@ type ClusterL4ILBEndpointsCalculator struct {
// subsetSizeLimit is the max value of the subset size in this mode.
subsetSizeLimit int
// svcId is the unique identifier for the service, that is used as a salt when hashing nodenames.
svcId string
svcId string
networkInfo *network.NetworkInfo

logger klog.Logger
}

func NewClusterL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string, logger klog.Logger) *ClusterL4ILBEndpointsCalculator {
func NewClusterL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string, logger klog.Logger, networkInfo *network.NetworkInfo) *ClusterL4ILBEndpointsCalculator {
return &ClusterL4ILBEndpointsCalculator{
nodeLister: nodeLister,
zoneGetter: zoneGetter,
subsetSizeLimit: maxSubsetSizeDefault,
svcId: svcId,
logger: logger.WithName("ClusterL4ILBEndpointsCalculator")}
logger: logger.WithName("ClusterL4ILBEndpointsCalculator"),
networkInfo: networkInfo,
}
}

// Mode indicates the mode that the EndpointsCalculator is operating in.
Expand All @@ -159,6 +169,10 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints

zoneNodeMap := make(map[string][]*v1.Node)
for _, node := range nodes {
if !l.networkInfo.NodeConnected(node) {
l.logger.Info("Node not connected to service network", "nodeName", node.Name, "network", l.networkInfo.K8sNetwork)
continue
}
zone, err := l.zoneGetter.GetZoneForNode(node.Name)
if err != nil {
l.logger.Error(err, "Unable to find zone for node skipping", "nodeName", node.Name)
Expand All @@ -168,7 +182,7 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints
}
klog.V(2).Infof("Got zoneNodeMap as input for service", "zoneNodeMap", nodeMapToString(zoneNodeMap), "serviceID", l.svcId)
// Compute the networkEndpoints, with total endpoints <= l.subsetSizeLimit.
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger)
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger, l.networkInfo)
return subsetMap, nil, 0, err
}

Expand Down
71 changes: 64 additions & 7 deletions pkg/neg/syncers/endpoints_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"k8s.io/apimachinery/pkg/types"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
networkv1 "k8s.io/cloud-provider-gcp/crd/apis/network/v1"
"k8s.io/cloud-provider-gcp/providers/gce"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog/v2"
)
Expand All @@ -48,8 +50,10 @@ func TestLocalGetEndpointSet(t *testing.T) {
endpointSets map[string]negtypes.NetworkEndpointSet
networkEndpointType negtypes.NetworkEndpointType
nodeLabelsMap map[string]map[string]string
nodeAnnotationsMap map[string]map[string]string
nodeReadyStatusMap map[string]v1.ConditionStatus
nodeNames []string
network network.NetworkInfo
}{
{
desc: "default endpoints",
Expand Down Expand Up @@ -101,11 +105,28 @@ func TestLocalGetEndpointSet(t *testing.T) {
networkEndpointType: negtypes.VmIpEndpointType,
nodeNames: []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6},
},
{
desc: "multinetwork endpoints",
endpointsData: negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()),
// only 4 out of 6 nodes are picked since there are > 4 endpoints, but they are found only on 4 nodes.
endpointSets: map[string]negtypes.NetworkEndpointSet{
negtypes.TestZone1: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "20.2.3.1", Node: testInstance1}, negtypes.NetworkEndpoint{IP: "20.2.3.2", Node: testInstance2}),
negtypes.TestZone2: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "20.2.3.3", Node: testInstance3}),
},
network: network.NetworkInfo{IsNonDefault: true, K8sNetwork: "other"},
nodeAnnotationsMap: map[string]map[string]string{
testInstance1: {networkv1.NorthInterfacesAnnotationKey: nodeInterfacesAnnotation(t, "other", "20.2.3.1")},
testInstance2: {networkv1.NorthInterfacesAnnotationKey: nodeInterfacesAnnotation(t, "other", "20.2.3.2")},
testInstance3: {networkv1.NorthInterfacesAnnotationKey: nodeInterfacesAnnotation(t, "other", "20.2.3.3")},
},
//networkEndpointType: negtypes.VmIpEndpointType,
nodeNames: []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6},
},
}
svcKey := fmt.Sprintf("%s/%s", testServiceName, testServiceNamespace)
ec := NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO())
for _, tc := range testCases {
createNodes(t, tc.nodeNames, tc.nodeLabelsMap, tc.nodeReadyStatusMap, transactionSyncer.nodeLister)
ec := NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO(), &tc.network)
createNodes(t, tc.nodeNames, tc.nodeLabelsMap, tc.nodeAnnotationsMap, tc.nodeReadyStatusMap, transactionSyncer.nodeLister)
retSet, _, _, err := ec.CalculateEndpoints(tc.endpointsData, nil)
if err != nil {
t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err)
Expand All @@ -117,6 +138,20 @@ func TestLocalGetEndpointSet(t *testing.T) {
}
}

func nodeInterfacesAnnotation(t *testing.T, network, ip string) string {
annotation, err := networkv1.MarshalNorthInterfacesAnnotation(networkv1.NorthInterfacesAnnotation{
{
Network: network,
IpAddress: ip,
},
})
if err != nil {
t.Errorf("could not create node annotations")
return ""
}
return annotation
}

// TestClusterGetEndpointSet verifies the GetEndpointSet method implemented by the ClusterL4ILBEndpointsCalculator.
func TestClusterGetEndpointSet(t *testing.T) {
t.Parallel()
Expand All @@ -134,6 +169,7 @@ func TestClusterGetEndpointSet(t *testing.T) {
nodeAnnotationsMap map[string]map[string]string
nodeReadyStatusMap map[string]v1.ConditionStatus
nodeNames []string
network network.NetworkInfo
}{
{
desc: "default endpoints",
Expand Down Expand Up @@ -193,11 +229,29 @@ func TestClusterGetEndpointSet(t *testing.T) {
networkEndpointType: negtypes.VmIpEndpointType,
nodeNames: []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6},
},
{
desc: "multinetwork endpoints",
endpointsData: negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()),
// all nodes are picked since, in this mode, endpoints running do not need to run on the selected node.
endpointSets: map[string]negtypes.NetworkEndpointSet{
negtypes.TestZone1: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "20.2.3.1", Node: testInstance1}, negtypes.NetworkEndpoint{IP: "20.2.3.2", Node: testInstance2}),
negtypes.TestZone2: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "20.2.3.3", Node: testInstance3}, negtypes.NetworkEndpoint{IP: "20.2.3.6", Node: testInstance6}),
},
network: network.NetworkInfo{IsNonDefault: true, K8sNetwork: "other"},
nodeAnnotationsMap: map[string]map[string]string{
testInstance1: {networkv1.NorthInterfacesAnnotationKey: nodeInterfacesAnnotation(t, "other", "20.2.3.1")},
testInstance2: {networkv1.NorthInterfacesAnnotationKey: nodeInterfacesAnnotation(t, "other", "20.2.3.2")},
testInstance3: {networkv1.NorthInterfacesAnnotationKey: nodeInterfacesAnnotation(t, "other", "20.2.3.3")},
testInstance6: {networkv1.NorthInterfacesAnnotationKey: nodeInterfacesAnnotation(t, "other", "20.2.3.6")},
},
networkEndpointType: negtypes.VmIpEndpointType,
nodeNames: []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6},
},
}
svcKey := fmt.Sprintf("%s/%s", testServiceName, testServiceNamespace)
ec := NewClusterL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO())
for _, tc := range testCases {
createNodes(t, tc.nodeNames, tc.nodeLabelsMap, tc.nodeReadyStatusMap, transactionSyncer.nodeLister)
ec := NewClusterL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO(), &tc.network)
createNodes(t, tc.nodeNames, tc.nodeLabelsMap, tc.nodeAnnotationsMap, tc.nodeReadyStatusMap, transactionSyncer.nodeLister)
retSet, _, _, err := ec.CalculateEndpoints(tc.endpointsData, nil)
if err != nil {
t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err)
Expand Down Expand Up @@ -234,8 +288,8 @@ func TestValidateEndpoints(t *testing.T) {
podLister := testContext.PodInformer.GetIndexer()
nodeLister := testContext.NodeInformer.GetIndexer()
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO(), &network.NetworkInfo{})
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO(), &network.NetworkInfo{})

testEndpointPodMap := map[negtypes.NetworkEndpoint]types.NamespacedName{
{
Expand Down Expand Up @@ -816,14 +870,17 @@ func TestValidateEndpoints(t *testing.T) {
}
}

func createNodes(t *testing.T, nodeNames []string, nodeLabels map[string]map[string]string, nodeReadyStatus map[string]v1.ConditionStatus, nodeIndexer cache.Indexer) {
func createNodes(t *testing.T, nodeNames []string, nodeLabels map[string]map[string]string, nodeAnnotations map[string]map[string]string, nodeReadyStatus map[string]v1.ConditionStatus, nodeIndexer cache.Indexer) {
t.Helper()
for i, nodeName := range nodeNames {
var labels, annotations map[string]string
readyStatus := v1.ConditionTrue
if nodeLabels != nil {
labels = nodeLabels[nodeName]
}
if nodeAnnotations != nil {
annotations = nodeAnnotations[nodeName]
}
if nodeReadyStatus != nil {
status, ok := nodeReadyStatus[nodeName]
if ok {
Expand Down
11 changes: 9 additions & 2 deletions pkg/neg/syncers/subsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

v1 "k8s.io/api/core/v1"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -158,7 +159,7 @@ func sortZones(nodesPerZone map[string][]*v1.Node) []ZoneInfo {
// Since the number of nodes will keep increasing in successive zones due to the sorting, even if fewer nodes were
// present in some zones, more nodes will be picked from other nodes, taking the total subset size to the given limit
// whenever possible.
func getSubsetPerZone(nodesPerZone map[string][]*v1.Node, totalLimit int, svcID string, currentMap map[string]negtypes.NetworkEndpointSet, logger klog.Logger) (map[string]negtypes.NetworkEndpointSet, error) {
func getSubsetPerZone(nodesPerZone map[string][]*v1.Node, totalLimit int, svcID string, currentMap map[string]negtypes.NetworkEndpointSet, logger klog.Logger, networkInfo *network.NetworkInfo) (map[string]negtypes.NetworkEndpointSet, error) {
result := make(map[string]negtypes.NetworkEndpointSet)
var currentList []negtypes.NetworkEndpoint

Expand All @@ -182,7 +183,13 @@ func getSubsetPerZone(nodesPerZone map[string][]*v1.Node, totalLimit int, svcID
}
subset := pickSubsetsMinRemovals(nodesPerZone[zone.Name], svcID, subsetSize, currentList)
for _, node := range subset {
result[zone.Name].Insert(negtypes.NetworkEndpoint{Node: node.Name, IP: utils.GetNodePrimaryIP(node)})
var ip string
if networkInfo.IsNonDefault {
ip = network.NodeIPForNetwork(node, networkInfo.K8sNetwork)
} else {
ip = utils.GetNodePrimaryIP(node)
}
result[zone.Name].Insert(negtypes.NetworkEndpoint{Node: node.Name, IP: ip})
}
totalLimit -= len(subset)
zonesRemaining--
Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/syncers/subsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/klog/v2"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -189,7 +190,7 @@ func TestUnevenNodesInZones(t *testing.T) {
},
}
for _, tc := range testCases {
subsetMap, err := getSubsetPerZone(tc.nodesMap, tc.subsetLimit, tc.svcKey, nil, klog.TODO())
subsetMap, err := getSubsetPerZone(tc.nodesMap, tc.subsetLimit, tc.svcKey, nil, klog.TODO(), &network.NetworkInfo{})
if err != nil {
t.Errorf("Failed to get subset for test '%s', err %v", tc.description, err)
}
Expand Down
Loading

0 comments on commit 4b5325d

Please sign in to comment.