From df3e1be9d3bfad3b5f75aa2cbf74007d839689c6 Mon Sep 17 00:00:00 2001 From: setunapo <98502954+setunapo@users.noreply.github.com> Date: Fri, 15 Jul 2022 19:17:08 +0800 Subject: [PATCH] [Feature]: Improve trie prefetch (#952) * trie prefetcher for From/To address in advance We found that trie prefetch could be not fast enough, especially trie prefetch of the outer big state trie tree. Instead of do trie prefetch until a transaction is finalized, we could do trie prefetch in advance. Try to prefetch the trie node of the From/To accounts, since their root hash are most likely to be changed. * Parallel TriePrefetch for large trie update. Currently, we create a subfetch for each account address to do trie prefetch. If the address has very large state change, trie prefetch could be not fast enough, e.g. a contract modified lots of KV pair or a large number of account's root hash is changed in a block. With this commit, there will be children subfetcher created to do trie prefetch in parallell if the parent subfetch's workload exceed the threshold. * some improvemnts of parallel trie prefetch implementation 1.childrenLock is removed, since it is not necessary APIs of triePrefetcher is not thread safe, they should be used sequentially. A prefetch will be interrupted by trie() or clos(), so we only need mark it as interrupted and check before call scheduleParallel to avoid the concurrent access to paraChildren 2.rename subfetcher.children to subfetcher.paraChildren 3.use subfetcher.pendingSize to replace totalSize & processedIndex 4.randomly select the start child to avoid always feed the first one 5.increase threshold and capacity to avoid create too many child routine * fix review comments ** nil check refine ** create a separate routine for From/To prefetch, avoid blocking the cirtical path * remove the interrupt member * not create a signer for each transaction * some changes to triePrefetcher ** remove the abortLoop, move the subfetcher abort operation into mainLoop since we want to make subfetcher's create & schedule & abort within a loop to avoid concurrent access locks. ** no wait subfetcher's term signal in abort() it could speed up the close by closing subfetcher concurrently. we send stop signnal to all subfetchers in burst and wait their term signal later. * some coding improve for subfetcher.scheduleParallel * fix a UT crash of s.prefetcher == nil * update parallel trie prefetcher configuration tested with different combination of parallelTriePrefetchThreshold & parallelTriePrefetchCapacity, found the most efficient configure could be: parallelTriePrefetchThreshold = 10 parallelTriePrefetchCapacity = 20 * fix review comments: code refine --- core/blockchain.go | 10 ++- core/state/statedb.go | 30 +++++++++ core/state/trie_prefetcher.go | 118 +++++++++++++++++++++++++--------- core/state_processor.go | 5 +- 4 files changed, 130 insertions(+), 33 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 89059792f0..2a509f9194 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1889,10 +1889,18 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) interruptCh := make(chan struct{}) // For diff sync, it may fallback to full sync, so we still do prefetch if len(block.Transactions()) >= prefetchTxNumber { - throwaway := statedb.Copy() // do Prefetch in a separate goroutine to avoid blocking the critical path + + // 1.do state prefetch for snapshot cache + throwaway := statedb.Copy() go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh) + + // 2.do trie prefetch for MPT trie node cache + // it is for the big state trie tree, prefetch based on transaction's From/To address. + // trie prefetcher is thread safe now, ok to prefetch in a separate routine + go statedb.TriePrefetchInAdvance(block, signer) } + //Process block using the parent state as reference point substart := time.Now() if bc.pipeCommit { diff --git a/core/state/statedb.go b/core/state/statedb.go index 0fefa5d81c..ad2638722a 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -236,6 +236,36 @@ func (s *StateDB) StopPrefetcher() { } } +func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) { + s.prefetcherLock.Lock() + prefetcher := s.prefetcher // s.prefetcher could be resetted to nil + s.prefetcherLock.Unlock() + + if prefetcher == nil { + return + } + accounts := make(map[common.Address]struct{}, block.Transactions().Len()<<1) + for _, tx := range block.Transactions() { + from, err := types.Sender(signer, tx) + if err != nil { + // invalid block, skip prefetch + return + } + accounts[from] = struct{}{} + if tx.To() != nil { + accounts[*tx.To()] = struct{}{} + } + } + addressesToPrefetch := make([][]byte, 0, len(accounts)) + for addr := range accounts { + addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure + } + + if len(addressesToPrefetch) > 0 { + prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr) + } +} + // Mark that the block is processed by diff layer func (s *StateDB) SetExpectedStateRoot(root common.Hash) { s.expectedRoot = root diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index e42628dc67..9004f489ee 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -26,8 +26,10 @@ import ( ) const ( - abortChanSize = 64 - concurrentChanSize = 10 + abortChanSize = 64 + concurrentChanSize = 10 + parallelTriePrefetchThreshold = 10 + parallelTriePrefetchCapacity = 20 ) var ( @@ -52,15 +54,13 @@ type triePrefetcher struct { fetches map[common.Hash]Trie // Partially or fully fetcher tries fetchers map[common.Hash]*subfetcher // Subfetchers for each trie + abortChan chan *subfetcher // to abort a single subfetcher and its children closed int32 closeMainChan chan struct{} // it is to inform the mainLoop closeMainDoneChan chan struct{} fetchersMutex sync.RWMutex prefetchChan chan *prefetchMsg // no need to wait for return - abortChan chan *subfetcher - closeAbortChan chan struct{} // it is used to inform abortLoop - deliveryMissMeter metrics.Meter accountLoadMeter metrics.Meter accountDupMeter metrics.Meter @@ -76,11 +76,10 @@ type triePrefetcher struct { func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace p := &triePrefetcher{ - db: db, - root: root, - fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map - abortChan: make(chan *subfetcher, abortChanSize), - closeAbortChan: make(chan struct{}), + db: db, + root: root, + fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map + abortChan: make(chan *subfetcher, abortChanSize), closeMainChan: make(chan struct{}), closeMainDoneChan: make(chan struct{}), @@ -96,11 +95,13 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), } - go p.abortLoop() go p.mainLoop() return p } +// the subfetcher's lifecycle will only be updated in this loop, +// include: subfetcher's creation & abort, child subfetcher's creation & abort. +// since the mainLoop will handle all the requests, each message handle should be lightweight func (p *triePrefetcher) mainLoop() { for { select { @@ -112,12 +113,36 @@ func (p *triePrefetcher) mainLoop() { p.fetchers[pMsg.root] = fetcher p.fetchersMutex.Unlock() } - fetcher.schedule(pMsg.keys) + select { + case <-fetcher.stop: + default: + fetcher.schedule(pMsg.keys) + // no need to run parallel trie prefetch if threshold is not reached. + if atomic.LoadUint32(&fetcher.pendingSize) > parallelTriePrefetchThreshold { + fetcher.scheduleParallel(pMsg.keys) + } + } + + case fetcher := <-p.abortChan: + fetcher.abort() + for _, child := range fetcher.paraChildren { + child.abort() + } case <-p.closeMainChan: for _, fetcher := range p.fetchers { - p.abortChan <- fetcher // safe to do multiple times + fetcher.abort() // safe to do multiple times + for _, child := range fetcher.paraChildren { + child.abort() + } + } + // make sure all subfetchers and child subfetchers are stopped + for _, fetcher := range p.fetchers { <-fetcher.term + for _, child := range fetcher.paraChildren { + <-child.term + } + if metrics.EnabledExpensive { if fetcher.root == p.root { p.accountLoadMeter.Mark(int64(len(fetcher.seen))) @@ -143,7 +168,6 @@ func (p *triePrefetcher) mainLoop() { } } } - close(p.closeAbortChan) close(p.closeMainDoneChan) p.fetchersMutex.Lock() p.fetchers = nil @@ -161,17 +185,6 @@ func (p *triePrefetcher) mainLoop() { } } -func (p *triePrefetcher) abortLoop() { - for { - select { - case fetcher := <-p.abortChan: - fetcher.abort() - case <-p.closeAbortChan: - return - } - } -} - // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { @@ -208,7 +221,8 @@ func (p *triePrefetcher) copy() *triePrefetcher { select { case <-p.closeMainChan: - // for closed trie prefetcher, the fetches should not be nil + // for closed trie prefetcher, fetchers is empty + // but the fetches should not be nil, since fetches is used to check if it is a copied inactive one. fetcherCopied := &triePrefetcher{ db: p.db, root: p.root, @@ -255,6 +269,7 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { return p.db.CopyTrie(trie) } + // use lock instead of request to mainLoop by chan to get the fetcher for performance concern. p.fetchersMutex.RLock() fetcher := p.fetchers[root] p.fetchersMutex.RUnlock() @@ -266,8 +281,8 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { // Interrupt the prefetcher if it's by any chance still running and return // a copy of any pre-loaded trie. select { - case <-p.closeAbortChan: - case p.abortChan <- fetcher: // safe to do multiple times + case <-p.closeMainChan: + case p.abortChan <- fetcher: // safe to abort a fecther multiple times } trie := fetcher.peek() @@ -323,6 +338,9 @@ type subfetcher struct { used [][]byte // Tracks the entries used in the end accountHash common.Hash + + pendingSize uint32 + paraChildren []*subfetcher // Parallel trie prefetch for address of massive change } // newSubfetcher creates a goroutine to prefetch state items belonging to a @@ -348,12 +366,51 @@ func (sf *subfetcher) schedule(keys [][]byte) { sf.lock.Lock() sf.tasks = append(sf.tasks, keys...) sf.lock.Unlock() - // Notify the prefetcher, it's fine if it's already terminated select { case sf.wake <- struct{}{}: default: } + atomic.AddUint32(&sf.pendingSize, uint32(len(keys))) +} + +func (sf *subfetcher) scheduleParallel(keys [][]byte) { + var keyIndex uint32 = 0 + childrenNum := len(sf.paraChildren) + if childrenNum > 0 { + // To feed the children first, if they are hungry. + // A child can handle keys with capacity of parallelTriePrefetchCapacity. + childIndex := len(keys) % childrenNum // randomly select the start child to avoid always feed the first one + for i := 0; i < childrenNum; i++ { + child := sf.paraChildren[childIndex] + childIndex = (childIndex + 1) % childrenNum + if atomic.LoadUint32(&child.pendingSize) >= parallelTriePrefetchCapacity { + // the child is already full, skip it + continue + } + feedNum := parallelTriePrefetchCapacity - atomic.LoadUint32(&child.pendingSize) + if keyIndex+feedNum >= uint32(len(keys)) { + // the new arrived keys are all consumed by children. + child.schedule(keys[keyIndex:]) + return + } + child.schedule(keys[keyIndex : keyIndex+feedNum]) + keyIndex += feedNum + } + } + // Children did not consume all the keys, to create new subfetch to handle left keys. + keysLeft := keys[keyIndex:] + keysLeftSize := len(keysLeft) + for i := 0; i*parallelTriePrefetchCapacity < keysLeftSize; i++ { + child := newSubfetcher(sf.db, sf.root, sf.accountHash) + sf.paraChildren = append(sf.paraChildren, child) + endIndex := (i + 1) * parallelTriePrefetchCapacity + if endIndex >= keysLeftSize { + child.schedule(keysLeft[i*parallelTriePrefetchCapacity:]) + return + } + child.schedule(keysLeft[i*parallelTriePrefetchCapacity : endIndex]) + } } // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it @@ -382,7 +439,7 @@ func (sf *subfetcher) abort() { default: close(sf.stop) } - <-sf.term + // no need to wait <-sf.term here, will check sf.term later } // loop waits for new tasks to be scheduled and keeps loading them until it runs @@ -450,6 +507,7 @@ func (sf *subfetcher) loop() { sf.trie.TryGet(task) sf.seen[string(task)] = struct{}{} } + atomic.AddUint32(&sf.pendingSize, ^uint32(0)) // decrease } } diff --git a/core/state_processor.go b/core/state_processor.go index 97025fb9eb..b42938adf9 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -403,9 +403,10 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg posa, isPoSA := p.engine.(consensus.PoSA) commonTxs := make([]*types.Transaction, 0, txNum) - // initilise bloom processors + // initialise bloom processors bloomProcessors := NewAsyncReceiptBloomGenerator(txNum) statedb.MarkFullProcessed() + signer := types.MakeSigner(p.config, header.Number) // usually do have two tx, one for validator set contract, another for system reward contract. systemTxs := make([]*types.Transaction, 0, 2) @@ -421,7 +422,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg } } - msg, err := tx.AsMessage(types.MakeSigner(p.config, header.Number), header.BaseFee) + msg, err := tx.AsMessage(signer, header.BaseFee) if err != nil { bloomProcessors.Close() return statedb, nil, nil, 0, err