Skip to content

Commit

Permalink
Multi Network support in the NEG controller
Browse files Browse the repository at this point in the history
  • Loading branch information
mmamczur committed Apr 28, 2023
1 parent 136d422 commit 3416001
Show file tree
Hide file tree
Showing 36 changed files with 2,213 additions and 84 deletions.
2 changes: 2 additions & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ func runControllers(ctx *ingctx.ControllerContext) {
ctx.NodeInformer,
ctx.EndpointSliceInformer,
ctx.SvcNegInformer,
ctx.NetworkInformer,
ctx.GKENetworkParamsInformer,
ctx.HasSynced,
ctx.ControllerMetrics,
ctx.L4Namer,
Expand Down
60 changes: 37 additions & 23 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type Controller struct {
hasSynced func() bool
ingressLister cache.Indexer
serviceLister cache.Indexer
networkLister cache.Indexer
gkeNetworkParamSetLister cache.Indexer
client kubernetes.Interface
defaultBackendService utils.ServicePort
enableASM bool
Expand Down Expand Up @@ -116,6 +118,8 @@ func NewController(
nodeInformer cache.SharedIndexInformer,
endpointSliceInformer cache.SharedIndexInformer,
svcNegInformer cache.SharedIndexInformer,
networkInformer cache.SharedIndexInformer,
gkeNetworkParamSetInformer cache.SharedIndexInformer,
hasSynced func() bool,
controllerMetrics *usageMetrics.ControllerMetrics,
l4Namer namer2.L4ResourcesNamer,
Expand Down Expand Up @@ -191,29 +195,39 @@ func NewController(
}
manager.reflector = reflector

var networkIndexer cache.Indexer
if networkInformer != nil {
networkIndexer = networkInformer.GetIndexer()
}
var gkeNetworkParamSetIndexer cache.Indexer
if gkeNetworkParamSetInformer != nil {
gkeNetworkParamSetIndexer = gkeNetworkParamSetInformer.GetIndexer()
}
negController := &Controller{
client: kubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
cloud: cloud,
namer: namer,
l4Namer: l4Namer,
defaultBackendService: defaultBackendService,
hasSynced: hasSynced,
ingressLister: ingressInformer.GetIndexer(),
serviceLister: serviceInformer.GetIndexer(),
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_service_queue"),
endpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_endpoint_queue"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_node_queue"),
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
usageCollector: controllerMetrics,
syncerMetrics: syncerMetrics,
runL4: runL4Controller,
logger: logger,
client: kubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
cloud: cloud,
namer: namer,
l4Namer: l4Namer,
defaultBackendService: defaultBackendService,
hasSynced: hasSynced,
ingressLister: ingressInformer.GetIndexer(),
serviceLister: serviceInformer.GetIndexer(),
networkLister: networkIndexer,
gkeNetworkParamSetLister: gkeNetworkParamSetIndexer,
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_service_queue"),
endpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_endpoint_queue"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_node_queue"),
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
usageCollector: controllerMetrics,
syncerMetrics: syncerMetrics,
runL4: runL4Controller,
logger: logger,
}
if runIngress {
ingressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -451,7 +465,7 @@ func (c *Controller) processService(key string) error {
}
negUsage := usageMetrics.NegServiceState{}
svcPortInfoMap := make(negtypes.PortInfoMap)
networkInfo, err := network.ServiceNetwork(service, c.cloud)
networkInfo, err := network.ServiceNetwork(service, c.networkLister, c.gkeNetworkParamSetLister, c.cloud)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, test
testContext.NodeInformer,
testContext.EndpointSliceInformer,
testContext.SvcNegInformer,
testContext.NetworkInformer,
testContext.GKENetworkParamSetInformer,
func() bool { return true },
metrics.FakeControllerMetrics(),
testContext.L4Namer,
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
portInfo.EpCalculatorMode,
manager.logger.WithValues("service", klog.KRef(syncerKey.Namespace, syncerKey.Name), "negName", syncerKey.NegName),
manager.enableDualStackNEG,
&portInfo.NetworkInfo,
)
syncer = negsyncer.NewTransactionSyncer(
syncerKey,
Expand Down
26 changes: 20 additions & 6 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog/v2"
)
Expand All @@ -43,15 +44,17 @@ type LocalL4ILBEndpointsCalculator struct {
subsetSizeLimit int
svcId string
logger klog.Logger
networkInfo *network.NetworkInfo
}

func NewLocalL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string, logger klog.Logger) *LocalL4ILBEndpointsCalculator {
func NewLocalL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string, logger klog.Logger, networkInfo *network.NetworkInfo) *LocalL4ILBEndpointsCalculator {
return &LocalL4ILBEndpointsCalculator{
nodeLister: nodeLister,
zoneGetter: zoneGetter,
subsetSizeLimit: maxSubsetSizeLocal,
svcId: svcId,
logger: logger.WithName("LocalL4ILBEndpointsCalculator"),
networkInfo: networkInfo,
}
}

