diff --git a/core/blockchain.go b/core/blockchain.go index 2a509f9194..60bee85762 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1892,13 +1892,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) // do Prefetch in a separate goroutine to avoid blocking the critical path // 1.do state prefetch for snapshot cache - throwaway := statedb.Copy() + throwaway := statedb.CopyDoPrefetch() go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh) // 2.do trie prefetch for MPT trie node cache // it is for the big state trie tree, prefetch based on transaction's From/To address. // trie prefetcher is thread safe now, ok to prefetch in a separate routine - go statedb.TriePrefetchInAdvance(block, signer) + go throwaway.TriePrefetchInAdvance(block, signer) } //Process block using the parent state as reference point diff --git a/core/state/statedb.go b/core/state/statedb.go index 0fa2e1faa5..a482d15abb 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -237,10 +237,9 @@ func (s *StateDB) StopPrefetcher() { } func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) { - s.prefetcherLock.Lock() - prefetcher := s.prefetcher // s.prefetcher could be resetted to nil - s.prefetcherLock.Unlock() - + // s is a temporary throw away StateDB, s.prefetcher won't be resetted to nil + // so no need to add lock for s.prefetcher + prefetcher := s.prefetcher if prefetcher == nil { return } @@ -818,6 +817,17 @@ func (db *StateDB) ForEachStorage(addr common.Address, cb func(key, value common // Copy creates a deep, independent copy of the state. // Snapshots of the copied state cannot be applied to the copy. func (s *StateDB) Copy() *StateDB { + return s.copyInternal(false) +} + +// It is mainly for state prefetcher to do trie prefetch right now. +func (s *StateDB) CopyDoPrefetch() *StateDB { + return s.copyInternal(true) +} + +// If doPrefetch is true, it tries to reuse the prefetcher, the copied StateDB will do active trie prefetch. +// otherwise, just do inactive copy trie prefetcher. +func (s *StateDB) copyInternal(doPrefetch bool) *StateDB { // Copy all the basic fields, initialize the memory ones state := &StateDB{ db: s.db, @@ -884,10 +894,11 @@ func (s *StateDB) Copy() *StateDB { state.accessList = s.accessList.Copy() } - // If there's a prefetcher running, make an inactive copy of it that can - // only access data but does not actively preload (since the user will not - // know that they need to explicitly terminate an active copy). - if s.prefetcher != nil { + state.prefetcher = s.prefetcher + if s.prefetcher != nil && !doPrefetch { + // If there's a prefetcher running, make an inactive copy of it that can + // only access data but does not actively preload (since the user will not + // know that they need to explicitly terminate an active copy). state.prefetcher = s.prefetcher.copy() } if s.snaps != nil { diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 9004f489ee..dd22f9d084 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -172,15 +172,7 @@ func (p *triePrefetcher) mainLoop() { p.fetchersMutex.Lock() p.fetchers = nil p.fetchersMutex.Unlock() - - // drain all the channels before quit the loop - for { - select { - case <-p.prefetchChan: - default: - return - } - } + return } } } @@ -362,6 +354,7 @@ func newSubfetcher(db Database, root common.Hash, accountHash common.Hash) *subf // schedule adds a batch of trie keys to the queue to prefetch. func (sf *subfetcher) schedule(keys [][]byte) { + atomic.AddUint32(&sf.pendingSize, uint32(len(keys))) // Append the tasks to the current queue sf.lock.Lock() sf.tasks = append(sf.tasks, keys...) @@ -371,7 +364,6 @@ func (sf *subfetcher) schedule(keys [][]byte) { case sf.wake <- struct{}{}: default: } - atomic.AddUint32(&sf.pendingSize, uint32(len(keys))) } func (sf *subfetcher) scheduleParallel(keys [][]byte) { diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index d39f3e4e25..4dffe22341 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -58,7 +58,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c // No need to execute the first batch, since the main processor will do it. for i := 0; i < prefetchThread; i++ { go func() { - newStatedb := statedb.Copy() + newStatedb := statedb.CopyDoPrefetch() newStatedb.EnableWriteOnSharedStorage() gaspool := new(GasPool).AddGas(block.GasLimit()) blockContext := NewEVMBlockContext(header, p.bc, nil) @@ -100,7 +100,7 @@ func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, for i := 0; i < prefetchThread; i++ { go func(startCh <-chan *types.Transaction, stopCh <-chan struct{}) { idx := 0 - newStatedb := statedb.Copy() + newStatedb := statedb.CopyDoPrefetch() newStatedb.EnableWriteOnSharedStorage() gaspool := new(GasPool).AddGas(gasLimit) blockContext := NewEVMBlockContext(header, p.bc, nil) @@ -153,5 +153,8 @@ func precacheTransaction(msg types.Message, config *params.ChainConfig, gaspool // Update the evm with the new transaction context. evm.Reset(NewEVMTxContext(msg), statedb) // Add addresses to access list if applicable - ApplyMessage(evm, msg, gaspool) + if _, err := ApplyMessage(evm, msg, gaspool); err == nil { + statedb.Finalise(true) + } + } diff --git a/miner/worker.go b/miner/worker.go index 42056df4e9..3f643bf677 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -899,7 +899,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP txsPrefetch := txs.Copy() tx := txsPrefetch.Peek() txCurr := &tx - w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr) + w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), interruptCh, txCurr) LOOP: for {