Skip to content

Commit

Permalink
Offload signature checking to taskpools
Browse files Browse the repository at this point in the history
In block processing, depending on the complexity of a transaction and
hotness of caches etc, signature checking can actually make up the
majority of time needed to process a transaction (60% observed in some
randomly sampled block ranges).

Fortunately, this is a task that trivially can be offloaded to a task
pool similar to how nimbus-eth2 does it.

This PR introduces taskpools in the most simple way possible, by
performing signature checking concurrently with other TX processing,
assigning a taskpool task per TX effectively.

With this little trick, we're in gigagas land 🎉 on my laptop!

```
INF 2024-12-10 21:05:35.170+01:00 Imported blocks
blockNumber=3874817 b... mgps=1222.707 ...
```

Tests don't use the taskpool for now because it needs manual cleanup and
we don't have a good mechanism in place. Future PR:s should address this
by creating a common shutdown sequence that also closes and cleans up
other resources like the DB.
  • Loading branch information
arnetheduck committed Dec 11, 2024
1 parent ac59b18 commit a6583df
Show file tree
Hide file tree
Showing 33 changed files with 142 additions and 51 deletions.
1 change: 1 addition & 0 deletions hive_integration/nodocker/consensus/consensus_sim.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ proc processChainData(cd: ChainData): TestStatus =
let
networkId = NetworkId(cd.params.config.chainId)
com = CommonRef.new(newCoreDbRef DefaultDbMemory,
Taskpool.new(),
networkId,
cd.params
)
Expand Down
1 change: 1 addition & 0 deletions hive_integration/nodocker/engine/engine_env.nim
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const
proc makeCom*(conf: NimbusConf): CommonRef =
CommonRef.new(
newCoreDbRef DefaultDbMemory,
Taskpool.new(),
conf.networkId,
conf.networkParams
)
Expand Down
2 changes: 1 addition & 1 deletion hive_integration/nodocker/engine/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ proc processBlock(
if header.parentBeaconBlockRoot.isSome:
? vmState.processBeaconBlockRoot(header.parentBeaconBlockRoot.get)

? processTransactions(vmState, header, blk.transactions)
? processTransactions(vmState, header, blk.transactions, taskpool = com.taskpool)

if com.isShanghaiOrLater(header.timestamp):
for withdrawal in blk.withdrawals.get:
Expand Down
3 changes: 2 additions & 1 deletion hive_integration/nodocker/graphql/graphql_sim.nim
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ proc main() =
ethCtx = newEthContext()
ethNode = setupEthNode(conf, ethCtx, eth)
com = CommonRef.new(newCoreDbRef DefaultDbMemory,
conf.networkId,
Taskpool.new(),
conf.networkId,
conf.networkParams
)

Expand Down
2 changes: 1 addition & 1 deletion hive_integration/nodocker/pyspec/test_env.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ proc setupELClient*(conf: ChainConfig, node: JsonNode): TestEnv =
let
memDB = newCoreDbRef DefaultDbMemory
genesisHeader = node.genesisHeader
com = CommonRef.new(memDB, conf)
com = CommonRef.new(memDB, Taskpool.new(), conf)
stateDB = LedgerRef.init(memDB)
chain = newForkedChain(com, genesisHeader)

Expand Down
5 changes: 3 additions & 2 deletions hive_integration/nodocker/rpc/test_env.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ proc manageAccounts(ctx: EthContext, conf: NimbusConf) =
proc setupRpcServer(ctx: EthContext, com: CommonRef,
ethNode: EthereumNode, txPool: TxPoolRef,
conf: NimbusConf, chain: ForkedChainRef): RpcServer =
let
let
rpcServer = newRpcHttpServer([initTAddress(conf.httpAddress, conf.httpPort)])
serverApi = newServerAPI(chain, txPool)


setupCommonRpc(ethNode, conf, rpcServer)
setupServerAPI(serverApi, rpcServer, ctx)

Expand Down Expand Up @@ -80,6 +80,7 @@ proc setupEnv*(): TestEnv =
ethCtx = newEthContext()
ethNode = setupEthNode(conf, ethCtx, eth)
com = CommonRef.new(newCoreDbRef DefaultDbMemory,
Taskpool.new(),
conf.networkId,
conf.networkParams
)
Expand Down
15 changes: 13 additions & 2 deletions nimbus/common/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import
../db/[core_db, ledger, storage_types],
../utils/[utils, ec_recover],
".."/[constants, errors, version],
"."/[chain_config, evmforks, genesis, hardforks]
"."/[chain_config, evmforks, genesis, hardforks],
taskpools

export
chain_config,
Expand All @@ -25,7 +26,8 @@ export
evmforks,
hardforks,
genesis,
utils
utils,
taskpools

type
SyncProgress = object
Expand Down Expand Up @@ -97,6 +99,9 @@ type
extraData: string
## Value of extraData field when building block

taskpool*: Taskpool
## Shared task pool for offloading computation to other threads

# ------------------------------------------------------------------------------
# Forward declarations
# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -165,6 +170,7 @@ proc initializeDb(com: CommonRef) =

proc init(com : CommonRef,
db : CoreDbRef,
taskpool : Taskpool,
networkId : NetworkId,
config : ChainConfig,
genesis : Genesis,
Expand All @@ -181,6 +187,7 @@ proc init(com : CommonRef,
com.pruneHistory= pruneHistory
com.pos = CasperRef.new
com.extraData = ShortClientId
com.taskpool = taskpool

# com.forkIdCalculator and com.genesisHash are set
# by setForkId
Expand Down Expand Up @@ -223,6 +230,7 @@ proc isBlockAfterTtd(com: CommonRef, header: Header): bool =
proc new*(
_: type CommonRef;
db: CoreDbRef;
taskpool: Taskpool;
networkId: NetworkId = MainNet;
params = networkParams(MainNet);
pruneHistory = false;
Expand All @@ -233,6 +241,7 @@ proc new*(
new(result)
result.init(
db,
taskpool,
networkId,
params.config,
params.genesis,
Expand All @@ -241,6 +250,7 @@ proc new*(
proc new*(
_: type CommonRef;
db: CoreDbRef;
taskpool: Taskpool;
config: ChainConfig;
networkId: NetworkId = MainNet;
pruneHistory = false;
Expand All @@ -251,6 +261,7 @@ proc new*(
new(result)
result.init(
db,
taskpool,
networkId,
config,
nil,
Expand Down
6 changes: 6 additions & 0 deletions nimbus/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@ type
defaultValueDesc: $ClientId
name: "agent-string" .}: string

numThreads* {.
separator: "\pPERFORMANCE OPTIONS",
defaultValue: 0,
desc: "Number of worker threads (\"0\" = use as many threads as there are CPU cores available)"
name: "num-threads" .}: int

beaconChunkSize* {.
hidden
desc: "Number of blocks per database transaction for beacon sync"
Expand Down
1 change: 1 addition & 0 deletions nimbus/core/chain/persist_blocks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ proc persistBlocksImpl(
skipValidation,
skipReceipts = skipValidation and NoPersistReceipts in flags,
skipUncles = NoPersistUncles in flags,
taskpool = c.com.taskpool,
)

let blockHash = header.blockHash()
Expand Down
81 changes: 65 additions & 16 deletions nimbus/core/executor/process_block.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.

{.push raises: [], gcsafe.}

import
../../common/common,
../../utils/utils,
Expand All @@ -21,11 +23,50 @@ import
./calculate_reward,
./executor_helpers,
./process_transaction,
eth/common/transaction_utils,
eth/common/[keys, transaction_utils],
chronicles,
results
results,
taskpools

template withSender(txs: openArray[Transaction], body: untyped) =
# Execute transactions offloading the signature checking to the task pool if
# it's available
if taskpool == nil:
for txIndex {.inject.}, tx {.inject.} in txs:
let sender {.inject.} = tx.recoverSender().valueOr(default(Address))
body
else:
type Entry = (Signature, Hash32, Flowvar[Address])

proc recoverTask(e: ptr Entry): Address {.nimcall.} =
let pk = recover(e[][0], SkMessage(e[][1].data))
if pk.isOk():
pk[].to(Address)
else:
default(Address)

var entries = newSeq[Entry](txs.len)

# Prepare signature recovery tasks for each transaction - for simplicity,
# we use `default(Address)` to signal sig check failure
for i, e in entries.mpairs():
e[0] = txs[i].signature().valueOr(default(Signature))
e[1] = txs[i].rlpHashForSigning(txs[i].isEip155)
let a = addr e
# Spawning the task here allows it to start early, while we still haven't
# hashed subsequent txs
e[2] = taskpool.spawn recoverTask(a)

{.push raises: [].}
for txIndex {.inject.}, e in entries.mpairs():
template tx(): untyped =
txs[txIndex]

# Sync blocks until the sender is available from the task pool - as soon
# as we have it, we can process this transaction while the senders of the
# other transactions are being computed
let sender {.inject.} = sync(e[2])

body

# Factored this out of procBlkPreamble so that it can be used directly for
# stateless execution of specific transactions.
Expand All @@ -34,15 +75,17 @@ proc processTransactions*(
header: Header,
transactions: seq[Transaction],
skipReceipts = false,
collectLogs = false
collectLogs = false,
taskpool: Taskpool = nil,
): Result[void, string] =
vmState.receipts.setLen(if skipReceipts: 0 else: transactions.len)
vmState.cumulativeGasUsed = 0
vmState.allLogs = @[]

for txIndex, tx in transactions:
let sender = tx.recoverSender().valueOr:
withSender(transactions):
if sender == default(Address):
return err("Could not get sender for tx with index " & $(txIndex))

let rc = vmState.processTransaction(tx, sender, header)
if rc.isErr:
return err("Error processing tx with index " & $(txIndex) & ":" & rc.error)
Expand All @@ -60,7 +103,10 @@ proc processTransactions*(
ok()

proc procBlkPreamble(
vmState: BaseVMState, blk: Block, skipValidation, skipReceipts, skipUncles: bool
vmState: BaseVMState,
blk: Block,
skipValidation, skipReceipts, skipUncles: bool,
taskpool: Taskpool,
): Result[void, string] =
template header(): Header =
blk.header
Expand Down Expand Up @@ -97,7 +143,9 @@ proc procBlkPreamble(
return err("Transactions missing from body")

let collectLogs = header.requestsHash.isSome and not skipValidation
?processTransactions(vmState, header, blk.transactions, skipReceipts, collectLogs)
?processTransactions(
vmState, header, blk.transactions, skipReceipts, collectLogs, taskpool
)
elif blk.transactions.len > 0:
return err("Transactions in block with empty txRoot")

Expand Down Expand Up @@ -150,7 +198,8 @@ proc procBlkEpilogue(
# large ranges of blocks, implicitly limiting its size using the gas limit
db.persist(
clearEmptyAccount = vmState.com.isSpuriousOrLater(header.number),
clearCache = true)
clearCache = true,
)

var
withdrawalReqs: seq[byte]
Expand All @@ -173,17 +222,15 @@ proc procBlkEpilogue(
expected = header.stateRoot,
actual = stateRoot,
arrivedFrom = vmState.parent.stateRoot
return err("stateRoot mismatch, expect: " &
$header.stateRoot & ", got: " & $stateRoot)
return
err("stateRoot mismatch, expect: " & $header.stateRoot & ", got: " & $stateRoot)

if not skipReceipts:
let bloom = createBloom(vmState.receipts)

if header.logsBloom != bloom:
debug "wrong logsBloom in block",
blockNumber = header.number,
actual = bloom,
expected = header.logsBloom
blockNumber = header.number, actual = bloom, expected = header.logsBloom
return err("bloom mismatch")

let receiptsRoot = calcReceiptsRoot(vmState.receipts)
Expand All @@ -199,7 +246,8 @@ proc procBlkEpilogue(

if header.requestsHash.isSome:
let
depositReqs = ?parseDepositLogs(vmState.allLogs, vmState.com.depositContractAddress)
depositReqs =
?parseDepositLogs(vmState.allLogs, vmState.com.depositContractAddress)
requestsHash = calcRequestsHash(depositReqs, withdrawalReqs, consolidationReqs)

if header.requestsHash.get != requestsHash:
Expand All @@ -223,9 +271,10 @@ proc processBlock*(
skipValidation: bool = false,
skipReceipts: bool = false,
skipUncles: bool = false,
taskpool: Taskpool = nil,
): Result[void, string] =
## Generalised function to processes `blk` for any network.
?vmState.procBlkPreamble(blk, skipValidation, skipReceipts, skipUncles)
?vmState.procBlkPreamble(blk, skipValidation, skipReceipts, skipUncles, taskpool)

# EIP-3675: no reward for miner in POA/POS
if not vmState.com.proofOfStake(blk.header):
Expand Down
18 changes: 17 additions & 1 deletion nimbus/nimbus_execution_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import
../nimbus/compile_info

import
std/[os, strutils, net],
std/[os, osproc, strutils, net],
chronicles,
eth/net/nat,
metrics,
Expand Down Expand Up @@ -217,8 +217,24 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =
preventLoadingDataDirForTheWrongNetwork(coreDB, conf)
setupMetrics(nimbus, conf)

let taskpool =
try:
if conf.numThreads < 0:
fatal "The number of threads --num-threads cannot be negative."
quit 1
elif conf.numThreads == 0:
Taskpool.new(numThreads = min(countProcessors(), 16))
else:
Taskpool.new(numThreads = conf.numThreads)
except CatchableError as e:
fatal "Cannot start taskpool", err = e.msg
quit 1

info "Threadpool started", numThreads = taskpool.numThreads

let com = CommonRef.new(
db = coreDB,
taskpool = taskpool,
pruneHistory = (conf.chainDbMode == AriPrune),
networkId = conf.networkId,
params = conf.networkParams)
Expand Down
2 changes: 1 addition & 1 deletion tests/macro_assembler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ proc initVMEnv*(network: string): BaseVMState =
cdb = DefaultDbMemory.newCoreDbRef()
com = CommonRef.new(
cdb,
conf,
nil, conf,
conf.chainId.NetworkId)
parent = Header(stateRoot: EMPTY_ROOT_HASH)
parentHash = rlpHash(parent)
Expand Down
2 changes: 1 addition & 1 deletion tests/persistBlockTestGen.nim
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ proc main() {.used.} =
var conf = makeConfig()
let db = newCoreDbRef(
DefaultDbPersistent, string conf.dataDir, DbOptions.init())
let com = CommonRef.new(db)
let com = CommonRef.new(db, nil)

com.dumpTest(97)
com.dumpTest(98) # no uncles and no tx
Expand Down
2 changes: 1 addition & 1 deletion tests/test_blockchain_json.nim
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ proc executeCase(node: JsonNode): bool =
memDB = newCoreDbRef DefaultDbMemory
stateDB = LedgerRef.init(memDB)
config = getChainConfig(env.network)
com = CommonRef.new(memDB, config)
com = CommonRef.new(memDB, nil, config)

setupStateDB(env.pre, stateDB)
stateDB.persist()
Expand Down
1 change: 1 addition & 0 deletions tests/test_coredb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ proc initRunnerDB(

result = CommonRef.new(
db = coreDB,
taskpool = nil,
networkId = networkId,
params = params,
pruneHistory = pruneHistory)
Expand Down
Loading

0 comments on commit a6583df

Please sign in to comment.