Skip to content

Commit

Permalink
Merge pull request #2023 from mmamczur/multi-networking-neg-controller
Browse files Browse the repository at this point in the history
Multi networking neg controller
  • Loading branch information
k8s-ci-robot authored May 16, 2023
2 parents a97609e + dbc0dcb commit 0dab8fd
Show file tree
Hide file tree
Showing 38 changed files with 2,241 additions and 91 deletions.
2 changes: 2 additions & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ func runControllers(ctx *ingctx.ControllerContext) {
ctx.NodeInformer,
ctx.EndpointSliceInformer,
ctx.SvcNegInformer,
ctx.NetworkInformer,
ctx.GKENetworkParamsInformer,
ctx.HasSynced,
ctx.ControllerMetrics,
ctx.L4Namer,
Expand Down
60 changes: 37 additions & 23 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type Controller struct {
hasSynced func() bool
ingressLister cache.Indexer
serviceLister cache.Indexer
networkLister cache.Indexer
gkeNetworkParamSetLister cache.Indexer
client kubernetes.Interface
defaultBackendService utils.ServicePort
enableASM bool
Expand Down Expand Up @@ -116,6 +118,8 @@ func NewController(
nodeInformer cache.SharedIndexInformer,
endpointSliceInformer cache.SharedIndexInformer,
svcNegInformer cache.SharedIndexInformer,
networkInformer cache.SharedIndexInformer,
gkeNetworkParamSetInformer cache.SharedIndexInformer,
hasSynced func() bool,
controllerMetrics *usageMetrics.ControllerMetrics,
l4Namer namer2.L4ResourcesNamer,
Expand Down Expand Up @@ -192,29 +196,39 @@ func NewController(
}
manager.reflector = reflector

var networkIndexer cache.Indexer
if networkInformer != nil {
networkIndexer = networkInformer.GetIndexer()
}
var gkeNetworkParamSetIndexer cache.Indexer
if gkeNetworkParamSetInformer != nil {
gkeNetworkParamSetIndexer = gkeNetworkParamSetInformer.GetIndexer()
}
negController := &Controller{
client: kubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
cloud: cloud,
namer: namer,
l4Namer: l4Namer,
defaultBackendService: defaultBackendService,
hasSynced: hasSynced,
ingressLister: ingressInformer.GetIndexer(),
serviceLister: serviceInformer.GetIndexer(),
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_service_queue"),
endpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_endpoint_queue"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_node_queue"),
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
usageCollector: controllerMetrics,
syncerMetrics: syncerMetrics,
runL4: runL4Controller,
logger: logger,
client: kubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
cloud: cloud,
namer: namer,
l4Namer: l4Namer,
defaultBackendService: defaultBackendService,
hasSynced: hasSynced,
ingressLister: ingressInformer.GetIndexer(),
serviceLister: serviceInformer.GetIndexer(),
networkLister: networkIndexer,
gkeNetworkParamSetLister: gkeNetworkParamSetIndexer,
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_service_queue"),
endpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_endpoint_queue"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_node_queue"),
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
usageCollector: controllerMetrics,
syncerMetrics: syncerMetrics,
runL4: runL4Controller,
logger: logger,
}
if runIngress {
ingressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -452,7 +466,7 @@ func (c *Controller) processService(key string) error {
}
negUsage := usageMetrics.NegServiceState{}
svcPortInfoMap := make(negtypes.PortInfoMap)
networkInfo, err := network.ServiceNetwork(service, c.cloud)
networkInfo, err := network.ServiceNetwork(service, c.networkLister, c.gkeNetworkParamSetLister, c.cloud, c.logger)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ var (
}

defaultNetwork = &network.NetworkInfo{
IsDefault: true,
K8sNetwork: "default",
}
)
Expand All @@ -129,6 +130,8 @@ func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, test
testContext.NodeInformer,
testContext.EndpointSliceInformer,
testContext.SvcNegInformer,
testContext.NetworkInformer,
testContext.GKENetworkParamSetInformer,
func() bool { return true },
metrics.FakeControllerMetrics(),
testContext.L4Namer,
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
manager.logger.WithValues("service", klog.KRef(syncerKey.Namespace, syncerKey.Name), "negName", syncerKey.NegName),
manager.enableDualStackNEG,
manager.syncerMetrics,
&portInfo.NetworkInfo,
)
syncer = negsyncer.NewTransactionSyncer(
syncerKey,
Expand Down
26 changes: 20 additions & 6 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/types"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog/v2"
)
Expand All @@ -45,15 +46,17 @@ type LocalL4ILBEndpointsCalculator struct {
subsetSizeLimit int
svcId string
logger klog.Logger
networkInfo *network.NetworkInfo
}

