Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds network tomography features to Consul. #1331

Merged
merged 59 commits into from
Oct 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
13f6cd3
Start adding stuff
derekchiang Mar 28, 2015
a9ea503
Adding tests and stuff
derekchiang Apr 9, 2015
b87f122
Change GET API a little bit
derekchiang Apr 9, 2015
9113b26
Some fixes
derekchiang Apr 13, 2015
8a0bb40
Fix tests
derekchiang Apr 13, 2015
98d87b5
Complete logic for sending coordinates
derekchiang Apr 15, 2015
a338e5e
Fix a comment
derekchiang Apr 16, 2015
a1854a7
Some fixes
derekchiang Apr 16, 2015
d8dd8d7
Improve a test
derekchiang Apr 16, 2015
bf5cb75
Use IndexedCoordinate instead
derekchiang Apr 18, 2015
ab9262c
Add a test case
derekchiang Apr 19, 2015
f144d17
Address comments
derekchiang Apr 29, 2015
f657caf
Add more tests
derekchiang Apr 29, 2015
f8d8109
Add state store tests
derekchiang Apr 29, 2015
54a2815
Address comments
derekchiang May 8, 2015
6900331
Fix tests
derekchiang May 14, 2015
1929b1d
Add an endpoint for getting WAN coordinates
derekchiang May 14, 2015
2f11db1
Add test for the GetWAN endpoint
derekchiang May 14, 2015
7255ddd
Address comments
derekchiang May 14, 2015
01d2452
Merges config changes after rebase.
Jun 4, 2015
86b112f
Does a clean up pass on the Consul side.
Jun 6, 2015
9256784
Removes one more WAN leftover.
Jun 19, 2015
b9d5fb0
Flips the sense of the coordinate enable option.
Jun 20, 2015
d12aa2f
Moves batching down into the state store and changes it to fail-fast.
Jun 23, 2015
2cee9f7
Takes the node name out of the coordinate get call.
Jun 27, 2015
7e6d521
Hardens Consul from bad coordinates from other nodes.
Jun 27, 2015
5f754c4
Does some small cleanups based on PR feedback.
Jun 29, 2015
e094f5a
Adds snapshot save and restore of coordinates.
Jun 30, 2015
66a3d29
Simplifies the batching function and adds some comments.
Jun 30, 2015
ad65d95
Scales coordinate sends to hit a fixed aggregate rate across the clus…
Jun 30, 2015
d734697
Turns down the coordinate sync rate a bit more.
Jul 1, 2015
89c7203
Adds coordinate sorting support to catalog queries for nodes and serv…
Jun 30, 2015
36c78f5
Adds sort of DCs in catalog queries based on RTT. Cleans up.
Jul 2, 2015
54ef97b
Adds tests for HTTP interface. Removes a stray mark.
Jul 3, 2015
78b2c2d
Moves disable checks down into the sort routine.
Jul 24, 2015
6d845c7
Adds explicit check for empty node in source parameter.
Jul 24, 2015
e47eea3
Adds a magic "self" node name to distance queries.
Jul 24, 2015
497f678
Switches to the median over all DC nodes with known coordinates.
Jul 24, 2015
9caa5b3
Adds distance sorting to health endpoint. Cleans up unit tests.
Jul 27, 2015
9c91957
Changes ?near=self to a safer ?near=_agent, which is also clearer abo…
Jul 28, 2015
8436131
Fixes bad name for DC forwarding of Coordinate.Get.
Jul 28, 2015
33e3505
Adds endpoints for raw network coordinates.
Jul 29, 2015
ce0e975
Fixes config merge fn for disabling coordinates and adds it to JSON.
Jul 30, 2015
d45fc23
Installs a friendly handler for coordinate endpoints when coordinates…
Jul 30, 2015
6289764
Moves sorting up into coordinate endpoint HTTP handlers.
Jul 30, 2015
f9da231
Adds coordinate of agent to self endpoint.
Jul 30, 2015
e8322ff
Adds a test for the `DisableCoordinate` config.
Jul 31, 2015
e21b450
Runs go fmt after latest rebase.
Oct 15, 2015
384d996
Cleans up after latest rebase.
Oct 15, 2015
80d5a30
Zeroes out the height when testing exact distances.
Oct 15, 2015
660da92
Makes the default protocol 2 and lets 3 interoperate with 2.
Oct 16, 2015
787f946
Adds support for coordinates to client API.
Oct 16, 2015
fb89001
Adds a "consul rtt" command.
Oct 16, 2015
88845f7
Cleans up after code review, adds a -short option to "consul rtt" com…
Oct 16, 2015
99cfbb8
Defaults second node to agent if not given. Removes -short option and…
Oct 16, 2015
439110f
Gives RTT class a more Go-like name.
Oct 22, 2015
de01f96
Fixes configs now that Serf always caches coordinates.
Oct 22, 2015
cef9402
Updates docs for network coordinates.
Oct 22, 2015
ecd3a1d
Completes rebase of network coordinates to new memdb.
Oct 23, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ type QueryOptions struct {
// Token is used to provide a per-request ACL token
// which overrides the agent's default token.
Token string

// Near is used to provide a node name that will sort the results
// in ascending order based on the estimated round trip time from
// that node. Setting this to "_agent" will use the agent's node
// for the sort.
Near string
}

