diff --git a/acceptance_test.go b/acceptance_test.go index 32aa3c95..3c2080f7 100644 --- a/acceptance_test.go +++ b/acceptance_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "net" "net/http" "net/url" "os" @@ -94,10 +95,10 @@ func TestRemoteWriteThenQueryBack(t *testing.T) { func TestMain(m *testing.M) { // Use localhost to avoid firewall warnings when running tests under OS X. - config.listenAddr = "localhost:9080" - config.peerAddr = "localhost:7946" + config.httpBindAddr, _ = net.ResolveTCPAddr("tcp", defaultHTTPAddr) + config.peerBindAddr, _ = net.ResolveTCPAddr("tcp", defaultPeerAddr) - httpBaseURL = fmt.Sprintf("http://%s", config.listenAddr) + httpBaseURL = fmt.Sprintf("http://%s", config.httpBindAddr) go main() err := waitForServer(httpBaseURL) diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go index fafcc45a..e4c0d8c2 100644 --- a/internal/cluster/cluster.go +++ b/internal/cluster/cluster.go @@ -1,15 +1,16 @@ package cluster import ( + "encoding/json" "fmt" "io/ioutil" "net" - "strconv" "github.com/hashicorp/memberlist" ) var c = struct { + c *Config d *delegate ml *memberlist.Memberlist }{ @@ -17,19 +18,25 @@ var c = struct { } type Config struct { - AdvertiseAddr string - BindAddr string - Peers []string + HTTPAdvertiseAddr net.TCPAddr + HTTPBindAddr net.TCPAddr + PeerAdvertiseAddr net.TCPAddr + PeerBindAddr net.TCPAddr + Peers []string } func Join(config *Config) error { // FIXME(mbostock): Consider using a non-local config for memberlist memberConf := memberlist.DefaultLocalConfig() - advHost, advPort, _ := net.SplitHostPort(config.BindAddr) - bindPort, _ := strconv.Atoi(advPort) - memberConf.BindAddr = advHost - memberConf.BindPort = bindPort + + memberConf.AdvertiseAddr = config.PeerAdvertiseAddr.IP.String() + memberConf.AdvertisePort = config.PeerAdvertiseAddr.Port + memberConf.BindAddr = config.PeerBindAddr.IP.String() + memberConf.BindPort = config.PeerBindAddr.Port + + memberConf.Delegate = c.d memberConf.LogOutput = ioutil.Discard + c.c = config var err error if c.ml, err = memberlist.Create(memberConf); err != nil { @@ -43,11 +50,23 @@ type Node struct { mln *memberlist.Node } +func (n *Node) meta() (m nodeMeta, err error) { + err = json.Unmarshal(n.mln.Meta, &m) + return +} + func (n *Node) Name() string { return n.mln.Name } func (n *Node) Addr() string { - return fmt.Sprintf("%s:%d", n.mln.Addr.String(), n.mln.Port) + return n.mln.Address() +} +func (n *Node) HTTPAddr() (string, error) { + m, err := n.meta() + if err != nil { + return "", err + } + return m.HTTPAddr, nil } func (n *Node) String() string { return n.Name() @@ -73,21 +92,25 @@ func Nodes() (nodes []*Node) { type delegate struct{} func (d *delegate) NodeMeta(limit int) []byte { - panic("not implemented") + // FIXME respect limit + j, _ := json.Marshal(&nodeMeta{ + HTTPAddr: c.c.HTTPAdvertiseAddr.String(), + }) + return j } -func (d *delegate) NotifyMsg([]byte) { - panic("not implemented") -} +func (d *delegate) NotifyMsg([]byte) {} func (d *delegate) GetBroadcasts(overhead int, limit int) [][]byte { - panic("not implemented") + return [][]byte{} } func (d *delegate) LocalState(join bool) []byte { - panic("not implemented") + return []byte{} } -func (d *delegate) MergeRemoteState(buf []byte, join bool) { - panic("not implemented") +func (d *delegate) MergeRemoteState(buf []byte, join bool) {} + +type nodeMeta struct { + HTTPAddr string `json:http_addr` } diff --git a/internal/integration/docker-compose.yml b/internal/integration/docker-compose.yml index e19b828b..e21335a1 100644 --- a/internal/integration/docker-compose.yml +++ b/internal/integration/docker-compose.yml @@ -8,6 +8,10 @@ services: - athensdb_3 command: - '--log-level=debug' + - '--http-advertise-addr=athensdb_1:9080' + - '--http-bind-addr=:9080' + - '--peer-advertise-addr=athensdb_1:7946' + - '--peer-bind-addr=:7946' - '--peers=athensdb_2' - '--peers=athensdb_3' athensdb_2: @@ -15,11 +19,19 @@ services: hostname: athensdb_2 command: - '--log-level=debug' + - '--http-advertise-addr=athensdb_2:9080' + - '--http-bind-addr=:9080' + - '--peer-advertise-addr=athensdb_2:7946' + - '--peer-bind-addr=:7946' athensdb_3: build: ../../ hostname: athensdb_3 command: - '--log-level=debug' + - '--http-advertise-addr=athensdb_3:9080' + - '--http-bind-addr=:9080' + - '--peer-advertise-addr=athensdb_3:7946' + - '--peer-bind-addr=:7946' - '--peers=athensdb_1' - '--peers=athensdb_3' prometheus: diff --git a/main.go b/main.go index 3f94a8a8..77635443 100644 --- a/main.go +++ b/main.go @@ -4,7 +4,9 @@ import ( "bytes" "fmt" "io/ioutil" + "net" "net/http" + "os" "sort" "sync" "time" @@ -25,8 +27,8 @@ import ( ) const ( - defaultAPIAddr = ":9080" - defaultPeerAddr = ":7946" + defaultHTTPAddr = "localhost:9080" + defaultPeerAddr = "localhost:7946" apiRoute = "/api/v1" writeRoute = "/receive" @@ -36,26 +38,41 @@ const ( var ( config struct { - listenAddr string - peerAddr string - peers []string + httpAdvertiseAddr *net.TCPAddr + httpBindAddr *net.TCPAddr + peerAdvertiseAddr *net.TCPAddr + peerBindAddr *net.TCPAddr + peers []string } version = "undefined" ) func init() { kingpin.Flag( - "api-bind-addr", - "host:port to bind to for HTTP API", - ).Default(defaultAPIAddr).StringVar(&config.listenAddr) + "http-advertise-addr", + "host:port to advertise to other nodes for HTTP", + ).Default(defaultHTTPAddr).TCPVar(&config.httpAdvertiseAddr) + + kingpin.Flag( + "http-bind-addr", + "host:port to bind to for HTTP", + ).Default(defaultHTTPAddr).TCPVar(&config.httpBindAddr) + + kingpin.Flag( + "peer-advertise-addr", + "host:port to advertise to other nodes for cluster communication", + ).Default(defaultPeerAddr).TCPVar(&config.peerAdvertiseAddr) + kingpin.Flag( "peer-bind-addr", "host:port to bind to for cluster communication", - ).Default(defaultPeerAddr).StringVar(&config.peerAddr) + ).Default(defaultPeerAddr).TCPVar(&config.peerBindAddr) + kingpin.Flag( "peers", "List of peers to connect to", ).StringsVar(&config.peers) + level := kingpin.Flag( "log-level", "Log level", @@ -68,6 +85,13 @@ func init() { logFlagFatal(err) } + if config.httpAdvertiseAddr.IP == nil || config.httpAdvertiseAddr.IP.IsUnspecified() { + logFlagFatal("must specify host or IP for --http--advertise-addr") + } + if config.peerAdvertiseAddr.IP == nil || config.peerAdvertiseAddr.IP.IsUnspecified() { + logFlagFatal("must specify host or IP for --peer--advertise-addr") + } + lvl, err := log.ParseLevel(*level) if err != nil { kingpin.Fatalf("could not parse log level %q", *level) @@ -166,8 +190,13 @@ func main() { log.Debugf("Writing %d samples to %s", samples, n) - // FIXME Remove hardcoded port, use advertised host from shared state - nodeReq, err := http.NewRequest("POST", "http://"+n.Name()+":9080"+writeRoute, bytes.NewBuffer(compressed)) + httpAddr, err := n.HTTPAddr() + if err != nil { + wgErrChan <- err + return + } + apiURL := fmt.Sprintf("%s%s%s", "http://", httpAddr, writeRoute) + nodeReq, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(compressed)) if err != nil { wgErrChan <- err return @@ -198,15 +227,20 @@ func main() { }) if err := cluster.Join(&cluster.Config{ - BindAddr: config.peerAddr, - Peers: config.peers, + HTTPAdvertiseAddr: *config.httpAdvertiseAddr, + HTTPBindAddr: *config.httpBindAddr, + PeerAdvertiseAddr: *config.peerAdvertiseAddr, + PeerBindAddr: *config.peerBindAddr, + Peers: config.peers, }); err != nil { log.Fatal("Failed to join the cluster: ", err) } - log.Infof("Starting AthensDB node %s; peer address %s; API address %s", cluster.LocalNode(), config.peerAddr, config.listenAddr) + log.Infof("starting AthensDB node %s", cluster.LocalNode()) + log.Infof("binding to %s for peer gossip; %s for HTTP", config.peerBindAddr, config.httpBindAddr) + log.Infof("advertising to cluster as %s for peer gossip; %s for HTTP", config.peerAdvertiseAddr, config.httpAdvertiseAddr) log.Infof("%d nodes in cluster: %s", len(cluster.Nodes()), cluster.Nodes()) - log.Fatal(http.ListenAndServe(config.listenAddr, router)) + log.Fatal(http.ListenAndServe(config.httpBindAddr.String(), router)) } func logFlagFatal(v ...interface{}) {