Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer dialing/kicking system overhaul #3346

Merged
merged 18 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion beacon_chain/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,14 @@ type
name: "udp-port" }: Port

maxPeers* {.
desc: "The maximum number of peers to connect to"
desc: "The target number of peers to connect to"
defaultValue: 160 # 5 (fanout) * 64 (subnets) / 2 (subs) for a heathy mesh
name: "max-peers" }: int

hardMaxPeers* {.
desc: "The maximum number of peers to connect to. Defaults to maxPeers * 1.5"
name: "hard-max-peers" }: Option[int]

nat* {.
desc: "Specify method to use for determining public address. " &
"Must be one of: any, none, upnp, pmp, extip:<IP>"
Expand Down
163 changes: 112 additions & 51 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import
libp2p/protocols/pubsub/[
pubsub, gossipsub, rpc/message, rpc/messages, peertable, pubsubpeer],
libp2p/stream/connection,
libp2p/utils/semaphore,
eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl,
eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2],
".."/[version, conf, beacon_clock],
Expand Down Expand Up @@ -65,6 +64,7 @@ type
discovery*: Eth2DiscoveryProtocol
discoveryEnabled*: bool
wantedPeers*: int
hardMaxPeers*: int
peerPool*: PeerPool[Peer, PeerID]
protocolStates*: seq[RootRef]
metadata*: altair.MetaData
Expand All @@ -81,6 +81,7 @@ type
peers*: Table[PeerID, Peer]
validTopics: HashSet[string]
peerPingerHeartbeatFut: Future[void]
peerTrimmerHeartbeatFut: Future[void]
cfg: RuntimeConfig
getBeaconTime: GetBeaconTimeFn

Expand Down Expand Up @@ -867,7 +868,11 @@ proc dialPeer*(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} =

debug "Connecting to discovered peer"
var deadline = sleepAsync(node.connectTimeout)
var workfut = node.switch.connect(peerAddr.peerId, peerAddr.addrs)
var workfut = node.switch.connect(
peerAddr.peerId,
peerAddr.addrs,
forceDial = true
)

try:
# `or` operation will only raise exception of `workfut`, because `deadline`
Expand Down Expand Up @@ -896,7 +901,8 @@ proc connectWorker(node: Eth2Node, index: int) {.async.} =
# Previous worker dial might have hit the maximum peers.
# TODO: could clear the whole connTable and connQueue here also, best
# would be to have this event based coming from peer pool or libp2p.
if node.switch.connManager.outSema.count > 0:

if node.peerPool.len < node.hardMaxPeers:
await node.dialPeer(remotePeerAddr, index)
# Peer was added to `connTable` before adding it to `connQueue`, so we
# excluding peer here after processing.
Expand Down Expand Up @@ -927,7 +933,8 @@ proc queryRandom*(
d: Eth2DiscoveryProtocol,
forkId: ENRForkID,
wantedAttnets: AttnetBits,
wantedSyncnets: SyncnetBits): Future[seq[Node]] {.async.} =
wantedSyncnets: SyncnetBits,
minScore: int): Future[seq[Node]] {.async.} =
## Perform a discovery query for a random target
## (forkId) and matching at least one of the attestation subnets.

Expand Down Expand Up @@ -979,25 +986,31 @@ proc queryRandom*(
if wantedSyncnets[i] and syncnetsNode[i]:
score += 10 # connecting to the right syncnet is urgent

if score > 0:
if score >= minScore:
filtered.add((score, n))

d.rng[].shuffle(filtered)
return filtered.sortedByIt(-it[0]).mapIt(it[1])

proc trimConnections(node: Eth2Node, count: int) {.async.} =
proc trimConnections(node: Eth2Node, count: int) =
# Kill `count` peers, scoring them to remove the least useful ones

var scores = initOrderedTable[PeerID, int]()

# Take into account the stabilitySubnets
# During sync, only this will be used to score peers
# since gossipsub is not running yet
#
# A peer subscribed to all stabilitySubnets will
# have 640 points
var peersInGracePeriod = 0
for peer in node.peers.values:
if peer.connectionState != Connected: continue
if peer.metadata.isNone: continue

# Metadata pinger is used as grace period
if peer.metadata.isNone:
peersInGracePeriod.inc()
continue

let
stabilitySubnets = peer.metadata.get().attnets
Expand All @@ -1006,25 +1019,53 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} =

