Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes watch tracking during reloads and fixes address issue. #3189

Merged
merged 5 commits into from
Jun 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 30 additions & 43 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/watch"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-sockaddr/template"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft"
Expand Down Expand Up @@ -174,6 +173,10 @@ type Agent struct {

// wgServers is the wait group for all HTTP and DNS servers
wgServers sync.WaitGroup

// watchPlans tracks all the currently-running watch plans for the
// agent.
watchPlans []*watch.Plan
}

func New(c *Config) (*Agent, error) {
Expand Down Expand Up @@ -317,7 +320,7 @@ func (a *Agent) Start() error {
}

// register watches
if err := a.registerWatches(); err != nil {
if err := a.reloadWatches(a.config); err != nil {
return err
}

Expand Down Expand Up @@ -496,19 +499,27 @@ func (a *Agent) serveHTTP(l net.Listener, srv *HTTPServer) error {
}
}

func (a *Agent) registerWatches() error {
if len(a.config.WatchPlans) == 0 {
return nil
}
addrs, err := a.config.HTTPAddrs()
// reloadWatches stops any existing watch plans and attempts to load the given
// set of watches.
func (a *Agent) reloadWatches(cfg *Config) error {
// Watches use the API to talk to this agent, so that must be enabled.
addrs, err := cfg.HTTPAddrs()
if err != nil {
return err
}
if len(addrs) == 0 {
return fmt.Errorf("watch plans require an HTTP or HTTPS endpoint")
}

for _, wp := range a.config.WatchPlans {
// Stop the current watches.
for _, wp := range a.watchPlans {
wp.Stop()
}
a.watchPlans = nil

// Fire off a goroutine for each new watch plan.
for _, wp := range cfg.WatchPlans {
a.watchPlans = append(a.watchPlans, wp)
go func(wp *watch.Plan) {
wp.Handler = makeWatchHandler(a.LogOutput, wp.Exempt["handler"])
wp.LogOutput = a.LogOutput
Expand All @@ -517,7 +528,7 @@ func (a *Agent) registerWatches() error {
addr = "unix://" + addr
}
if err := wp.Run(addr); err != nil {
a.logger.Println("[ERR] Failed to run watch: %v", err)
a.logger.Printf("[ERR] Failed to run watch: %v", err)
}
}(wp)
}
Expand Down Expand Up @@ -2302,9 +2313,7 @@ func (a *Agent) DisableNodeMaintenance() {
a.logger.Printf("[INFO] agent: Node left maintenance mode")
}

func (a *Agent) ReloadConfig(newCfg *Config) (bool, error) {
var errs error

func (a *Agent) ReloadConfig(newCfg *Config) error {
// Bulk update the services and checks
a.PauseSync()
defer a.ResumeSync()
Expand All @@ -2316,50 +2325,28 @@ func (a *Agent) ReloadConfig(newCfg *Config) (bool, error) {
// First unload all checks, services, and metadata. This lets us begin the reload
// with a clean slate.
if err := a.unloadServices(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed unloading services: %s", err))
return false, errs
return fmt.Errorf("Failed unloading services: %s", err)
}
if err := a.unloadChecks(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed unloading checks: %s", err))
return false, errs
return fmt.Errorf("Failed unloading checks: %s", err)
}
a.unloadMetadata()

// Reload service/check definitions and metadata.
if err := a.loadServices(newCfg); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading services: %s", err))
return false, errs
return fmt.Errorf("Failed reloading services: %s", err)
}
if err := a.loadChecks(newCfg); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading checks: %s", err))
return false, errs
return fmt.Errorf("Failed reloading checks: %s", err)
}
if err := a.loadMetadata(newCfg); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading metadata: %s", err))
return false, errs
return fmt.Errorf("Failed reloading metadata: %s", err)
}

// Get the new client listener addr
httpAddr, err := newCfg.ClientListener(a.config.Addresses.HTTP, a.config.Ports.HTTP)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to determine HTTP address: %v", err))
// Reload the watches.
if err := a.reloadWatches(newCfg); err != nil {
return fmt.Errorf("Failed reloading watches: %v", err)
}

// Deregister the old watches
for _, wp := range a.config.WatchPlans {
wp.Stop()
}

// Register the new watches
for _, wp := range newCfg.WatchPlans {
go func(wp *watch.Plan) {
wp.Handler = makeWatchHandler(a.LogOutput, wp.Exempt["handler"])
wp.LogOutput = a.LogOutput
if err := wp.Run(httpAddr.String()); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Error running watch: %v", err))
}
}(wp)
}

return true, errs
return nil
}
27 changes: 22 additions & 5 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/serf/serf"
)

Expand Down Expand Up @@ -237,6 +238,19 @@ func TestAgent_Reload(t *testing.T) {
cfg.Services = []*structs.ServiceDefinition{
&structs.ServiceDefinition{Name: "redis"},
}

params := map[string]interface{}{
"datacenter": "dc1",
"type": "key",
"key": "test",
"handler": "true",
}
wp, err := watch.ParseExempt(params, []string{"handler"})
if err != nil {
t.Fatalf("Expected watch.Parse to succeed %v", err)
}
cfg.WatchPlans = append(cfg.WatchPlans, wp)

a := NewTestAgent(t.Name(), cfg)
defer a.Shutdown()

Expand All @@ -250,16 +264,19 @@ func TestAgent_Reload(t *testing.T) {
&structs.ServiceDefinition{Name: "redis-reloaded"},
}

ok, err := a.ReloadConfig(cfg2)
if err != nil {
if err := a.ReloadConfig(cfg2); err != nil {
t.Fatalf("got error %v want nil", err)
}
if !ok {
t.Fatalf("got ok %v want true")
}
if _, ok := a.state.services["redis-reloaded"]; !ok {
t.Fatalf("missing redis-reloaded service")
}

// Verify that previous config's watch plans were stopped.
for _, wp := range cfg.WatchPlans {
if !wp.IsStopped() {
t.Fatalf("Reloading configs should stop watch plans of the previous configuration")
}
}
}

func TestAgent_Reload_ACLDeny(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ type ProtoAddr struct {
}

func (p ProtoAddr) String() string {
return p.Proto + "+" + p.Net + "://" + p.Addr
return p.Proto + "://" + p.Addr
}

func (c *Config) DNSAddrs() ([]ProtoAddr, error) {
Expand Down
7 changes: 4 additions & 3 deletions command/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,10 +841,11 @@ func (cmd *AgentCommand) handleReload(agent *agent.Agent, cfg *agent.Config) (*a
newCfg.LogLevel = cfg.LogLevel
}

ok, errs := agent.ReloadConfig(newCfg)
if ok {
return newCfg, errs
if err := agent.ReloadConfig(newCfg); err != nil {
errs = multierror.Append(fmt.Errorf(
"Failed to reload configs: %v", err))
}

return cfg, errs
}

Expand Down
6 changes: 6 additions & 0 deletions watch/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,9 @@ func (p *Plan) shouldStop() bool {
return false
}
}

func (p *Plan) IsStopped() bool {
p.stopLock.Lock()
defer p.stopLock.Unlock()
return p.stop
}