Skip to content

Commit

Permalink
split EventBus to its own file, clean up comments and docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Mar 23, 2017
1 parent fd05aad commit 182057e
Show file tree
Hide file tree
Showing 20 changed files with 142 additions and 200 deletions.
8 changes: 4 additions & 4 deletions checks/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

const eventBufferSize = 1000

// HealthCheck ...
// HealthCheck manages state of periodic health checks.
type HealthCheck struct {
Name string
serviceName string
Expand All @@ -21,7 +21,7 @@ type HealthCheck struct {
events.EventHandler // Event handling
}

// NewHealthCheck ...
// NewHealthCheck creates a HealthCheck from a validated Config struct
func NewHealthCheck(cfg *Config) *HealthCheck {
check := &HealthCheck{
Name: cfg.Name,
Expand All @@ -34,7 +34,7 @@ func NewHealthCheck(cfg *Config) *HealthCheck {
return check
}

// FromConfigs ...
// FromConfigs creates HealthChecks from a slice of validated Configs
func FromConfigs(cfgs []*Config) []*HealthCheck {
checks := []*HealthCheck{}
for _, cfg := range cfgs {
Expand Down Expand Up @@ -86,5 +86,5 @@ func (check *HealthCheck) Run(bus *events.EventBus) {

// String implements the stdlib fmt.Stringer interface for pretty-printing
func (check *HealthCheck) String() string {
return "HealthCheck[%v]" + check.Name // TODO: is there a better representation???
return "HealthCheck[%v]" + check.Name
}
8 changes: 4 additions & 4 deletions checks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Config struct {
timeout time.Duration
serviceName string // for now always the same as the Name

/* TODO:
/* TODO v3:
These fields are here *only* so we can reuse the config map we use
in the services package here too. this package ignores them. when we
move on to the v3 configuration syntax these will be dropped.
Expand Down Expand Up @@ -51,7 +51,7 @@ func NewConfigs(raw []interface{}) ([]*Config, error) {
return nil, fmt.Errorf("HealthCheck configuration error: %v", err)
}
for _, check := range unvalidatedChecks {
// TODO: we'll remove this check when we split the check
// TODO v3: we'll remove this check when we split the check
// from the service config
if check.Exec != nil {
err := check.Validate()
Expand Down Expand Up @@ -88,7 +88,7 @@ func (cfg *Config) Validate() error {
cmd, err := commands.NewCommand(cfg.Exec, cfg.timeout,
log.Fields{"service": cfg.serviceName, "check": cfg.Name})
if err != nil {
// TODO: this is config syntax specific and should be updated
// TODO v3: this is config syntax specific and should be updated
return fmt.Errorf("could not parse `health` in check %s: %s",
cfg.Name, err)
}
Expand All @@ -100,5 +100,5 @@ func (cfg *Config) Validate() error {

// String implements the stdlib fmt.Stringer interface for pretty-printing
func (cfg *Config) String() string {
return "checks.Config[" + cfg.Name + "]" // TODO: is there a better representation???
return "checks.Config[" + cfg.Name + "]"
}
10 changes: 5 additions & 5 deletions commands/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ func NewCommand(rawArgs interface{}, timeout time.Duration, fields log.Fields) (
lock: &sync.Mutex{},
logger: log.StandardLogger().Writer(),
logFields: fields,
} // cmd created at RunAndWait or RunWithTimeout
} // exec.Cmd created at Run or RunAndWaitForOutput
return cmd, nil
}

// Run ...
// Run creates an exec.Cmd for the Command and runs it asynchronously.
// If the parent context is closed/canceled this will terminate the
// child process and do any cleanup we need.
func (c *Command) Run(pctx context.Context, bus *events.EventBus) {
if c == nil {
// TODO: will this ever get called like this?
log.Debugf("nothing to run for %s", c.Name)
return
}
Expand Down Expand Up @@ -111,11 +112,10 @@ func (c *Command) Run(pctx context.Context, bus *events.EventBus) {

// RunAndWaitForOutput runs the command and blocks until completed, then
// returns a string of the stdout
// TODO: remove this once the control plane is available for Sensors (the
// TODO v3: remove this once the control plane is available for Sensors (the
// only caller) to send metrics to
func (c *Command) RunAndWaitForOutput(pctx context.Context, bus *events.EventBus) string {
if c == nil {
// TODO: will this ever get called like this?
log.Debugf("nothing to run for %s", c.Name)
return ""
}
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (cfg *Config) InitLogging() error {
return nil
}

// parseStopTimeout ...
// parseStopTimeout makes sure we have a safe default
func (cfg *rawConfig) parseStopTimeout() (int, error) {
if cfg.stopTimeout == 0 {
return defaultStopTimeout, nil
Expand Down
2 changes: 1 addition & 1 deletion core/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type App struct {
Watches []*watches.Watch
Telemetry *telemetry.Telemetry
StopTimeout int
maintModeLock *sync.RWMutex // TODO: probably want to move this to Service.Status
maintModeLock *sync.RWMutex // TODO v3: probably want to move this to Service.Status
signalLock *sync.RWMutex
paused bool
ConfigFlag string
Expand Down
54 changes: 1 addition & 53 deletions core/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

/*
a LOT of the these tests should be moved to the config package
TODO v3: a LOT of the these tests should be moved to the config package
*/

// ------------------------------------------
Expand Down Expand Up @@ -59,58 +59,6 @@ var testJSON = `{
}
`

// func TestValidConfigParse(t *testing.T) {
// defer argTestCleanup(argTestSetup())

// os.Setenv("TEST", "HELLO")
// os.Args = []string{"this", "-config", testJSON, "/testdata/test.sh", "valid1", "--debug"}
// app, err := LoadApp()
// if err != nil {
// t.Fatalf("unexpected error in LoadApp: %v", err)
// }

// if len(app.Watches) != 2 || len(app.Services) != 5 {
// t.Fatalf("expected 2 watches and 2 services but got: len(watches)=%d, len(services)=%d", len(app.Watches), len(app.Services))
// }
// args := flag.Args()
// if len(args) != 3 || args[0] != "/testdata/test.sh" {
// t.Errorf("expected 3 args but got unexpected results: %v", args)
// }

// expectedTags := []string{"tag1", "tag2"}
// if !reflect.DeepEqual(app.Services[0].Definition.Tags, expectedTags) {
// t.Errorf("expected tags %s for serviceA, but got: %s", expectedTags, app.Services[0].Definition.Tags)
// }

// if app.Services[1].Definition.Tags != nil {
// t.Errorf("expected no tags for serviceB, but got: %s", app.Services[1].Definition.Tags)
// }

// if app.Services[0].Definition.TTL != 19 {
// t.Errorf("expected ttl=19 for serviceA, but got: %d", app.Services[1].Definition.TTL)
// }

// if app.Services[1].Definition.TTL != 103 {
// t.Errorf("expected ttl=103 for serviceB, but got: %d", app.Services[1].Definition.TTL)
// }

// if app.Watches[0].Tag != "dev" {
// t.Errorf("expected tag %s for upstreamA, but got: %s", "dev", app.Watches[0].Tag)
// }

// if app.Watches[1].Tag != "" {
// t.Errorf("expected no tag for upstreamB, but got: %s", app.Watches[1].Tag)
// }

// // TODO
// // validateCommandParsed(t, "preStart", app.PreStartCmd,
// // "/bin/to/preStart.sh", []string{"arg1", "arg2"})
// // validateCommandParsed(t, "preStop", app.PreStopCmd,
// // "/bin/to/preStop.sh", []string{"arg1", "arg2"})
// // validateCommandParsed(t, "postStop", app.PostStopCmd,
// // "/bin/to/postStop.sh", nil) //[]string{})
// }

func TestServiceConfigRequiredFields(t *testing.T) {
// Missing `name`
var testJSON = `{"consul": "consul:8500", "services": [
Expand Down
80 changes: 80 additions & 0 deletions events/bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package events

import (
"sync"

log "github.com/Sirupsen/logrus"
)

// EventBus manages the state of and transmits messages to all its Subscribers
type EventBus struct {
registry map[Subscriber]bool
lock *sync.RWMutex
reload bool
done chan bool
}

// NewEventBus initializes an EventBus. We need this rather than a struct
// literal so that we know our channels are non-nil (which block sends).
func NewEventBus() *EventBus {
lock := &sync.RWMutex{}
reg := make(map[Subscriber]bool)
done := make(chan bool, 1)
bus := &EventBus{registry: reg, lock: lock, done: done, reload: false}
return bus
}

// Register the Subscriber for all Events
func (bus *EventBus) Register(subscriber Subscriber) {
bus.lock.Lock()
defer bus.lock.Unlock()
bus.registry[subscriber] = true
}

// Unregister the Subscriber from all Events
func (bus *EventBus) Unregister(subscriber Subscriber) {
bus.lock.Lock()
defer bus.lock.Unlock()
if _, ok := bus.registry[subscriber]; ok {
delete(bus.registry, subscriber)
}
if len(bus.registry) == 0 {
bus.done <- true
}
}

// Publish an Event to all Subscribers
func (bus *EventBus) Publish(event Event) {
log.Debugf("event: %v", event)
bus.lock.RLock()
defer bus.lock.RUnlock()
for subscriber := range bus.registry {
// sending to an unsubscribed Subscriber shouldn't be a runtime
// error, so this is in intentionally allowed to panic here
subscriber.Receive(event)
}
}

// SetReloadFlag sets the flag that Wait will use to signal to the main
// App that we want to restart rather than be shut down
func (bus *EventBus) SetReloadFlag() {
bus.lock.Lock()
bus.reload = true
bus.lock.Unlock()
}

// Shutdown asks all Subscribers to halt by sending the GlobalShutdown
// message. Subscribers are responsible for handling this message.
func (bus *EventBus) Shutdown() {
bus.Publish(GlobalShutdown)
}

// Wait blocks until the EventBus registry is unpopulated. Returns true
// if the "reload" flag was set.
func (bus *EventBus) Wait() bool {
<-bus.done
close(bus.done)
bus.lock.RLock()
defer bus.lock.RUnlock()
return bus.reload
}
82 changes: 2 additions & 80 deletions events/events.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package events

import (
"sync"

log "github.com/Sirupsen/logrus"
)

// Event ...
// Event represents a single message in the EventBus
type Event struct {
Code EventCode
Source string
}

// go:generate stringer -type EventCode

// EventCode ...
// EventCode is an enum for Events
type EventCode int

// EventCode enum
Expand Down Expand Up @@ -43,75 +37,3 @@ var (
QuitByClose = Event{Code: Quit, Source: "closed"}
NonEvent = Event{Code: None, Source: ""}
)

// EventBus ...
type EventBus struct {
registry map[Subscriber]bool
lock *sync.RWMutex
reload bool
done chan bool
}

// NewEventBus ...
func NewEventBus() *EventBus {
lock := &sync.RWMutex{}
reg := make(map[Subscriber]bool)
done := make(chan bool, 1)
bus := &EventBus{registry: reg, lock: lock, done: done, reload: false}
return bus
}

// Register the Subscriber for all Events
func (bus *EventBus) Register(subscriber Subscriber) {
bus.lock.Lock()
defer bus.lock.Unlock()
bus.registry[subscriber] = true
}

// Unregister the Subscriber from all Events
func (bus *EventBus) Unregister(subscriber Subscriber) {
bus.lock.Lock()
defer bus.lock.Unlock()
if _, ok := bus.registry[subscriber]; ok {
delete(bus.registry, subscriber)
}
if len(bus.registry) == 0 {
bus.done <- true
}
}

// Publish an Event to all Subscribers
func (bus *EventBus) Publish(event Event) {
log.Debugf("event: %v", event)
bus.lock.RLock()
defer bus.lock.RUnlock()
for subscriber := range bus.registry {
// sending to an unsubscribed Subscriber shouldn't be a runtime
// error, so this is in intentionally allowed to panic here
subscriber.Receive(event)
}
}

// SetReloadFlag sets the flag that Wait will use to signal to the main
// App that we want to restart rather than be shut down
func (bus *EventBus) SetReloadFlag() {
bus.lock.Lock()
bus.reload = true
bus.lock.Unlock()
}

// Shutdown asks all Subscribers to halt by sending the GlobalShutdown
// message. Subscribers are responsible for handling this message.
func (bus *EventBus) Shutdown() {
bus.Publish(GlobalShutdown)
}

// Wait blocks until the EventBus registry is unpopulated. Returns true
// if the "reload" flag was set.
func (bus *EventBus) Wait() bool {
<-bus.done
close(bus.done)
bus.lock.RLock()
defer bus.lock.RUnlock()
return bus.reload
}
10 changes: 5 additions & 5 deletions events/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package events

import "errors"

// EventHandler should be embedded in all Runners so that we can reuse
// the code for registering and unregistering handlers. This is why the
// various fields are (unfortunately) public and we can't use struct
// literals for constructors. All NewRunner functions will need to set
// EventHandler should be embedded in all task runners so that we can
// reuse the code for registering and unregistering handlers. This is why
// the various fields are (unfortunately) public and we can't use struct
// literals for constructors. All task runner constructors will need to set
// these fields explicitly:
// runner.Rx = make(chan Event, n)
// runner.Flush = make(chan bool)
Expand All @@ -32,7 +32,7 @@ func (evh *EventHandler) Unsubscribe(bus *EventBus) {
// Embedding struct should use a non-blocking buffered channel but
// this may be blocking in tests.
func (evh *EventHandler) Receive(e Event) {
// TODO: instrument receives so we can report event throughput
// TODO v3: instrument receives so we can report event throughput
// statistics via Prometheus
evh.Rx <- e
}
Expand Down
Loading

0 comments on commit 182057e

Please sign in to comment.