Skip to content

Commit

Permalink
Add CLI/API endpoints for removing peer by ID
Browse files Browse the repository at this point in the history
  • Loading branch information
kyhavlov committed Mar 30, 2017
1 parent 73f0e6f commit da9c825
Show file tree
Hide file tree
Showing 12 changed files with 313 additions and 29 deletions.
19 changes: 17 additions & 2 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,6 @@ func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) err
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
r.setWriteOptions(q)

// TODO (slackpad) Currently we made address a query parameter. Once
// IDs are in place this will be DELETE /v1/operator/raft/peer/<id>.
r.params.Set("address", string(address))

_, resp, err := requireOK(op.c.doRequest(r))
Expand All @@ -240,6 +238,23 @@ func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) err
return nil
}

// RaftRemovePeerByID is used to kick a stale peer (one that it in the Raft
// quorum but no longer known to Serf or the catalog) by ID.
func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
r.setWriteOptions(q)

r.params.Set("id", string(id))

_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return err
}

resp.Body.Close()
return nil
}

// KeyringInstall is used to install a new gossip encryption key into the cluster
func (op *Operator) KeyringInstall(key string, q *WriteOptions) error {
r := op.c.newRequest("POST", "/v1/operator/keyring")
Expand Down
27 changes: 22 additions & 5 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,40 @@ func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Reques
return nil, nil
}

var args structs.RaftPeerByAddressRequest
var args structs.RaftRemovePeerRequest
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)

params := req.URL.Query()
if _, ok := params["address"]; ok {
_, hasID := params["id"]
if hasID {
args.ID = raft.ServerID(params.Get("id"))
}
_, hasAddress := params["address"]
if hasAddress {
args.Address = raft.ServerAddress(params.Get("address"))
} else {
}

if !hasID && !hasAddress {
resp.WriteHeader(http.StatusBadRequest)
resp.Write([]byte("Must specify either ?id with the server's ID or ?address with IP:port of peer to remove"))
return nil, nil
}
if hasID && hasAddress {
resp.WriteHeader(http.StatusBadRequest)
resp.Write([]byte("Must specify ?address with IP:port of peer to remove"))
resp.Write([]byte("Must specify only one of ?id or ?address"))
return nil, nil
}

var reply struct{}
if err := s.agent.RPC("Operator.RaftRemovePeerByAddress", &args, &reply); err != nil {
method := "Operator.RaftRemovePeerByID"
if hasAddress {
method = "Operator.RaftRemovePeerByAddress"
}
if err := s.agent.RPC(method, &args, &reply); err != nil {
return nil, err
}

return nil, nil
}

Expand Down
17 changes: 17 additions & 0 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,23 @@ func TestOperator_RaftPeer(t *testing.T) {
t.Fatalf("err: %v", err)
}
})

httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("DELETE", "/v1/operator/raft/peer?id=nope", body)
if err != nil {
t.Fatalf("err: %v", err)
}

// If we get this error, it proves we sent the ID all the
// way through.
resp := httptest.NewRecorder()
_, err = srv.OperatorRaftPeer(resp, req)
if err == nil || !strings.Contains(err.Error(),
"id \"nope\" was not found in the Raft configuration") {
t.Fatalf("err: %v", err)
}
})
}

func TestOperator_KeyringInstall(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion command/operator_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *OperatorRaftCommand) raft(args []string) error {
}
c.Ui.Output(result)
} else if removePeer {
if err := raftRemovePeers(address, operator); err != nil {
if err := raftRemovePeers(address, "", operator); err != nil {
return fmt.Errorf("Error removing peer: %v", err)
}
c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address))
Expand Down
33 changes: 23 additions & 10 deletions command/operator_raft_remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ func (c *OperatorRaftRemoveCommand) Synopsis() string {
func (c *OperatorRaftRemoveCommand) Run(args []string) int {
f := c.Command.NewFlagSet(c)

var address string
var address, id string
f.StringVar(&address, "address", "",
"The address to remove from the Raft configuration.")
f.StringVar(&id, "id", "",
"The ID to remove from the Raft configuration.")

if err := c.Command.Parse(args); err != nil {
if err == flag.ErrHelp {
Expand All @@ -58,25 +60,36 @@ func (c *OperatorRaftRemoveCommand) Run(args []string) int {
}

// Fetch the current configuration.
if err := raftRemovePeers(address, client.Operator()); err != nil {
if err := raftRemovePeers(address, id, client.Operator()); err != nil {
c.Ui.Error(fmt.Sprintf("Error removing peer: %v", err))
return 1
}
c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address))
if address != "" {
c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address))
} else {
c.Ui.Output(fmt.Sprintf("Removed peer with id %q", id))
}

return 0
}

