From 4dd1896ddc696f557cb46d760756f7c7b2aabb1d Mon Sep 17 00:00:00 2001 From: kdeme <7857583+kdeme@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:29:31 +0100 Subject: [PATCH] Quick and dirty PoC for syncing from Portal history network This PR is not intended to get merged. It is a very quick and dirty implementation with the intention of testing the Portal network and Fluffy code and to verify how long downloads of blocks take in comparison with the execution of them. It kind of abuses the current import from era code to do this. I think (?) in an improved version the block downloads should probably lead the implementation and trigger execution (it is a bit the reverse right now, which makes sense for era files). Perhaps that way the execution could even be offloaded to another thread? It is also coded without using the JSON-RPC API, as I found that easier for a quick version. But the getBlock call could be changed to use the json-rpc alternative. --- fluffy/network/history/history_network.nim | 6 +- nimbus/config.nim | 6 + nimbus/nimbus_import.nim | 208 ++++++++++++++++++++- 3 files changed, 210 insertions(+), 10 deletions(-) diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index b975e65d5..9fe1a2d57 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -142,7 +142,7 @@ proc getVerifiedBlockHeader*( for i in 0 ..< (1 + n.contentRequestRetries): let headerContent = (await n.portalProtocol.contentLookup(contentKey, contentId)).valueOr: - warn "Failed fetching block header with proof from the network" + debug "Failed fetching block header with proof from the network" return Opt.none(Header) header = validateCanonicalHeaderBytes(headerContent.content, id, n.accumulator).valueOr: @@ -217,7 +217,7 @@ proc getBlock*( # also the original type into the network. let header = (await n.getVerifiedBlockHeader(id)).valueOr: - warn "Failed to get header when getting block", id + debug "Failed to get header when getting block", id return Opt.none(Block) hash = when id is Hash32: @@ -225,7 +225,7 @@ proc getBlock*( else: header.rlpHash() body = (await n.getBlockBody(hash, header)).valueOr: - warn "Failed to get body when getting block", hash + debug "Failed to get body when getting block", hash return Opt.none(Block) Opt.some((header, body)) diff --git a/nimbus/config.nim b/nimbus/config.nim index be628dc56..ebc5019a2 100644 --- a/nimbus/config.nim +++ b/nimbus/config.nim @@ -536,6 +536,12 @@ type defaultValue: false name: "debug-store-slot-hashes".}: bool + usePortal* {. + hidden + desc: "Use portal network instead of era files" + defaultValue: false + name: "debug-use-portal".}: bool + of `import-rlp`: blocksFile* {. argument diff --git a/nimbus/nimbus_import.nim b/nimbus/nimbus_import.nim index eb15f4cdb..9e7e7f88e 100644 --- a/nimbus/nimbus_import.nim +++ b/nimbus/nimbus_import.nim @@ -13,7 +13,8 @@ import chronicles, metrics, chronos/timer, - std/[strformat, strutils], + chronos, + std/[strformat, strutils, os], stew/io2, beacon_chain/era_db, beacon_chain/networking/network_metadata, @@ -21,7 +22,16 @@ import ./common/common, ./core/chain, ./db/era1_db, - ./utils/era_helpers + ./utils/era_helpers, + eth/common/keys, # rng + eth/net/nat, # setupAddress + eth/p2p/discoveryv5/protocol as discv5_protocol, + eth/p2p/discoveryv5/routing_table, + eth/p2p/discoveryv5/enr, + ../fluffy/portal_node, + ../fluffy/common/common_utils, # getPersistentNetKey, getPersistentEnr + ../fluffy/network_metadata, + ../fluffy/version declareGauge nec_import_block_number, "Latest imported block number" @@ -33,7 +43,165 @@ declareCounter nec_imported_gas, "Gas processed during import" var running {.volatile.} = true -proc importBlocks*(conf: NimbusConf, com: CommonRef) = +proc runPortalNode(config: NimbusConf): PortalNode {. + raises: [CatchableError] +.} = + let rng = newRng() + + ## Network configuration + let + bindIp = config.listenAddress + udpPort = Port(config.udpPort) + # TODO: allow for no TCP port mapping! + (extIp, _, extUdpPort) = + try: + setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "portal") + except CatchableError as exc: + raiseAssert exc.msg + # raise exc # TODO: Ideally we don't have the Exception here + except Exception as exc: + raiseAssert exc.msg + (netkey, newNetKey) = + # if config.netKey.isSome(): + # (config.netKey.get(), true) + # else: + getPersistentNetKey(rng[], config.dataDir / "netkey") + + enrFilePath = config.dataDir / "nimbus_portal_node.enr" + previousEnr = + if not newNetKey: + getPersistentEnr(enrFilePath) + else: + Opt.none(enr.Record) + + var bootstrapRecords: seq[Record] + # loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords) + # bootstrapRecords.add(config.bootstrapNodes) + + # case config.network + # of PortalNetwork.none: + # discard # don't connect to any network bootstrap nodes + # of PortalNetwork.mainnet: + # for enrURI in mainnetBootstrapNodes: + # let res = enr.Record.fromURI(enrURI) + # if res.isOk(): + # bootstrapRecords.add(res.value) + # of PortalNetwork.angelfood: + # for enrURI in angelfoodBootstrapNodes: + # let res = enr.Record.fromURI(enrURI) + # if res.isOk(): + # bootstrapRecords.add(res.value) + + # Only mainnet + for enrURI in mainnetBootstrapNodes: + let res = enr.Record.fromURI(enrURI) + if res.isOk(): + bootstrapRecords.add(res.value) + + ## Discovery v5 protocol setup + let + discoveryConfig = + DiscoveryConfig.init(DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop) + d = newProtocol( + netkey, + extIp, + Opt.none(Port), + extUdpPort, + # Note: The addition of default clientInfo to the ENR is a temporary + # measure to easily identify & debug the clients used in the testnet. + # Might make this into a, default off, cli option. + localEnrFields = {"c": enrClientInfoShort}, + bootstrapRecords = bootstrapRecords, + previousRecord = previousEnr, + bindIp = bindIp, + bindPort = udpPort, + enrAutoUpdate = true, + config = discoveryConfig, + rng = rng, + ) + + d.open() + + ## Portal node setup + let + portalProtocolConfig = PortalProtocolConfig.init( + DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, defaultAlpha, RadiusConfig(kind: Static, logRadius: 249), + defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize, + defaultDisableContentCache, defaultMaxConcurrentOffers + ) + + portalNodeConfig = PortalNodeConfig( + accumulatorFile: Opt.none(string), + disableStateRootValidation: true, + trustedBlockRoot: Opt.none(Digest), + portalConfig: portalProtocolConfig, + dataDir: string config.dataDir, + storageCapacity: 0, + contentRequestRetries: 1 + ) + + node = PortalNode.new( + PortalNetwork.mainnet, + portalNodeConfig, + d, + {PortalSubnetwork.history}, + bootstrapRecords = bootstrapRecords, + rng = rng, + ) + + let enrFile = config.dataDir / "nimbus_portal_node.enr" + if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr: + fatal "Failed to write the enr file", file = enrFile + quit 1 + + ## Start the Portal node. + node.start() + + node + + +proc getBlockLoop(node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]], startBlock: uint64): Future[void] {.async.} = + let historyNetwork = node.historyNetwork.value() + var blockNumber = startBlock + + let blockNumberQueue = newAsyncQueue[(uint64, uint64)](2048) + var blocks: seq[EthBlock] = newSeq[EthBlock](8192) + var count = 0 + + proc blockWorker(node: PortalNode): Future[void] {.async.} = + while true: + let (blockNumber, i) = await blockNumberQueue.popFirst() + while true: + let (header, body) = (await historyNetwork.getBlock(blockNumber + i)).valueOr: + error "Failed to get block", blockNumber = blockNumber + i + # Note: loop will get stuck here if a block is not available + continue + + blocks[i] = init(EthBlock, header, body) + count.inc() + + break + + var workers: seq[Future[void]] = @[] + for i in 0 ..< 512: + workers.add node.blockWorker() + + while true: + blocks = newSeq[EthBlock](8192) + count = 0 + info "Downloading 8192 blocks", startBlock = blockNumber + for i in 0..8191'u64: + await blockNumberQueue.addLast((blockNumber, i)) + + # Not great :/ + while count != 8192: + await sleepAsync(10.milliseconds) + info "Adding 8192 blocks", startBlock = blockNumber + await blockQueue.addLast(blocks) + + blockNumber += 8192 + +proc importBlocks*(conf: NimbusConf, com: CommonRef, blockQueue: AsyncQueue[seq[EthBlock]]) {.async: (raises: [CancelledError]).} = proc controlCHandler() {.noconv.} = when defined(windows): # workaround for https://github.com/nim-lang/Nim/issues/4057 @@ -278,14 +446,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = true while running and imported < conf.maxBlocks and blockNumber <= lastEra1Block: - if not loadEraBlock(blockNumber): - notice "No more era1 blocks to import", blockNumber - break + if not conf.usePortal: + if not loadEraBlock(blockNumber): + notice "No more era1 blocks to import", blockNumber + break - imported += 1 + imported += 1 + else: + let blockSeq = await blockQueue.popFirst() + blocks.add(blockSeq) + info "Loaded 8192 blocks", startBlock = blockNumber + + imported += 8192 if blocks.lenu64 mod conf.chunkSize == 0: process() + info "Processed chunk of blocks" if blocks.len > 0: process() # last chunk, if any @@ -341,3 +517,21 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = if blocks.len > 0: process() + +proc importBlocks*(conf: NimbusConf, com: CommonRef) {. + raises: [CatchableError] +.} = + let blockQueue = newAsyncQueue[seq[EthBlock]](4) + + if conf.usePortal: + let portalNode = runPortalNode(conf) + let start = com.db.getSavedStateBlockNumber() + 1 + asyncSpawn portalNode.getBlockLoop(blockQueue, start) + + asyncSpawn importBlocks(conf, com, blockQueue) + + while running: + try: + poll() + except CatchableError as e: + warn "Exception in poll()", exc = e.name, err = e.msg