Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reload/leave http endpoints #2516

Merged
merged 7 commits into from
Nov 30, 2016
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ func (a *Agent) Self() (map[string]map[string]interface{}, error) {
return out, nil
}

// Reload triggers a configuration reload for the agent we are connected to.
func (a *Agent) Reload() error {
r := a.c.newRequest("PUT", "/v1/agent/reload")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}

// NodeName is used to get the node name of the agent
func (a *Agent) NodeName() (string, error) {
if a.nodeName != "" {
Expand Down Expand Up @@ -348,6 +359,17 @@ func (a *Agent) Join(addr string, wan bool) error {
return nil
}

// Leave is used to have the agent gracefully leave the cluster and shutdown
func (a *Agent) Leave() error {
r := a.c.newRequest("PUT", "/v1/agent/leave")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}

// ForceLeave is used to have the agent eject a failed node
func (a *Agent) ForceLeave(node string) error {
r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node)
Expand Down
83 changes: 83 additions & 0 deletions api/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package api

import (
"io/ioutil"
"strings"
"testing"

"time"

"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/serf"
)

func TestAgent_Self(t *testing.T) {
Expand All @@ -24,6 +29,51 @@ func TestAgent_Self(t *testing.T) {
}
}

func TestAgent_Reload(t *testing.T) {
t.Parallel()

// Create our initial empty config file, to be overwritten later
configFile, err := ioutil.TempFile("", "reload")
if err != nil {
t.Fatalf("err: %s", err)
}
if _, err := configFile.Write([]byte("{}")); err != nil {
t.Fatalf("err: %s", err)
}
configFile.Close()

c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.Args = []string{"-config-file", configFile.Name()}
})
defer s.Stop()

agent := c.Agent()

// Update the config file with a service definition
config := `{"service":{"name":"redis", "port":1234}}`
err = ioutil.WriteFile(configFile.Name(), []byte(config), 0644)
if err != nil {
t.Fatalf("err: %v", err)
}

if err = agent.Reload(); err != nil {
t.Fatalf("err: %v", err)
}

services, err := agent.Services()
if err != nil {
t.Fatalf("err: %v", err)
}

service, ok := services["redis"]
if !ok {
t.Fatalf("bad: %v", ok)
}
if service.Port != 1234 {
t.Fatalf("bad: %v", service.Port)
}
}

func TestAgent_Members(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
Expand Down Expand Up @@ -545,6 +595,39 @@ func TestAgent_Join(t *testing.T) {
}
}

func TestAgent_Leave(t *testing.T) {
t.Parallel()
c1, s1 := makeClient(t)
defer s1.Stop()

c2, s2 := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.Server = false
conf.Bootstrap = false
})
defer s2.Stop()

if err := c2.Agent().Join(s1.LANAddr, false); err != nil {
t.Fatalf("err: %v", err)
}

if err := c2.Agent().Leave(); err != nil {
t.Fatalf("err: %v", err)
}

// Make sure the second agent's status is 'Left'
members, err := c1.Agent().Members(false)
if err != nil {
t.Fatalf("err: %v", err)
}
member := members[0]
if member.Name == s1.Config.NodeName {
member = members[1]
}
if member.Status != int(serf.StatusLeft) {
t.Fatalf("bad: %v", *member)
}
}

func TestAgent_ForceLeave(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
Expand Down
9 changes: 8 additions & 1 deletion command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Agent struct {
server *consul.Server
client *consul.Client

// The command used to launch this agent
command *Command
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can drop this now, right?


// state stores a local representation of the node,
// services and checks. Used for anti-entropy.
state localState
Expand Down Expand Up @@ -109,6 +112,8 @@ type Agent struct {
eventLock sync.RWMutex
eventNotify state.NotifyGroup

reloadCh chan chan error

shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
Expand All @@ -121,7 +126,8 @@ type Agent struct {

// Create is used to create a new Agent. Returns
// the agent or potentially an error.
func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) (*Agent, error) {
func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter,
reloadCh chan chan error) (*Agent, error) {
// Ensure we have a log sink
if logOutput == nil {
logOutput = os.Stderr
Expand Down Expand Up @@ -184,6 +190,7 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) (*
checkDockers: make(map[types.CheckID]*CheckDocker),
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
reloadCh: reloadCh,
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
}
Expand Down
24 changes: 24 additions & 0 deletions command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
}, nil
}

func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make these new ones require a PUT otherwise give a 405. Might as well start these off in a better spot :-)

if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}