func raftRemovePeers(address string, operator *api.Operator) error {
// TODO (slackpad) Once we expose IDs, add support for removing
// by ID, add support for that.
if len(address) == 0 {
return fmt.Errorf("an address is required for the peer to remove")
func raftRemovePeers(address, id string, operator *api.Operator) error {
if len(address) == 0 && len(id) == 0 {
return fmt.Errorf("an address or id is required for the peer to remove")
}
if len(address) > 0 && len(id) > 0 {
return fmt.Errorf("cannot give both an address and id")
}

// Try to kick the peer.
if err := operator.RaftRemovePeerByAddress(address, nil); err != nil {
return err
if len(address) > 0 {
if err := operator.RaftRemovePeerByAddress(address, nil); err != nil {
return err
}
} else {
if err := operator.RaftRemovePeerByID(id, nil); err != nil {
return err
}
}

return nil
Expand Down
23 changes: 23 additions & 0 deletions command/operator_raft_remove_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,27 @@ func TestOperator_Raft_RemovePeer(t *testing.T) {
t.Fatalf("bad: %s", output)
}
}

// Test the remove-peer subcommand with -id
{
ui := new(cli.MockUi)
c := OperatorRaftRemoveCommand{
Command: base.Command{
Ui: ui,
Flags: base.FlagSetHTTP,
},
}
args := []string{"-http-addr=" + a1.httpAddr, "-id=nope"}

code := c.Run(args)
if code != 1 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}

// If we get this error, it proves we sent the address all they through.
output := strings.TrimSpace(ui.ErrorWriter.String())
if !strings.Contains(output, "id \"nope\" was not found in the Raft configuration") {
t.Fatalf("bad: %s", output)
}
}
}
2 changes: 1 addition & 1 deletion consul/autopilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
// Add s4 to peers directly
s4addr := fmt.Sprintf("127.0.0.1:%d",
s4.config.SerfLANConfig.MemberlistConfig.BindPort)
s1.raft.AddVoter(raft.ServerID(s4.config.NodeID), raft.ServerAddress(s4addr),0, 0)
s1.raft.AddVoter(raft.ServerID(s4.config.NodeID), raft.ServerAddress(s4addr), 0, 0)

// Verify we have 4 peers
peers, err := s1.numPeers()
Expand Down
82 changes: 80 additions & 2 deletions consul/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply
// quorum but no longer known to Serf or the catalog) by address in the form of
// "IP:port". The reply argument is not used, but it required to fulfill the RPC
// interface.
func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftPeerByAddressRequest, reply *struct{}) error {
func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done {
return err
}
Expand All @@ -99,6 +99,7 @@ func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftPeerByAddressReque
}
for _, s := range future.Configuration().Servers {
if s.Address == args.Address {
args.ID = s.ID
goto REMOVE
}
}
Expand All @@ -115,7 +116,17 @@ REMOVE:
// doing if you are calling this. If you remove a peer that's known to
// Serf, for example, it will come back when the leader does a reconcile
// pass.
future := op.srv.raft.RemovePeer(args.Address)
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
if err != nil {
return err
}

var future raft.Future
if minRaftProtocol >= 2 {
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
} else {
future = op.srv.raft.RemovePeer(args.Address)
}
if err := future.Error(); err != nil {
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer %q: %v",
args.Address, err)
Expand All @@ -126,6 +137,73 @@ REMOVE:
return nil
}

// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft
// quorum but no longer known to Serf or the catalog) by address in the form of
// "IP:port". The reply argument is not used, but is required to fulfill the RPC
// interface.
func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done {
return err
}

// This is a super dangerous operation that requires operator write
// access.
acl, err := op.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && !acl.OperatorWrite() {
return permissionDeniedErr
}

// Since this is an operation designed for humans to use, we will return
// an error if the supplied id isn't among the peers since it's
// likely they screwed up.
{
future := op.srv.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
for _, s := range future.Configuration().Servers {
if s.ID == args.ID {
args.Address = s.Address
goto REMOVE
}
}
return fmt.Errorf("id %q was not found in the Raft configuration",
args.ID)
}

REMOVE:
// The Raft library itself will prevent various forms of foot-shooting,
// like making a configuration with no voters. Some consideration was
// given here to adding more checks, but it was decided to make this as
// low-level and direct as possible. We've got ACL coverage to lock this
// down, and if you are an operator, it's assumed you know what you are
// doing if you are calling this. If you remove a peer that's known to
// Serf, for example, it will come back when the leader does a reconcile
// pass.
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
if err != nil {
return err
}

var future raft.Future
if minRaftProtocol >= 2 {
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
} else {
future = op.srv.raft.RemovePeer(args.Address)
}
if err := future.Error(); err != nil {
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer with id %q: %v",
args.ID, err)
return err
}

op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer with id %q", args.ID)
return nil
}

// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
Expand Down
Loading

0 comments on commit da9c825

Please sign in to comment.