From d64c17ffc3525101261ad21464f5fe53f6c3f868 Mon Sep 17 00:00:00 2001 From: zah Date: Wed, 10 Aug 2022 15:31:10 +0300 Subject: [PATCH] Minor post-merge cleanups (#3945) https://github.com/status-im/nimbus-eth2/pull/3944 The use of nested `awaitWithRetries` calls would have resulted in an unexpected number of retries (3x3). We now use regular `await` in outer layer to avoid the problem. https://github.com/status-im/nimbus-eth2/pull/3943 The new code has an invariant that the `headMerkleizer` field in the `Eth1Chain` is always kept in sync with the blocks stored in the chain. This invariant is now enforced better by doing the necessary merkleizer updates in the `Eth1Chain.addBlock` function, in the `Eth1Chain.init` function and in the `Eth1Chain.reset` function. --- beacon_chain/eth1/eth1_monitor.nim | 173 +++++++++++++++-------------- research/block_sim.nim | 14 +-- 2 files changed, 96 insertions(+), 91 deletions(-) diff --git a/beacon_chain/eth1/eth1_monitor.nim b/beacon_chain/eth1/eth1_monitor.nim index 37e67c5773..bda709698a 100644 --- a/beacon_chain/eth1/eth1_monitor.nim +++ b/beacon_chain/eth1/eth1_monitor.nim @@ -70,10 +70,19 @@ type GenesisStateRef = ref phase0.BeaconState Eth1Block* = ref object + hash*: Eth2Digest number*: Eth1BlockNumber timestamp*: Eth1BlockTimestamp + ## Basic properties of the block + ## These must be initialized in the constructor + deposits*: seq[DepositData] - voteData*: Eth1Data + ## Deposits inside this particular block + + depositRoot*: Eth2Digest + depositCount*: uint64 + ## Global deposits count and hash tree root of the entire sequence + ## These are computed when the block is added to the chain (see `addBlock`) when hasGenesisDetection: activeValidatorsCount*: uint64 @@ -222,16 +231,16 @@ when hasGenesisDetection: proc createGenesisState(m: Eth1Monitor, eth1Block: Eth1Block): GenesisStateRef = notice "Generating genesis state", blockNum = eth1Block.number, - blockHash = eth1Block.voteData.block_hash, + blockHash = eth1Block.hash, blockTimestamp = eth1Block.timestamp, - totalDeposits = eth1Block.voteData.deposit_count, + totalDeposits = eth1Block.depositCount, activeValidators = eth1Block.activeValidatorsCount - var deposits = m.allGenesisDepositsUpTo(eth1Block.voteData.deposit_count) + var deposits = m.allGenesisDepositsUpTo(eth1Block.depositCount) result = newClone(initialize_beacon_state_from_eth1( m.cfg, - eth1Block.voteData.block_hash, + eth1Block.hash, eth1Block.timestamp.uint64, deposits, {})) @@ -355,7 +364,7 @@ func asEngineExecutionPayload*(executionPayload: bellatrix.ExecutionPayload): func shortLog*(b: Eth1Block): string = try: - &"{b.number}:{shortLog b.voteData.block_hash}(deposits = {b.voteData.deposit_count})" + &"{b.number}:{shortLog b.hash}(deposits = {b.depositCount})" except ValueError as exc: raiseAssert exc.msg template findBlock(chain: Eth1Chain, eth1Data: Eth1Data): Eth1Block = @@ -364,12 +373,9 @@ template findBlock(chain: Eth1Chain, eth1Data: Eth1Data): Eth1Block = func makeSuccessorWithoutDeposits(existingBlock: Eth1Block, successor: BlockObject): Eth1Block = result = Eth1Block( + hash: successor.hash.asEth2Digest, number: Eth1BlockNumber successor.number, - timestamp: Eth1BlockTimestamp successor.timestamp, - voteData: Eth1Data( - block_hash: successor.hash.asEth2Digest, - deposit_count: existingBlock.voteData.deposit_count, - deposit_root: existingBlock.voteData.deposit_root)) + timestamp: Eth1BlockTimestamp successor.timestamp) when hasGenesisDetection: result.activeValidatorsCount = existingBlock.activeValidatorsCount @@ -382,14 +388,30 @@ func latestCandidateBlock(chain: Eth1Chain, periodStart: uint64): Eth1Block = proc popFirst(chain: var Eth1Chain) = let removed = chain.blocks.popFirst - chain.blocksByHash.del removed.voteData.block_hash.asBlockHash + chain.blocksByHash.del removed.hash.asBlockHash eth1_chain_len.set chain.blocks.len.int64 +func getDepositsRoot*(m: DepositsMerkleizer): Eth2Digest = + mixInLength(m.getFinalHash, int m.totalChunks) + proc addBlock*(chain: var Eth1Chain, newBlock: Eth1Block) = + for deposit in newBlock.deposits: + chain.headMerkleizer.addChunk hash_tree_root(deposit).data + + newBlock.depositCount = chain.headMerkleizer.getChunkCount + newBlock.depositRoot = chain.headMerkleizer.getDepositsRoot + chain.blocks.addLast newBlock - chain.blocksByHash[newBlock.voteData.block_hash.asBlockHash] = newBlock + chain.blocksByHash[newBlock.hash.asBlockHash] = newBlock + eth1_chain_len.set chain.blocks.len.int64 +func toVoteData(blk: Eth1Block): Eth1Data = + Eth1Data( + deposit_root: blk.depositRoot, + deposit_count: blk.depositCount, + block_hash: blk.hash) + func hash*(x: Eth1Data): Hash = hash(x.block_hash) @@ -580,6 +602,12 @@ template readJsonField(j: JsonNode, fieldName: string, ValueType: type): untyped template init[N: static int](T: type DynamicBytes[N, N]): T = T newSeq[byte](N) +proc fetchTimestampWithRetries(blkParam: Eth1Block, p: Web3DataProviderRef) {.async.} = + let blk = blkParam + let web3block = awaitWithRetries( + p.getBlockByHash(blk.hash.asBlockHash)) + blk.timestamp = Eth1BlockTimestamp web3block.timestamp + func depositEventsToBlocks(depositsList: JsonNode): seq[Eth1Block] {. raises: [Defect, CatchableError].} = if depositsList.kind != JArray: @@ -596,8 +624,12 @@ func depositEventsToBlocks(depositsList: JsonNode): seq[Eth1Block] {. if lastEth1Block == nil or lastEth1Block.number != blockNumber: lastEth1Block = Eth1Block( - number: blockNumber, - voteData: Eth1Data(block_hash: blockHash.asEth2Digest)) + hash: blockHash.asEth2Digest, + number: blockNumber + # The `timestamp` is set in `syncBlockRange` immediately + # after calling this function, because we don't want to + # make this function `async` + ) result.add lastEth1Block @@ -628,11 +660,6 @@ func depositEventsToBlocks(depositsList: JsonNode): seq[Eth1Block] {. amount: bytes_to_uint64(amount.toArray), signature: ValidatorSig.init(signature.toArray)) -proc fetchTimestamp(p: Web3DataProviderRef, blk: Eth1Block) {.async.} = - let web3block = awaitWithRetries( - p.getBlockByHash(blk.voteData.block_hash.asBlockHash)) - blk.timestamp = Eth1BlockTimestamp web3block.timestamp - type DepositContractDataStatus = enum Fetched @@ -660,10 +687,10 @@ when hasDepositRootChecks: try: let fetchedRoot = asEth2Digest( awaitOrRaiseOnTimeout(depositRoot, contractCallTimeout)) - if blk.voteData.deposit_root.isZero: - blk.voteData.deposit_root = fetchedRoot + if blk.depositRoot.isZero: + blk.depositRoot = fetchedRoot result = Fetched - elif blk.voteData.deposit_root == fetchedRoot: + elif blk.depositRoot == fetchedRoot: result = VerifiedCorrect else: result = DepositRootIncorrect @@ -676,9 +703,9 @@ when hasDepositRootChecks: try: let fetchedCount = bytes_to_uint64( awaitOrRaiseOnTimeout(rawCount, contractCallTimeout).toArray) - if blk.voteData.deposit_count == 0: - blk.voteData.deposit_count = fetchedCount - elif blk.voteData.deposit_count != fetchedCount: + if blk.depositCount == 0: + blk.depositCount = fetchedCount + elif blk.depositCount != fetchedCount: result = DepositCountIncorrect except CatchableError as err: debug "Failed to fetch deposits count", @@ -694,23 +721,13 @@ proc onBlockHeaders(p: Web3DataProviderRef, p.blockHeadersSubscription = awaitWithRetries( p.web3.subscribeForBlockHeaders(blockHeaderHandler, errorHandler)) -func getDepositsRoot*(m: DepositsMerkleizer): Eth2Digest = - mixInLength(m.getFinalHash, int m.totalChunks) - -func eth1DataFromMerkleizer(eth1Block: Eth2Digest, - merkleizer: DepositsMerkleizer): Eth1Data = - Eth1Data( - block_hash: eth1Block, - deposit_count: merkleizer.getChunkCount, - deposit_root: merkleizer.getDepositsRoot) - proc pruneOldBlocks(chain: var Eth1Chain, depositIndex: uint64) = let initialChunks = chain.finalizedDepositsMerkleizer.getChunkCount var lastBlock: Eth1Block while chain.blocks.len > 0: let blk = chain.blocks.peekFirst - if blk.voteData.deposit_count >= depositIndex: + if blk.depositCount >= depositIndex: break else: for deposit in blk.deposits: @@ -719,17 +736,17 @@ proc pruneOldBlocks(chain: var Eth1Chain, depositIndex: uint64) = lastBlock = blk if chain.finalizedDepositsMerkleizer.getChunkCount > initialChunks: - chain.finalizedBlockHash = lastBlock.voteData.block_hash + chain.finalizedBlockHash = lastBlock.hash chain.db.putEth2FinalizedTo DepositContractSnapshot( - eth1Block: lastBlock.voteData.block_hash, + eth1Block: lastBlock.hash, depositContractState: chain.finalizedDepositsMerkleizer.toDepositContractState) eth1_finalized_head.set lastBlock.number.toGaugeValue - eth1_finalized_deposits.set lastBlock.voteData.deposit_count.toGaugeValue + eth1_finalized_deposits.set lastBlock.depositCount.toGaugeValue debug "Eth1 blocks pruned", - newTailBlock = lastBlock.voteData.block_hash, - depositsCount = lastBlock.voteData.deposit_count + newTailBlock = lastBlock.hash, + depositsCount = lastBlock.depositCount func advanceMerkleizer(chain: Eth1Chain, merkleizer: var DepositsMerkleizer, @@ -737,12 +754,12 @@ func advanceMerkleizer(chain: Eth1Chain, if chain.blocks.len == 0: return depositIndex == merkleizer.getChunkCount - if chain.blocks.peekLast.voteData.deposit_count < depositIndex: + if chain.blocks.peekLast.depositCount < depositIndex: return false let firstBlock = chain.blocks[0] - depositsInLastPrunedBlock = firstBlock.voteData.deposit_count - + depositsInLastPrunedBlock = firstBlock.depositCount - firstBlock.deposits.lenu64 # advanceMerkleizer should always be called shortly after prunning the chain @@ -764,10 +781,10 @@ func getDepositsRange(chain: Eth1Chain, first, last: uint64): seq[DepositData] = # in the Eth1Chain. This should hold true at the single call site right # now, but we need to guard the pre-conditions better. for blk in chain.blocks: - if blk.voteData.deposit_count <= first: + if blk.depositCount <= first: continue - let firstDepositIdxInBlk = blk.voteData.deposit_count - blk.deposits.lenu64 + let firstDepositIdxInBlk = blk.depositCount - blk.deposits.lenu64 if firstDepositIdxInBlk >= last: return @@ -781,7 +798,7 @@ func lowerBound(chain: Eth1Chain, depositCount: uint64): Eth1Block = # future, but the `algorithm` module currently requires an # `openArray`, which the `deques` module can't provide yet. for eth1Block in chain.blocks: - if eth1Block.voteData.deposit_count > depositCount: + if eth1Block.depositCount > depositCount: return result = eth1Block @@ -789,28 +806,30 @@ proc trackFinalizedState(chain: var Eth1Chain, finalizedEth1Data: Eth1Data, finalizedStateDepositIndex: uint64, blockProposalExpected = false): bool = - # Returns true if the Eth1Monitor is synced to the finalization point + ## This function will return true if the Eth1Monitor is synced + ## to the finalization point. + if chain.blocks.len == 0: debug "Eth1 chain not initialized" return false let latest = chain.blocks.peekLast - if latest.voteData.deposit_count < finalizedEth1Data.deposit_count: + if latest.depositCount < finalizedEth1Data.deposit_count: if blockProposalExpected: error "The Eth1 chain is not synced", - ourDepositsCount = latest.voteData.deposit_count, + ourDepositsCount = latest.depositCount, targetDepositsCount = finalizedEth1Data.deposit_count return false let matchingBlock = chain.lowerBound(finalizedEth1Data.deposit_count) result = if matchingBlock != nil: - if matchingBlock.voteData.deposit_root == finalizedEth1Data.deposit_root: + if matchingBlock.depositRoot == finalizedEth1Data.deposit_root: true else: error "Corrupted deposits history detected", - ourDepositsCount = matchingBlock.voteData.deposit_count, + ourDepositsCount = matchingBlock.depositCount, taretDepositsCount = finalizedEth1Data.deposit_count, - ourDepositsRoot = matchingBlock.voteData.deposit_root, + ourDepositsRoot = matchingBlock.depositRoot, targetDepositsRoot = finalizedEth1Data.deposit_root chain.hasConsensusViolation = true false @@ -846,7 +865,7 @@ proc getBlockProposalData*(chain: var Eth1Chain, for vote in getStateField(state, eth1_data_votes): let eth1Block = chain.findBlock(vote) if eth1Block != nil and - eth1Block.voteData.deposit_root == vote.deposit_root and + eth1Block.depositRoot == vote.deposit_root and vote.deposit_count >= getStateField(state, eth1_data).deposit_count and is_candidate_block(chain.cfg, eth1Block, periodStart): otherVotesCountTable.inc vote @@ -882,7 +901,7 @@ proc getBlockProposalData*(chain: var Eth1Chain, result.vote = getStateField(state, eth1_data) else: debug "No acceptable eth1 votes. Voting for latest candidate" - result.vote = latestBlock.voteData + result.vote = latestBlock.toVoteData if pendingDepositsCount > 0: if hasLatestDeposits: @@ -968,7 +987,8 @@ proc init*(T: type Eth1Chain, cfg: RuntimeConfig, db: BeaconChainDB): T = T(db: db, cfg: cfg, finalizedBlockHash: finalizedDeposits.eth1Block, - finalizedDepositsMerkleizer: m) + finalizedDepositsMerkleizer: m, + headMerkleizer: copy m) proc createInitialDepositSnapshot*( depositContractAddress: Eth1Address, @@ -1037,7 +1057,7 @@ proc safeCancel(fut: var Future[void]) = func clear(chain: var Eth1Chain) = chain.blocks.clear() chain.blocksByHash.clear() - chain.headMerkleizer.reset() + chain.headMerkleizer = copy chain.finalizedDepositsMerkleizer chain.hasConsensusViolation = false proc detectPrimaryProviderComingOnline(m: Eth1Monitor) {.async.} = @@ -1174,7 +1194,7 @@ proc syncBlockRange(m: Eth1Monitor, for i in 0 ..< blocksWithDeposits.len: let blk = blocksWithDeposits[i] - awaitWithRetries m.dataProvider.fetchTimestamp(blk) + await blk.fetchTimestampWithRetries(m.dataProvider) if blk.number > fullSyncFromBlock: let lastBlock = m.depositsChain.blocks.peekLast @@ -1187,11 +1207,6 @@ proc syncBlockRange(m: Eth1Monitor, lastBlock.makeSuccessorWithoutDeposits(blockWithoutDeposits)) eth1_synced_head.set blockWithoutDeposits.number.toGaugeValue - for deposit in blk.deposits: - m.headMerkleizer.addChunk hash_tree_root(deposit).data - blk.voteData.deposit_count = m.headMerkleizer.getChunkCount - blk.voteData.deposit_root = m.headMerkleizer.getDepositsRoot - m.depositsChain.addBlock blk eth1_synced_head.set blk.number.toGaugeValue @@ -1207,8 +1222,8 @@ proc syncBlockRange(m: Eth1Monitor, when hasDepositRootChecks: debug "Deposit contract state verified", status = $status, - ourCount = lastBlock.voteData.deposit_count, - ourRoot = lastBlock.voteData.deposit_root + ourCount = lastBlock.depositCount, + ourRoot = lastBlock.depositRoot case status of DepositRootIncorrect, DepositCountIncorrect: @@ -1219,7 +1234,7 @@ proc syncBlockRange(m: Eth1Monitor, info "Eth1 sync progress", blockNumber = lastBlock.number, - depositsProcessed = lastBlock.voteData.deposit_count + depositsProcessed = lastBlock.depositCount when hasGenesisDetection: if blocksWithDeposits.len > 0: @@ -1229,7 +1244,7 @@ proc syncBlockRange(m: Eth1Monitor, blk.activeValidatorsCount = m.genesisValidators.lenu64 let depositContractState = DepositContractSnapshot( - eth1Block: blocksWithDeposits[^1].voteData.block_hash, + eth1Block: blocksWithDeposits[^1].hash, depositContractState: m.headMerkleizer.toDepositContractState) m.depositsChain.db.putEth2FinalizedTo depositContractState @@ -1249,14 +1264,14 @@ proc syncBlockRange(m: Eth1Monitor, m.depositsChain.addBlock lastBlock.makeSuccessorWithoutDeposits(web3Block) else: - awaitWithRetries m.dataProvider.fetchTimestamp(lastBlock) + await lastBlock.fetchTimestampWithRetries(m.dataProvider) var genesisBlockIdx = m.depositsChain.blocks.len - 1 if m.isAfterMinGenesisTime(m.depositsChain.blocks[genesisBlockIdx]): for i in 1 ..< blocksWithDeposits.len: let idx = (m.depositsChain.blocks.len - 1) - i let blk = m.depositsChain.blocks[idx] - awaitWithRetries m.dataProvider.fetchTimestamp(blk) + await blk.fetchTimestampWithRetries(m.dataProvider) if m.isGenesisCandidate(blk): genesisBlockIdx = idx else: @@ -1278,7 +1293,7 @@ proc syncBlockRange(m: Eth1Monitor, if genesisBlockIdx > 0: let genesisParent = m.depositsChain.blocks[genesisBlockIdx - 1] if genesisParent.timestamp == 0: - awaitWithRetries m.dataProvider.fetchTimestamp(genesisParent) + await genesisParent.fetchTimestampWithRetries(m.dataProvider) if m.hasEnoughValidators(genesisParent) and genesisBlock.number - genesisParent.number > 1: genesisBlock = awaitWithRetries( @@ -1330,7 +1345,7 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} = matchingBlockAtNewProvider = awaitWithRetries( m.dataProvider.getBlockByNumber lastKnownBlock.number) - lastKnownBlock.voteData.block_hash.asBlockHash != matchingBlockAtNewProvider.hash) + lastKnownBlock.hash.asBlockHash != matchingBlockAtNewProvider.hash) if needsReset: m.depositsChain.clear() @@ -1401,11 +1416,9 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} = m.depositsChain.finalizedBlockHash.asBlockHash)) m.depositsChain.addBlock Eth1Block( + hash: m.depositsChain.finalizedBlockHash, number: Eth1BlockNumber startBlock.number, - timestamp: Eth1BlockTimestamp startBlock.timestamp, - voteData: eth1DataFromMerkleizer( - m.depositsChain.finalizedBlockHash, - m.depositsChain.finalizedDepositsMerkleizer)) + timestamp: Eth1BlockTimestamp startBlock.timestamp) eth1SyncedTo = Eth1BlockNumber startBlock.number @@ -1414,8 +1427,6 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} = eth1_finalized_deposits.set( m.depositsChain.finalizedDepositsMerkleizer.getChunkCount.toGaugeValue) - m.depositsChain.headMerkleizer = copy m.finalizedDepositsMerkleizer - debug "Starting Eth1 syncing", `from` = shortLog(m.depositsChain.blocks[0]) while true: @@ -1596,7 +1607,6 @@ when hasGenesisDetection: var startBlock = startBlock endBlock = endBlock - depositData = startBlock.voteData activeValidatorsCountDuringRange = startBlock.activeValidatorsCount while startBlock.number + 1 < endBlock.number: @@ -1610,10 +1620,9 @@ when hasGenesisDetection: candidateBlock = awaitWithRetries( m.dataProvider.getBlockByNumber(candidateNumber)) - var candidateAsEth1Block = Eth1Block(number: candidateBlock.number.uint64, - timestamp: candidateBlock.timestamp.uint64, - voteData: depositData) - candidateAsEth1Block.voteData.block_hash = candidateBlock.hash.asEth2Digest + var candidateAsEth1Block = Eth1Block(hash: candidateBlock.hash.asEth2Digest, + number: candidateBlock.number.uint64, + timestamp: candidateBlock.timestamp.uint64) let candidateGenesisTime = genesis_time_from_eth1_timestamp( m.cfg, candidateBlock.timestamp.uint64) diff --git a/research/block_sim.nim b/research/block_sim.nim index feb6ea540b..a9a9930501 100644 --- a/research/block_sim.nim +++ b/research/block_sim.nim @@ -101,10 +101,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6, eth1Chain.addBlock Eth1Block( number: Eth1BlockNumber 1, - timestamp: Eth1BlockTimestamp genesisTime, - voteData: Eth1Data( - deposit_root: merkleizer.getDepositsRoot, - deposit_count: merkleizer.getChunkCount)) + timestamp: Eth1BlockTimestamp genesisTime) let replayState = assignClone(dag.headState) @@ -385,10 +382,9 @@ cli do(slots = SLOTS_PER_EPOCH * 6, inc eth1BlockNum var eth1Block = Eth1Block( + hash: makeFakeHash(eth1BlockNum), number: Eth1BlockNumber eth1BlockNum, - timestamp: Eth1BlockTimestamp nextBlockTime, - voteData: Eth1Data( - block_hash: makeFakeHash(eth1BlockNum))) + timestamp: Eth1BlockTimestamp nextBlockTime) let newDeposits = int clamp(gauss(r, 5.0, 8.0), 0.0, 1000.0) for i in 0 ..< newDeposits: @@ -397,8 +393,8 @@ cli do(slots = SLOTS_PER_EPOCH * 6, eth1Block.deposits.add d merkleizer.addChunk hash_tree_root(d).data - eth1Block.voteData.deposit_root = merkleizer.getDepositsRoot - eth1Block.voteData.deposit_count = merkleizer.getChunkCount + eth1Block.depositRoot = merkleizer.getDepositsRoot + eth1Block.depositCount = merkleizer.getChunkCount eth1Chain.addBlock eth1Block lastEth1BlockAt = nextBlockTime