From b1880958b06a26337ac01776c00258f50c62c967 Mon Sep 17 00:00:00 2001 From: setunapo Date: Fri, 17 Jun 2022 15:14:23 +0800 Subject: [PATCH] Improve the state prefetch dispatch policy ** replace atomic read by channel close to interrupt state prefetch ** try to do state prefetch when the prefetch thread is idle, no need to divide into 3 sub-arrays it will make the prefetchers workload balance --- core/blockchain.go | 11 +++++----- core/state_prefetcher.go | 46 ++++++++++++++++++++-------------------- core/types.go | 2 +- 3 files changed, 30 insertions(+), 29 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index eeaf1b7e0a..9cd7ccce61 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2114,13 +2114,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") - var followupInterrupt uint32 + interruptChan := make(chan struct{}) // For diff sync, it may fallback to full sync, so we still do prefetch if len(block.Transactions()) >= prefetchTxNumber { throwaway := statedb.Copy() - go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) { - bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) - }(time.Now(), block, throwaway, &followupInterrupt) + // do Prefetch in a separate goroutine to avoid blocking the critical path + go func() { + bc.prefetcher.Prefetch(block, throwaway, bc.vmConfig, interruptChan) + }() } //Process block using the parent state as reference point substart := time.Now() @@ -2129,7 +2130,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er } statedb.SetExpectedStateRoot(block.Root()) statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) - atomic.StoreUint32(&followupInterrupt, 1) + close(interruptChan) // state prefetch can be stopped activeState = statedb if err != nil { bc.reportBlock(block, receipts, err) diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 4a731d8dfe..2e6c3bc970 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -17,8 +17,6 @@ package core import ( - "sync/atomic" - "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -50,43 +48,45 @@ func NewStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine conse // Prefetch processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb, but any changes are discarded. The // only goal is to pre-cache transaction signatures and snapshot clean state. -func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) { +func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptChan chan struct{}) { var ( header = block.Header() signer = types.MakeSigner(p.config, header.Number) ) transactions := block.Transactions() - sortTransactions := make([][]*types.Transaction, prefetchThread) - for i := 0; i < prefetchThread; i++ { - sortTransactions[i] = make([]*types.Transaction, 0, len(transactions)/prefetchThread) - } - for idx := range transactions { - threadIdx := idx % prefetchThread - sortTransactions[threadIdx] = append(sortTransactions[threadIdx], transactions[idx]) - } + txChan := make(chan int, prefetchThread) // No need to execute the first batch, since the main processor will do it. for i := 0; i < prefetchThread; i++ { - go func(idx int) { + go func() { newStatedb := statedb.Copy() newStatedb.EnableWriteOnSharedStorage() gaspool := new(GasPool).AddGas(block.GasLimit()) blockContext := NewEVMBlockContext(header, p.bc, nil) evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) // Iterate over and process the individual transactions - for i, tx := range sortTransactions[idx] { - // If block precaching was interrupted, abort - if interrupt != nil && atomic.LoadUint32(interrupt) == 1 { + for { + select { + case txIndex := <-txChan: + tx := transactions[txIndex] + // Convert the transaction into an executable message and pre-cache its sender + msg, err := tx.AsMessageNoNonceCheck(signer) + if err != nil { + return // Also invalid block, bail out + } + newStatedb.Prepare(tx.Hash(), header.Hash(), txIndex) + precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm) + + case <-interruptChan: + // If block precaching was interrupted, abort return } - // Convert the transaction into an executable message and pre-cache its sender - msg, err := tx.AsMessageNoNonceCheck(signer) - if err != nil { - return // Also invalid block, bail out - } - newStatedb.Prepare(tx.Hash(), header.Hash(), i) - precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm) } - }(i) + }() + } + + // it should be in a separate goroutine, to avoid blocking the critical path. + for i := 0; i < len(transactions); i++ { + txChan <- i } } diff --git a/core/types.go b/core/types.go index c9061233e6..54b16865f8 100644 --- a/core/types.go +++ b/core/types.go @@ -39,7 +39,7 @@ type Prefetcher interface { // Prefetch processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb, but any changes are discarded. The // only goal is to pre-cache transaction signatures and state trie nodes. - Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) + Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptChan chan struct{}) // PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage. PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) }