Skip to content

Commit

Permalink
Improve the state prefetch dispatch policy
Browse files Browse the repository at this point in the history
** replace atomic read by channel close to interrupt state prefetch
** try to do state prefetch when the prefetch thread is idle, no need to divide into 3 sub-arrays
   it will make the prefetchers workload balance
  • Loading branch information
setunapo committed Jun 20, 2022
1 parent 28f8090 commit c224b6f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 29 deletions.
11 changes: 6 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2114,13 +2114,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
var followupInterrupt uint32
interruptChan := make(chan struct{})
// For diff sync, it may fallback to full sync, so we still do prefetch
if len(block.Transactions()) >= prefetchTxNumber {
throwaway := statedb.Copy()
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
}(time.Now(), block, throwaway, &followupInterrupt)
// do Prefetch in a separate goroutine to avoid blocking the critical path
go func() {
bc.prefetcher.Prefetch(block, throwaway, bc.vmConfig, interruptChan)
}()
}
//Process block using the parent state as reference point
substart := time.Now()
Expand All @@ -2129,7 +2130,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
}
statedb.SetExpectedStateRoot(block.Root())
statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
atomic.StoreUint32(&followupInterrupt, 1)
close(interruptChan) // state prefetch can be stopped
activeState = statedb
if err != nil {
bc.reportBlock(block, receipts, err)
Expand Down
46 changes: 23 additions & 23 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package core

import (
"sync/atomic"

"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -50,43 +48,45 @@ func NewStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine conse
// Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and snapshot clean state.
func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) {
func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptChan chan struct{}) {
var (
header = block.Header()
signer = types.MakeSigner(p.config, header.Number)
)
transactions := block.Transactions()
sortTransactions := make([][]*types.Transaction, prefetchThread)
for i := 0; i < prefetchThread; i++ {
sortTransactions[i] = make([]*types.Transaction, 0, len(transactions)/prefetchThread)
}
for idx := range transactions {
threadIdx := idx % prefetchThread
sortTransactions[threadIdx] = append(sortTransactions[threadIdx], transactions[idx])
}
txChan := make(chan int, prefetchThread)
// No need to execute the first batch, since the main processor will do it.
for i := 0; i < prefetchThread; i++ {
go func(idx int) {
go func() {
newStatedb := statedb.Copy()
newStatedb.EnableWriteOnSharedStorage()
gaspool := new(GasPool).AddGas(block.GasLimit())
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
// Iterate over and process the individual transactions
for i, tx := range sortTransactions[idx] {
// If block precaching was interrupted, abort
if interrupt != nil && atomic.LoadUint32(interrupt) == 1 {
for {
select {
case txIndex := <-txChan:
tx := transactions[txIndex]
// Convert the transaction into an executable message and pre-cache its sender
msg, err := tx.AsMessageNoNonceCheck(signer)
if err != nil {
return // Also invalid block, bail out
}
newStatedb.Prepare(tx.Hash(), header.Hash(), txIndex)
precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm)

case <-interruptChan:
// If block precaching was interrupted, abort
return
}
// Convert the transaction into an executable message and pre-cache its sender
msg, err := tx.AsMessageNoNonceCheck(signer)
if err != nil {
return // Also invalid block, bail out
}
newStatedb.Prepare(tx.Hash(), header.Hash(), i)
precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm)
}
}(i)
}()
}

// we are in a separate goroutine, won't block the critical path of insertChain
for i := 0; i < len(transactions); i++ {
txChan <- i
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Prefetcher interface {
// Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptChan chan struct{})
// PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage.
PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction)
}
Expand Down

0 comments on commit c224b6f

Please sign in to comment.