Skip to content

Commit

Permalink
NetworkDB create NodeID for cluster nodes
Browse files Browse the repository at this point in the history
Separate the hostname from the node identifier. All the messages
that are exchanged on the network are containing a nodeName field
that today was hostname-uniqueid. Now being encoded as strings in
the protobuf without any length restriction they plays a role
on the effieciency of protocol itself. If the hostname is very long
the overhead will increase and will degradate the performance of
the database itself that each single cycle by default allows 1400
bytes payload

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed Sep 11, 2017
1 parent 8d55670 commit e940235
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 62 deletions.
6 changes: 0 additions & 6 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import (
"encoding/json"
"fmt"
"net"
"os"
"sort"
"sync"

"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events"
"github.com/docker/libnetwork/cluster"
"github.com/docker/libnetwork/datastore"
Expand Down Expand Up @@ -282,12 +280,8 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
}

keys, _ := c.getKeys(subsysGossip)
hostname, _ := os.Hostname()
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName)

netDBConf := networkdb.DefaultConfig()
netDBConf.NodeName = nodeName
netDBConf.BindAddr = listenAddr
netDBConf.AdvertiseAddr = advertiseAddr
netDBConf.Keys = keys
Expand Down
12 changes: 6 additions & 6 deletions networkdb/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim
nEvent := NetworkEvent{
Type: event,
LTime: ltime,
NodeName: nDB.config.NodeName,
NodeName: nDB.config.NodeID,
NetworkID: nid,
}

