Skip to content

Commit

Permalink
Clean controllers shutdown
Browse files Browse the repository at this point in the history
We didn't wait for the controllers shutdown before the program exited,
let's handle this properly. Also, fix a few comments and log healthcheck
start and stop.
  • Loading branch information
bpineau committed Apr 4, 2018
1 parent 2f84cb2 commit ab4a567
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 23 deletions.
33 changes: 25 additions & 8 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Event struct {

// Controller is a generic kubernetes controller
type Controller struct {
stopCh chan struct{}
doneCh chan struct{}
evchan chan Event
name string
config *config.KdnConfig
Expand Down Expand Up @@ -77,27 +79,42 @@ func NewController(lw cache.ListerWatcher, evchan chan Event, name string, confi
},
})

return &Controller{evchan, name, config, queue, informer}
return &Controller{
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
evchan: evchan,
name: name,
config: config,
queue: queue,
informer: informer,
}
}

// Run starts the controller in the foreground
func (c *Controller) Run(stopCh <-chan struct{}) {
// Start launchs the controller in the background
func (c *Controller) Start() {
c.config.Logger.Infof("Starting %s controller", c.name)
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

go c.informer.Run(stopCh)
go c.informer.Run(c.stopCh)

if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
if !cache.WaitForCacheSync(c.stopCh, c.informer.HasSynced) {
utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}

wait.Until(c.runWorker, time.Second, stopCh)
// XXX needs a sync.wg to wait for that
go wait.Until(c.runWorker, time.Second, c.stopCh)
}

// Stop halts the controller
func (c *Controller) Stop() {
close(c.stopCh)
c.queue.ShutDown()
<-c.doneCh
c.config.Logger.Infof("Stopping %s controller", c.name)
}

func (c *Controller) runWorker() {
defer close(c.doneCh)
for c.processNextItem() {
// continue looping
}
Expand Down
18 changes: 8 additions & 10 deletions pkg/controller/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Observer struct {
evch chan Event
disc *discovery.DiscoveryClient
cpool dynamic.ClientPool
ctrls map[string]chan<- struct{}
ctrls map[string]*Controller
config *config.KdnConfig
}

Expand All @@ -46,7 +46,7 @@ func NewObserver(config *config.KdnConfig, evch chan Event) *Observer {
evch: evch,
disc: discovery.NewDiscoveryClientForConfigOrDie(config.Client),
cpool: dynamic.NewDynamicClientPool(config.Client),
ctrls: make(map[string]chan<- struct{}),
ctrls: make(map[string]*Controller),
}
}

Expand Down Expand Up @@ -85,8 +85,8 @@ func (c *Observer) Stop() {

close(c.stop)

for _, ch := range c.ctrls {
close(ch)
for _, c := range c.ctrls {
c.Stop()
}

<-c.done
Expand Down Expand Up @@ -120,10 +120,8 @@ func (c *Observer) refresh() error {
},
}

stop := make(chan struct{})
c.ctrls[name] = stop
name := strings.ToLower(res.ar.Kind)
go NewController(lw, c.evch, name, c.config).Run(stop)
c.ctrls[name] = NewController(lw, c.evch, strings.ToLower(res.ar.Kind), c.config)
go c.ctrls[name].Start()
}

return nil
Expand Down Expand Up @@ -160,8 +158,8 @@ func (c *Observer) expandAndFilterAPIResources(groups []*metav1.APIResourceList)
continue
}

g := &gvk{group: gv.Group, version: gv.Version, kind: ar.Kind, gv: gv, ar: ar}
resources[strings.ToLower(gv.Group+":"+ar.Kind)] = g
resource := &gvk{group: gv.Group, version: gv.Version, kind: ar.Kind, gv: gv, ar: ar}
resources[strings.ToLower(gv.Group+":"+ar.Kind)] = resource
}
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func (h *Listener) Start() (*Listener, error) {
return h, nil
}

h.config.Logger.Info("Starting http healtcheck handler")

h.srv = &http.Server{Addr: fmt.Sprintf(":%d", h.config.HealthPort)}

http.HandleFunc("/health", h.healthCheckReply)
Expand All @@ -52,11 +54,12 @@ func (h *Listener) Start() (*Listener, error) {

// Stop halts the http health check handler
func (h *Listener) Stop() {
h.config.Logger.Info("Stopping http healtcheck handler")
if h.srv == nil {
return
}

h.config.Logger.Info("Stopping http healtcheck handler")

err := h.srv.Shutdown(context.TODO())
if err != nil {
h.config.Logger.Warningf("failed to stop http healtcheck handler: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/run/run.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Package run implements the main katafygio's loop, by
// launching the healthcheck service and all known controllers.
// the services and controllers.
package run

import (
Expand All @@ -14,7 +14,7 @@ import (
"github.com/bpineau/katafygio/pkg/store/git"
)

// Run launchs the effective controllers goroutines
// Run launchs the services
func Run(config *config.KdnConfig) {
repo, err := git.New(config).Start()
if err != nil {
Expand All @@ -23,7 +23,7 @@ func Run(config *config.KdnConfig) {

evchan := make(chan controller.Event)

rec := recorder.New(config, evchan).Start()
reco := recorder.New(config, evchan).Start()
ctrl := controller.NewObserver(config, evchan).Start()

http, err := health.New(config).Start()
Expand All @@ -38,6 +38,6 @@ func Run(config *config.KdnConfig) {

ctrl.Stop()
repo.Stop()
rec.Stop()
reco.Stop()
http.Stop()
}

0 comments on commit ab4a567

Please sign in to comment.