Skip to content

Commit

Permalink
avoid unnecessary async copies in broadcast (#3830)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnetheduck authored Jul 1, 2022
1 parent 4fbbbfd commit 6a3bd89
Showing 1 changed file with 20 additions and 21 deletions.
41 changes: 20 additions & 21 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2419,31 +2419,30 @@ proc addAsyncValidator*[MsgType](node: Eth2Node,
proc unsubscribe*(node: Eth2Node, topic: string) =
node.pubsub.unsubscribeAll(topic)

proc broadcast(node: Eth2Node, topic: string, msg: auto):
Future[Result[void, cstring]] {.async.} =
try:
let uncompressed = SSZ.encode(msg)
proc gossipEncode(msg: auto): seq[byte] =
let uncompressed = SSZ.encode(msg)
# This function only for messages we create. A message this large amounts to
# an internal logic error.
doAssert uncompressed.len <= maxGossipMaxSize()

# This is only for messages we create. A message this large amounts to an
# internal logic error.
doAssert uncompressed.len <= maxGossipMaxSize()
snappy.encode(uncompressed)

let compressed =
try: snappy.encode(uncompressed)
except InputTooLarge:
raiseAssert "More than 4gb? not likely.."
proc broadcast(node: Eth2Node, topic: string, msg: seq[byte]):
Future[Result[void, cstring]] {.async.} =
let peers = await node.pubsub.publish(topic, msg)

let peers = await node.pubsub.publish(topic, compressed)
# TODO remove workaround for sync committee BN/VC log spam
if peers > 0 or find(topic, "sync_committee_") != -1:
inc nbc_gossip_messages_sent
return ok()
else:
# Increments libp2p_gossipsub_failed_publish metric
return err("No peers on libp2p topic")

# TODO remove workaround for sync committee BN/VC log spam
if peers > 0 or find(topic, "sync_committee_") != -1:
inc nbc_gossip_messages_sent
return ok()
else:
# Increments libp2p_gossipsub_failed_publish metric
return err("No peers on libp2p topic")
except IOError as exc:
raiseAssert exc.msg # TODO in-memory compression shouldn't fail
proc broadcast(node: Eth2Node, topic: string, msg: auto):
Future[Result[void, cstring]] =
# Avoid {.async.} copies of message while broadcasting
broadcast(node, topic, gossipEncode(msg))

proc subscribeAttestationSubnets*(
node: Eth2Node, subnets: AttnetBits, forkDigest: ForkDigest) =
Expand Down

0 comments on commit 6a3bd89

Please sign in to comment.