Expand All @@ -44,7 +44,7 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim
nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
msg: raw,
id: nid,
node: nDB.config.NodeName,
node: nDB.config.NodeID,
})
return nil
}
Expand Down Expand Up @@ -72,7 +72,7 @@ func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error {
nEvent := NodeEvent{
Type: event,
LTime: nDB.networkClock.Increment(),
NodeName: nDB.config.NodeName,
NodeName: nDB.config.NodeID,
}

raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent)
Expand Down Expand Up @@ -129,7 +129,7 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
tEvent := TableEvent{
Type: event,
LTime: entry.ltime,
NodeName: nDB.config.NodeName,
NodeName: nDB.config.NodeID,
NetworkID: nid,
TableName: tname,
Key: key,
Expand All @@ -143,7 +143,7 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st

var broadcastQ *memberlist.TransmitLimitedQueue
nDB.RLock()
thisNodeNetworks, ok := nDB.networks[nDB.config.NodeName]
thisNodeNetworks, ok := nDB.networks[nDB.config.NodeID]
if ok {
// The network may have been removed
network, networkOk := thisNodeNetworks[nid]
Expand All @@ -166,7 +166,7 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
id: nid,
tname: tname,
key: key,
node: nDB.config.NodeName,
node: nDB.config.NodeID,
})
return nil
}
24 changes: 13 additions & 11 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (nDB *NetworkDB) clusterInit() error {
nDB.lastHealthTimestamp = nDB.lastStatsTimestamp

config := memberlist.DefaultLANConfig()
config.Name = nDB.config.NodeName
config.Name = nDB.config.NodeID
config.BindAddr = nDB.config.BindAddr
config.AdvertiseAddr = nDB.config.AdvertiseAddr
config.UDPBufferSize = nDB.config.PacketBufferSize
Expand Down Expand Up @@ -363,7 +363,7 @@ func (nDB *NetworkDB) reapTableEntries() {
func (nDB *NetworkDB) gossip() {
networkNodes := make(map[string][]string)
nDB.RLock()
thisNodeNetworks := nDB.networks[nDB.config.NodeName]
thisNodeNetworks := nDB.networks[nDB.config.NodeID]
for nid := range thisNodeNetworks {
networkNodes[nid] = nDB.networkNodes[nid]

Expand All @@ -375,7 +375,7 @@ func (nDB *NetworkDB) gossip() {
if printHealth {
healthScore := nDB.memberlist.GetHealthScore()
if healthScore != 0 {
logrus.Warnf("NetworkDB stats - healthscore:%d (connectivity issues)", healthScore)
logrus.Warnf("NetworkDB stats %v(%v) - healthscore:%d (connectivity issues)", nDB.config.Hostname, nDB.config.NodeID, healthScore)
}
nDB.lastHealthTimestamp = time.Now()
}
Expand Down Expand Up @@ -406,7 +406,8 @@ func (nDB *NetworkDB) gossip() {
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d",
logrus.Infof("NetworkDB stats %v(%v) - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d",
nDB.config.Hostname, nDB.config.NodeID,
nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
}
Expand Down Expand Up @@ -442,7 +443,7 @@ func (nDB *NetworkDB) gossip() {
func (nDB *NetworkDB) bulkSyncTables() {
var networks []string
nDB.RLock()
for nid, network := range nDB.networks[nDB.config.NodeName] {
for nid, network := range nDB.networks[nDB.config.NodeID] {
if network.leaving {
continue
}
Expand Down Expand Up @@ -508,10 +509,10 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
var err error
var networks []string
for _, node := range nodes {
if node == nDB.config.NodeName {
if node == nDB.config.NodeID {
continue
}
logrus.Debugf("%s: Initiating bulk sync with node %v", nDB.config.NodeName, node)
logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node)
networks = nDB.findCommonNetworks(node)
err = nDB.bulkSyncNode(networks, node, true)
// if its periodic bulksync stop after the first successful sync
Expand Down Expand Up @@ -542,7 +543,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
unsolMsg = "unsolicited"
}

logrus.Debugf("%s: Initiating %s bulk sync for networks %v with node %s", nDB.config.NodeName, unsolMsg, networks, node)
logrus.Debugf("%v(%v): Initiating %s bulk sync for networks %v with node %s",
nDB.config.Hostname, nDB.config.NodeID, unsolMsg, networks, node)

nDB.RLock()
mnode := nDB.nodes[node]
Expand Down Expand Up @@ -592,7 +594,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
bsm := BulkSyncMessage{
LTime: nDB.tableClock.Time(),
Unsolicited: unsolicited,
NodeName: nDB.config.NodeName,
NodeName: nDB.config.NodeID,
Networks: networks,
Payload: compound,
}
Expand Down Expand Up @@ -624,7 +626,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
case <-t.C:
logrus.Errorf("Bulk sync to node %s timed out", node)
case <-ch:
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime))
logrus.Debugf("%v(%v): Bulk sync to node %s took %s", nDB.config.Hostname, nDB.config.NodeID, node, time.Since(startTime))
}
t.Stop()
}
Expand Down Expand Up @@ -661,7 +663,7 @@ OUTER:
idx := randomOffset(n)
node := nodes[idx]

if node == nDB.config.NodeName {
if node == nDB.config.NodeID {
continue
}

Expand Down
14 changes: 7 additions & 7 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
nDB.Lock()
defer nDB.Unlock()

if nEvent.NodeName == nDB.config.NodeName {
if nEvent.NodeName == nDB.config.NodeID {
return false
}

Expand Down Expand Up @@ -204,7 +204,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {

// Ignore the table events for networks that are in the process of going away
nDB.RLock()
networks := nDB.networks[nDB.config.NodeName]
networks := nDB.networks[nDB.config.NodeID]
network, ok := networks[tEvent.NetworkID]
// Check if the owner of the event is still part of the network
nodes := nDB.networkNodes[tEvent.NetworkID]
Expand Down Expand Up @@ -287,7 +287,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
}

// Ignore messages that this node generated.
if tEvent.NodeName == nDB.config.NodeName {
if tEvent.NodeName == nDB.config.NodeID {
return
}

Expand All @@ -300,7 +300,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
}

nDB.RLock()
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID]
nDB.RUnlock()

if !ok {
Expand All @@ -318,7 +318,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
id: tEvent.NetworkID,
tname: tEvent.TableName,
key: tEvent.Key,
node: nDB.config.NodeName,
node: nDB.config.NodeID,
})
}
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
case MessageTypeCompound:
nDB.handleCompound(data, isBulkSync)
default:
logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType)
logrus.Errorf("%v(%v): unknown message type %d", nDB.config.Hostname, nDB.config.NodeID, mType)
}
}

Expand Down Expand Up @@ -457,7 +457,7 @@ func (d *delegate) LocalState(join bool) []byte {

pp := NetworkPushPull{
LTime: d.nDB.networkClock.Time(),
NodeName: d.nDB.config.NodeName,
NodeName: d.nDB.config.NodeID,
}

for name, nn := range d.nDB.networks {
Expand Down
40 changes: 23 additions & 17 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/armon/go-radix"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events"
"github.com/docker/libnetwork/types"
"github.com/hashicorp/memberlist"
Expand Down Expand Up @@ -146,8 +147,11 @@ type network struct {
// Config represents the configuration of the networdb instance and
// can be passed by the caller.
type Config struct {
// NodeName is the cluster wide unique name for this node.
NodeName string
// NodeID is the node unique identifier of the node when is part of the cluster
NodeID string

// Hostname is the node hostname.
Hostname string

// BindAddr is the IP on which networkdb listens. It can be
// 0.0.0.0 to listen on all addresses on the host.
Expand Down Expand Up @@ -205,7 +209,8 @@ type entry struct {
func DefaultConfig() *Config {
hostname, _ := os.Hostname()
return &Config{
NodeName: hostname,
NodeID: stringid.TruncateID(stringid.GenerateRandomID()),
Hostname: hostname,
BindAddr: "0.0.0.0",
PacketBufferSize: 1400,
StatsPrintPeriod: 5 * time.Minute,
Expand All @@ -231,6 +236,7 @@ func New(c *Config) (*NetworkDB, error) {
nDB.indexes[byTable] = radix.New()
nDB.indexes[byNetwork] = radix.New()

logrus.Debugf("New memberlist node - Node:%v will use memberlist nodeID:%v", c.Hostname, c.NodeID)
if err := nDB.clusterInit(); err != nil {
return nil, err
}
Expand All @@ -254,7 +260,7 @@ func (nDB *NetworkDB) Join(members []string) error {
// stopping timers, canceling goroutines etc.
func (nDB *NetworkDB) Close() {
if err := nDB.clusterLeave(); err != nil {
logrus.Errorf("Could not close DB %s: %v", nDB.config.NodeName, err)
logrus.Errorf("%v(%v) Could not close DB: %v", nDB.config.Hostname, nDB.config.NodeID, err)
}
}

Expand Down Expand Up @@ -329,7 +335,7 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {

entry := &entry{
ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeName,
node: nDB.config.NodeID,
value: value,
}

Expand All @@ -356,7 +362,7 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {

entry := &entry{
ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeName,
node: nDB.config.NodeID,
value: value,
}

Expand Down Expand Up @@ -399,7 +405,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {

entry := &entry{
ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeName,
node: nDB.config.NodeID,
value: value,
deleting: true,
reapTime: reapInterval,
Expand Down Expand Up @@ -449,7 +455,7 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
// entries owned by remote nodes, we will accept them and we notify the application
func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
// Indicates if the delete is triggered for the local node
isNodeLocal := node == nDB.config.NodeName
isNodeLocal := node == nDB.config.NodeID

nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
func(path string, v interface{}) bool {
Expand Down Expand Up @@ -553,10 +559,10 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
ltime := nDB.networkClock.Increment()

nDB.Lock()
nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
if !ok {
nodeNetworks = make(map[string]*network)
nDB.networks[nDB.config.NodeName] = nodeNetworks
nDB.networks[nDB.config.NodeID] = nodeNetworks
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime}
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
Expand All @@ -567,15 +573,15 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
},
RetransmitMult: 4,
}
nDB.addNetworkNode(nid, nDB.config.NodeName)
nDB.addNetworkNode(nid, nDB.config.NodeID)
networkNodes := nDB.networkNodes[nid]
nDB.Unlock()

if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
}

logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid)
logrus.Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
if _, err := nDB.bulkSync(networkNodes, true); err != nil {
logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
}
Expand All @@ -599,12 +605,12 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
defer nDB.Unlock()

// Remove myself from the list of the nodes participating to the network
nDB.deleteNetworkNode(nid, nDB.config.NodeName)
nDB.deleteNetworkNode(nid, nDB.config.NodeID)

// Update all the local entries marking them for deletion and delete all the remote entries
nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeName)
nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeID)

nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
if !ok {
return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
}
Expand Down Expand Up @@ -659,7 +665,7 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
defer nDB.RUnlock()

var networks []string
for nid := range nDB.networks[nDB.config.NodeName] {
for nid := range nDB.networks[nDB.config.NodeID] {
if n, ok := nDB.networks[nodeName][nid]; ok {
if !n.leaving {
networks = append(networks, nid)
Expand All @@ -675,7 +681,7 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
defer nDB.Unlock()

ltime := nDB.networkClock.Increment()
for _, n := range nDB.networks[nDB.config.NodeName] {
for _, n := range nDB.networks[nDB.config.NodeID] {
n.ltime = ltime
}
}
Loading

0 comments on commit e940235

Please sign in to comment.