Skip to content

Commit

Permalink
Quick and dirty PoC for syncing from Portal history network
Browse files Browse the repository at this point in the history
This PR is not intended to get merged. It is a very quick and
dirty implementation with the intention of testing the Portal
network and Fluffy code and to verify how long downloads of blocks
take in comparison with the execution of them.
It kind of abuses the current import from era code to do this.

I think (?) in an improved version the block downloads should
probably lead the implementation and trigger execution (it is a
bit the reverse right now, which makes sense for era files).
Perhaps that way the execution could even be offloaded to another
thread?

It is also coded without using the JSON-RPC API, as I found that
easier for a quick version. But the getBlock call could be
changed to use the json-rpc alternative.
  • Loading branch information
kdeme committed Dec 4, 2024
1 parent 4c37682 commit 4dd1896
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 10 deletions.
6 changes: 3 additions & 3 deletions fluffy/network/history/history_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ proc getVerifiedBlockHeader*(
for i in 0 ..< (1 + n.contentRequestRetries):
let
headerContent = (await n.portalProtocol.contentLookup(contentKey, contentId)).valueOr:
warn "Failed fetching block header with proof from the network"
debug "Failed fetching block header with proof from the network"
return Opt.none(Header)

header = validateCanonicalHeaderBytes(headerContent.content, id, n.accumulator).valueOr:
Expand Down Expand Up @@ -217,15 +217,15 @@ proc getBlock*(
# also the original type into the network.
let
header = (await n.getVerifiedBlockHeader(id)).valueOr:
warn "Failed to get header when getting block", id
debug "Failed to get header when getting block", id
return Opt.none(Block)
hash =
when id is Hash32:
id
else:
header.rlpHash()
body = (await n.getBlockBody(hash, header)).valueOr:
warn "Failed to get body when getting block", hash
debug "Failed to get body when getting block", hash
return Opt.none(Block)

Opt.some((header, body))
Expand Down
6 changes: 6 additions & 0 deletions nimbus/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,12 @@ type
defaultValue: false
name: "debug-store-slot-hashes".}: bool

usePortal* {.
hidden
desc: "Use portal network instead of era files"
defaultValue: false
name: "debug-use-portal".}: bool

of `import-rlp`:
blocksFile* {.
argument
Expand Down
208 changes: 201 additions & 7 deletions nimbus/nimbus_import.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,25 @@ import
chronicles,
metrics,
chronos/timer,
std/[strformat, strutils],
chronos,
std/[strformat, strutils, os],
stew/io2,
beacon_chain/era_db,
beacon_chain/networking/network_metadata,
./config,
./common/common,
./core/chain,
./db/era1_db,
./utils/era_helpers
./utils/era_helpers,
eth/common/keys, # rng
eth/net/nat, # setupAddress
eth/p2p/discoveryv5/protocol as discv5_protocol,
eth/p2p/discoveryv5/routing_table,
eth/p2p/discoveryv5/enr,
../fluffy/portal_node,
../fluffy/common/common_utils, # getPersistentNetKey, getPersistentEnr
../fluffy/network_metadata,
../fluffy/version

declareGauge nec_import_block_number, "Latest imported block number"

Expand All @@ -33,7 +43,165 @@ declareCounter nec_imported_gas, "Gas processed during import"

var running {.volatile.} = true

proc importBlocks*(conf: NimbusConf, com: CommonRef) =
proc runPortalNode(config: NimbusConf): PortalNode {.
raises: [CatchableError]
.} =
let rng = newRng()

## Network configuration
let
bindIp = config.listenAddress
udpPort = Port(config.udpPort)
# TODO: allow for no TCP port mapping!
(extIp, _, extUdpPort) =
try:
setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "portal")
except CatchableError as exc:
raiseAssert exc.msg
# raise exc # TODO: Ideally we don't have the Exception here
except Exception as exc:
raiseAssert exc.msg
(netkey, newNetKey) =
# if config.netKey.isSome():
# (config.netKey.get(), true)
# else:
getPersistentNetKey(rng[], config.dataDir / "netkey")

enrFilePath = config.dataDir / "nimbus_portal_node.enr"
previousEnr =
if not newNetKey:
getPersistentEnr(enrFilePath)
else:
Opt.none(enr.Record)

var bootstrapRecords: seq[Record]
# loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords)
# bootstrapRecords.add(config.bootstrapNodes)

# case config.network
# of PortalNetwork.none:
# discard # don't connect to any network bootstrap nodes
# of PortalNetwork.mainnet:
# for enrURI in mainnetBootstrapNodes:
# let res = enr.Record.fromURI(enrURI)
# if res.isOk():
# bootstrapRecords.add(res.value)
# of PortalNetwork.angelfood:
# for enrURI in angelfoodBootstrapNodes:
# let res = enr.Record.fromURI(enrURI)
# if res.isOk():
# bootstrapRecords.add(res.value)

# Only mainnet
for enrURI in mainnetBootstrapNodes:
let res = enr.Record.fromURI(enrURI)
if res.isOk():
bootstrapRecords.add(res.value)

## Discovery v5 protocol setup
let
discoveryConfig =
DiscoveryConfig.init(DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop)
d = newProtocol(
netkey,
extIp,
Opt.none(Port),
extUdpPort,
# Note: The addition of default clientInfo to the ENR is a temporary
# measure to easily identify & debug the clients used in the testnet.
# Might make this into a, default off, cli option.
localEnrFields = {"c": enrClientInfoShort},
bootstrapRecords = bootstrapRecords,
previousRecord = previousEnr,
bindIp = bindIp,
bindPort = udpPort,
enrAutoUpdate = true,
config = discoveryConfig,
rng = rng,
)

d.open()

## Portal node setup
let
portalProtocolConfig = PortalProtocolConfig.init(
DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, defaultAlpha, RadiusConfig(kind: Static, logRadius: 249),
defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize,
defaultDisableContentCache, defaultMaxConcurrentOffers
)

portalNodeConfig = PortalNodeConfig(
accumulatorFile: Opt.none(string),
disableStateRootValidation: true,
trustedBlockRoot: Opt.none(Digest),
portalConfig: portalProtocolConfig,
dataDir: string config.dataDir,
storageCapacity: 0,
contentRequestRetries: 1
)

node = PortalNode.new(
PortalNetwork.mainnet,
portalNodeConfig,
d,
{PortalSubnetwork.history},
bootstrapRecords = bootstrapRecords,
rng = rng,
)

let enrFile = config.dataDir / "nimbus_portal_node.enr"
if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr:
fatal "Failed to write the enr file", file = enrFile
quit 1

## Start the Portal node.
node.start()

node


proc getBlockLoop(node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]], startBlock: uint64): Future[void] {.async.} =
let historyNetwork = node.historyNetwork.value()
var blockNumber = startBlock