Expand Down Expand Up @@ -91,6 +94,10 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
l.logger.Info("Dropping Node from subset since it is not a valid LB candidate", "nodeName", node.Name)
continue
}
if !l.networkInfo.IsNodeConnected(node) {
l.logger.Info("Node not connected to service network", "nodeName", node.Name, "network", l.networkInfo.K8sNetwork)
continue
}
zone, err := l.zoneGetter.GetZoneForNode(node.Name)
if err != nil {
l.logger.Error(err, "Unable to find zone for node, skipping", "nodeName", node.Name)
Expand All @@ -105,7 +112,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
}
// Compute the networkEndpoints, with total endpoints count <= l.subsetSizeLimit
klog.V(2).Infof("Got zoneNodeMap as input for service", "zoneNodeMap", nodeMapToString(zoneNodeMap), "serviceID", l.svcId)
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger)
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger, l.networkInfo)
return subsetMap, nil, 0, err
}

Expand Down Expand Up @@ -133,18 +140,21 @@ type ClusterL4ILBEndpointsCalculator struct {
// subsetSizeLimit is the max value of the subset size in this mode.
subsetSizeLimit int
// svcId is the unique identifier for the service, that is used as a salt when hashing nodenames.
svcId string
svcId string
networkInfo *network.NetworkInfo

logger klog.Logger
}

func NewClusterL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string, logger klog.Logger) *ClusterL4ILBEndpointsCalculator {
func NewClusterL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string, logger klog.Logger, networkInfo *network.NetworkInfo) *ClusterL4ILBEndpointsCalculator {
return &ClusterL4ILBEndpointsCalculator{
nodeLister: nodeLister,
zoneGetter: zoneGetter,
subsetSizeLimit: maxSubsetSizeDefault,
svcId: svcId,
logger: logger.WithName("ClusterL4ILBEndpointsCalculator")}
logger: logger.WithName("ClusterL4ILBEndpointsCalculator"),
networkInfo: networkInfo,
}
}

// Mode indicates the mode that the EndpointsCalculator is operating in.
Expand All @@ -159,6 +169,10 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints

zoneNodeMap := make(map[string][]*v1.Node)
for _, node := range nodes {
if !l.networkInfo.IsNodeConnected(node) {
l.logger.Info("Node not connected to service network", "nodeName", node.Name, "network", l.networkInfo.K8sNetwork)
continue
}
zone, err := l.zoneGetter.GetZoneForNode(node.Name)
if err != nil {
l.logger.Error(err, "Unable to find zone for node skipping", "nodeName", node.Name)
Expand All @@ -168,7 +182,7 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints
}
klog.V(2).Infof("Got zoneNodeMap as input for service", "zoneNodeMap", nodeMapToString(zoneNodeMap), "serviceID", l.svcId)
// Compute the networkEndpoints, with total endpoints <= l.subsetSizeLimit.
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger)
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger, l.networkInfo)
return subsetMap, nil, 0, err
}

Expand Down
Loading

0 comments on commit 3416001

Please sign in to comment.