Skip to content

Commit

Permalink
Create Neg CRs when neg crd is enabled
Browse files Browse the repository at this point in the history
 - when EnableNegCrd flag is set, neg client is created
 - manager will create neg crs when creating syncers
 - manager will delete neg crs when deleting syncers
  • Loading branch information
swetharepakula committed Jul 1, 2020
1 parent 991b046 commit caa984c
Show file tree
Hide file tree
Showing 16 changed files with 1,050 additions and 36 deletions.
12 changes: 10 additions & 2 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned"
frontendconfigclient "k8s.io/ingress-gce/pkg/frontendconfig/client/clientset/versioned"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"

ingctx "k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller"
Expand Down Expand Up @@ -126,11 +127,18 @@ func main() {
}
}

var networkEndpointGroupClient svcnegclient.Interface
if flags.F.EnableNegCrd {
negCRDMeta := svcneg.CRDMeta()
if _, err := crdHandler.EnsureCRD(negCRDMeta); err != nil {
klog.Fatalf("Failed to ensure ServiceNetworkEndpointGroup CRD: %v", err)
}

networkEndpointGroupClient, err = svcnegclient.NewForConfig(kubeConfig)
if err != nil {
klog.Fatalf("Failed to create NetworkEndpointGroup client: %v", err)
}

}

namer, err := app.NewNamer(kubeClient, flags.F.ClusterName, firewalls.DefaultFirewallName)
Expand Down Expand Up @@ -160,7 +168,7 @@ func main() {
ASMConfigMapNamespace: flags.F.ASMConfigMapBasedConfigNamespace,
ASMConfigMapName: flags.F.ASMConfigMapBasedConfigCMName,
}
ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, cloud, namer, kubeSystemUID, ctxConfig)
ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, networkEndpointGroupClient, cloud, namer, kubeSystemUID, ctxConfig)
go app.RunHTTPServer(ctx.HealthCheck)

if !flags.F.LeaderElection.LeaderElect {
Expand Down Expand Up @@ -253,7 +261,7 @@ func runControllers(ctx *ingctx.ControllerContext) {
}

// TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController.
negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, zoneGetter, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller)
negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, zoneGetter, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller, flags.F.EnableNegCrd)

go negController.Run(stopCh)
klog.V(0).Infof("negController started")
Expand Down
4 changes: 4 additions & 0 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
frontendconfigclient "k8s.io/ingress-gce/pkg/frontendconfig/client/clientset/versioned"
informerfrontendconfig "k8s.io/ingress-gce/pkg/frontendconfig/client/informers/externalversions/frontendconfig/v1beta1"
"k8s.io/ingress-gce/pkg/metrics"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"
Expand Down Expand Up @@ -85,6 +86,7 @@ type ControllerContext struct {

// Map of namespace => record.EventRecorder.
recorders map[string]record.EventRecorder
NegClient svcnegclient.Interface
}

// ControllerContextConfig encapsulates some settings that are tunable via command line flags.
Expand All @@ -106,6 +108,7 @@ func NewControllerContext(
kubeClient kubernetes.Interface,
backendConfigClient backendconfigclient.Interface,
frontendConfigClient frontendconfigclient.Interface,
svcnegClient svcnegclient.Interface,
cloud *gce.Cloud,
namer *namer.Namer,
kubeSystemUID types.UID,
Expand All @@ -127,6 +130,7 @@ func NewControllerContext(
NodeInformer: informerv1.NewNodeInformer(kubeClient, config.ResyncPeriod, utils.NewNamespaceIndexer()),
recorders: map[string]record.EventRecorder{},
healthChecks: make(map[string]func() error),
NegClient: svcnegClient,
}

if config.FrontendConfigEnabled {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newLoadBalancerController() *LoadBalancerController {
DefaultBackendSvcPort: test.DefaultBeSvcPort,
HealthCheckPath: "/",
}
ctx := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig)
ctx := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, nil, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig)
lbc := NewLoadBalancerController(ctx, stopCh)
// TODO(rramkumar): Fix this so we don't have to override with our fake
lbc.instancePool = instances.NewNodePool(instances.NewFakeInstanceGroups(sets.NewString(), namer), namer)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/translator/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func fakeTranslator() *Translator {
DefaultBackendSvcPort: defaultBackend,
HealthCheckPath: "/",
}
ctx := context.NewControllerContext(nil, client, backendConfigClient, nil, nil, defaultNamer, "" /*kubeSystemUID*/, ctxConfig)
ctx := context.NewControllerContext(nil, client, backendConfigClient, nil, nil, nil, defaultNamer, "" /*kubeSystemUID*/, ctxConfig)
gce := &Translator{
ctx: ctx,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/firewalls/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newFirewallController() *FirewallController {
DefaultBackendSvcPort: test.DefaultBeSvcPort,
}

ctx := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, fakeGCE, defaultNamer, "" /*kubeSystemUID*/, ctxConfig)
ctx := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, nil, fakeGCE, defaultNamer, "" /*kubeSystemUID*/, ctxConfig)
fwc := NewFirewallController(ctx, []string{"30000-32767"})
fwc.hasSynced = func() bool { return true }

