diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index b046ec2595..60fffc8e41 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -77,7 +77,6 @@ type syncManager*: SyncManager[Peer, PeerId] backfiller*: SyncManager[Peer, PeerId] genesisSnapshotContent*: string - actionTracker*: ActionTracker processor*: ref Eth2Processor blockProcessor*: ref BlockProcessor consensusManager*: ref ConsensusManager diff --git a/beacon_chain/consensus_object_pools/consensus_manager.nim b/beacon_chain/consensus_object_pools/consensus_manager.nim index ff23c8846f..8ebcc9eb09 100644 --- a/beacon_chain/consensus_object_pools/consensus_manager.nim +++ b/beacon_chain/consensus_object_pools/consensus_manager.nim @@ -20,6 +20,7 @@ from ../spec/eth2_apis/dynamic_fee_recipients import DynamicFeeRecipientsStore, getDynamicFeeRecipient from ../validators/keystore_management import KeymanagerHost, getSuggestedFeeRecipient +from ../validators/action_tracker import ActionTracker, getNextProposalSlot type ForkChoiceUpdatedInformation* = object @@ -47,6 +48,10 @@ type # ---------------------------------------------------------------- eth1Monitor*: Eth1Monitor + # Allow determination of whether there's an upcoming proposal + # ---------------------------------------------------------------- + actionTracker*: ActionTracker + # Allow determination of preferred fee recipient during proposals # ---------------------------------------------------------------- dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore @@ -66,6 +71,7 @@ func new*(T: type ConsensusManager, attestationPool: ref AttestationPool, quarantine: ref Quarantine, eth1Monitor: Eth1Monitor, + actionTracker: ActionTracker, dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore, keymanagerHost: ref KeymanagerHost, defaultFeeRecipient: Eth1Address @@ -75,6 +81,7 @@ func new*(T: type ConsensusManager, attestationPool: attestationPool, quarantine: quarantine, eth1Monitor: eth1Monitor, + actionTracker: actionTracker, dynamicFeeRecipientsStore: dynamicFeeRecipientsStore, keymanagerHost: keymanagerHost, forkchoiceUpdatedInfo: Opt.none ForkchoiceUpdatedInformation, @@ -260,12 +267,26 @@ proc updateHead*(self: var ConsensusManager, wallSlot: Slot) = self.updateHead(newHead.blck) -proc checkNextProposer(dag: ChainDAGRef, slot: Slot): +proc checkNextProposer( + dag: ChainDAGRef, actionTracker: ActionTracker, + dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore, + slot: Slot): Opt[(ValidatorIndex, ValidatorPubKey)] = - let proposer = dag.getProposer(dag.head, slot + 1) + let nextSlot = slot + 1 + let proposer = dag.getProposer(dag.head, nextSlot) if proposer.isNone(): return Opt.none((ValidatorIndex, ValidatorPubKey)) - Opt.some((proposer.get, dag.validatorKey(proposer.get).get().toPubKey)) + if actionTracker.getNextProposalSlot(slot) != nextSlot and + dynamicFeeRecipientsStore[].getDynamicFeeRecipient( + proposer.get, nextSlot.epoch).isNone: + return Opt.none((ValidatorIndex, ValidatorPubKey)) + let proposerKey = dag.validatorKey(proposer.get).get().toPubKey + Opt.some((proposer.get, proposerKey)) + +proc checkNextProposer*(self: ref ConsensusManager, wallSlot: Slot): + Opt[(ValidatorIndex, ValidatorPubKey)] = + self.dag.checkNextProposer( + self.actionTracker, self.dynamicFeeRecipientsStore, wallSlot) proc getFeeRecipient*( self: ref ConsensusManager, pubkey: ValidatorPubKey, validatorIdx: ValidatorIndex, @@ -279,56 +300,59 @@ proc getFeeRecipient*( from ../spec/datatypes/bellatrix import PayloadID -proc runProposalForkchoiceUpdated*(self: ref ConsensusManager) {.async.} = - withState(self.dag.headState): - let - nextSlot = state.data.slot + 1 - (validatorIndex, nextProposer) = - self.dag.checkNextProposer(nextSlot).valueOr: - return - - # Approximately lines up with validator_duties version. Used optimistcally/ - # opportunistically, so mismatches are fine if not too frequent. - let - timestamp = compute_timestamp_at_slot(state.data, nextSlot) - randomData = - get_randao_mix(state.data, get_current_epoch(state.data)).data - feeRecipient = self.getFeeRecipient( - nextProposer, validatorIndex, nextSlot.epoch) - beaconHead = self.attestationPool[].getBeaconHead(self.dag.head) - headBlockRoot = self.dag.loadExecutionBlockRoot(beaconHead.blck) - - if headBlockRoot.isZero: +proc runProposalForkchoiceUpdated*( + self: ref ConsensusManager, wallSlot: Slot) {.async.} = + let + nextWallSlot = wallSlot + 1 + (validatorIndex, nextProposer) = self.checkNextProposer(wallSlot).valueOr: return + debug "runProposalForkchoiceUpdated: expected to be proposing next slot", + nextWallSlot, validatorIndex, nextProposer + + # Approximately lines up with validator_duties version. Used optimistcally/ + # opportunistically, so mismatches are fine if not too frequent. + let + timestamp = withState(self.dag.headState): + compute_timestamp_at_slot(state.data, nextWallSlot) + randomData = withState(self.dag.headState): + get_randao_mix(state.data, get_current_epoch(state.data)).data + feeRecipient = self.getFeeRecipient( + nextProposer, validatorIndex, nextWallSlot.epoch) + beaconHead = self.attestationPool[].getBeaconHead(self.dag.head) + headBlockRoot = self.dag.loadExecutionBlockRoot(beaconHead.blck) + + if headBlockRoot.isZero: + return + + try: + let fcResult = awaitWithTimeout( + forkchoiceUpdated( + self.eth1Monitor, + headBlockRoot, + beaconHead.safeExecutionPayloadHash, + beaconHead.finalizedExecutionPayloadHash, + timestamp, randomData, feeRecipient), + FORKCHOICEUPDATED_TIMEOUT): + debug "runProposalForkchoiceUpdated: forkchoiceUpdated timed out" + ForkchoiceUpdatedResponse( + payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing)) + + if fcResult.payloadStatus.status != PayloadExecutionStatus.valid or + fcResult.payloadId.isNone: + return + + self.forkchoiceUpdatedInfo = Opt.some ForkchoiceUpdatedInformation( + payloadId: bellatrix.PayloadID(fcResult.payloadId.get), + headBlockRoot: headBlockRoot, + safeBlockRoot: beaconHead.safeExecutionPayloadHash, + finalizedBlockRoot: beaconHead.finalizedExecutionPayloadHash, + timestamp: timestamp, + feeRecipient: feeRecipient) + except CatchableError as err: + error "Engine API fork-choice update failed", err = err.msg - try: - let fcResult = awaitWithTimeout( - forkchoiceUpdated( - self.eth1Monitor, - headBlockRoot, - beaconHead.safeExecutionPayloadHash, - beaconHead.finalizedExecutionPayloadHash, - timestamp, randomData, feeRecipient), - FORKCHOICEUPDATED_TIMEOUT): - debug "runProposalForkchoiceUpdated: forkchoiceUpdated timed out" - ForkchoiceUpdatedResponse( - payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing)) - - if fcResult.payloadStatus.status != PayloadExecutionStatus.valid or - fcResult.payloadId.isNone: - return - - self.forkchoiceUpdatedInfo = Opt.some ForkchoiceUpdatedInformation( - payloadId: bellatrix.PayloadID(fcResult.payloadId.get), - headBlockRoot: headBlockRoot, - safeBlockRoot: beaconHead.safeExecutionPayloadHash, - finalizedBlockRoot: beaconHead.finalizedExecutionPayloadHash, - timestamp: timestamp, - feeRecipient: feeRecipient) - except CatchableError as err: - error "Engine API fork-choice update failed", err = err.msg - -proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BeaconHead) +proc updateHeadWithExecution*( + self: ref ConsensusManager, newHead: BeaconHead, wallSlot: Slot) {.async.} = ## Trigger fork choice and update the DAG with the new head block ## This does not automatically prune the DAG after finalization @@ -343,9 +367,13 @@ proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BeaconHead) # justified and finalized self.dag.updateHead(newHead.blck, self.quarantine[]) - # TODO after things stabilize with this, check for upcoming proposal and - # don't bother sending first fcU, but initially, keep both in place - asyncSpawn self.runProposalForkchoiceUpdated() + # If this node should propose next slot, start preparing payload. Both + # fcUs are useful: the updateExecutionClientHead(newHead) call updates + # the head state (including optimistic status) that self.dagUpdateHead + # needs while runProposalForkchoiceUpdated requires RANDAO information + # from the head state corresponding to the `newHead` block, which only + # self.dag.updateHead(...) sets up. + await self.runProposalForkchoiceUpdated(wallSlot) self[].checkExpectedBlock() except CatchableError as exc: diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index eda73987e5..993e9a40b1 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -17,9 +17,10 @@ import ../sszdump from ../consensus_object_pools/consensus_manager import - ConsensusManager, optimisticExecutionPayloadHash, runForkchoiceUpdated, - runForkchoiceUpdatedDiscardResult, runProposalForkchoiceUpdated, - shouldSyncOptimistically, updateHead, updateHeadWithExecution + ConsensusManager, checkNextProposer, optimisticExecutionPayloadHash, + runForkchoiceUpdated, runForkchoiceUpdatedDiscardResult, + runProposalForkchoiceUpdated, shouldSyncOptimistically, updateHead, + updateHeadWithExecution from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds from ../consensus_object_pools/block_dag import BlockRef, root, slot from ../consensus_object_pools/block_pools_types import BlockError, EpochRef @@ -318,8 +319,10 @@ proc storeBlock*( safeBlockRoot = newHead.get.safeExecutionPayloadHash, finalizedBlockRoot = newHead.get.finalizedExecutionPayloadHash) else: - let headExecutionPayloadHash = - self.consensusManager.dag.loadExecutionBlockRoot(newHead.get.blck) + let + headExecutionPayloadHash = + self.consensusManager.dag.loadExecutionBlockRoot(newHead.get.blck) + wallSlot = self.getBeaconTime().slotOrZero if headExecutionPayloadHash.isZero: # Blocks without execution payloads can't be optimistic. self.consensusManager[].updateHead(newHead.get.blck) @@ -328,15 +331,22 @@ proc storeBlock*( # be selected as head, so `VALID`. `forkchoiceUpdated` necessary for EL # client only. self.consensusManager[].updateHead(newHead.get.blck) - asyncSpawn eth1Monitor.expectValidForkchoiceUpdated( - headBlockRoot = headExecutionPayloadHash, - safeBlockRoot = newHead.get.safeExecutionPayloadHash, - finalizedBlockRoot = newHead.get.finalizedExecutionPayloadHash) - # TODO remove redundant fcU in case of proposal - asyncSpawn self.consensusManager.runProposalForkchoiceUpdated() + if self.consensusManager.checkNextProposer(wallSlot).isNone: + # No attached validator is next proposer, so use non-proposal fcU + asyncSpawn eth1Monitor.expectValidForkchoiceUpdated( + headBlockRoot = headExecutionPayloadHash, + safeBlockRoot = newHead.get.safeExecutionPayloadHash, + finalizedBlockRoot = newHead.get.finalizedExecutionPayloadHash) + else: + # Some attached validator is next proposer, so prepare payload. As + # updateHead() updated the DAG head, runProposalForkchoiceUpdated, + # which needs the state corresponding to that head block, can run. + asyncSpawn self.consensusManager.runProposalForkchoiceUpdated( + wallSlot) else: - asyncSpawn self.consensusManager.updateHeadWithExecution(newHead.get) + asyncSpawn self.consensusManager.updateHeadWithExecution( + newHead.get, wallSlot) else: warn "Head selection failed, using previous head", head = shortLog(self.consensusManager.dag.head), wallSlot diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 0c99def070..06a85c741f 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -292,6 +292,7 @@ proc initFullNode( ExitPool.init(dag, attestationPool, onVoluntaryExitAdded)) consensusManager = ConsensusManager.new( dag, attestationPool, quarantine, node.eth1Monitor, + ActionTracker.init(rng, config.subscribeAllSubnets), node.dynamicFeeRecipientsStore, node.keymanagerHost, config.defaultFeeRecipient) blockProcessor = BlockProcessor.new( @@ -371,9 +372,10 @@ proc initFullNode( node.validatorMonitor[].addMonitor(validator.pubkey, validator.index) if validator.index.isSome(): - node.actionTracker.knownValidators[validator.index.get()] = wallSlot - let - stabilitySubnets = node.actionTracker.stabilitySubnets(wallSlot) + node.consensusManager[].actionTracker.knownValidators[ + validator.index.get()] = wallSlot + let stabilitySubnets = + node.consensusManager[].actionTracker.stabilitySubnets(wallSlot) # Here, we also set the correct ENR should we be in all subnets mode! node.network.updateStabilitySubnetMetadata(stabilitySubnets) @@ -735,7 +737,6 @@ proc init*(T: type BeaconNode, keymanagerHost: keymanagerHost, keymanagerServer: keymanagerInitResult.server, eventBus: eventBus, - actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets), gossipState: {}, blocksGossipState: {}, beaconClock: beaconClock, @@ -791,20 +792,22 @@ proc updateAttestationSubnetHandlers(node: BeaconNode, slot: Slot) = return let - aggregateSubnets = node.actionTracker.aggregateSubnets(slot) - stabilitySubnets = node.actionTracker.stabilitySubnets(slot) + aggregateSubnets = + node.consensusManager[].actionTracker.aggregateSubnets(slot) + stabilitySubnets = + node.consensusManager[].actionTracker.stabilitySubnets(slot) subnets = aggregateSubnets + stabilitySubnets node.network.updateStabilitySubnetMetadata(stabilitySubnets) # Now we know what we should be subscribed to - make it so let - prevSubnets = node.actionTracker.subscribedSubnets + prevSubnets = node.consensusManager[].actionTracker.subscribedSubnets unsubscribeSubnets = prevSubnets - subnets subscribeSubnets = subnets - prevSubnets # Remember what we subscribed to, so we can unsubscribe later - node.actionTracker.subscribedSubnets = subnets + node.consensusManager[].actionTracker.subscribedSubnets = subnets let forkDigests = node.forkDigests() @@ -888,7 +891,7 @@ proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = for subnet_id in SubnetId: node.network.unsubscribe(getAttestationTopic(forkDigest, subnet_id)) - node.actionTracker.subscribedSubnets = default(AttnetBits) + node.consensusManager[].actionTracker.subscribedSubnets = default(AttnetBits) func hasSyncPubKey(node: BeaconNode, epoch: Epoch): auto = # Only used to determine which gossip topics to which to subscribe @@ -1103,15 +1106,16 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = # We "know" the actions for the current and the next epoch withState(node.dag.headState): - if node.actionTracker.needsUpdate(state, slot.epoch): + if node.consensusManager[].actionTracker.needsUpdate(state, slot.epoch): let epochRef = node.dag.getEpochRef(head, slot.epoch, false).expect( "Getting head EpochRef should never fail") - node.actionTracker.updateActions(epochRef) + node.consensusManager[].actionTracker.updateActions(epochRef) - if node.actionTracker.needsUpdate(state, slot.epoch + 1): + if node.consensusManager[].actionTracker.needsUpdate( + state, slot.epoch + 1): let epochRef = node.dag.getEpochRef(head, slot.epoch + 1, false).expect( "Getting head EpochRef should never fail") - node.actionTracker.updateActions(epochRef) + node.consensusManager[].actionTracker.updateActions(epochRef) if node.gossipState.card > 0 and targetGossipState.card == 0: debug "Disabling topic subscriptions", @@ -1192,14 +1196,17 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = let head = node.dag.head if node.isSynced(head): withState(node.dag.headState): - if node.actionTracker.needsUpdate(state, slot.epoch + 1): + if node.consensusManager[].actionTracker.needsUpdate( + state, slot.epoch + 1): let epochRef = node.dag.getEpochRef(head, slot.epoch + 1, false).expect( "Getting head EpochRef should never fail") - node.actionTracker.updateActions(epochRef) + node.consensusManager[].actionTracker.updateActions(epochRef) let - nextAttestationSlot = node.actionTracker.getNextAttestationSlot(slot) - nextProposalSlot = node.actionTracker.getNextProposalSlot(slot) + nextAttestationSlot = + node.consensusManager[].actionTracker.getNextAttestationSlot(slot) + nextProposalSlot = + node.consensusManager[].actionTracker.getNextProposalSlot(slot) nextActionWaitTime = saturate(fromNow( node.beaconClock, min(nextAttestationSlot, nextProposalSlot))) @@ -1263,7 +1270,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = node.dag.advanceClearanceState() # Prepare action tracker for the next slot - node.actionTracker.updateSlot(slot + 1) + node.consensusManager[].actionTracker.updateSlot(slot + 1) # The last thing we do is to perform the subscriptions and unsubscriptions for # the next slot, just before that slot starts - because of the advance cuttoff diff --git a/beacon_chain/rpc/rest_validator_api.nim b/beacon_chain/rpc/rest_validator_api.nim index e76bf33352..10bb846c1a 100644 --- a/beacon_chain/rpc/rest_validator_api.nim +++ b/beacon_chain/rpc/rest_validator_api.nim @@ -605,7 +605,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = get_committee_count_per_slot(shufflingRef), request.slot, request.committee_index) - node.actionTracker.registerDuty( + node.consensusManager[].actionTracker.registerDuty( request.slot, subnet_id, request.validator_index, request.is_aggregator) diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index 3c6e3ccd5c..95f1b7dc7a 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -1384,8 +1384,10 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} = doppelgangerDetection.nodeLaunchSlot > GENESIS_SLOT and node.config.doppelgangerDetection: let - nextAttestationSlot = node.actionTracker.getNextAttestationSlot(slot - 1) - nextProposalSlot = node.actionTracker.getNextProposalSlot(slot - 1) + nextAttestationSlot = + node.consensusManager[].actionTracker.getNextAttestationSlot(slot - 1) + nextProposalSlot = + node.consensusManager[].actionTracker.getNextProposalSlot(slot - 1) if slot in [nextAttestationSlot, nextProposalSlot]: notice "Doppelganger detection active - skipping validator duties while observing activity on the network", @@ -1544,5 +1546,5 @@ proc registerDuties*(node: BeaconNode, wallSlot: Slot) {.async.} = continue let isAggregator = is_aggregator(committee.lenu64, slotSigRes.get()) - node.actionTracker.registerDuty( + node.consensusManager[].actionTracker.registerDuty( slot, subnet_id, validator_index, isAggregator) diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index 1a397a25fe..4fc60470b8 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -23,6 +23,7 @@ import from ../beacon_chain/spec/eth2_apis/dynamic_fee_recipients import DynamicFeeRecipientsStore, init +from ../beacon_chain/validators/action_tracker import ActionTracker from ../beacon_chain/validators/keystore_management import KeymanagerHost proc pruneAtFinalization(dag: ChainDAGRef) = @@ -40,9 +41,10 @@ suite "Block processor" & preset(): quarantine = newClone(Quarantine.init()) attestationPool = newClone(AttestationPool.init(dag, quarantine)) eth1Monitor = new Eth1Monitor + actionTracker: ActionTracker keymanagerHost: ref KeymanagerHost consensusManager = ConsensusManager.new( - dag, attestationPool, quarantine, eth1Monitor, + dag, attestationPool, quarantine, eth1Monitor, actionTracker, newClone(DynamicFeeRecipientsStore.init()), keymanagerHost, default(Eth1Address)) state = newClone(dag.headState)