scores[peer.peerId] = thisPeersScore


# Safegard: if we have too many peers in the grace
# period, don't kick anyone. Otherwise, they will be
# preferred over long-standing peers
if peersInGracePeriod > scores.len div 2:
return

# Split a 1000 points for each topic's peers
# + 10 000 points for each subbed topic
# + 5 000 points for each subbed topic
# This gives priority to peers in topics with few peers
# For instance, a topic with `dHigh` peers will give 80 points to each peer
# Whereas a topic with `dLow` peers will give 250 points to each peer
#
# Then, use the average of all topics per peers, to avoid giving too much
# point to big peers

var gossipScores = initTable[PeerID, tuple[sum: int, count: int]]()
for topic, _ in node.pubsub.gossipsub:
let
peersInMesh = node.pubsub.mesh.peers(topic)
peersSubbed = node.pubsub.gossipsub.peers(topic)
scorePerMeshPeer = 10_000 div max(peersInMesh, 1)
scorePerMeshPeer = 5_000 div max(peersInMesh, 1)
scorePerSubbedPeer = 1_000 div max(peersSubbed, 1)

for peer in node.pubsub.mesh.getOrDefault(topic):
for peer in node.pubsub.gossipsub.getOrDefault(topic):
if peer.peerId notin scores: continue
scores[peer.peerId] = scores[peer.peerId] + scorePerSubbedPeer
let currentVal = gossipScores.getOrDefault(peer.peerId)
gossipScores[peer.peerId] = (
currentVal.sum + scorePerSubbedPeer,
currentVal.count + 1
)

for peer in node.pubsub.gossipsub.getOrDefault(topic):
# Avoid global topics (>75% of peers), which would greatly reduce
# the average score for small peers
if peersSubbed > scores.len div 4 * 3: continue

for peer in node.pubsub.mesh.getOrDefault(topic):
if peer.peerId notin scores: continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we skip non-stability peers here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I read the code wrong - we're looking at peers that nimbus knows about, and filtering out peers that are in gossipsub and not in nimbus - if this happens, it smells of bug, but moving on:

we don't give score to syncnet peers in the initial loop - this needs addressing as well - also, we sort of prefer "large" peers with lots of subnets over smaller peer with fewer subnets - I suspect this ends up being wrong over time because it gives an unfair advantage to large peers which in now way are guaranteed to be "better" - it would be good to strive for a random mix of both kinds - perhaps "has_some_interesting_attnet" and "has_some_interesting_syncnet" would be better conditions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea I had, instead of scoring on SUM(topicsPoints), we could score on MAX(topicPoints), wdyt?

The first loop is only here to score peers during sync at this point, ideally I would like to get rid of it, not sure how atm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and filtering out peers that are in gossipsub and not in nimbus - if this happens, it smells of bug, but moving on

It can happen if we don't have a metadata for them yet (ie, they are in their grace period)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last commit switches to AVG(nonGlobalTopicsPoints)
Global topics are not counted because that would be unfair towards small nodes. They always have low value, and would bring the average down

scores[peer.peerId] = scores[peer.peerId] + scorePerMeshPeer
let currentVal = gossipScores.getOrDefault(peer.peerId)
gossipScores[peer.peerId] = (
currentVal.sum + scorePerMeshPeer,
currentVal.count + 1
)

for peerId, gScore in gossipScores.pairs:
scores[peerId] =
scores.getOrDefault(peerId) + (gScore.sum div gScore.count)

proc sortPerScore(a, b: (PeerID, int)): int =
system.cmp(a[1], b[1])
Expand All @@ -1035,7 +1076,7 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} =

