Skip to content

Commit

Permalink
serve libp2p protocol for light client sync
Browse files Browse the repository at this point in the history
This extends the `--serve-light-client-data` launch option to serve
locally collected light client data via libp2p.
See ethereum/consensus-specs#2802
  • Loading branch information
etan-status committed Jan 27, 2022
1 parent e3c47fd commit 74b0601
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 9 deletions.
24 changes: 24 additions & 0 deletions beacon_chain/gossip_processing/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ declareCounter beacon_sync_committee_contributions_received,
"Number of valid sync committee contributions processed by this node"
declareCounter beacon_sync_committee_contributions_dropped,
"Number of invalid sync committee contributions dropped by this node", labels = ["reason"]
declareCounter beacon_optimistic_light_client_updates_received,
"Number of valid optimistic light client updates processed by this node"
declareCounter beacon_optimistic_light_client_updates_dropped,
"Number of invalid optimistic light client updates dropped by this node", labels = ["reason"]

const delayBuckets = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, Inf]

Expand Down Expand Up @@ -528,3 +532,23 @@ proc contributionValidator*(
beacon_sync_committee_contributions_dropped.inc(1, [$v.error[0]])

err(v.error())

proc optimisticLightClientUpdateValidator*(
self: ref Eth2Processor, src: MsgSource,
optimistic_update: altair.OptimisticLightClientUpdate
): Result[void, ValidationError] =
logScope:
optimisticUpdate = shortLog(optimistic_update)

debug "Optimistic light client update received"

let v = self.dag.validateOptimisticLightClientUpdate(optimistic_update)
if v.isOk():
trace "Optimistic light client update validated"

beacon_optimistic_light_client_updates_received.inc()
else:
debug "Dropping optimistic light client update", error = v.error
beacon_optimistic_light_client_updates_dropped.inc(1, [$v.error[0]])

v
17 changes: 17 additions & 0 deletions beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -984,3 +984,20 @@ proc validateContribution*(
sig.get()

return ok((sig, participants))

# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/sync-protocol.md#optimistic_light_client_update
proc validateOptimisticLightClientUpdate*(
dag: ChainDAGRef, optimistic_update: altair.OptimisticLightClientUpdate):
Result[void, ValidationError] =
template latest_local_update(): auto = dag.optimisticLightClientUpdate

if optimistic_update != latest_local_update:
# [IGNORE] The optimistic update is not attesting to the latest block's
# parent block.
if optimistic_update.attested_header != latest_local_update.attested_header:
return errIgnore("OptimisticLightClientUpdate: different attested block")

# [REJECT] The optimistic update does not match the expected value.
return errReject("OptimisticLightClientUpdate: update does not match block")

ok()
14 changes: 14 additions & 0 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2225,3 +2225,17 @@ proc broadcastSignedContributionAndProof*(
node: Eth2Node, msg: SignedContributionAndProof) =
let topic = getSyncCommitteeContributionAndProofTopic(node.forkDigests.altair)
node.broadcast(topic, msg)

proc broadcastOptimisticLightClientUpdate*(
node: Eth2Node, msg: altair.OptimisticLightClientUpdate) =
let
forkDigest =
if msg.fork_version == node.cfg.SHARDING_FORK_VERSION:
node.forkDigests.sharding
elif msg.fork_version == node.cfg.BELLATRIX_FORK_VERSION:
node.forkDigests.bellatrix
else:
doAssert msg.fork_version == node.cfg.ALTAIR_FORK_VERSION:
node.forkDigests.altair
topic = getOptimisticLightClientUpdateTopic(forkDigest)
node.broadcast(topic, msg)
18 changes: 18 additions & 0 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,9 @@ proc addAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Sl
getSyncCommitteeContributionAndProofTopic(forkDigest), basicParams)
node.network.updateSyncnetsMetadata(syncnets)

if node.config.serveLightClientData:
node.network.subscribe(getOptimisticLightClientUpdateTopic(forkDigest))

proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.removePhase0MessageHandlers(forkDigest)

Expand All @@ -742,6 +745,9 @@ proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.unsubscribe(
getSyncCommitteeContributionAndProofTopic(forkDigest))

if node.config.serveLightClientData:
node.network.unsubscribe(getOptimisticLightClientUpdateTopic(forkDigest))

proc trackSyncCommitteeTopics*(node: BeaconNode) =
# TODO
discard
Expand Down Expand Up @@ -1149,6 +1155,18 @@ proc installMessageValidators(node: BeaconNode) =
installSyncCommitteeeValidators(node.dag.forkDigests.altair)
installSyncCommitteeeValidators(node.dag.forkDigests.bellatrix)

if node.config.serveLightClientData:
template installOptimisticLightClientUpdateValidator(digest: auto) =
node.network.addValidator(
getOptimisticLightClientUpdateTopic(digest),
proc(msg: altair.OptimisticLightClientUpdate): ValidationResult =
toValidationResult(
node.processor[].optimisticLightClientUpdateValidator(
MsgSource.gossip, msg)))

installOptimisticLightClientUpdateValidator(node.dag.forkDigests.altair)
installOptimisticLightClientUpdateValidator(node.dag.forkDigests.bellatrix)

proc stop(node: BeaconNode) =
bnStatus = BeaconNodeStatus.Stopping
notice "Graceful shutdown"
Expand Down
7 changes: 6 additions & 1 deletion beacon_chain/spec/beacon_time.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand Down Expand Up @@ -146,6 +146,9 @@ const
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/validator.md#broadcast-sync-committee-contribution
syncContributionSlotOffset* = TimeDiff(nanoseconds:
NANOSECONDS_PER_SLOT.int64 * 2 div INTERVALS_PER_SLOT)
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/sync-protocol.md#block-proposal
optimisticLightClientUpdateSlotOffset* = TimeDiff(nanoseconds:
NANOSECONDS_PER_SLOT.int64 div INTERVALS_PER_SLOT)

func toFloatSeconds*(t: TimeDiff): float =
float(t.nanoseconds) / 1_000_000_000.0
Expand All @@ -167,6 +170,8 @@ func sync_committee_message_deadline*(s: Slot): BeaconTime =
s.start_beacon_time + syncCommitteeMessageSlotOffset
func sync_contribution_deadline*(s: Slot): BeaconTime =
s.start_beacon_time + syncContributionSlotOffset
func optimistic_light_client_update_deadline*(s: Slot): BeaconTime =
s.start_beacon_time + optimisticLightClientUpdateSlotOffset

func slotOrZero*(time: BeaconTime): Slot =
let exSlot = time.toSlot
Expand Down
6 changes: 6 additions & 0 deletions beacon_chain/spec/datatypes/altair.nim
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,12 @@ chronicles.formatIt SignedContributionAndProof: shortLog(it)
template hash*(x: LightClientUpdate): Hash =
hash(x.header)

func shortLog*(v: OptimisticLightClientUpdate): auto =
(
attested_header: shortLog(v.attested_header),
sync_aggregate: shortLog(v.sync_aggregate)
)

func clear*(info: var EpochInfo) =
info.validators.setLen(0)
info.balances = UnslashedParticipatingBalances()
Expand Down
5 changes: 5 additions & 0 deletions beacon_chain/spec/network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func getSyncCommitteeContributionAndProofTopic*(forkDigest: ForkDigest): string
## For subscribing and unsubscribing to/from a subnet.
eth2Prefix(forkDigest) & "sync_committee_contribution_and_proof/ssz_snappy"

# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/sync-protocol.md#optimistic_light_client_update
func getOptimisticLightClientUpdateTopic*(forkDigest: ForkDigest): string =
## For broadcasting the latest `OptimisticLightClientUpdate` to light clients.
eth2Prefix(forkDigest) & "optimistic_light_client_update/ssz_snappy"

func getENRForkID*(cfg: RuntimeConfig,
epoch: Epoch,
genesis_validators_root: Eth2Digest): ENRForkID =
Expand Down
37 changes: 29 additions & 8 deletions beacon_chain/validators/validator_duties.nim
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,18 @@ proc handleSyncCommitteeMessages(node: BeaconNode, head: BlockRef, slot: Slot) =
asyncSpawn createAndSendSyncCommitteeMessage(node, slot, validator,
subcommitteeIdx, head)

proc handleOptimisticLightClientUpdates(
node: BeaconNode, head: BlockRef, slot: Slot, didPropose: bool) =
if not didPropose:
return
let msg = node.dag.optimisticLightClientUpdate
if msg.attested_header.slot != slot:
notice "No optimistic light client update for proposed block",
slot = slot, block_root = shortLog(head.root)
return
node.network.broadcastOptimisticLightClientUpdate(msg)
notice "Sent optimistic light client update", message = shortLog(msg)

proc signAndSendContribution(node: BeaconNode,
validator: AttachedValidator,
contribution: SyncCommitteeContribution,
Expand Down Expand Up @@ -841,14 +853,14 @@ proc handleSyncCommitteeContributions(node: BeaconNode,
slot, head, subnet_id = candidateAggregators[i].subcommitteeIdx

proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
Future[BlockRef] {.async.} =
Future[(head: BlockRef, didPropose: bool)] {.async.} =
## Perform the proposal for the given slot, iff we have a validator attached
## that is supposed to do so, given the shuffling at that slot for the given
## head - to compute the proposer, we need to advance a state to the given
## slot
let proposer = node.dag.getProposer(head, slot)
if proposer.isNone():
return head
return (head: head, didPropose: false)

let
proposerKey = node.dag.validatorKey(proposer.get).get().toPubKey
Expand All @@ -862,9 +874,12 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
proposer_index = proposer.get(),
proposer = shortLog(proposerKey)

head
(head: head, didPropose: false)
else:
await proposeBlock(node, validator, proposer.get(), head, slot)
(
head: await proposeBlock(node, validator, proposer.get(), head, slot),
didPropose: true
)

proc makeAggregateAndProof*(
pool: var AttestationPool, epochRef: EpochRef, slot: Slot, index: CommitteeIndex,
Expand Down Expand Up @@ -1052,6 +1067,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =

# Start by checking if there's work we should have done in the past that we
# can still meaningfully do
var didPropose = false
while curSlot < slot:
notice "Catching up on validator duties",
curSlot = shortLog(curSlot),
Expand All @@ -1061,7 +1077,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
# For every slot we're catching up, we'll propose then send
# attestations - head should normally be advancing along the same branch
# in this case
head = await handleProposal(node, head, curSlot)
(head, didPropose) = await handleProposal(node, head, curSlot)

# For each slot we missed, we need to send out attestations - if we were
# proposing during this time, we'll use the newly proposed head, else just
Expand All @@ -1071,7 +1087,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =

curSlot += 1

head = await handleProposal(node, head, slot)
(head, didPropose) = await handleProposal(node, head, slot)

let
# The latest point in time when we'll be sending out attestations
Expand Down Expand Up @@ -1116,11 +1132,16 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
node.consensusManager[].updateHead(slot)
head = node.dag.head

static: doAssert attestationSlotOffset == syncCommitteeMessageSlotOffset

static:
doAssert attestationSlotOffset == syncCommitteeMessageSlotOffset
handleAttestations(node, head, slot)
handleSyncCommitteeMessages(node, head, slot)

if node.config.serveLightClientData:
static:
doAssert attestationSlotOffset == optimisticLightClientUpdateSlotOffset
handleOptimisticLightClientUpdates(node, slot, didPropose)

updateValidatorMetrics(node) # the important stuff is done, update the vanity numbers

# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/phase0/validator.md#broadcast-aggregate
Expand Down

0 comments on commit 74b0601

Please sign in to comment.