errCh := make(chan error, 0)
s.agent.reloadCh <- errCh

return nil, <-errCh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should wait on the shutdown channel too, and return some kind of error.

}

func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
services := s.agent.state.Services()
return services, nil
Expand Down Expand Up @@ -80,6 +92,18 @@ func (s *HTTPServer) AgentJoin(resp http.ResponseWriter, req *http.Request) (int
}
}

func (s *HTTPServer) AgentLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}

if err := s.agent.Leave(); err != nil {
return nil, err
}
return nil, s.agent.Shutdown()
}

func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/")
return nil, s.agent.ForceLeave(addr)
Expand Down
120 changes: 119 additions & 1 deletion command/agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/cli"
)

func TestHTTPAgentServices(t *testing.T) {
Expand Down Expand Up @@ -119,6 +120,81 @@ func TestHTTPAgentSelf(t *testing.T) {
}
}

func TestHTTPAgentReload(t *testing.T) {
conf := nextConfig()
tmpDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tmpDir)

// Write initial config, to be reloaded later
tmpFile, err := ioutil.TempFile(tmpDir, "config")
if err != nil {
t.Fatalf("err: %s", err)
}
_, err = tmpFile.WriteString(`{"service":{"name":"redis"}}`)
if err != nil {
t.Fatalf("err: %s", err)
}
tmpFile.Close()

doneCh := make(chan struct{})
shutdownCh := make(chan struct{})

defer func() {
close(shutdownCh)
<-doneCh
}()

cmd := &Command{
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
}

args := []string{
"-server",
"-data-dir", tmpDir,
"-http-port", fmt.Sprintf("%d", conf.Ports.HTTP),
"-config-file", tmpFile.Name(),
}

go func() {
cmd.Run(args)
close(doneCh)
}()

testutil.WaitForResult(func() (bool, error) {
return len(cmd.httpServers) == 1, nil
}, func(err error) {
t.Fatalf("should have an http server")
})

if _, ok := cmd.agent.state.services["redis"]; !ok {
t.Fatalf("missing redis service")
}

err = ioutil.WriteFile(tmpFile.Name(), []byte(`{"service":{"name":"redis-reloaded"}}`), 0644)
if err != nil {
t.Fatalf("err: %v", err)
}

srv := cmd.httpServers[0]
req, err := http.NewRequest("PUT", "/v1/agent/reload", nil)
if err != nil {
t.Fatalf("err: %v", err)
}

_, err = srv.AgentReload(nil, req)
if err != nil {
t.Fatalf("Err: %v", err)
}

if _, ok := cmd.agent.state.services["redis-reloaded"]; !ok {
t.Fatalf("missing redis-reloaded service")
}
}

func TestHTTPAgentMembers(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
Expand Down Expand Up @@ -239,6 +315,49 @@ func TestHTTPAgentJoin_WAN(t *testing.T) {
})
}

func TestHTTPAgentLeave(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()

dir2, srv2 := makeHTTPServerWithConfig(t, func(c *Config) {
c.Server = false
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer srv2.Shutdown()

// Join first
addr := fmt.Sprintf("127.0.0.1:%d", srv2.agent.config.Ports.SerfLan)
_, err := srv.agent.JoinLAN([]string{addr})
if err != nil {
t.Fatalf("err: %v", err)
}

// Graceful leave now
req, err := http.NewRequest("PUT", "/v1/agent/leave", nil)
if err != nil {
t.Fatalf("err: %v", err)
}

obj, err := srv2.AgentLeave(nil, req)
if err != nil {
t.Fatalf("Err: %v", err)
}
if obj != nil {
t.Fatalf("Err: %v", obj)
}

testutil.WaitForResult(func() (bool, error) {
m := srv.agent.LANMembers()
success := m[1].Status == serf.StatusLeft
return success, errors.New(m[1].Status.String())
}, func(err error) {
t.Fatalf("member status is %v, should be left", err)
})
}

func TestHTTPAgentForceLeave(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have a reload test as well.

dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
Expand Down Expand Up @@ -1030,7 +1149,6 @@ func TestHTTPAgent_Monitor(t *testing.T) {
logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter)

dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter)
srv.agent.logWriter = logWriter
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
Expand Down
Loading