diff --git a/api/agent.go b/api/agent.go index 63648fe33f37..1893d1cf359d 100644 --- a/api/agent.go +++ b/api/agent.go @@ -117,6 +117,17 @@ func (a *Agent) Self() (map[string]map[string]interface{}, error) { return out, nil } +// Reload triggers a configuration reload for the agent we are connected to. +func (a *Agent) Reload() error { + r := a.c.newRequest("PUT", "/v1/agent/reload") + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return err + } + resp.Body.Close() + return nil +} + // NodeName is used to get the node name of the agent func (a *Agent) NodeName() (string, error) { if a.nodeName != "" { @@ -348,6 +359,17 @@ func (a *Agent) Join(addr string, wan bool) error { return nil } +// Leave is used to have the agent gracefully leave the cluster and shutdown +func (a *Agent) Leave() error { + r := a.c.newRequest("PUT", "/v1/agent/leave") + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return err + } + resp.Body.Close() + return nil +} + // ForceLeave is used to have the agent eject a failed node func (a *Agent) ForceLeave(node string) error { r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node) diff --git a/api/agent_test.go b/api/agent_test.go index cc16f990fc33..cd48d57087fd 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -1,9 +1,14 @@ package api import ( + "io/ioutil" "strings" "testing" + "time" + + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/serf/serf" ) func TestAgent_Self(t *testing.T) { @@ -24,6 +29,51 @@ func TestAgent_Self(t *testing.T) { } } +func TestAgent_Reload(t *testing.T) { + t.Parallel() + + // Create our initial empty config file, to be overwritten later + configFile, err := ioutil.TempFile("", "reload") + if err != nil { + t.Fatalf("err: %s", err) + } + if _, err := configFile.Write([]byte("{}")); err != nil { + t.Fatalf("err: %s", err) + } + configFile.Close() + + c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) { + conf.Args = []string{"-config-file", configFile.Name()} + }) + defer s.Stop() + + agent := c.Agent() + + // Update the config file with a service definition + config := `{"service":{"name":"redis", "port":1234}}` + err = ioutil.WriteFile(configFile.Name(), []byte(config), 0644) + if err != nil { + t.Fatalf("err: %v", err) + } + + if err = agent.Reload(); err != nil { + t.Fatalf("err: %v", err) + } + + services, err := agent.Services() + if err != nil { + t.Fatalf("err: %v", err) + } + + service, ok := services["redis"] + if !ok { + t.Fatalf("bad: %v", ok) + } + if service.Port != 1234 { + t.Fatalf("bad: %v", service.Port) + } +} + func TestAgent_Members(t *testing.T) { t.Parallel() c, s := makeClient(t) @@ -545,6 +595,39 @@ func TestAgent_Join(t *testing.T) { } } +func TestAgent_Leave(t *testing.T) { + t.Parallel() + c1, s1 := makeClient(t) + defer s1.Stop() + + c2, s2 := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) { + conf.Server = false + conf.Bootstrap = false + }) + defer s2.Stop() + + if err := c2.Agent().Join(s1.LANAddr, false); err != nil { + t.Fatalf("err: %v", err) + } + + if err := c2.Agent().Leave(); err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure the second agent's status is 'Left' + members, err := c1.Agent().Members(false) + if err != nil { + t.Fatalf("err: %v", err) + } + member := members[0] + if member.Name == s1.Config.NodeName { + member = members[1] + } + if member.Status != int(serf.StatusLeft) { + t.Fatalf("bad: %v", *member) + } +} + func TestAgent_ForceLeave(t *testing.T) { t.Parallel() c, s := makeClient(t) diff --git a/command/agent/agent.go b/command/agent/agent.go index 11d1670baeca..03e4624f4e77 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -109,6 +109,8 @@ type Agent struct { eventLock sync.RWMutex eventNotify state.NotifyGroup + reloadCh chan chan error + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -121,7 +123,8 @@ type Agent struct { // Create is used to create a new Agent. Returns // the agent or potentially an error. -func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) (*Agent, error) { +func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, + reloadCh chan chan error) (*Agent, error) { // Ensure we have a log sink if logOutput == nil { logOutput = os.Stderr @@ -184,6 +187,7 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) (* checkDockers: make(map[types.CheckID]*CheckDocker), eventCh: make(chan serf.UserEvent, 1024), eventBuf: make([]*UserEvent, 256), + reloadCh: reloadCh, shutdownCh: make(chan struct{}), endpoints: make(map[string]string), } diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index cb4afbde927f..0067d1b3563b 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -39,6 +39,30 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int }, nil } +func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "PUT" { + resp.WriteHeader(http.StatusMethodNotAllowed) + return nil, nil + } + + errCh := make(chan error, 0) + + // Trigger the reload + select { + case <-s.agent.ShutdownCh(): + return nil, fmt.Errorf("Agent was shutdown before reload could be completed") + case s.agent.reloadCh <- errCh: + } + + // Wait for the result of the reload, or for the agent to shutdown + select { + case <-s.agent.ShutdownCh(): + return nil, fmt.Errorf("Agent was shutdown before reload could be completed") + case err := <-errCh: + return nil, err + } +} + func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { services := s.agent.state.Services() return services, nil @@ -80,6 +104,18 @@ func (s *HTTPServer) AgentJoin(resp http.ResponseWriter, req *http.Request) (int } } +func (s *HTTPServer) AgentLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "PUT" { + resp.WriteHeader(http.StatusMethodNotAllowed) + return nil, nil + } + + if err := s.agent.Leave(); err != nil { + return nil, err + } + return nil, s.agent.Shutdown() +} + func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) { addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/") return nil, s.agent.ForceLeave(addr) diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 0de3483270dd..9d93ca8fba67 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/types" "github.com/hashicorp/serf/serf" + "github.com/mitchellh/cli" ) func TestHTTPAgentServices(t *testing.T) { @@ -119,6 +120,81 @@ func TestHTTPAgentSelf(t *testing.T) { } } +func TestHTTPAgentReload(t *testing.T) { + conf := nextConfig() + tmpDir, err := ioutil.TempDir("", "consul") + if err != nil { + t.Fatalf("err: %s", err) + } + defer os.RemoveAll(tmpDir) + + // Write initial config, to be reloaded later + tmpFile, err := ioutil.TempFile(tmpDir, "config") + if err != nil { + t.Fatalf("err: %s", err) + } + _, err = tmpFile.WriteString(`{"service":{"name":"redis"}}`) + if err != nil { + t.Fatalf("err: %s", err) + } + tmpFile.Close() + + doneCh := make(chan struct{}) + shutdownCh := make(chan struct{}) + + defer func() { + close(shutdownCh) + <-doneCh + }() + + cmd := &Command{ + ShutdownCh: shutdownCh, + Ui: new(cli.MockUi), + } + + args := []string{ + "-server", + "-data-dir", tmpDir, + "-http-port", fmt.Sprintf("%d", conf.Ports.HTTP), + "-config-file", tmpFile.Name(), + } + + go func() { + cmd.Run(args) + close(doneCh) + }() + + testutil.WaitForResult(func() (bool, error) { + return len(cmd.httpServers) == 1, nil + }, func(err error) { + t.Fatalf("should have an http server") + }) + + if _, ok := cmd.agent.state.services["redis"]; !ok { + t.Fatalf("missing redis service") + } + + err = ioutil.WriteFile(tmpFile.Name(), []byte(`{"service":{"name":"redis-reloaded"}}`), 0644) + if err != nil { + t.Fatalf("err: %v", err) + } + + srv := cmd.httpServers[0] + req, err := http.NewRequest("PUT", "/v1/agent/reload", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + _, err = srv.AgentReload(nil, req) + if err != nil { + t.Fatalf("Err: %v", err) + } + + if _, ok := cmd.agent.state.services["redis-reloaded"]; !ok { + t.Fatalf("missing redis-reloaded service") + } +} + func TestHTTPAgentMembers(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) @@ -239,6 +315,49 @@ func TestHTTPAgentJoin_WAN(t *testing.T) { }) } +func TestHTTPAgentLeave(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + dir2, srv2 := makeHTTPServerWithConfig(t, func(c *Config) { + c.Server = false + c.Bootstrap = false + }) + defer os.RemoveAll(dir2) + defer srv2.Shutdown() + + // Join first + addr := fmt.Sprintf("127.0.0.1:%d", srv2.agent.config.Ports.SerfLan) + _, err := srv.agent.JoinLAN([]string{addr}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Graceful leave now + req, err := http.NewRequest("PUT", "/v1/agent/leave", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + obj, err := srv2.AgentLeave(nil, req) + if err != nil { + t.Fatalf("Err: %v", err) + } + if obj != nil { + t.Fatalf("Err: %v", obj) + } + + testutil.WaitForResult(func() (bool, error) { + m := srv.agent.LANMembers() + success := m[1].Status == serf.StatusLeft + return success, errors.New(m[1].Status.String()) + }, func(err error) { + t.Fatalf("member status is %v, should be left", err) + }) +} + func TestHTTPAgentForceLeave(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) @@ -1030,7 +1149,6 @@ func TestHTTPAgent_Monitor(t *testing.T) { logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter) dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter) - srv.agent.logWriter = logWriter defer os.RemoveAll(dir) defer srv.Shutdown() defer srv.agent.Shutdown() diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 34819911612c..64d6d05e3123 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -87,7 +87,7 @@ func makeAgentLog(t *testing.T, conf *Config, l io.Writer, writer *logger.LogWri } conf.DataDir = dir - agent, err := Create(conf, l, writer) + agent, err := Create(conf, l, writer, nil) if err != nil { os.RemoveAll(dir) t.Fatalf(fmt.Sprintf("err: %v", err)) @@ -113,7 +113,7 @@ func makeAgentKeyring(t *testing.T, conf *Config, key string) (string, *Agent) { t.Fatalf("err: %s", err) } - agent, err := Create(conf, nil, nil) + agent, err := Create(conf, nil, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -846,7 +846,7 @@ func TestAgent_PersistService(t *testing.T) { agent.Shutdown() // Should load it back during later start - agent2, err := Create(config, nil, nil) + agent2, err := Create(config, nil, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -980,7 +980,7 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) { } config.Services = []*ServiceDefinition{svc2} - agent2, err := Create(config, nil, nil) + agent2, err := Create(config, nil, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -1073,7 +1073,7 @@ func TestAgent_PersistCheck(t *testing.T) { agent.Shutdown() // Should load it back during later start - agent2, err := Create(config, nil, nil) + agent2, err := Create(config, nil, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -1166,7 +1166,7 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { } config.Checks = []*CheckDefinition{check2} - agent2, err := Create(config, nil, nil) + agent2, err := Create(config, nil, nil, nil) if err != nil { t.Fatalf("err: %s", err) } diff --git a/command/agent/command.go b/command/agent/command.go index d47537363a67..22f5b1563afa 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -28,6 +28,7 @@ import ( "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/watch" "github.com/hashicorp/go-checkpoint" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/logutils" scada "github.com/hashicorp/scada-client/scada" "github.com/mitchellh/cli" @@ -50,6 +51,7 @@ type Command struct { HumanVersion string Ui cli.Ui ShutdownCh <-chan struct{} + configReloadCh chan chan error args []string logFilter *logutils.LevelFilter logOutput io.Writer @@ -466,7 +468,7 @@ func (c *Config) discoverEc2Hosts(logger *log.Logger) ([]string, error) { // setupAgent is used to start the agent and various interfaces func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) error { c.Ui.Output("Starting Consul agent...") - agent, err := Create(config, logOutput, logWriter) + agent, err := Create(config, logOutput, logWriter, c.configReloadCh) if err != nil { c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err)) return err @@ -747,6 +749,9 @@ func (c *Command) Run(args []string) int { c.logFilter = logFilter c.logOutput = logOutput + // Setup the channel for triggering config reloads + c.configReloadCh = make(chan chan error) + /* Setup telemetry Aggregate on 10 second intervals for 1 minute. Expose the metrics over stderr when there is a SIGUSR1 received. @@ -947,11 +952,15 @@ func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}, retry // Wait for a signal WAIT: var sig os.Signal + var reloadErrCh chan error select { case s := <-signalCh: sig = s case <-c.rpcServer.ReloadCh(): sig = syscall.SIGHUP + case ch := <-c.configReloadCh: + sig = syscall.SIGHUP + reloadErrCh = ch case <-c.ShutdownCh: sig = os.Interrupt case <-retryJoin: @@ -966,9 +975,17 @@ WAIT: // Check if this is a SIGHUP if sig == syscall.SIGHUP { - if conf := c.handleReload(config); conf != nil { + conf, err := c.handleReload(config) + if conf != nil { config = conf } + if err != nil { + c.Ui.Error(err.Error()) + } + // Send result back if reload was called via HTTP + if reloadErrCh != nil { + reloadErrCh <- err + } goto WAIT } @@ -1008,12 +1025,13 @@ WAIT: } // handleReload is invoked when we should reload our configs, e.g. SIGHUP -func (c *Command) handleReload(config *Config) *Config { +func (c *Command) handleReload(config *Config) (*Config, error) { c.Ui.Output("Reloading configuration...") + var errs error newConf := c.readConfig() if newConf == nil { - c.Ui.Error(fmt.Sprintf("Failed to reload configs")) - return config + errs = multierror.Append(errs, fmt.Errorf("Failed to reload configs")) + return config, errs } // Change the log level @@ -1021,7 +1039,7 @@ func (c *Command) handleReload(config *Config) *Config { if logger.ValidateLevelFilter(minLevel, c.logFilter) { c.logFilter.SetMinLevel(minLevel) } else { - c.Ui.Error(fmt.Sprintf( + errs = multierror.Append(fmt.Errorf( "Invalid log level: %s. Valid log levels are: %v", minLevel, c.logFilter.Levels)) @@ -1040,28 +1058,28 @@ func (c *Command) handleReload(config *Config) *Config { // First unload all checks and services. This lets us begin the reload // with a clean slate. if err := c.agent.unloadServices(); err != nil { - c.Ui.Error(fmt.Sprintf("Failed unloading services: %s", err)) - return nil + errs = multierror.Append(errs, fmt.Errorf("Failed unloading services: %s", err)) + return nil, errs } if err := c.agent.unloadChecks(); err != nil { - c.Ui.Error(fmt.Sprintf("Failed unloading checks: %s", err)) - return nil + errs = multierror.Append(errs, fmt.Errorf("Failed unloading checks: %s", err)) + return nil, errs } // Reload services and check definitions. if err := c.agent.loadServices(newConf); err != nil { - c.Ui.Error(fmt.Sprintf("Failed reloading services: %s", err)) - return nil + errs = multierror.Append(errs, fmt.Errorf("Failed reloading services: %s", err)) + return nil, errs } if err := c.agent.loadChecks(newConf); err != nil { - c.Ui.Error(fmt.Sprintf("Failed reloading checks: %s", err)) - return nil + errs = multierror.Append(errs, fmt.Errorf("Failed reloading checks: %s", err)) + return nil, errs } // Get the new client listener addr httpAddr, err := newConf.ClientListener(config.Addresses.HTTP, config.Ports.HTTP) if err != nil { - c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err)) + errs = multierror.Append(errs, fmt.Errorf("Failed to determine HTTP address: %v", err)) } // Deregister the old watches @@ -1075,7 +1093,7 @@ func (c *Command) handleReload(config *Config) *Config { wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"]) wp.LogOutput = c.logOutput if err := wp.Run(httpAddr.String()); err != nil { - c.Ui.Error(fmt.Sprintf("Error running watch: %v", err)) + errs = multierror.Append(errs, fmt.Errorf("Error running watch: %v", err)) } }(wp) } @@ -1085,12 +1103,12 @@ func (c *Command) handleReload(config *Config) *Config { newConf.AtlasToken != config.AtlasToken || newConf.AtlasEndpoint != config.AtlasEndpoint { if err := c.setupScadaConn(newConf); err != nil { - c.Ui.Error(fmt.Sprintf("Failed reloading SCADA client: %s", err)) - return nil + errs = multierror.Append(errs, fmt.Errorf("Failed reloading SCADA client: %s", err)) + return nil, errs } } - return newConf + return newConf, errs } // startScadaClient is used to start a new SCADA provider and listener, diff --git a/command/agent/http.go b/command/agent/http.go index 7ba8a3c0a6af..8f1a1a1db087 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -251,11 +251,13 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { } s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf)) s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) + s.handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload)) s.handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor)) s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) s.handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin)) + s.handleFuncMetrics("/v1/agent/leave", s.wrap(s.AgentLeave)) s.handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave)) s.handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck)) s.handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck)) diff --git a/command/util_test.go b/command/util_test.go index 1a4a1e892759..646c1a526557 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -74,7 +74,7 @@ func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper { } conf.DataDir = dir - a, err := agent.Create(conf, lw, nil) + a, err := agent.Create(conf, lw, nil, nil) if err != nil { os.RemoveAll(dir) t.Fatalf(fmt.Sprintf("err: %v", err)) diff --git a/testutil/server.go b/testutil/server.go index 831fafa9b70c..727f1088e836 100644 --- a/testutil/server.go +++ b/testutil/server.go @@ -72,6 +72,7 @@ type TestServerConfig struct { ACLDefaultPolicy string `json:"acl_default_policy,omitempty"` Encrypt string `json:"encrypt,omitempty"` Stdout, Stderr io.Writer `json:"-"` + Args []string `json:"-"` } // ServerConfigCallback is a function interface which can be @@ -201,7 +202,9 @@ func NewTestServerConfig(t TestingT, cb ServerConfigCallback) *TestServer { } // Start the server - cmd := exec.Command("consul", "agent", "-config-file", configFile.Name()) + args := []string{"agent", "-config-file", configFile.Name()} + args = append(args, consulConfig.Args...) + cmd := exec.Command("consul", args...) cmd.Stdout = stdout cmd.Stderr = stderr if err := cmd.Start(); err != nil { diff --git a/website/source/docs/agent/http/agent.html.markdown b/website/source/docs/agent/http/agent.html.markdown index c206f7981d33..c1f6e477594c 100644 --- a/website/source/docs/agent/http/agent.html.markdown +++ b/website/source/docs/agent/http/agent.html.markdown @@ -20,9 +20,11 @@ The following endpoints are supported: * [`/v1/agent/services`](#agent_services) : Returns the services the local agent is managing * [`/v1/agent/members`](#agent_members) : Returns the members as seen by the local serf agent * [`/v1/agent/self`](#agent_self) : Returns the local node configuration +* [`/v1/agent/reload`](#agent_reload) : Causes the local agent to reload its configuration * [`/v1/agent/maintenance`](#agent_maintenance) : Manages node maintenance mode * [`/v1/agent/monitor`](#agent_monitor) : Streams logs from the local agent * [`/v1/agent/join/
`](#agent_join) : Triggers the local agent to join a node +* [`/v1/agent/leave`](#agent_leave): Triggers the local agent to gracefully shutdown and leave the cluster * [`/v1/agent/force-leave/