Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]: Trie prefetch on state pretch #996

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1892,13 +1892,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
// do Prefetch in a separate goroutine to avoid blocking the critical path

// 1.do state prefetch for snapshot cache
throwaway := statedb.Copy()
throwaway := statedb.CopyDoPrefetch()
go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh)

// 2.do trie prefetch for MPT trie node cache
// it is for the big state trie tree, prefetch based on transaction's From/To address.
// trie prefetcher is thread safe now, ok to prefetch in a separate routine
go statedb.TriePrefetchInAdvance(block, signer)
go throwaway.TriePrefetchInAdvance(block, signer)
}

//Process block using the parent state as reference point
Expand Down
27 changes: 19 additions & 8 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,9 @@ func (s *StateDB) StopPrefetcher() {
}

func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) {
s.prefetcherLock.Lock()
prefetcher := s.prefetcher // s.prefetcher could be resetted to nil
s.prefetcherLock.Unlock()

// s is a temporary throw away StateDB, s.prefetcher won't be resetted to nil
// so no need to add lock for s.prefetcher
prefetcher := s.prefetcher
if prefetcher == nil {
return
}
Expand Down Expand Up @@ -818,6 +817,17 @@ func (db *StateDB) ForEachStorage(addr common.Address, cb func(key, value common
// Copy creates a deep, independent copy of the state.
// Snapshots of the copied state cannot be applied to the copy.
func (s *StateDB) Copy() *StateDB {
return s.copyInternal(false)
}

// It is mainly for state prefetcher to do trie prefetch right now.
func (s *StateDB) CopyDoPrefetch() *StateDB {
return s.copyInternal(true)
}

// If doPrefetch is true, it tries to reuse the prefetcher, the copied StateDB will do active trie prefetch.
// otherwise, just do inactive copy trie prefetcher.
func (s *StateDB) copyInternal(doPrefetch bool) *StateDB {
// Copy all the basic fields, initialize the memory ones
state := &StateDB{
db: s.db,
Expand Down Expand Up @@ -884,10 +894,11 @@ func (s *StateDB) Copy() *StateDB {
state.accessList = s.accessList.Copy()
}

// If there's a prefetcher running, make an inactive copy of it that can
// only access data but does not actively preload (since the user will not
// know that they need to explicitly terminate an active copy).
if s.prefetcher != nil {
state.prefetcher = s.prefetcher
if s.prefetcher != nil && !doPrefetch {
// If there's a prefetcher running, make an inactive copy of it that can
// only access data but does not actively preload (since the user will not
// know that they need to explicitly terminate an active copy).
state.prefetcher = s.prefetcher.copy()
}
if s.snaps != nil {
Expand Down
12 changes: 2 additions & 10 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,7 @@ func (p *triePrefetcher) mainLoop() {
p.fetchersMutex.Lock()
p.fetchers = nil
p.fetchersMutex.Unlock()

// drain all the channels before quit the loop
for {
select {
case <-p.prefetchChan:
default:
return
}
}
return
}
}
}
Expand Down Expand Up @@ -362,6 +354,7 @@ func newSubfetcher(db Database, root common.Hash, accountHash common.Hash) *subf

// schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(keys [][]byte) {
atomic.AddUint32(&sf.pendingSize, uint32(len(keys)))
forcodedancing marked this conversation as resolved.
Show resolved Hide resolved
// Append the tasks to the current queue
sf.lock.Lock()
sf.tasks = append(sf.tasks, keys...)
Expand All @@ -371,7 +364,6 @@ func (sf *subfetcher) schedule(keys [][]byte) {
case sf.wake <- struct{}{}:
default:
}
atomic.AddUint32(&sf.pendingSize, uint32(len(keys)))
}

func (sf *subfetcher) scheduleParallel(keys [][]byte) {
Expand Down
9 changes: 6 additions & 3 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
// No need to execute the first batch, since the main processor will do it.
for i := 0; i < prefetchThread; i++ {
go func() {
newStatedb := statedb.Copy()
newStatedb := statedb.CopyDoPrefetch()
newStatedb.EnableWriteOnSharedStorage()
gaspool := new(GasPool).AddGas(block.GasLimit())
blockContext := NewEVMBlockContext(header, p.bc, nil)
Expand Down Expand Up @@ -100,7 +100,7 @@ func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce,
for i := 0; i < prefetchThread; i++ {
go func(startCh <-chan *types.Transaction, stopCh <-chan struct{}) {
idx := 0
newStatedb := statedb.Copy()
newStatedb := statedb.CopyDoPrefetch()
newStatedb.EnableWriteOnSharedStorage()
gaspool := new(GasPool).AddGas(gasLimit)
blockContext := NewEVMBlockContext(header, p.bc, nil)
Expand Down Expand Up @@ -153,5 +153,8 @@ func precacheTransaction(msg types.Message, config *params.ChainConfig, gaspool
// Update the evm with the new transaction context.
evm.Reset(NewEVMTxContext(msg), statedb)
// Add addresses to access list if applicable
ApplyMessage(evm, msg, gaspool)
if _, err := ApplyMessage(evm, msg, gaspool); err == nil {
statedb.Finalise(true)
forcodedancing marked this conversation as resolved.
Show resolved Hide resolved
}

}
2 changes: 1 addition & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
txsPrefetch := txs.Copy()
tx := txsPrefetch.Peek()
txCurr := &tx
w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr)
w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), interruptCh, txCurr)

LOOP:
for {
Expand Down