Skip to content

Commit

Permalink
Cluster: Share node HTTP address with peers
Browse files Browse the repository at this point in the history
Allow each node to share its HTTP address with the rest of the
cluster using node metadata that is broadcast by the memberlist library.

Previously, the cluster was communicating over HTTP using the peer names
(as determined automatically by their hostnames) and a hardcoded port
number.

Adds additional `*-advertise-addr` commandline-flags which are necessary
for other cluster members to know what address to contact peers on,
e.g. when binding to all interfaces or when using NAT.

For simplicity, use `localhost` as the default host for all address
flags, which provides a secure default. Some clustered data stores, such
as Hashicorp's Consul (which uses the memberlist library) will try to
detect the first available private IP when no host in specified but I
found that too magic and prefer to make this configuration explicit.

The memberlist library itself will try to determine the first available
private IP to advertise to peers if no host is specified; I have coded
defensively to prevent that from occurring as I don't wish for that
behaviour to bleed into AthensDB by accident. Users must specify a
hostname or IP address to advertise to other members of the cluster.

The Kingpin flag library resolves the host in the `host:port` address
pairs to IP addresses (as `net.TCPAddr`) and passes `tcp` as the
procotol to `net.ResolveTCPAddr()`, meaning that it will try to resolve
to the first available IPv4 or IPv6 address as determined by the Go's
default DNS resolver.
  • Loading branch information
