diff --git a/agent.go b/agent.go index b0d66476e6..f12006583b 100644 --- a/agent.go +++ b/agent.go @@ -6,11 +6,9 @@ import ( "encoding/json" "fmt" "net" - "os" "sort" "sync" - "github.com/docker/docker/pkg/stringid" "github.com/docker/go-events" "github.com/docker/libnetwork/cluster" "github.com/docker/libnetwork/datastore" @@ -282,12 +280,8 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d } keys, _ := c.getKeys(subsysGossip) - hostname, _ := os.Hostname() - nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID()) - logrus.Info("Gossip cluster hostname ", nodeName) netDBConf := networkdb.DefaultConfig() - netDBConf.NodeName = nodeName netDBConf.BindAddr = listenAddr netDBConf.AdvertiseAddr = advertiseAddr netDBConf.Keys = keys diff --git a/networkdb/broadcast.go b/networkdb/broadcast.go index 52e96ec639..a63687f1f2 100644 --- a/networkdb/broadcast.go +++ b/networkdb/broadcast.go @@ -32,7 +32,7 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim nEvent := NetworkEvent{ Type: event, LTime: ltime, - NodeName: nDB.config.NodeName, + NodeName: nDB.config.NodeID, NetworkID: nid, } @@ -44,7 +44,7 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{ msg: raw, id: nid, - node: nDB.config.NodeName, + node: nDB.config.NodeID, }) return nil } @@ -72,7 +72,7 @@ func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error { nEvent := NodeEvent{ Type: event, LTime: nDB.networkClock.Increment(), - NodeName: nDB.config.NodeName, + NodeName: nDB.config.NodeID, } raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent) @@ -129,7 +129,7 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st tEvent := TableEvent{ Type: event, LTime: entry.ltime, - NodeName: nDB.config.NodeName, + NodeName: nDB.config.NodeID, NetworkID: nid, TableName: tname, Key: key, @@ -143,7 +143,7 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st var broadcastQ *memberlist.TransmitLimitedQueue nDB.RLock() - thisNodeNetworks, ok := nDB.networks[nDB.config.NodeName] + thisNodeNetworks, ok := nDB.networks[nDB.config.NodeID] if ok { // The network may have been removed network, networkOk := thisNodeNetworks[nid] @@ -166,7 +166,7 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st id: nid, tname: tname, key: key, - node: nDB.config.NodeName, + node: nDB.config.NodeID, }) return nil } diff --git a/networkdb/cluster.go b/networkdb/cluster.go index d15a5767ff..98a557a8c6 100644 --- a/networkdb/cluster.go +++ b/networkdb/cluster.go @@ -102,7 +102,7 @@ func (nDB *NetworkDB) clusterInit() error { nDB.lastHealthTimestamp = nDB.lastStatsTimestamp config := memberlist.DefaultLANConfig() - config.Name = nDB.config.NodeName + config.Name = nDB.config.NodeID config.BindAddr = nDB.config.BindAddr config.AdvertiseAddr = nDB.config.AdvertiseAddr config.UDPBufferSize = nDB.config.PacketBufferSize @@ -363,7 +363,7 @@ func (nDB *NetworkDB) reapTableEntries() { func (nDB *NetworkDB) gossip() { networkNodes := make(map[string][]string) nDB.RLock() - thisNodeNetworks := nDB.networks[nDB.config.NodeName] + thisNodeNetworks := nDB.networks[nDB.config.NodeID] for nid := range thisNodeNetworks { networkNodes[nid] = nDB.networkNodes[nid] @@ -375,7 +375,7 @@ func (nDB *NetworkDB) gossip() { if printHealth { healthScore := nDB.memberlist.GetHealthScore() if healthScore != 0 { - logrus.Warnf("NetworkDB stats - healthscore:%d (connectivity issues)", healthScore) + logrus.Warnf("NetworkDB stats %v(%v) - healthscore:%d (connectivity issues)", nDB.config.Hostname, nDB.config.NodeID, healthScore) } nDB.lastHealthTimestamp = time.Now() } @@ -406,7 +406,8 @@ func (nDB *NetworkDB) gossip() { // Collect stats and print the queue info, note this code is here also to have a view of the queues empty network.qMessagesSent += len(msgs) if printStats { - logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d", + logrus.Infof("NetworkDB stats %v(%v) - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d", + nDB.config.Hostname, nDB.config.NodeID, nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second))) network.qMessagesSent = 0 } @@ -442,7 +443,7 @@ func (nDB *NetworkDB) gossip() { func (nDB *NetworkDB) bulkSyncTables() { var networks []string nDB.RLock() - for nid, network := range nDB.networks[nDB.config.NodeName] { + for nid, network := range nDB.networks[nDB.config.NodeID] { if network.leaving { continue } @@ -508,10 +509,10 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) { var err error var networks []string for _, node := range nodes { - if node == nDB.config.NodeName { + if node == nDB.config.NodeID { continue } - logrus.Debugf("%s: Initiating bulk sync with node %v", nDB.config.NodeName, node) + logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node) networks = nDB.findCommonNetworks(node) err = nDB.bulkSyncNode(networks, node, true) // if its periodic bulksync stop after the first successful sync @@ -542,7 +543,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b unsolMsg = "unsolicited" } - logrus.Debugf("%s: Initiating %s bulk sync for networks %v with node %s", nDB.config.NodeName, unsolMsg, networks, node) + logrus.Debugf("%v(%v): Initiating %s bulk sync for networks %v with node %s", + nDB.config.Hostname, nDB.config.NodeID, unsolMsg, networks, node) nDB.RLock() mnode := nDB.nodes[node] @@ -592,7 +594,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b bsm := BulkSyncMessage{ LTime: nDB.tableClock.Time(), Unsolicited: unsolicited, - NodeName: nDB.config.NodeName, + NodeName: nDB.config.NodeID, Networks: networks, Payload: compound, } @@ -624,7 +626,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b case <-t.C: logrus.Errorf("Bulk sync to node %s timed out", node) case <-ch: - logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime)) + logrus.Debugf("%v(%v): Bulk sync to node %s took %s", nDB.config.Hostname, nDB.config.NodeID, node, time.Since(startTime)) } t.Stop() } @@ -661,7 +663,7 @@ OUTER: idx := randomOffset(n) node := nodes[idx] - if node == nDB.config.NodeName { + if node == nDB.config.NodeID { continue } diff --git a/networkdb/delegate.go b/networkdb/delegate.go index ffaf94e8c8..a7193c7ea8 100644 --- a/networkdb/delegate.go +++ b/networkdb/delegate.go @@ -140,7 +140,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { nDB.Lock() defer nDB.Unlock() - if nEvent.NodeName == nDB.config.NodeName { + if nEvent.NodeName == nDB.config.NodeID { return false } @@ -204,7 +204,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { // Ignore the table events for networks that are in the process of going away nDB.RLock() - networks := nDB.networks[nDB.config.NodeName] + networks := nDB.networks[nDB.config.NodeID] network, ok := networks[tEvent.NetworkID] // Check if the owner of the event is still part of the network nodes := nDB.networkNodes[tEvent.NetworkID] @@ -287,7 +287,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { } // Ignore messages that this node generated. - if tEvent.NodeName == nDB.config.NodeName { + if tEvent.NodeName == nDB.config.NodeID { return } @@ -300,7 +300,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { } nDB.RLock() - n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID] + n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID] nDB.RUnlock() if !ok { @@ -318,7 +318,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { id: tEvent.NetworkID, tname: tEvent.TableName, key: tEvent.Key, - node: nDB.config.NodeName, + node: nDB.config.NodeID, }) } } @@ -424,7 +424,7 @@ func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) { case MessageTypeCompound: nDB.handleCompound(data, isBulkSync) default: - logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType) + logrus.Errorf("%v(%v): unknown message type %d", nDB.config.Hostname, nDB.config.NodeID, mType) } } @@ -457,7 +457,7 @@ func (d *delegate) LocalState(join bool) []byte { pp := NetworkPushPull{ LTime: d.nDB.networkClock.Time(), - NodeName: d.nDB.config.NodeName, + NodeName: d.nDB.config.NodeID, } for name, nn := range d.nDB.networks { diff --git a/networkdb/networkdb.go b/networkdb/networkdb.go index 73dd999097..48579c6787 100644 --- a/networkdb/networkdb.go +++ b/networkdb/networkdb.go @@ -11,6 +11,7 @@ import ( "time" "github.com/armon/go-radix" + "github.com/docker/docker/pkg/stringid" "github.com/docker/go-events" "github.com/docker/libnetwork/types" "github.com/hashicorp/memberlist" @@ -146,8 +147,11 @@ type network struct { // Config represents the configuration of the networdb instance and // can be passed by the caller. type Config struct { - // NodeName is the cluster wide unique name for this node. - NodeName string + // NodeID is the node unique identifier of the node when is part of the cluster + NodeID string + + // Hostname is the node hostname. + Hostname string // BindAddr is the IP on which networkdb listens. It can be // 0.0.0.0 to listen on all addresses on the host. @@ -205,7 +209,8 @@ type entry struct { func DefaultConfig() *Config { hostname, _ := os.Hostname() return &Config{ - NodeName: hostname, + NodeID: stringid.TruncateID(stringid.GenerateRandomID()), + Hostname: hostname, BindAddr: "0.0.0.0", PacketBufferSize: 1400, StatsPrintPeriod: 5 * time.Minute, @@ -231,6 +236,7 @@ func New(c *Config) (*NetworkDB, error) { nDB.indexes[byTable] = radix.New() nDB.indexes[byNetwork] = radix.New() + logrus.Debugf("New memberlist node - Node:%v will use memberlist nodeID:%v", c.Hostname, c.NodeID) if err := nDB.clusterInit(); err != nil { return nil, err } @@ -254,7 +260,7 @@ func (nDB *NetworkDB) Join(members []string) error { // stopping timers, canceling goroutines etc. func (nDB *NetworkDB) Close() { if err := nDB.clusterLeave(); err != nil { - logrus.Errorf("Could not close DB %s: %v", nDB.config.NodeName, err) + logrus.Errorf("%v(%v) Could not close DB: %v", nDB.config.Hostname, nDB.config.NodeID, err) } } @@ -329,7 +335,7 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error { entry := &entry{ ltime: nDB.tableClock.Increment(), - node: nDB.config.NodeName, + node: nDB.config.NodeID, value: value, } @@ -356,7 +362,7 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error { entry := &entry{ ltime: nDB.tableClock.Increment(), - node: nDB.config.NodeName, + node: nDB.config.NodeID, value: value, } @@ -399,7 +405,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { entry := &entry{ ltime: nDB.tableClock.Increment(), - node: nDB.config.NodeName, + node: nDB.config.NodeID, value: value, deleting: true, reapTime: reapInterval, @@ -449,7 +455,7 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) { // entries owned by remote nodes, we will accept them and we notify the application func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { // Indicates if the delete is triggered for the local node - isNodeLocal := node == nDB.config.NodeName + isNodeLocal := node == nDB.config.NodeID nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool { @@ -553,10 +559,10 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { ltime := nDB.networkClock.Increment() nDB.Lock() - nodeNetworks, ok := nDB.networks[nDB.config.NodeName] + nodeNetworks, ok := nDB.networks[nDB.config.NodeID] if !ok { nodeNetworks = make(map[string]*network) - nDB.networks[nDB.config.NodeName] = nodeNetworks + nDB.networks[nDB.config.NodeID] = nodeNetworks } nodeNetworks[nid] = &network{id: nid, ltime: ltime} nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{ @@ -567,7 +573,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { }, RetransmitMult: 4, } - nDB.addNetworkNode(nid, nDB.config.NodeName) + nDB.addNetworkNode(nid, nDB.config.NodeID) networkNodes := nDB.networkNodes[nid] nDB.Unlock() @@ -575,7 +581,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { return fmt.Errorf("failed to send leave network event for %s: %v", nid, err) } - logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid) + logrus.Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid) if _, err := nDB.bulkSync(networkNodes, true); err != nil { logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err) } @@ -599,12 +605,12 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error { defer nDB.Unlock() // Remove myself from the list of the nodes participating to the network - nDB.deleteNetworkNode(nid, nDB.config.NodeName) + nDB.deleteNetworkNode(nid, nDB.config.NodeID) // Update all the local entries marking them for deletion and delete all the remote entries - nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeName) + nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeID) - nodeNetworks, ok := nDB.networks[nDB.config.NodeName] + nodeNetworks, ok := nDB.networks[nDB.config.NodeID] if !ok { return fmt.Errorf("could not find self node for network %s while trying to leave", nid) } @@ -659,7 +665,7 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string { defer nDB.RUnlock() var networks []string - for nid := range nDB.networks[nDB.config.NodeName] { + for nid := range nDB.networks[nDB.config.NodeID] { if n, ok := nDB.networks[nodeName][nid]; ok { if !n.leaving { networks = append(networks, nid) @@ -675,7 +681,7 @@ func (nDB *NetworkDB) updateLocalNetworkTime() { defer nDB.Unlock() ltime := nDB.networkClock.Increment() - for _, n := range nDB.networks[nDB.config.NodeName] { + for _, n := range nDB.networks[nDB.config.NodeID] { n.ltime = ltime } } diff --git a/networkdb/networkdb_test.go b/networkdb/networkdb_test.go index 326aaadecd..5ad972fda8 100644 --- a/networkdb/networkdb_test.go +++ b/networkdb/networkdb_test.go @@ -31,7 +31,7 @@ func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*Netwo var dbs []*NetworkDB for i := 0; i < num; i++ { conf := DefaultConfig() - conf.NodeName = fmt.Sprintf("%s%d", namePrefix, i+1) + conf.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1) conf.BindPort = int(atomic.AddInt32(&dbPort, 1)) db, err := New(conf) require.NoError(t, err) @@ -69,7 +69,7 @@ func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool time.Sleep(50 * time.Millisecond) } - assert.Fail(t, fmt.Sprintf("%s: Node existence verification for node %s failed", db.config.NodeName, node)) + assert.Fail(t, fmt.Sprintf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node)) } func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) { @@ -117,7 +117,7 @@ func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value s time.Sleep(50 * time.Millisecond) } - assert.Fail(t, fmt.Sprintf("Entry existence verification test failed for %s", db.config.NodeName)) + assert.Fail(t, fmt.Sprintf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID)) } func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) { @@ -157,12 +157,12 @@ func TestNetworkDBJoinLeaveNetwork(t *testing.T) { err := dbs[0].JoinNetwork("network1") assert.NoError(t, err) - dbs[1].verifyNetworkExistence(t, "node1", "network1", true) + dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true) err = dbs[0].LeaveNetwork("network1") assert.NoError(t, err) - dbs[1].verifyNetworkExistence(t, "node1", "network1", false) + dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false) closeNetworkDBInstances(dbs) } @@ -181,11 +181,11 @@ func TestNetworkDBJoinLeaveNetworks(t *testing.T) { } for i := 1; i <= n; i++ { - dbs[1].verifyNetworkExistence(t, "node1", fmt.Sprintf("network0%d", i), true) + dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true) } for i := 1; i <= n; i++ { - dbs[0].verifyNetworkExistence(t, "node2", fmt.Sprintf("network1%d", i), true) + dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true) } for i := 1; i <= n; i++ { @@ -199,11 +199,11 @@ func TestNetworkDBJoinLeaveNetworks(t *testing.T) { } for i := 1; i <= n; i++ { - dbs[1].verifyNetworkExistence(t, "node1", fmt.Sprintf("network0%d", i), false) + dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false) } for i := 1; i <= n; i++ { - dbs[0].verifyNetworkExistence(t, "node2", fmt.Sprintf("network1%d", i), false) + dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false) } closeNetworkDBInstances(dbs) @@ -215,7 +215,7 @@ func TestNetworkDBCRUDTableEntry(t *testing.T) { err := dbs[0].JoinNetwork("network1") assert.NoError(t, err) - dbs[1].verifyNetworkExistence(t, "node1", "network1", true) + dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true) err = dbs[1].JoinNetwork("network1") assert.NoError(t, err) @@ -245,7 +245,7 @@ func TestNetworkDBCRUDTableEntries(t *testing.T) { err := dbs[0].JoinNetwork("network1") assert.NoError(t, err) - dbs[1].verifyNetworkExistence(t, "node1", "network1", true) + dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true) err = dbs[1].JoinNetwork("network1") assert.NoError(t, err) @@ -361,7 +361,7 @@ func TestNetworkDBBulkSync(t *testing.T) { err := dbs[0].JoinNetwork("network1") assert.NoError(t, err) - dbs[1].verifyNetworkExistence(t, "node1", "network1", true) + dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true) n := 1000 for i := 1; i <= n; i++ { @@ -374,7 +374,7 @@ func TestNetworkDBBulkSync(t *testing.T) { err = dbs[1].JoinNetwork("network1") assert.NoError(t, err) - dbs[0].verifyNetworkExistence(t, "node2", "network1", true) + dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true) for i := 1; i <= n; i++ { dbs[1].verifyEntryExistence(t, "test_table", "network1", @@ -397,7 +397,7 @@ func TestNetworkDBCRUDMediumCluster(t *testing.T) { continue } - dbs[i].verifyNodeExistence(t, fmt.Sprintf("node%d", j+1), true) + dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true) } } @@ -408,7 +408,7 @@ func TestNetworkDBCRUDMediumCluster(t *testing.T) { for i := 0; i < n; i++ { for j := 0; j < n; j++ { - dbs[i].verifyNetworkExistence(t, fmt.Sprintf("node%d", j+1), "network1", true) + dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true) } }