Skip to content

Commit

Permalink
allow execution clients several seconds to construct blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
tersec committed Aug 22, 2022
1 parent f1ddcff commit e7f6aac
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 21 deletions.
94 changes: 92 additions & 2 deletions beacon_chain/consensus_object_pools/consensus_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,19 @@ import
../consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool],
../eth1/eth1_monitor

from ../spec/eth2_apis/dynamic_fee_recipients import
DynamicFeeRecipientsStore, getDynamicFeeRecipient
from ../validators/keystore_management import
KeymanagerHost, getSuggestedFeeRecipient

type
ForkChoiceUpdatedInformation* = object
payloadId*: PayloadID
headBlockRoot*: Eth2Digest
safeBlockRoot*: Eth2Digest
finalizedBlockRoot*: Eth2Digest
feeRecipient*: Eth1Address

ConsensusManager* = object
expectedSlot: Slot
expectedBlockReceived: Future[bool]
Expand All @@ -34,20 +46,33 @@ type
# ----------------------------------------------------------------
eth1Monitor*: Eth1Monitor

# Allow determination of preferred fee recipient during proposals
# ----------------------------------------------------------------
dynamicFeeRecipientsStore: DynamicFeeRecipientsStore
keymanagerHost: ref KeymanagerHost

# Tracking last proposal forkchoiceUpdated payload information
# ----------------------------------------------------------------
forkchoiceUpdatedInfo*: ForkchoiceUpdatedInformation

# Initialization
# ------------------------------------------------------------------------------

func new*(T: type ConsensusManager,
dag: ChainDAGRef,
attestationPool: ref AttestationPool,
quarantine: ref Quarantine,
eth1Monitor: Eth1Monitor
eth1Monitor: Eth1Monitor,
dynamicFeeRecipientsStore: DynamicFeeRecipientsStore,
keymanagerHost: ref KeymanagerHost
): ref ConsensusManager =
(ref ConsensusManager)(
dag: dag,
attestationPool: attestationPool,
quarantine: quarantine,
eth1Monitor: eth1Monitor
eth1Monitor: eth1Monitor,
dynamicFeeRecipientsStore: dynamicFeeRecipientsStore,
keymanagerHost: keymanagerHost
)

# Consensus Management
Expand Down Expand Up @@ -182,6 +207,67 @@ proc updateHead*(self: var ConsensusManager, wallSlot: Slot) =

self.updateHead(newHead)

proc checkNextProposer(dag: ChainDAGRef, slot: Slot):
Opt[(ValidatorIndex, ValidatorPubKey)] =
let proposer = dag.getProposer(dag.head, slot + 1)
if proposer.isNone():
return Opt.none((ValidatorIndex, ValidatorPubKey))
Opt.some((proposer.get, dag.validatorKey(proposer.get).get().toPubKey))

proc getFeeRecipient*(
self: ref ConsensusManager, pubkey: ValidatorPubKey, validatorIdx: ValidatorIndex,
epoch: Epoch): Eth1Address =
self.dynamicFeeRecipientsStore.getDynamicFeeRecipient(validatorIdx, epoch).valueOr:
if self.keymanagerHost != nil:
self.keymanagerHost[].getSuggestedFeeRecipient(pubkey).valueOr:
self.keymanagerHost[].defaultFeeRecipient
else:
self.keymanagerHost[].defaultFeeRecipient

from ../spec/datatypes/bellatrix import PayloadID

proc runProposalForkchoiceUpdated(self: ref ConsensusManager) {.async.} =
withState(self.dag.headState):
let
nextSlot = state.data.slot + 1
(validatorIndex, nextProposer) =
self.dag.checkNextProposer(nextSlot).valueOr:
return

# Approximately lines up with validator_duties version. Used optimistcally/
# opportunistically, so mismatches are fine if not too frequent.
let
timestamp = compute_timestamp_at_slot(state.data, nextSlot)
randomData =
get_randao_mix(state.data, get_current_epoch(state.data)).data
feeRecipient = self.getFeeRecipient(
nextProposer, validatorIndex, nextSlot.epoch)
headBlockRoot = self.dag.loadExecutionBlockRoot(self.dag.head)
finalizedBlockRoot =
self.dag.loadExecutionBlockRoot(self.dag.finalizedHead.blck)

try:
let fcResult = awaitWithTimeout(
forkchoiceUpdated(
self.eth1Monitor, headBlockRoot, finalizedBlockRoot, timestamp,
randomData, feeRecipient),
FORKCHOICEUPDATED_TIMEOUT):
debug "runProposalForkchoiceUpdated: forkchoiceUpdated timed out"
ForkchoiceUpdatedResponse(
payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing))

if fcResult.payloadStatus.status != PayloadExecutionStatus.valid or
fcResult.payloadId.isNone:
return

self.forkchoiceUpdatedInfo = ForkchoiceUpdatedInformation(
payloadId: bellatrix.PayloadID(fcResult.payloadId.get),
headBlockRoot: headBlockRoot,
finalizedBlockRoot: finalizedBlockRoot,
feeRecipient: feeRecipient)
except CatchableError as err:
error "Engine API fork-choice update failed", err = err.msg

proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BlockRef)
{.async.} =
## Trigger fork choice and update the DAG with the new head block
Expand All @@ -197,6 +283,10 @@ proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BlockRef)
# justified and finalized
self.dag.updateHead(newHead, self.quarantine[])

# TODO after things stabilize with this, check for upcoming proposal and
# don't bother sending first fcU, but initially, keep both in place
asyncSpawn self.runProposalForkchoiceUpdated()

self[].checkExpectedBlock()
except CatchableError as exc:
debug "updateHeadWithExecution error",
Expand Down
3 changes: 2 additions & 1 deletion beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ proc initFullNode(
exitPool = newClone(
ExitPool.init(dag, attestationPool, onVoluntaryExitAdded))
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.eth1Monitor)
dag, attestationPool, quarantine, node.eth1Monitor,
node.dynamicFeeRecipientsStore, node.keymanagerHost)
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime,
Expand Down
38 changes: 21 additions & 17 deletions beacon_chain/validators/validator_duties.nim
Original file line number Diff line number Diff line change
Expand Up @@ -354,17 +354,6 @@ proc get_execution_payload(
asConsensusExecutionPayload(
await execution_engine.getPayload(payload_id.get))

proc getFeeRecipient(node: BeaconNode,
pubkey: ValidatorPubKey,
validatorIdx: ValidatorIndex,
epoch: Epoch): Eth1Address =
node.dynamicFeeRecipientsStore.getDynamicFeeRecipient(validatorIdx, epoch).valueOr:
if node.keymanagerHost != nil:
node.keymanagerHost[].getSuggestedFeeRecipient(pubkey).valueOr:
node.config.defaultFeeRecipient
else:
node.config.defaultFeeRecipient

from web3/engine_api_types import PayloadExecutionStatus

proc getExecutionPayload(
Expand Down Expand Up @@ -405,11 +394,25 @@ proc getExecutionPayload(
terminalBlockHash
latestFinalized =
node.dag.loadExecutionBlockRoot(node.dag.finalizedHead.blck)
feeRecipient = node.getFeeRecipient(pubkey, validator_index, epoch)
payload_id = (await forkchoice_updated(
proposalState.bellatrixData.data, latestHead, latestFinalized,
feeRecipient,
node.consensusManager.eth1Monitor))
feeRecipient = node.consensusManager.getFeeRecipient(pubkey, validator_index, epoch)
lastFcU = node.consensusManager.forkchoiceUpdatedInfo
payload_id =
if lastFcU.headBlockRoot == latestHead and
lastFcU.finalizedBlockRoot == latestFinalized and
lastFcU.feeRecipient == feeRecipient:
some bellatrix.PayloadID(lastFcU.payloadId)
else:
debug "getExecutionPayload: didn't find payloadId, re-querying",
latestHead,
latestFinalized,
feeRecipient,
cachedHeadBlockRoot = lastFcU.headBlockRoot,
cachedFinalizedBlockRoot = lastFcU.finalizedBlockRoot,
cachedFeeRecipient = lastFcU.feeRecipient

(await forkchoice_updated(
proposalState.bellatrixData.data, latestHead, latestFinalized,
feeRecipient, node.consensusManager.eth1Monitor))
payload = try:
awaitWithTimeout(
get_execution_payload(payload_id, node.consensusManager.eth1Monitor),
Expand Down Expand Up @@ -1219,7 +1222,8 @@ proc getValidatorRegistration(
# activated for duties yet. We can safely skip the registration then.
return

let feeRecipient = node.getFeeRecipient(validator.pubkey, validatorIdx, epoch)
let feeRecipient = node.consensusManager.getFeeRecipient(
validator.pubkey, validatorIdx, epoch)
var validatorRegistration = SignedValidatorRegistrationV1(
message: ValidatorRegistrationV1(
fee_recipient: ExecutionAddress(data: distinctBase(feeRecipient)),
Expand Down
9 changes: 8 additions & 1 deletion tests/test_block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import
../beacon_chain/eth1/eth1_monitor,
./testutil, ./testdbutil, ./testblockutil

from ../beacon_chain/spec/eth2_apis/dynamic_fee_recipients import
DynamicFeeRecipientsStore, init
from ../beacon_chain/validators/keystore_management import KeymanagerHost

proc pruneAtFinalization(dag: ChainDAGRef) =
if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG()
Expand All @@ -36,8 +40,11 @@ suite "Block processor" & preset():
quarantine = newClone(Quarantine.init())
attestationPool = newClone(AttestationPool.init(dag, quarantine))
eth1Monitor = new Eth1Monitor
dynamicFeeRecipientsStore = DynamicFeeRecipientsStore.init()
keymanagerHost: ref KeymanagerHost
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, eth1Monitor)
dag, attestationPool, quarantine, eth1Monitor,
dynamicFeeRecipientsStore, keymanagerHost)
state = newClone(dag.headState)
cache = StateCache()
b1 = addTestBlock(state[], cache).phase0Data
Expand Down

0 comments on commit e7f6aac

Please sign in to comment.