Skip to content

Commit

Permalink
Merge pull request #1331 from slackpad/f-network-tomography
Browse files Browse the repository at this point in the history
Adds network tomography features to Consul.
  • Loading branch information
slackpad committed Oct 23, 2015
2 parents 1585913 + ecd3a1d commit 798d49c
Show file tree
Hide file tree
Showing 55 changed files with 4,516 additions and 103 deletions.
9 changes: 9 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ type QueryOptions struct {
// Token is used to provide a per-request ACL token
// which overrides the agent's default token.
Token string

// Near is used to provide a node name that will sort the results
// in ascending order based on the estimated round trip time from
// that node. Setting this to "_agent" will use the agent's node
// for the sort.
Near string
}

// WriteOptions are used to parameterize a write
Expand Down Expand Up @@ -250,6 +256,9 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.Token != "" {
r.params.Set("token", q.Token)
}
if q.Near != "" {
r.params.Set("near", q.Near)
}
}

// durToMsec converts a duration to a millisecond specified string
Expand Down
4 changes: 4 additions & 0 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func TestSetQueryOptions(t *testing.T) {
WaitIndex: 1000,
WaitTime: 100 * time.Second,
Token: "12345",
Near: "nodex",
}
r.setQueryOptions(q)

Expand All @@ -148,6 +149,9 @@ func TestSetQueryOptions(t *testing.T) {
if r.params.Get("token") != "12345" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("near") != "nodex" {
t.Fatalf("bad: %v", r.params)
}
}