let blockNumberQueue = newAsyncQueue[(uint64, uint64)](2048)
var blocks: seq[EthBlock] = newSeq[EthBlock](8192)
var count = 0

proc blockWorker(node: PortalNode): Future[void] {.async.} =
while true:
let (blockNumber, i) = await blockNumberQueue.popFirst()
while true:
let (header, body) = (await historyNetwork.getBlock(blockNumber + i)).valueOr:
error "Failed to get block", blockNumber = blockNumber + i
# Note: loop will get stuck here if a block is not available
continue

blocks[i] = init(EthBlock, header, body)
count.inc()

break

var workers: seq[Future[void]] = @[]
for i in 0 ..< 512:
workers.add node.blockWorker()

while true:
blocks = newSeq[EthBlock](8192)
count = 0
info "Downloading 8192 blocks", startBlock = blockNumber
for i in 0..8191'u64:
await blockNumberQueue.addLast((blockNumber, i))

# Not great :/
while count != 8192:
await sleepAsync(10.milliseconds)
info "Adding 8192 blocks", startBlock = blockNumber
await blockQueue.addLast(blocks)

blockNumber += 8192

proc importBlocks*(conf: NimbusConf, com: CommonRef, blockQueue: AsyncQueue[seq[EthBlock]]) {.async: (raises: [CancelledError]).} =
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
Expand Down Expand Up @@ -278,14 +446,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
true

while running and imported < conf.maxBlocks and blockNumber <= lastEra1Block:
if not loadEraBlock(blockNumber):
notice "No more era1 blocks to import", blockNumber
break
if not conf.usePortal:
if not loadEraBlock(blockNumber):
notice "No more era1 blocks to import", blockNumber
break

imported += 1
imported += 1
else:
let blockSeq = await blockQueue.popFirst()
blocks.add(blockSeq)
info "Loaded 8192 blocks", startBlock = blockNumber

imported += 8192

if blocks.lenu64 mod conf.chunkSize == 0:
process()
info "Processed chunk of blocks"

if blocks.len > 0:
process() # last chunk, if any
Expand Down Expand Up @@ -341,3 +517,21 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =

if blocks.len > 0:
process()

proc importBlocks*(conf: NimbusConf, com: CommonRef) {.
raises: [CatchableError]
.} =
let blockQueue = newAsyncQueue[seq[EthBlock]](4)

if conf.usePortal:
let portalNode = runPortalNode(conf)
let start = com.db.getSavedStateBlockNumber() + 1
asyncSpawn portalNode.getBlockLoop(blockQueue, start)

asyncSpawn importBlocks(conf, com, blockQueue)

while running:
try:
poll()
except CatchableError as e:
warn "Exception in poll()", exc = e.name, err = e.msg

0 comments on commit 4dd1896

Please sign in to comment.