mattbostock committed Jul 22, 2017
1 parent 56703ae commit 089d9fb
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 35 deletions.
7 changes: 4 additions & 3 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -94,10 +95,10 @@ func TestRemoteWriteThenQueryBack(t *testing.T) {

func TestMain(m *testing.M) {
// Use localhost to avoid firewall warnings when running tests under OS X.
config.listenAddr = "localhost:9080"
config.peerAddr = "localhost:7946"
config.httpBindAddr, _ = net.ResolveTCPAddr("tcp", defaultHTTPAddr)
config.peerBindAddr, _ = net.ResolveTCPAddr("tcp", defaultPeerAddr)

httpBaseURL = fmt.Sprintf("http://%s", config.listenAddr)
httpBaseURL = fmt.Sprintf("http://%s", config.httpBindAddr)
go main()

err := waitForServer(httpBaseURL)
Expand Down
57 changes: 40 additions & 17 deletions internal/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,42 @@
package cluster

import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"strconv"

"github.com/hashicorp/memberlist"
)

var c = struct {
c *Config
d *delegate
ml *memberlist.Memberlist
}{
d: &delegate{},
}

type Config struct {
AdvertiseAddr string
BindAddr string
Peers []string
HTTPAdvertiseAddr net.TCPAddr
HTTPBindAddr net.TCPAddr
PeerAdvertiseAddr net.TCPAddr
PeerBindAddr net.TCPAddr
Peers []string
}

func Join(config *Config) error {
// FIXME(mbostock): Consider using a non-local config for memberlist
memberConf := memberlist.DefaultLocalConfig()
advHost, advPort, _ := net.SplitHostPort(config.BindAddr)
bindPort, _ := strconv.Atoi(advPort)
memberConf.BindAddr = advHost
memberConf.BindPort = bindPort

memberConf.AdvertiseAddr = config.PeerAdvertiseAddr.IP.String()
memberConf.AdvertisePort = config.PeerAdvertiseAddr.Port
memberConf.BindAddr = config.PeerBindAddr.IP.String()
memberConf.BindPort = config.PeerBindAddr.Port

memberConf.Delegate = c.d
memberConf.LogOutput = ioutil.Discard
c.c = config

var err error
if c.ml, err = memberlist.Create(memberConf); err != nil {
Expand All @@ -43,11 +50,23 @@ type Node struct {
mln *memberlist.Node
}

func (n *Node) meta() (m nodeMeta, err error) {
err = json.Unmarshal(n.mln.Meta, &m)
return
}

func (n *Node) Name() string {
return n.mln.Name
}
func (n *Node) Addr() string {
return fmt.Sprintf("%s:%d", n.mln.Addr.String(), n.mln.Port)
return n.mln.Address()
}
func (n *Node) HTTPAddr() (string, error) {
m, err := n.meta()
if err != nil {
return "", err
}
return m.HTTPAddr, nil
}
func (n *Node) String() string {
return n.Name()
Expand All @@ -73,21 +92,25 @@ func Nodes() (nodes []*Node) {
type delegate struct{}

func (d *delegate) NodeMeta(limit int) []byte {
panic("not implemented")
// FIXME respect limit
j, _ := json.Marshal(&nodeMeta{
HTTPAddr: c.c.HTTPAdvertiseAddr.String(),
})
return j
}

func (d *delegate) NotifyMsg([]byte) {
panic("not implemented")
}
func (d *delegate) NotifyMsg([]byte) {}

func (d *delegate) GetBroadcasts(overhead int, limit int) [][]byte {
panic("not implemented")
return [][]byte{}
}

func (d *delegate) LocalState(join bool) []byte {
panic("not implemented")
return []byte{}
}

func (d *delegate) MergeRemoteState(buf []byte, join bool) {
panic("not implemented")
func (d *delegate) MergeRemoteState(buf []byte, join bool) {}

type nodeMeta struct {
HTTPAddr string `json:http_addr`
}
12 changes: 12 additions & 0 deletions internal/integration/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,30 @@ services:
- athensdb_3
command:
- '--log-level=debug'
- '--http-advertise-addr=athensdb_1:9080'
- '--http-bind-addr=:9080'
- '--peer-advertise-addr=athensdb_1:7946'
- '--peer-bind-addr=:7946'
- '--peers=athensdb_2'
- '--peers=athensdb_3'
athensdb_2:
build: ../../
hostname: athensdb_2
command:
- '--log-level=debug'
- '--http-advertise-addr=athensdb_2:9080'
- '--http-bind-addr=:9080'
- '--peer-advertise-addr=athensdb_2:7946'
- '--peer-bind-addr=:7946'
athensdb_3:
build: ../../
hostname: athensdb_3
command:
- '--log-level=debug'
- '--http-advertise-addr=athensdb_3:9080'
- '--http-bind-addr=:9080'
- '--peer-advertise-addr=athensdb_3:7946'
- '--peer-bind-addr=:7946'
- '--peers=athensdb_1'
- '--peers=athensdb_3'
prometheus:
Expand Down
64 changes: 49 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"sort"
"sync"
"time"
Expand All @@ -25,8 +27,8 @@ import (
)

const (
defaultAPIAddr = ":9080"
defaultPeerAddr = ":7946"
defaultHTTPAddr = "localhost:9080"
defaultPeerAddr = "localhost:7946"

apiRoute = "/api/v1"
writeRoute = "/receive"
Expand All @@ -36,26 +38,41 @@ const (

var (
config struct {
listenAddr string
peerAddr string
peers []string
httpAdvertiseAddr *net.TCPAddr
httpBindAddr *net.TCPAddr
peerAdvertiseAddr *net.TCPAddr
peerBindAddr *net.TCPAddr
peers []string
}
version = "undefined"
)

func init() {
kingpin.Flag(
"api-bind-addr",
"host:port to bind to for HTTP API",
).Default(defaultAPIAddr).StringVar(&config.listenAddr)
"http-advertise-addr",
"host:port to advertise to other nodes for HTTP",
).Default(defaultHTTPAddr).TCPVar(&config.httpAdvertiseAddr)

kingpin.Flag(
"http-bind-addr",
"host:port to bind to for HTTP",
).Default(defaultHTTPAddr).TCPVar(&config.httpBindAddr)

kingpin.Flag(
"peer-advertise-addr",
"host:port to advertise to other nodes for cluster communication",
).Default(defaultPeerAddr).TCPVar(&config.peerAdvertiseAddr)

kingpin.Flag(
"peer-bind-addr",
"host:port to bind to for cluster communication",
).Default(defaultPeerAddr).StringVar(&config.peerAddr)
).Default(defaultPeerAddr).TCPVar(&config.peerBindAddr)

kingpin.Flag(
"peers",
"List of peers to connect to",
).StringsVar(&config.peers)

level := kingpin.Flag(
"log-level",
"Log level",
Expand All @@ -68,6 +85,13 @@ func init() {
logFlagFatal(err)
}

if config.httpAdvertiseAddr.IP == nil || config.httpAdvertiseAddr.IP.IsUnspecified() {
logFlagFatal("must specify host or IP for --http--advertise-addr")
}
if config.peerAdvertiseAddr.IP == nil || config.peerAdvertiseAddr.IP.IsUnspecified() {
logFlagFatal("must specify host or IP for --peer--advertise-addr")
}

lvl, err := log.ParseLevel(*level)
if err != nil {
kingpin.Fatalf("could not parse log level %q", *level)
Expand Down Expand Up @@ -166,8 +190,13 @@ func main() {

log.Debugf("Writing %d samples to %s", samples, n)

// FIXME Remove hardcoded port, use advertised host from shared state
nodeReq, err := http.NewRequest("POST", "http://"+n.Name()+":9080"+writeRoute, bytes.NewBuffer(compressed))
httpAddr, err := n.HTTPAddr()
if err != nil {
wgErrChan <- err
return
}
apiURL := fmt.Sprintf("%s%s%s", "http://", httpAddr, writeRoute)
nodeReq, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(compressed))
if err != nil {
wgErrChan <- err
return
Expand Down Expand Up @@ -198,15 +227,20 @@ func main() {
})

if err := cluster.Join(&cluster.Config{
BindAddr: config.peerAddr,
Peers: config.peers,
HTTPAdvertiseAddr: *config.httpAdvertiseAddr,
HTTPBindAddr: *config.httpBindAddr,
PeerAdvertiseAddr: *config.peerAdvertiseAddr,
PeerBindAddr: *config.peerBindAddr,
Peers: config.peers,
}); err != nil {
log.Fatal("Failed to join the cluster: ", err)
}

log.Infof("Starting AthensDB node %s; peer address %s; API address %s", cluster.LocalNode(), config.peerAddr, config.listenAddr)
log.Infof("starting AthensDB node %s", cluster.LocalNode())
log.Infof("binding to %s for peer gossip; %s for HTTP", config.peerBindAddr, config.httpBindAddr)
log.Infof("advertising to cluster as %s for peer gossip; %s for HTTP", config.peerAdvertiseAddr, config.httpAdvertiseAddr)
log.Infof("%d nodes in cluster: %s", len(cluster.Nodes()), cluster.Nodes())
log.Fatal(http.ListenAndServe(config.listenAddr, router))
log.Fatal(http.ListenAndServe(config.httpBindAddr.String(), router))
}

func logFlagFatal(v ...interface{}) {
Expand Down

0 comments on commit 089d9fb

Please sign in to comment.