func NewLocalL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string, logger klog.Logger) *LocalL4ILBEndpointsCalculator {
func NewLocalL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string, logger klog.Logger, networkInfo *network.NetworkInfo) *LocalL4ILBEndpointsCalculator {
return &LocalL4ILBEndpointsCalculator{
nodeLister: nodeLister,
zoneGetter: zoneGetter,
subsetSizeLimit: maxSubsetSizeLocal,
svcId: svcId,
logger: logger.WithName("LocalL4ILBEndpointsCalculator"),
networkInfo: networkInfo,
}
}

Expand Down Expand Up @@ -93,6 +96,10 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
l.logger.Info("Dropping Node from subset since it is not a valid LB candidate", "nodeName", node.Name)
continue
}
if !l.networkInfo.IsNodeConnected(node) {
l.logger.Info("Node not connected to service network", "nodeName", node.Name, "network", l.networkInfo.K8sNetwork)
continue
}
zone, err := l.zoneGetter.GetZoneForNode(node.Name)
if err != nil {
l.logger.Error(err, "Unable to find zone for node, skipping", "nodeName", node.Name)
Expand All @@ -107,7 +114,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
}
// Compute the networkEndpoints, with total endpoints count <= l.subsetSizeLimit
klog.V(2).Infof("Got zoneNodeMap as input for service", "zoneNodeMap", nodeMapToString(zoneNodeMap), "serviceID", l.svcId)
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger)
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger, l.networkInfo)
return subsetMap, nil, 0, err
}

Expand Down Expand Up @@ -135,18 +142,21 @@ type ClusterL4ILBEndpointsCalculator struct {
// subsetSizeLimit is the max value of the subset size in this mode.
subsetSizeLimit int
// svcId is the unique identifier for the service, that is used as a salt when hashing nodenames.
svcId string
svcId string
networkInfo *network.NetworkInfo

logger klog.Logger
}

func NewClusterL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string, logger klog.Logger) *ClusterL4ILBEndpointsCalculator {
func NewClusterL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string, logger klog.Logger, networkInfo *network.NetworkInfo) *ClusterL4ILBEndpointsCalculator {
return &ClusterL4ILBEndpointsCalculator{
nodeLister: nodeLister,
zoneGetter: zoneGetter,
subsetSizeLimit: maxSubsetSizeDefault,
svcId: svcId,
logger: logger.WithName("ClusterL4ILBEndpointsCalculator")}
logger: logger.WithName("ClusterL4ILBEndpointsCalculator"),
networkInfo: networkInfo,
}
}

// Mode indicates the mode that the EndpointsCalculator is operating in.
Expand All @@ -161,6 +171,10 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints

zoneNodeMap := make(map[string][]*v1.Node)
for _, node := range nodes {
if !l.networkInfo.IsNodeConnected(node) {
l.logger.Info("Node not connected to service network", "nodeName", node.Name, "network", l.networkInfo.K8sNetwork)
continue
}
zone, err := l.zoneGetter.GetZoneForNode(node.Name)
if err != nil {
l.logger.Error(err, "Unable to find zone for node skipping", "nodeName", node.Name)
Expand All @@ -170,7 +184,7 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints
}
klog.V(2).Infof("Got zoneNodeMap as input for service", "zoneNodeMap", nodeMapToString(zoneNodeMap), "serviceID", l.svcId)
// Compute the networkEndpoints, with total endpoints <= l.subsetSizeLimit.
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger)
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger, l.networkInfo)
return subsetMap, nil, 0, err
}

Expand Down
Loading

0 comments on commit 0dab8fd

Please sign in to comment.