Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow forwarding of some status RPCs #6198

Merged
merged 3 commits into from
Jul 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/consul/server_serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (s *Server) maybeBootstrap() {
// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
if err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version,
"Status.Peers", server.UseTLS, &struct{}{}, &peers); err != nil {
"Status.Peers", server.UseTLS, &structs.DCSpecificRequest{Datacenter: s.config.Datacenter}, &peers); err != nil {
nextRetry := time.Duration((1 << attempt) * peerRetryBase)
s.logger.Printf("[ERR] consul: Failed to confirm peer status for %s: %v. Retrying in "+
"%v...", server.Name, err, nextRetry.String())
Expand Down
19 changes: 17 additions & 2 deletions agent/consul/status_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strconv"

"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/structs"
)

// Status endpoint is used to check on server status
Expand All @@ -18,7 +19,14 @@ func (s *Status) Ping(args struct{}, reply *struct{}) error {
}

// Leader is used to get the address of the leader
func (s *Status) Leader(args struct{}, reply *string) error {
func (s *Status) Leader(args *structs.DCSpecificRequest, reply *string) error {
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
// not using the regular forward function as it does a bunch of stuff we
// dont want like verifying consistency etc. We just want to enable DC
// forwarding
if args.Datacenter != "" && args.Datacenter != s.server.config.Datacenter {
return s.server.forwardDC("Status.Leader", args.Datacenter, args, reply)
}

leader := string(s.server.raft.Leader())
if leader != "" {
*reply = leader
Expand All @@ -29,7 +37,14 @@ func (s *Status) Leader(args struct{}, reply *string) error {
}

// Peers is used to get all the Raft peers
func (s *Status) Peers(args struct{}, reply *[]string) error {
func (s *Status) Peers(args *structs.DCSpecificRequest, reply *[]string) error {
// not using the regular forward function as it does a bunch of stuff we
// dont want like verifying consistency etc. We just want to enable DC
// forwarding
if args.Datacenter != "" && args.Datacenter != s.server.config.Datacenter {
return s.server.forwardDC("Status.Peers", args.Datacenter, args, reply)
}

future := s.server.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
Expand Down
54 changes: 54 additions & 0 deletions agent/consul/status_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"time"

"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
)

func rpcClient(t *testing.T, s *Server) rpc.ClientCodec {
Expand Down Expand Up @@ -69,6 +71,32 @@ func TestStatusLeader(t *testing.T) {
}
}

func TestStatusLeader_ForwardDC(t *testing.T) {
t.Parallel()
dir1, s1 := testServerDC(t, "primary")
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()

dir2, s2 := testServerDC(t, "secondary")
defer os.RemoveAll(dir2)
defer s2.Shutdown()

joinWAN(t, s2, s1)

testrpc.WaitForLeader(t, s1.RPC, "secondary")
testrpc.WaitForLeader(t, s2.RPC, "primary")

args := structs.DCSpecificRequest{
Datacenter: "secondary",
}

var out string
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Status.Leader", &args, &out))
require.Equal(t, s2.config.RPCAdvertise.String(), out)
}

func TestStatusPeers(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
Expand All @@ -86,3 +114,29 @@ func TestStatusPeers(t *testing.T) {
t.Fatalf("no peers: %v", peers)
}
}

func TestStatusPeers_ForwardDC(t *testing.T) {
t.Parallel()
dir1, s1 := testServerDC(t, "primary")
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()

dir2, s2 := testServerDC(t, "secondary")
defer os.RemoveAll(dir2)
defer s2.Shutdown()

joinWAN(t, s2, s1)

testrpc.WaitForLeader(t, s1.RPC, "secondary")
testrpc.WaitForLeader(t, s2.RPC, "primary")

args := structs.DCSpecificRequest{
Datacenter: "secondary",
}

var out []string
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Status.Peers", &args, &out))
require.Equal(t, []string{s2.config.RPCAdvertise.String()}, out)
}
16 changes: 14 additions & 2 deletions agent/status_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,31 @@ package agent

import (
"net/http"

"github.com/hashicorp/consul/agent/structs"
)

func (s *HTTPServer) StatusLeader(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

var out string
if err := s.agent.RPC("Status.Leader", struct{}{}, &out); err != nil {
if err := s.agent.RPC("Status.Leader", &args, &out); err != nil {
return nil, err
}
return out, nil
}

func (s *HTTPServer) StatusPeers(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

var out []string
if err := s.agent.RPC("Status.Peers", struct{}{}, &out); err != nil {
if err := s.agent.RPC("Status.Peers", &args, &out); err != nil {
return nil, err
}
return out, nil
Expand Down
75 changes: 75 additions & 0 deletions agent/status_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package agent

import (
"fmt"
"net/http"
"testing"

"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/require"
)

func TestStatusLeader(t *testing.T) {
Expand All @@ -24,6 +27,42 @@ func TestStatusLeader(t *testing.T) {
}
}

func TestStatusLeaderSecondary(t *testing.T) {
t.Parallel()
a1 := NewTestAgent(t, t.Name(), "datacenter = \"primary\"")
defer a1.Shutdown()
a2 := NewTestAgent(t, t.Name(), "datacenter = \"secondary\"")
defer a2.Shutdown()

testrpc.WaitForTestAgent(t, a1.RPC, "primary")
testrpc.WaitForTestAgent(t, a2.RPC, "secondary")

a1SerfAddr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN)
a1Addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.ServerPort)
a2Addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.ServerPort)
_, err := a2.JoinWAN([]string{a1SerfAddr})
require.NoError(t, err)

retry.Run(t, func(r *retry.R) {
require.Len(r, a1.WANMembers(), 2)
require.Len(r, a2.WANMembers(), 2)
})

req, _ := http.NewRequest("GET", "/v1/status/leader?dc=secondary", nil)
obj, err := a1.srv.StatusLeader(nil, req)
require.NoError(t, err)
leader, ok := obj.(string)
require.True(t, ok)
require.Equal(t, a2Addr, leader)

req, _ = http.NewRequest("GET", "/v1/status/leader?dc=primary", nil)
obj, err = a2.srv.StatusLeader(nil, req)
require.NoError(t, err)
leader, ok = obj.(string)
require.True(t, ok)
require.Equal(t, a1Addr, leader)
}

func TestStatusPeers(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
Expand All @@ -40,3 +79,39 @@ func TestStatusPeers(t *testing.T) {
t.Fatalf("bad peers: %v", peers)
}
}

func TestStatusPeersSecondary(t *testing.T) {
t.Parallel()
a1 := NewTestAgent(t, t.Name(), "datacenter = \"primary\"")
defer a1.Shutdown()
a2 := NewTestAgent(t, t.Name(), "datacenter = \"secondary\"")
defer a2.Shutdown()

testrpc.WaitForTestAgent(t, a1.RPC, "primary")
testrpc.WaitForTestAgent(t, a2.RPC, "secondary")

a1SerfAddr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN)
a1Addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.ServerPort)
a2Addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.ServerPort)
_, err := a2.JoinWAN([]string{a1SerfAddr})
require.NoError(t, err)

retry.Run(t, func(r *retry.R) {
require.Len(r, a1.WANMembers(), 2)
require.Len(r, a2.WANMembers(), 2)
})

req, _ := http.NewRequest("GET", "/v1/status/peers?dc=secondary", nil)
obj, err := a1.srv.StatusPeers(nil, req)
require.NoError(t, err)
peers, ok := obj.([]string)
require.True(t, ok)
require.Equal(t, []string{a2Addr}, peers)

req, _ = http.NewRequest("GET", "/v1/status/peers?dc=primary", nil)
obj, err = a2.srv.StatusPeers(nil, req)
require.NoError(t, err)
peers, ok = obj.([]string)
require.True(t, ok)
require.Equal(t, []string{a1Addr}, peers)
}
12 changes: 12 additions & 0 deletions website/source/api/status.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ The table below shows this endpoint's support for
| ---------------- | ----------------- | ------------- | ------------ |
| `NO` | `none` | `none` | `none` |

### Parameters

- `dc` `(string: "")` - Specifies the datacenter to query. This will default to
the datacenter of the agent being queried. This is specified as part of the
URL as a query parameter.

### Sample Request

```text
Expand Down Expand Up @@ -65,6 +71,12 @@ The table below shows this endpoint's support for
| ---------------- | ----------------- | ------------- | ------------ |
| `NO` | `none` | `none` | `none` |

### Parameters

- `dc` `(string: "")` - Specifies the datacenter to query. This will default to
the datacenter of the agent being queried. This is specified as part of the
URL as a query parameter.

### Sample Request

```text
Expand Down