Skip to content

Commit

Permalink
Expect event.Notifier interface instead of concrete
Browse files Browse the repository at this point in the history
Sames goes for discovery.DiscoveryInterface (rather than
discovery.DiscoveryClient). Will make unit tests easier.
  • Loading branch information
bpineau committed Apr 9, 2018
1 parent 1899edf commit 881d036
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 18 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ const maxProcessRetry = 6
type Controller struct {
stopCh chan struct{}
doneCh chan struct{}
notifier *event.Notifier
notifier event.Notifier
config *config.KfConfig
name string
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
}

// New return a kubernetes controller using the provided client
func New(client cache.ListerWatcher, notifier *event.Notifier, name string, config *config.KfConfig) *Controller {
func New(client cache.ListerWatcher, notifier event.Notifier, name string, config *config.KfConfig) *Controller {

selector := metav1.ListOptions{LabelSelector: config.Filter}
lw := &cache.ListWatch{
Expand Down
27 changes: 19 additions & 8 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,29 @@ type Notification struct {
}

// Notifier mediates notifications between controllers and recorder
type Notifier struct {
C chan Notification
type Notifier interface {
Send(notif *Notification)
ReadChan() <-chan Notification
}

// New creates a new event.Notifier
func New() *Notifier {
return &Notifier{
C: make(chan Notification),
// Unbuffered implements Notifier
type Unbuffered struct {
c chan Notification
}

// New creates an Unbuffered
func New() *Unbuffered {
return &Unbuffered{
c: make(chan Notification),
}
}

// Send sends a notification
func (n *Notifier) Send(notif *Notification) {
n.C <- *notif
func (n *Unbuffered) Send(notif *Notification) {
n.c <- *notif
}

// ReadChan returns a channel to read Notifications from
func (n *Unbuffered) ReadChan() <-chan Notification {
return n.c
}
6 changes: 3 additions & 3 deletions pkg/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ const discoveryInterval = 60 * time.Second
type Observer struct {
stop chan struct{}
done chan struct{}
notif *event.Notifier
disc *discovery.DiscoveryClient
notif event.Notifier
disc discovery.DiscoveryInterface
cpool dynamic.ClientPool
ctrls map[string]*controller.Controller
config *config.KfConfig
Expand All @@ -40,7 +40,7 @@ type gvk struct {
type resources map[string]*gvk

// New returns a new observer, that will watch API resources and create controllers
func New(config *config.KfConfig, notif *event.Notifier) *Observer {
func New(config *config.KfConfig, notif event.Notifier) *Observer {
return &Observer{
config: config,
notif: notif,
Expand Down
7 changes: 4 additions & 3 deletions pkg/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ type activeFiles map[string]bool
// Listener receive events from controllers and save them to disk as yaml files
type Listener struct {
config *config.KfConfig
events *event.Notifier
events event.Notifier
actives activeFiles
activesLock sync.RWMutex
stopch chan struct{}
donech chan struct{}
}

// New creates a new Listener
func New(config *config.KfConfig, events *event.Notifier) *Listener {
func New(config *config.KfConfig, events event.Notifier) *Listener {
return &Listener{
config: config,
events: events,
Expand All @@ -47,6 +47,7 @@ func (w *Listener) Start() *Listener {
}

go func() {
evCh := w.events.ReadChan()
gcTick := time.NewTicker(w.config.ResyncIntv * 2)
w.stopch = make(chan struct{})
w.donech = make(chan struct{})
Expand All @@ -57,7 +58,7 @@ func (w *Listener) Start() *Listener {
select {
case <-w.stopch:
return
case ev := <-w.events.C:
case ev := <-evCh:
w.processNextEvent(&ev)
case <-gcTick.C:
w.deleteObsoleteFiles()
Expand Down
4 changes: 2 additions & 2 deletions pkg/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Run(config *config.KfConfig) {

evts := event.New()
reco := recorder.New(config, evts).Start()
ctrl := observer.New(config, evts).Start()
obsv := observer.New(config, evts).Start()

http, err := health.New(config).Start()
if err != nil {
Expand All @@ -36,7 +36,7 @@ func Run(config *config.KfConfig) {
signal.Notify(sigterm, syscall.SIGINT)
<-sigterm

ctrl.Stop()
obsv.Stop()
repo.Stop()
reco.Stop()
http.Stop()
Expand Down

0 comments on commit 881d036

Please sign in to comment.