for peerId in scores.keys:
debug "kicking peer", peerId, score=scores[peerId]
await node.switch.disconnect(peerId)
asyncSpawn node.getPeer(peerId).disconnect(PeerScoreLow)
dec toKick
inc(nbc_cycling_kicked_peers)
if toKick <= 0: return
Expand Down Expand Up @@ -1117,10 +1158,20 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
(wantedAttnets, wantedSyncnets) = node.getLowSubnets(currentEpoch)
wantedAttnetsCount = wantedAttnets.countOnes()
wantedSyncnetsCount = wantedSyncnets.countOnes()
outgoingPeers = node.peerPool.lenCurrent({PeerType.Outgoing})
targetOutgoingPeers = max(node.wantedPeers div 10, 3)

if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or
outgoingPeers < targetOutgoingPeers:

if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0:
let discoveredNodes = await node.discovery.queryRandom(
node.discoveryForkId, wantedAttnets, wantedSyncnets)
let
minScore =
if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0:
1
else:
0
discoveredNodes = await node.discovery.queryRandom(
node.discoveryForkId, wantedAttnets, wantedSyncnets, minScore)

let newPeers = block:
var np = newSeq[PeerAddr]()
Expand All @@ -1137,39 +1188,27 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
np.add(peerAddr)
np

# We have to be careful to kick enough peers to make room for new ones
# (If we are here, we have an unhealthy mesh, so if we're full, we have bad peers)
# But no kick too many peers because with low max-peers, that can cause disruption
# Also keep in mind that a lot of dial fails, and that we can have incoming peers waiting
let
roomRequired = 1 + newPeers.len()
roomCurrent = node.peerPool.lenSpace({PeerType.Outgoing})
roomDelta = roomRequired - roomCurrent
roomCurrent = node.hardMaxPeers - len(node.peerPool)
peersToKick = min(newPeers.len - roomCurrent, node.hardMaxPeers div 5)

maxPeersToKick = len(node.peerPool) div 5
peersToKick = min(roomDelta, maxPeersToKick)

if peersToKick > 0 and newPeers.len() > 0:
await node.trimConnections(peersToKick)
if peersToKick > 0 and newPeers.len > 0:
node.trimConnections(peersToKick)

for peerAddr in newPeers:
# We adding to pending connections table here, but going
# to remove it only in `connectWorker`.
node.connTable.incl(peerAddr.peerId)
await node.connQueue.addLast(peerAddr)

debug "Discovery tick", wanted_peers = node.wantedPeers,
space = node.peerPool.shortLogSpace(),
acquired = node.peerPool.shortLogAcquired(),
available = node.peerPool.shortLogAvailable(),
current = node.peerPool.shortLogCurrent(),
length = len(node.peerPool),
debug "Discovery tick",
wanted_peers = node.wantedPeers,
current_peers = len(node.peerPool),
discovered_nodes = len(discoveredNodes),
kicked_peers = max(0, peersToKick),
new_peers = len(newPeers)

if len(newPeers) == 0:
let currentPeers = node.peerPool.lenCurrent()
let currentPeers = len(node.peerPool)
if currentPeers <= node.wantedPeers shr 2: # 25%
warn "Peer count low, no new peers discovered",
discovered_nodes = len(discoveredNodes), new_peers = newPeers,
Expand Down Expand Up @@ -1349,8 +1388,9 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig,
switch: switch,
pubsub: pubsub,
wantedPeers: config.maxPeers,
hardMaxPeers: config.hardMaxPeers.get(config.maxPeers * 3 div 2), #*1.5
cfg: runtimeCfg,
peerPool: newPeerPool[Peer, PeerID](maxPeers = config.maxPeers),
peerPool: newPeerPool[Peer, PeerID](),
# Its important here to create AsyncQueue with limited size, otherwise
# it could produce HIGH cpu usage.
connQueue: newAsyncQueue[PeerAddr](ConcurrentConnections),
Expand Down Expand Up @@ -1432,16 +1472,12 @@ proc startListening*(node: Eth2Node) {.async.} =
await node.pubsub.start()

proc peerPingerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.}
proc peerTrimmerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.}