func TestSetWriteOptions(t *testing.T) {
Expand Down
66 changes: 66 additions & 0 deletions api/coordinate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package api

import (
"github.com/hashicorp/serf/coordinate"
)

// CoordinateEntry represents a node and its associated network coordinate.
type CoordinateEntry struct {
Node string
Coord *coordinate.Coordinate
}

// CoordinateDatacenterMap represents a datacenter and its associated WAN
// nodes and their associates coordinates.
type CoordinateDatacenterMap struct {
Datacenter string
Coordinates []CoordinateEntry
}

// Coordinate can be used to query the coordinate endpoints
type Coordinate struct {
c *Client
}

// Coordinate returns a handle to the coordinate endpoints
func (c *Client) Coordinate() *Coordinate {
return &Coordinate{c}
}

// Datacenters is used to return the coordinates of all the servers in the WAN
// pool.
func (c *Coordinate) Datacenters() ([]*CoordinateDatacenterMap, error) {
r := c.c.newRequest("GET", "/v1/coordinate/datacenters")
_, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()

var out []*CoordinateDatacenterMap
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}

// Nodes is used to return the coordinates of all the nodes in the LAN pool.
func (c *Coordinate) Nodes(q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) {
r := c.c.newRequest("GET", "/v1/coordinate/nodes")
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()

qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt

var out []*CoordinateEntry
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}
54 changes: 54 additions & 0 deletions api/coordinate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package api

import (
"fmt"
"testing"

"github.com/hashicorp/consul/testutil"
)

func TestCoordinate_Datacenters(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

coordinate := c.Coordinate()

testutil.WaitForResult(func() (bool, error) {
datacenters, err := coordinate.Datacenters()
if err != nil {
return false, err
}

if len(datacenters) == 0 {
return false, fmt.Errorf("Bad: %v", datacenters)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

func TestCoordinate_Nodes(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

coordinate := c.Coordinate()

testutil.WaitForResult(func() (bool, error) {
_, _, err := coordinate.Nodes(nil)
if err != nil {
return false, err
}

// There's not a good way to populate coordinates without
// waiting for them to calculate and update, so the best
// we can do is call the endpoint and make sure we don't
// get an error.
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
74 changes: 74 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)

Expand Down Expand Up @@ -191,6 +192,11 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
// Start handling events
go agent.handleEvents()

// Start sending network coordinate to the server.
if !config.DisableCoordinates {
go agent.sendCoordinate()
}

// Write out the PID file if necessary
err = agent.storePid()
if err != nil {
Expand Down Expand Up @@ -539,6 +545,22 @@ func (a *Agent) WANMembers() []serf.Member {
}
}

// CanServersUnderstandProtocol checks to see if all the servers understand the
// given protocol version.
func (a *Agent) CanServersUnderstandProtocol(version uint8) bool {
numServers, numWhoGrok := 0, 0
members := a.LANMembers()
for _, member := range members {
if member.Tags["role"] == "consul" {
numServers++
if member.ProtocolMax >= version {
numWhoGrok++
}
}
}
return (numServers > 0) && (numWhoGrok == numServers)
}

// StartSync is called once Services and Checks are registered.
// This is called to prevent a race between clients and the anti-entropy routines
func (a *Agent) StartSync() {
Expand All @@ -556,6 +578,58 @@ func (a *Agent) ResumeSync() {
a.state.Resume()
}

// Returns the coordinate of this node in the local pool (assumes coordinates
// are enabled, so check that before calling).
func (a *Agent) GetCoordinate() (*coordinate.Coordinate, error) {
if a.config.Server {
return a.server.GetLANCoordinate()
} else {
return a.client.GetCoordinate()
}
}

// sendCoordinate is a long-running loop that periodically sends our coordinate
// to the server. Closing the agent's shutdownChannel will cause this to exit.
func (a *Agent) sendCoordinate() {
for {
rate := a.config.SyncCoordinateRateTarget
min := a.config.SyncCoordinateIntervalMin
intv := rateScaledInterval(rate, min, len(a.LANMembers()))
intv = intv + randomStagger(intv)

select {
case <-time.After(intv):
if !a.CanServersUnderstandProtocol(3) {
continue
}

var c *coordinate.Coordinate
var err error
if c, err = a.GetCoordinate(); err != nil {
a.logger.Printf("[ERR] agent: failed to get coordinate: %s", err)
continue
}

// TODO - Consider adding a distance check so we don't send
// an update if the position hasn't changed by more than a
// threshold.
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Coord: c,
WriteRequest: structs.WriteRequest{Token: a.config.ACLToken},
}
var reply struct{}
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
a.logger.Printf("[ERR] agent: coordinate update error: %s", err)
continue
}
case <-a.shutdownCh:
return
}
}
}

// persistService saves a service definition to a JSON file in the data dir
func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
Expand Down
11 changes: 11 additions & 0 deletions command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent
import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"net/http"
"strconv"
Expand All @@ -11,12 +12,22 @@ import (

type AgentSelf struct {
Config *Config
Coord *coordinate.Coordinate
Member serf.Member
}

func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var c *coordinate.Coordinate
if !s.agent.config.DisableCoordinates {
var err error
if c, err = s.agent.GetCoordinate(); err != nil {
return nil, err
}
}

return AgentSelf{
Config: s.agent.config,
Coord: c,
Member: s.agent.LocalMember(),
}, nil
}
Expand Down
21 changes: 20 additions & 1 deletion command/agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -81,7 +82,7 @@ func TestHTTPAgentSelf(t *testing.T) {

obj, err := srv.AgentSelf(nil, req)
if err != nil {
t.Fatalf("Err: %v", err)
t.Fatalf("err: %v", err)
}

val := obj.(AgentSelf)
Expand All @@ -92,6 +93,24 @@ func TestHTTPAgentSelf(t *testing.T) {
if int(val.Config.Ports.SerfLan) != srv.agent.config.Ports.SerfLan {
t.Fatalf("incorrect port: %v", obj)
}

c, err := srv.agent.server.GetLANCoordinate()
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(c, val.Coord) {
t.Fatalf("coordinates are not equal: %v != %v", c, val.Coord)
}

srv.agent.config.DisableCoordinates = true
obj, err = srv.AgentSelf(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
val = obj.(AgentSelf)
if val.Coord != nil {
t.Fatalf("should have been nil: %v", val.Coord)
}
}

func TestHTTPAgentMembers(t *testing.T) {
Expand Down
50 changes: 50 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func nextConfig() *Config {
cons.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
cons.RaftConfig.ElectionTimeout = 40 * time.Millisecond

cons.DisableCoordinates = false
cons.CoordinateUpdatePeriod = 100 * time.Millisecond
return conf
}

Expand Down Expand Up @@ -1579,3 +1581,51 @@ func TestAgent_purgeCheckState(t *testing.T) {
t.Fatalf("should have removed file")
}
}

func TestAgent_GetCoordinate(t *testing.T) {
check := func(server bool) {
config := nextConfig()
config.Server = server
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
defer agent.Shutdown()

// This doesn't verify the returned coordinate, but it makes
// sure that the agent chooses the correct Serf instance,
// depending on how it's configured as a client or a server.
// If it chooses the wrong one, this will crash.
if _, err := agent.GetCoordinate(); err != nil {
t.Fatalf("err: %s", err)
}
}

check(true)
check(false)
}

func TestAgent_CanServersUnderstandProtocol(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
defer agent.Shutdown()

min := uint8(consul.ProtocolVersionMin)
if !agent.CanServersUnderstandProtocol(min) {
t.Fatalf("should grok %d", min)
}

max := uint8(consul.ProtocolVersionMax)
if !agent.CanServersUnderstandProtocol(max) {
t.Fatalf("should grok %d", max)
}

current := uint8(config.Protocol)
if !agent.CanServersUnderstandProtocol(current) {
t.Fatalf("should grok %d", current)
}

future := max + 1
if agent.CanServersUnderstandProtocol(future) {
t.Fatalf("should not grok %d", future)
}
}
Loading

0 comments on commit 798d49c

Please sign in to comment.