Expand Down
2 changes: 1 addition & 1 deletion pkg/l4/l4controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func newServiceController() *L4Controller {
Namespace: api_v1.NamespaceAll,
ResyncPeriod: 1 * time.Minute,
}
ctx := context.NewControllerContext(nil, kubeClient, nil, nil, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig)
ctx := context.NewControllerContext(nil, kubeClient, nil, nil, nil, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig)
return NewController(ctx, stopCh)
}

Expand Down
30 changes: 23 additions & 7 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -99,6 +100,9 @@ type Controller struct {

// runL4 indicates whether to run NEG controller that processes L4 ILB services
runL4 bool

// indicates whether neg crd have been enabled
enableNegCrd bool
}

// NewController returns a network endpoint group controller.
Expand All @@ -112,6 +116,7 @@ func NewController(
enableReadinessReflector bool,
runIngress bool,
runL4Controller bool,
enableNegCrd bool,
) *Controller {
// init event recorder
// TODO: move event recorder initializer to main. Reuse it among controllers.
Expand All @@ -123,7 +128,7 @@ func NewController(
recorder := eventBroadcaster.NewRecorder(scheme.Scheme,
apiv1.EventSource{Component: "neg-controller"})

manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), ctx.NodeInformer.GetIndexer())
manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.NegClient, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), ctx.NodeInformer.GetIndexer())
var reflector readiness.Reflector
if enableReadinessReflector {
reflector = readiness.NewReadinessReflector(ctx, manager)
Expand Down Expand Up @@ -151,6 +156,7 @@ func NewController(
reflector: reflector,
collector: ctx.ControllerMetrics,
runL4: runL4Controller,
enableNegCrd: enableNegCrd,
}

if runIngress {
Expand Down Expand Up @@ -376,8 +382,8 @@ func (c *Controller) processService(key string) error {
}
if !exists {
c.collector.DeleteNegService(key)
c.manager.StopSyncer(namespace, name)
return nil

return c.manager.StopSyncer(namespace, name)
}

service := obj.(*apiv1.Service)
Expand Down Expand Up @@ -431,9 +437,15 @@ func (c *Controller) processService(key string) error {
klog.V(4).Infof("Service %q does not need any NEG. Skipping", key)
c.collector.DeleteNegService(key)
// neg annotation is not found or NEG is not enabled
c.manager.StopSyncer(namespace, name)
var errList []error
if err = c.manager.StopSyncer(namespace, name); err != nil {
errList = append(errList, err)
}
// delete the annotation
return c.syncNegStatusAnnotation(namespace, name, make(negtypes.PortInfoMap))
if err = c.syncNegStatusAnnotation(namespace, name, make(negtypes.PortInfoMap)); err != nil {
errList = append(errList, err)
}
return utilerrors.NewAggregate(errList)
}

// mergeIngressPortInfo merges Ingress PortInfo into portInfoMap if the service has Enable Ingress annotation.
Expand Down Expand Up @@ -481,12 +493,16 @@ func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name ty
)
}

exposedNegSvcPort, _, err := negServicePorts(negAnnotation, knowSvcPortSet)
exposedNegSvcPort, customNames, err := negServicePorts(negAnnotation, knowSvcPortSet)
if err != nil {
return err
}

if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer /*readinessGate*/, true, nil)); err != nil {
if !c.enableNegCrd {
customNames = nil
}

if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer /*readinessGate*/, true, customNames)); err != nil {
return fmt.Errorf("failed to merge service ports exposed as standalone NEGs (%v) into ingress referenced service ports (%v): %v", exposedNegSvcPort, portInfoMap, err)
}
}
Expand Down
Loading

0 comments on commit caa984c

Please sign in to comment.