proc start*(node: Eth2Node) {.async.} =

proc onPeerCountChanged() =
trace "Number of peers has been changed",
space = node.peerPool.shortLogSpace(),
acquired = node.peerPool.shortLogAcquired(),
available = node.peerPool.shortLogAvailable(),
current = node.peerPool.shortLogCurrent(),
length = len(node.peerPool)
trace "Number of peers has been changed", length = len(node.peerPool)
nbc_peers.set int64(len(node.peerPool))

node.peerPool.setPeerCounter(onPeerCountChanged)
Expand All @@ -1462,15 +1498,22 @@ proc start*(node: Eth2Node) {.async.} =
if pa.isOk():
await node.connQueue.addLast(pa.get())
node.peerPingerHeartbeatFut = node.peerPingerHeartbeat()
node.peerTrimmerHeartbeatFut = node.peerTrimmerHeartbeat()

proc stop*(node: Eth2Node) {.async.} =
# Ignore errors in futures, since we're shutting down (but log them on the
# TRACE level, if a timeout is reached).
var waitedFutures =
@[
node.switch.stop(),
node.peerPingerHeartbeat.cancelAndWait(),
node.peerTrimmerHeartbeatFut.cancelAndWait(),
]

if node.discoveryEnabled:
waitedFutures &= node.discovery.closeWait()

let
waitedFutures = if node.discoveryEnabled:
@[node.discovery.closeWait(), node.switch.stop()]
else:
@[node.switch.stop()]
timeout = 5.seconds
completed = await withTimeout(allFutures(waitedFutures), timeout)
if not completed:
Expand Down Expand Up @@ -1671,6 +1714,24 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async.} =

await sleepAsync(5.seconds)

proc peerTrimmerHeartbeat(node: Eth2Node) {.async.} =
while true:
# Peer trimmer

# Only count Connected peers
# (to avoid counting Disconnecting ones)
var connectedPeers = 0
for peer in node.peers.values:
if peer.connectionState == Connected:
inc connectedPeers

let excessPeers = connectedPeers - node.wantedPeers
if excessPeers > 0:
# Let chronos take back control every kick
node.trimConnections(1)

await sleepAsync(1.seconds div max(1, excessPeers))

func asLibp2pKey*(key: keys.PublicKey): PublicKey =
PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key))

Expand Down Expand Up @@ -1900,17 +1961,17 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,

let altairPrefix = "/eth2/" & $forkDigests.altair

func msgIdProvider(m: messages.Message): seq[byte] =
func msgIdProvider(m: messages.Message): Result[seq[byte], ValidationResult] =
template topic: untyped =
if m.topicIDs.len > 0: m.topicIDs[0] else: ""

try:
# This doesn't have to be a tight bound, just enough to avoid denial of
# service attacks.
let decoded = snappy.decode(m.data, maxGossipMaxSize())
gossipId(decoded, altairPrefix, topic, true)
ok(gossipId(decoded, altairPrefix, topic, true))
except CatchableError:
gossipId(m.data, altairPrefix, topic, false)
ok(gossipId(m.data, altairPrefix, topic, false))

let
params = GossipSubParams(
Expand Down
8 changes: 4 additions & 4 deletions tests/test_discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ procSuite "Eth2 specific discovery tests":
attnetsSelected.setBit(34)

let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
check discovered.len == 1

await node1.closeWait()
Expand Down Expand Up @@ -96,7 +96,7 @@ procSuite "Eth2 specific discovery tests":
attnetsSelected.setBit(42)

let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
check discovered.len == 1

await node1.closeWait()
Expand Down Expand Up @@ -124,7 +124,7 @@ procSuite "Eth2 specific discovery tests":

block:
let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
check discovered.len == 0

block:
Expand All @@ -139,7 +139,7 @@ procSuite "Eth2 specific discovery tests":
discard node1.addNode(nodes[][0])

let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
check discovered.len == 1

await node1.closeWait()
Expand Down
2 changes: 1 addition & 1 deletion vendor/nim-libp2p