Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Harden NEG GC #459

Merged
merged 2 commits into from
Sep 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func runControllers(ctx *context.ControllerContext) {

if ctx.NEGEnabled {
// TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController.
negController := neg.NewController(neg.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod)
negController := neg.NewController(neg.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod)
go negController.Run(stopCh)
glog.V(0).Infof("negController started")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ var (
WatchNamespace string
NodePortRanges PortRanges
EnableBackendConfig bool
NegGCPeriod time.Duration

LeaderElection LeaderElectionConfiguration
}{}
Expand Down Expand Up @@ -215,6 +216,8 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
`This flag is deprecated. Use -v to control verbosity.`)
flag.Bool("use-real-cloud", false,
`This flag has been deprecated and no longer has any effect.`)
flag.DurationVar(&F.NegGCPeriod, "neg-gc-period", 120*time.Second,
`Relist and garbage collect NEGs this often.`)
}

type RateLimitSpecs struct {
Expand Down
12 changes: 10 additions & 2 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func init() {
type Controller struct {
manager negSyncerManager
resyncPeriod time.Duration
gcPeriod time.Duration
recorder record.EventRecorder
namer networkEndpointGroupNamer
zoneGetter zoneGetter
Expand All @@ -74,7 +75,8 @@ func NewController(
ctx *context.ControllerContext,
zoneGetter zoneGetter,
namer networkEndpointGroupNamer,
resyncPeriod time.Duration) *Controller {
resyncPeriod time.Duration,
gcPeriod time.Duration) *Controller {
// init event recorder
// TODO: move event recorder initializer to main. Reuse it among controllers.
eventBroadcaster := record.NewBroadcaster()
Expand All @@ -91,6 +93,7 @@ func NewController(
client: ctx.KubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
namer: namer,
Expand Down Expand Up @@ -167,7 +170,12 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}()

go wait.Until(c.serviceWorker, time.Second, stopCh)
go wait.Until(c.gc, c.resyncPeriod, stopCh)
go func() {
// Wait for gcPeriod to run the first GC
// This is to make sure that all services are fully processed before running GC.
time.Sleep(c.gcPeriod)
wait.Until(c.gc, c.gcPeriod, stopCh)
}()

<-stopCh
}
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func newTestController(kubeClient kubernetes.Interface) *Controller {
NewFakeZoneGetter(),
namer,
1*time.Second,
1*time.Second,
)
return controller
}
Expand Down
24 changes: 7 additions & 17 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,8 @@ func (manager *syncerManager) ShutDown() {
func (manager *syncerManager) GC() error {
glog.V(2).Infof("Start NEG garbage collection.")
defer glog.V(2).Infof("NEG garbage collection finished.")
for _, key := range manager.getAllStoppedSyncerKeys() {
manager.garbageCollectSyncer(key)
}
// Garbage collect Syncers
manager.garbageCollectSyncer()

// Garbage collect NEGs
if err := manager.garbageCollectNEG(); err != nil {
Expand All @@ -178,24 +177,15 @@ func (manager *syncerManager) GC() error {
return nil
}

func (manager *syncerManager) garbageCollectSyncer(key servicePort) {
// garbageCollectSyncer removes stopped syncer from syncerMap
func (manager *syncerManager) garbageCollectSyncer() {
manager.mu.Lock()
defer manager.mu.Unlock()
if manager.syncerMap[key].IsStopped() && !manager.syncerMap[key].IsShuttingDown() {
delete(manager.syncerMap, key)
}
}

func (manager *syncerManager) getAllStoppedSyncerKeys() []servicePort {
manager.mu.Lock()
defer manager.mu.Unlock()
ret := []servicePort{}
for key, syncer := range manager.syncerMap {
if syncer.IsStopped() {
ret = append(ret, key)
if syncer.IsStopped() && !syncer.IsShuttingDown() {
delete(manager.syncerMap, key)
}
}
return ret
}

func (manager *syncerManager) garbageCollectNEG() error {
Expand All @@ -219,7 +209,7 @@ func (manager *syncerManager) garbageCollectNEG() error {
manager.mu.Lock()
defer manager.mu.Unlock()
for key, ports := range manager.svcPortMap {
for sp, _ := range ports {
for sp := range ports {
name := manager.namer.NEG(key.namespace, key.name, sp)
negNames.Delete(name)
}
Expand Down