// WriteOptions are used to parameterize a write
Expand Down Expand Up @@ -250,6 +256,9 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.Token != "" {
r.params.Set("token", q.Token)
}
if q.Near != "" {
r.params.Set("near", q.Near)
}
}

// durToMsec converts a duration to a millisecond specified string
Expand Down
4 changes: 4 additions & 0 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func TestSetQueryOptions(t *testing.T) {
WaitIndex: 1000,
WaitTime: 100 * time.Second,
Token: "12345",
Near: "nodex",
}
r.setQueryOptions(q)

Expand All @@ -148,6 +149,9 @@ func TestSetQueryOptions(t *testing.T) {
if r.params.Get("token") != "12345" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("near") != "nodex" {
t.Fatalf("bad: %v", r.params)
}
}

func TestSetWriteOptions(t *testing.T) {
Expand Down
66 changes: 66 additions & 0 deletions api/coordinate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package api

import (
"github.com/hashicorp/serf/coordinate"
)

// CoordinateEntry represents a node and its associated network coordinate.
type CoordinateEntry struct {
Node string
Coord *coordinate.Coordinate
}

// CoordinateDatacenterMap represents a datacenter and its associated WAN
// nodes and their associates coordinates.
type CoordinateDatacenterMap struct {
Datacenter string
Coordinates []CoordinateEntry
}

// Coordinate can be used to query the coordinate endpoints
type Coordinate struct {
c *Client
}

// Coordinate returns a handle to the coordinate endpoints
func (c *Client) Coordinate() *Coordinate {
return &Coordinate{c}
}

// Datacenters is used to return the coordinates of all the servers in the WAN
// pool.
func (c *Coordinate) Datacenters() ([]*CoordinateDatacenterMap, error) {
r := c.c.newRequest("GET", "/v1/coordinate/datacenters")
_, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()

var out []*CoordinateDatacenterMap
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}

// Nodes is used to return the coordinates of all the nodes in the LAN pool.
func (c *Coordinate) Nodes(q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) {
r := c.c.newRequest("GET", "/v1/coordinate/nodes")
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()

qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt

var out []*CoordinateEntry
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}
54 changes: 54 additions & 0 deletions api/coordinate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package api

import (
"fmt"
"testing"

"github.com/hashicorp/consul/testutil"
)

func TestCoordinate_Datacenters(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

coordinate := c.Coordinate()

testutil.WaitForResult(func() (bool, error) {
datacenters, err := coordinate.Datacenters()
if err != nil {
return false, err
}

if len(datacenters) == 0 {
return false, fmt.Errorf("Bad: %v", datacenters)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

func TestCoordinate_Nodes(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

coordinate := c.Coordinate()

testutil.WaitForResult(func() (bool, error) {
_, _, err := coordinate.Nodes(nil)
if err != nil {
return false, err
}

// There's not a good way to populate coordinates without
// waiting for them to calculate and update, so the best
// we can do is call the endpoint and make sure we don't
// get an error.
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
74 changes: 74 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)

Expand Down Expand Up @@ -191,6 +192,11 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
// Start handling events
go agent.handleEvents()

// Start sending network coordinate to the server.
if !config.DisableCoordinates {
go agent.sendCoordinate()
}

// Write out the PID file if necessary
err = agent.storePid()
if err != nil {
Expand Down Expand Up @@ -539,6 +545,22 @@ func (a *Agent) WANMembers() []serf.Member {
}
}

// CanServersUnderstandProtocol checks to see if all the servers understand the
// given protocol version.
func (a *Agent) CanServersUnderstandProtocol(version uint8) bool {
numServers, numWhoGrok := 0, 0
members := a.LANMembers()
for _, member := range members {
if member.Tags["role"] == "consul" {
numServers++
if member.ProtocolMax >= version {
numWhoGrok++
}
}
}
return (numServers > 0) && (numWhoGrok == numServers)
}

// StartSync is called once Services and Checks are registered.
// This is called to prevent a race between clients and the anti-entropy routines
func (a *Agent) StartSync() {
Expand All @@ -556,6 +578,58 @@ func (a *Agent) ResumeSync() {
a.state.Resume()
}

// Returns the coordinate of this node in the local pool (assumes coordinates
// are enabled, so check that before calling).
func (a *Agent) GetCoordinate() (*coordinate.Coordinate, error) {
if a.config.Server {
return a.server.GetLANCoordinate()
} else {
return a.client.GetCoordinate()
}
}

// sendCoordinate is a long-running loop that periodically sends our coordinate
// to the server. Closing the agent's shutdownChannel will cause this to exit.
func (a *Agent) sendCoordinate() {
for {
rate := a.config.SyncCoordinateRateTarget
min := a.config.SyncCoordinateIntervalMin
intv := rateScaledInterval(rate, min, len(a.LANMembers()))
intv = intv + randomStagger(intv)

select {
case <-time.After(intv):
if !a.CanServersUnderstandProtocol(3) {
continue
}

var c *coordinate.Coordinate
var err error
if c, err = a.GetCoordinate(); err != nil {
a.logger.Printf("[ERR] agent: failed to get coordinate: %s", err)
continue
}

// TODO - Consider adding a distance check so we don't send
// an update if the position hasn't changed by more than a
// threshold.
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Coord: c,
WriteRequest: structs.WriteRequest{Token: a.config.ACLToken},
}
var reply struct{}
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
a.logger.Printf("[ERR] agent: coordinate update error: %s", err)
continue
}
case <-a.shutdownCh:
return
}
}
}

// persistService saves a service definition to a JSON file in the data dir
func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
Expand Down
11 changes: 11 additions & 0 deletions command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent
import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"net/http"
"strconv"
Expand All @@ -11,12 +12,22 @@ import (

type AgentSelf struct {
Config *Config
Coord *coordinate.Coordinate
Member serf.Member
}

func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var c *coordinate.Coordinate
if !s.agent.config.DisableCoordinates {
var err error
if c, err = s.agent.GetCoordinate(); err != nil {
return nil, err
}
}

return AgentSelf{
Config: s.agent.config,
Coord: c,
Member: s.agent.LocalMember(),
}, nil
}
Expand Down
21 changes: 20 additions & 1 deletion command/agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -81,7 +82,7 @@ func TestHTTPAgentSelf(t *testing.T) {

obj, err := srv.AgentSelf(nil, req)
if err != nil {
t.Fatalf("Err: %v", err)
t.Fatalf("err: %v", err)
}

val := obj.(AgentSelf)
Expand All @@ -92,6 +93,24 @@ func TestHTTPAgentSelf(t *testing.T) {
if int(val.Config.Ports.SerfLan) != srv.agent.config.Ports.SerfLan {
t.Fatalf("incorrect port: %v", obj)
}

c, err := srv.agent.server.GetLANCoordinate()
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(c, val.Coord) {
t.Fatalf("coordinates are not equal: %v != %v", c, val.Coord)
}

srv.agent.config.DisableCoordinates = true
obj, err = srv.AgentSelf(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
val = obj.(AgentSelf)
if val.Coord != nil {
t.Fatalf("should have been nil: %v", val.Coord)
}
}

func TestHTTPAgentMembers(t *testing.T) {
Expand Down
50 changes: 50 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func nextConfig() *Config {
cons.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
cons.RaftConfig.ElectionTimeout = 40 * time.Millisecond

cons.DisableCoordinates = false
cons.CoordinateUpdatePeriod = 100 * time.Millisecond
return conf
}

Expand Down Expand Up @@ -1579,3 +1581,51 @@ func TestAgent_purgeCheckState(t *testing.T) {
t.Fatalf("should have removed file")
}
}

func TestAgent_GetCoordinate(t *testing.T) {
check := func(server bool) {
config := nextConfig()
config.Server = server
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
defer agent.Shutdown()

// This doesn't verify the returned coordinate, but it makes
// sure that the agent chooses the correct Serf instance,
// depending on how it's configured as a client or a server.
// If it chooses the wrong one, this will crash.
if _, err := agent.GetCoordinate(); err != nil {
t.Fatalf("err: %s", err)
}
}

check(true)
check(false)
}

func TestAgent_CanServersUnderstandProtocol(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
defer agent.Shutdown()

min := uint8(consul.ProtocolVersionMin)
if !agent.CanServersUnderstandProtocol(min) {
t.Fatalf("should grok %d", min)
}

max := uint8(consul.ProtocolVersionMax)
if !agent.CanServersUnderstandProtocol(max) {
t.Fatalf("should grok %d", max)
}

current := uint8(config.Protocol)
if !agent.CanServersUnderstandProtocol(current) {
t.Fatalf("should grok %d", current)
}

future := max + 1
if agent.CanServersUnderstandProtocol(future) {
t.Fatalf("should not grok %d", future)
}
}
Loading