Skip to content

Commit

Permalink
more efficient forkchoiceUpdated usage (#4055)
Browse files Browse the repository at this point in the history
* more efficient forkchoiceUpdated usage

* await rather than asyncSpawn; ensure head update before dag.updateHead

* use action tracker rather than attached validators to check for next slot proposal; use wall slot + 1 rather than state slot + 1 to correctly check when missing blocks

* re-add two-fcU case for when newPayload not VALID

* check dynamicFeeRecipientsStore for potential proposal

* remove duplicate checks for whether next proposer
  • Loading branch information
tersec authored Sep 7, 2022
1 parent 324e021 commit bf3a014
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 90 deletions.
1 change: 0 additions & 1 deletion beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ type
syncManager*: SyncManager[Peer, PeerId]
backfiller*: SyncManager[Peer, PeerId]
genesisSnapshotContent*: string
actionTracker*: ActionTracker
processor*: ref Eth2Processor
blockProcessor*: ref BlockProcessor
consensusManager*: ref ConsensusManager
Expand Down
136 changes: 82 additions & 54 deletions beacon_chain/consensus_object_pools/consensus_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ from ../spec/eth2_apis/dynamic_fee_recipients import
DynamicFeeRecipientsStore, getDynamicFeeRecipient
from ../validators/keystore_management import
KeymanagerHost, getSuggestedFeeRecipient
from ../validators/action_tracker import ActionTracker, getNextProposalSlot

type
ForkChoiceUpdatedInformation* = object
Expand Down Expand Up @@ -47,6 +48,10 @@ type
# ----------------------------------------------------------------
eth1Monitor*: Eth1Monitor

# Allow determination of whether there's an upcoming proposal
# ----------------------------------------------------------------
actionTracker*: ActionTracker

# Allow determination of preferred fee recipient during proposals
# ----------------------------------------------------------------
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore
Expand All @@ -66,6 +71,7 @@ func new*(T: type ConsensusManager,
attestationPool: ref AttestationPool,
quarantine: ref Quarantine,
eth1Monitor: Eth1Monitor,
actionTracker: ActionTracker,
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore,
keymanagerHost: ref KeymanagerHost,
defaultFeeRecipient: Eth1Address
Expand All @@ -75,6 +81,7 @@ func new*(T: type ConsensusManager,
attestationPool: attestationPool,
quarantine: quarantine,
eth1Monitor: eth1Monitor,
actionTracker: actionTracker,
dynamicFeeRecipientsStore: dynamicFeeRecipientsStore,
keymanagerHost: keymanagerHost,
forkchoiceUpdatedInfo: Opt.none ForkchoiceUpdatedInformation,
Expand Down Expand Up @@ -260,12 +267,26 @@ proc updateHead*(self: var ConsensusManager, wallSlot: Slot) =

self.updateHead(newHead.blck)

proc checkNextProposer(dag: ChainDAGRef, slot: Slot):
proc checkNextProposer(
dag: ChainDAGRef, actionTracker: ActionTracker,
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore,
slot: Slot):
Opt[(ValidatorIndex, ValidatorPubKey)] =
let proposer = dag.getProposer(dag.head, slot + 1)
let nextSlot = slot + 1
let proposer = dag.getProposer(dag.head, nextSlot)
if proposer.isNone():
return Opt.none((ValidatorIndex, ValidatorPubKey))
Opt.some((proposer.get, dag.validatorKey(proposer.get).get().toPubKey))
if actionTracker.getNextProposalSlot(slot) != nextSlot and
dynamicFeeRecipientsStore[].getDynamicFeeRecipient(
proposer.get, nextSlot.epoch).isNone:
return Opt.none((ValidatorIndex, ValidatorPubKey))
let proposerKey = dag.validatorKey(proposer.get).get().toPubKey
Opt.some((proposer.get, proposerKey))

proc checkNextProposer*(self: ref ConsensusManager, wallSlot: Slot):
Opt[(ValidatorIndex, ValidatorPubKey)] =
self.dag.checkNextProposer(
self.actionTracker, self.dynamicFeeRecipientsStore, wallSlot)

proc getFeeRecipient*(
self: ref ConsensusManager, pubkey: ValidatorPubKey, validatorIdx: ValidatorIndex,
Expand All @@ -279,56 +300,59 @@ proc getFeeRecipient*(

from ../spec/datatypes/bellatrix import PayloadID

proc runProposalForkchoiceUpdated*(self: ref ConsensusManager) {.async.} =
withState(self.dag.headState):
let
nextSlot = state.data.slot + 1
(validatorIndex, nextProposer) =
self.dag.checkNextProposer(nextSlot).valueOr:
return

# Approximately lines up with validator_duties version. Used optimistcally/
# opportunistically, so mismatches are fine if not too frequent.
let
timestamp = compute_timestamp_at_slot(state.data, nextSlot)
randomData =
get_randao_mix(state.data, get_current_epoch(state.data)).data
feeRecipient = self.getFeeRecipient(
nextProposer, validatorIndex, nextSlot.epoch)
beaconHead = self.attestationPool[].getBeaconHead(self.dag.head)
headBlockRoot = self.dag.loadExecutionBlockRoot(beaconHead.blck)

if headBlockRoot.isZero:
proc runProposalForkchoiceUpdated*(
self: ref ConsensusManager, wallSlot: Slot) {.async.} =
let
nextWallSlot = wallSlot + 1
(validatorIndex, nextProposer) = self.checkNextProposer(wallSlot).valueOr:
return
debug "runProposalForkchoiceUpdated: expected to be proposing next slot",
nextWallSlot, validatorIndex, nextProposer

# Approximately lines up with validator_duties version. Used optimistcally/
# opportunistically, so mismatches are fine if not too frequent.
let
timestamp = withState(self.dag.headState):
compute_timestamp_at_slot(state.data, nextWallSlot)
randomData = withState(self.dag.headState):
get_randao_mix(state.data, get_current_epoch(state.data)).data
feeRecipient = self.getFeeRecipient(
nextProposer, validatorIndex, nextWallSlot.epoch)
beaconHead = self.attestationPool[].getBeaconHead(self.dag.head)
headBlockRoot = self.dag.loadExecutionBlockRoot(beaconHead.blck)

if headBlockRoot.isZero:
return

try:
let fcResult = awaitWithTimeout(
forkchoiceUpdated(
self.eth1Monitor,
headBlockRoot,
beaconHead.safeExecutionPayloadHash,
beaconHead.finalizedExecutionPayloadHash,
timestamp, randomData, feeRecipient),
FORKCHOICEUPDATED_TIMEOUT):
debug "runProposalForkchoiceUpdated: forkchoiceUpdated timed out"
ForkchoiceUpdatedResponse(
payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing))

if fcResult.payloadStatus.status != PayloadExecutionStatus.valid or
fcResult.payloadId.isNone:
return

self.forkchoiceUpdatedInfo = Opt.some ForkchoiceUpdatedInformation(
payloadId: bellatrix.PayloadID(fcResult.payloadId.get),
headBlockRoot: headBlockRoot,
safeBlockRoot: beaconHead.safeExecutionPayloadHash,
finalizedBlockRoot: beaconHead.finalizedExecutionPayloadHash,
timestamp: timestamp,
feeRecipient: feeRecipient)
except CatchableError as err:
error "Engine API fork-choice update failed", err = err.msg

try:
let fcResult = awaitWithTimeout(
forkchoiceUpdated(
self.eth1Monitor,
headBlockRoot,
beaconHead.safeExecutionPayloadHash,
beaconHead.finalizedExecutionPayloadHash,
timestamp, randomData, feeRecipient),
FORKCHOICEUPDATED_TIMEOUT):
debug "runProposalForkchoiceUpdated: forkchoiceUpdated timed out"
ForkchoiceUpdatedResponse(
payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing))

if fcResult.payloadStatus.status != PayloadExecutionStatus.valid or
fcResult.payloadId.isNone:
return

self.forkchoiceUpdatedInfo = Opt.some ForkchoiceUpdatedInformation(
payloadId: bellatrix.PayloadID(fcResult.payloadId.get),
headBlockRoot: headBlockRoot,
safeBlockRoot: beaconHead.safeExecutionPayloadHash,
finalizedBlockRoot: beaconHead.finalizedExecutionPayloadHash,
timestamp: timestamp,
feeRecipient: feeRecipient)
except CatchableError as err:
error "Engine API fork-choice update failed", err = err.msg

proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BeaconHead)
proc updateHeadWithExecution*(
self: ref ConsensusManager, newHead: BeaconHead, wallSlot: Slot)
{.async.} =
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
Expand All @@ -343,9 +367,13 @@ proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BeaconHead)
# justified and finalized
self.dag.updateHead(newHead.blck, self.quarantine[])

# TODO after things stabilize with this, check for upcoming proposal and
# don't bother sending first fcU, but initially, keep both in place
asyncSpawn self.runProposalForkchoiceUpdated()
# If this node should propose next slot, start preparing payload. Both
# fcUs are useful: the updateExecutionClientHead(newHead) call updates
# the head state (including optimistic status) that self.dagUpdateHead
# needs while runProposalForkchoiceUpdated requires RANDAO information
# from the head state corresponding to the `newHead` block, which only
# self.dag.updateHead(...) sets up.
await self.runProposalForkchoiceUpdated(wallSlot)

self[].checkExpectedBlock()
except CatchableError as exc:
Expand Down
34 changes: 22 additions & 12 deletions beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import
../sszdump

from ../consensus_object_pools/consensus_manager import
ConsensusManager, optimisticExecutionPayloadHash, runForkchoiceUpdated,
runForkchoiceUpdatedDiscardResult, runProposalForkchoiceUpdated,
shouldSyncOptimistically, updateHead, updateHeadWithExecution
ConsensusManager, checkNextProposer, optimisticExecutionPayloadHash,
runForkchoiceUpdated, runForkchoiceUpdatedDiscardResult,
runProposalForkchoiceUpdated, shouldSyncOptimistically, updateHead,
updateHeadWithExecution
from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds
from ../consensus_object_pools/block_dag import BlockRef, root, slot
from ../consensus_object_pools/block_pools_types import BlockError, EpochRef
Expand Down Expand Up @@ -318,8 +319,10 @@ proc storeBlock*(
safeBlockRoot = newHead.get.safeExecutionPayloadHash,
finalizedBlockRoot = newHead.get.finalizedExecutionPayloadHash)
else:
let headExecutionPayloadHash =
self.consensusManager.dag.loadExecutionBlockRoot(newHead.get.blck)
let
headExecutionPayloadHash =
self.consensusManager.dag.loadExecutionBlockRoot(newHead.get.blck)
wallSlot = self.getBeaconTime().slotOrZero
if headExecutionPayloadHash.isZero:
# Blocks without execution payloads can't be optimistic.
self.consensusManager[].updateHead(newHead.get.blck)
Expand All @@ -328,15 +331,22 @@ proc storeBlock*(
# be selected as head, so `VALID`. `forkchoiceUpdated` necessary for EL
# client only.
self.consensusManager[].updateHead(newHead.get.blck)
asyncSpawn eth1Monitor.expectValidForkchoiceUpdated(
headBlockRoot = headExecutionPayloadHash,
safeBlockRoot = newHead.get.safeExecutionPayloadHash,
finalizedBlockRoot = newHead.get.finalizedExecutionPayloadHash)

# TODO remove redundant fcU in case of proposal
asyncSpawn self.consensusManager.runProposalForkchoiceUpdated()
if self.consensusManager.checkNextProposer(wallSlot).isNone:
# No attached validator is next proposer, so use non-proposal fcU
asyncSpawn eth1Monitor.expectValidForkchoiceUpdated(
headBlockRoot = headExecutionPayloadHash,
safeBlockRoot = newHead.get.safeExecutionPayloadHash,
finalizedBlockRoot = newHead.get.finalizedExecutionPayloadHash)
else:
# Some attached validator is next proposer, so prepare payload. As
# updateHead() updated the DAG head, runProposalForkchoiceUpdated,
# which needs the state corresponding to that head block, can run.
asyncSpawn self.consensusManager.runProposalForkchoiceUpdated(
wallSlot)
else:
asyncSpawn self.consensusManager.updateHeadWithExecution(newHead.get)
asyncSpawn self.consensusManager.updateHeadWithExecution(
newHead.get, wallSlot)
else:
warn "Head selection failed, using previous head",
head = shortLog(self.consensusManager.dag.head), wallSlot
Expand Down
43 changes: 25 additions & 18 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ proc initFullNode(
ExitPool.init(dag, attestationPool, onVoluntaryExitAdded))
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.eth1Monitor,
ActionTracker.init(rng, config.subscribeAllSubnets),
node.dynamicFeeRecipientsStore, node.keymanagerHost,
config.defaultFeeRecipient)
blockProcessor = BlockProcessor.new(
Expand Down Expand Up @@ -371,9 +372,10 @@ proc initFullNode(
node.validatorMonitor[].addMonitor(validator.pubkey, validator.index)

if validator.index.isSome():
node.actionTracker.knownValidators[validator.index.get()] = wallSlot
let
stabilitySubnets = node.actionTracker.stabilitySubnets(wallSlot)
node.consensusManager[].actionTracker.knownValidators[
validator.index.get()] = wallSlot
let stabilitySubnets =
node.consensusManager[].actionTracker.stabilitySubnets(wallSlot)
# Here, we also set the correct ENR should we be in all subnets mode!
node.network.updateStabilitySubnetMetadata(stabilitySubnets)

Expand Down Expand Up @@ -735,7 +737,6 @@ proc init*(T: type BeaconNode,
keymanagerHost: keymanagerHost,
keymanagerServer: keymanagerInitResult.server,
eventBus: eventBus,
actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets),
gossipState: {},
blocksGossipState: {},
beaconClock: beaconClock,
Expand Down Expand Up @@ -791,20 +792,22 @@ proc updateAttestationSubnetHandlers(node: BeaconNode, slot: Slot) =
return

let
aggregateSubnets = node.actionTracker.aggregateSubnets(slot)
stabilitySubnets = node.actionTracker.stabilitySubnets(slot)
aggregateSubnets =
node.consensusManager[].actionTracker.aggregateSubnets(slot)
stabilitySubnets =
node.consensusManager[].actionTracker.stabilitySubnets(slot)
subnets = aggregateSubnets + stabilitySubnets

node.network.updateStabilitySubnetMetadata(stabilitySubnets)

# Now we know what we should be subscribed to - make it so
let
prevSubnets = node.actionTracker.subscribedSubnets
prevSubnets = node.consensusManager[].actionTracker.subscribedSubnets
unsubscribeSubnets = prevSubnets - subnets
subscribeSubnets = subnets - prevSubnets

# Remember what we subscribed to, so we can unsubscribe later
node.actionTracker.subscribedSubnets = subnets
node.consensusManager[].actionTracker.subscribedSubnets = subnets

let forkDigests = node.forkDigests()

Expand Down Expand Up @@ -888,7 +891,7 @@ proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
for subnet_id in SubnetId:
node.network.unsubscribe(getAttestationTopic(forkDigest, subnet_id))

node.actionTracker.subscribedSubnets = default(AttnetBits)
node.consensusManager[].actionTracker.subscribedSubnets = default(AttnetBits)

func hasSyncPubKey(node: BeaconNode, epoch: Epoch): auto =
# Only used to determine which gossip topics to which to subscribe
Expand Down Expand Up @@ -1103,15 +1106,16 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =

# We "know" the actions for the current and the next epoch
withState(node.dag.headState):
if node.actionTracker.needsUpdate(state, slot.epoch):
if node.consensusManager[].actionTracker.needsUpdate(state, slot.epoch):
let epochRef = node.dag.getEpochRef(head, slot.epoch, false).expect(
"Getting head EpochRef should never fail")
node.actionTracker.updateActions(epochRef)
node.consensusManager[].actionTracker.updateActions(epochRef)

if node.actionTracker.needsUpdate(state, slot.epoch + 1):
if node.consensusManager[].actionTracker.needsUpdate(
state, slot.epoch + 1):
let epochRef = node.dag.getEpochRef(head, slot.epoch + 1, false).expect(
"Getting head EpochRef should never fail")
node.actionTracker.updateActions(epochRef)
node.consensusManager[].actionTracker.updateActions(epochRef)

if node.gossipState.card > 0 and targetGossipState.card == 0:
debug "Disabling topic subscriptions",
Expand Down Expand Up @@ -1192,14 +1196,17 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
let head = node.dag.head
if node.isSynced(head):
withState(node.dag.headState):
if node.actionTracker.needsUpdate(state, slot.epoch + 1):
if node.consensusManager[].actionTracker.needsUpdate(
state, slot.epoch + 1):
let epochRef = node.dag.getEpochRef(head, slot.epoch + 1, false).expect(
"Getting head EpochRef should never fail")
node.actionTracker.updateActions(epochRef)
node.consensusManager[].actionTracker.updateActions(epochRef)

let
nextAttestationSlot = node.actionTracker.getNextAttestationSlot(slot)
nextProposalSlot = node.actionTracker.getNextProposalSlot(slot)
nextAttestationSlot =
node.consensusManager[].actionTracker.getNextAttestationSlot(slot)
nextProposalSlot =
node.consensusManager[].actionTracker.getNextProposalSlot(slot)
nextActionWaitTime = saturate(fromNow(
node.beaconClock, min(nextAttestationSlot, nextProposalSlot)))

Expand Down Expand Up @@ -1263,7 +1270,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
node.dag.advanceClearanceState()

# Prepare action tracker for the next slot
node.actionTracker.updateSlot(slot + 1)
node.consensusManager[].actionTracker.updateSlot(slot + 1)

# The last thing we do is to perform the subscriptions and unsubscriptions for
# the next slot, just before that slot starts - because of the advance cuttoff
Expand Down
2 changes: 1 addition & 1 deletion beacon_chain/rpc/rest_validator_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
get_committee_count_per_slot(shufflingRef), request.slot,
request.committee_index)

node.actionTracker.registerDuty(
node.consensusManager[].actionTracker.registerDuty(
request.slot, subnet_id, request.validator_index,
request.is_aggregator)

Expand Down
Loading

0 comments on commit bf3a014

Please sign in to comment.