diff --git a/core/blockchain.go b/core/blockchain.go index 89059792f0..2a509f9194 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1889,10 +1889,18 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) interruptCh := make(chan struct{}) // For diff sync, it may fallback to full sync, so we still do prefetch if len(block.Transactions()) >= prefetchTxNumber { - throwaway := statedb.Copy() // do Prefetch in a separate goroutine to avoid blocking the critical path + + // 1.do state prefetch for snapshot cache + throwaway := statedb.Copy() go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh) + + // 2.do trie prefetch for MPT trie node cache + // it is for the big state trie tree, prefetch based on transaction's From/To address. + // trie prefetcher is thread safe now, ok to prefetch in a separate routine + go statedb.TriePrefetchInAdvance(block, signer) } + //Process block using the parent state as reference point substart := time.Now() if bc.pipeCommit { diff --git a/core/state/statedb.go b/core/state/statedb.go index 43e99e3f14..0fa2e1faa5 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -236,6 +236,36 @@ func (s *StateDB) StopPrefetcher() { } } +func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) { + s.prefetcherLock.Lock() + prefetcher := s.prefetcher // s.prefetcher could be resetted to nil + s.prefetcherLock.Unlock() + + if prefetcher == nil { + return + } + accounts := make(map[common.Address]struct{}, block.Transactions().Len()<<1) + for _, tx := range block.Transactions() { + from, err := types.Sender(signer, tx) + if err != nil { + // invalid block, skip prefetch + return + } + accounts[from] = struct{}{} + if tx.To() != nil { + accounts[*tx.To()] = struct{}{} + } + } + addressesToPrefetch := make([][]byte, 0, len(accounts)) + for addr := range accounts { + addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure + } + + if len(addressesToPrefetch) > 0 { + prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr) + } +} + // Mark that the block is processed by diff layer func (s *StateDB) SetExpectedStateRoot(root common.Hash) { s.expectedRoot = root diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index e42628dc67..9004f489ee 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -26,8 +26,10 @@ import ( ) const ( - abortChanSize = 64 - concurrentChanSize = 10 + abortChanSize = 64 + concurrentChanSize = 10 + parallelTriePrefetchThreshold = 10 + parallelTriePrefetchCapacity = 20 ) var ( @@ -52,15 +54,13 @@ type triePrefetcher struct { fetches map[common.Hash]Trie // Partially or fully fetcher tries fetchers map[common.Hash]*subfetcher // Subfetchers for each trie + abortChan chan *subfetcher // to abort a single subfetcher and its children closed int32 closeMainChan chan struct{} // it is to inform the mainLoop closeMainDoneChan chan struct{} fetchersMutex sync.RWMutex prefetchChan chan *prefetchMsg // no need to wait for return - abortChan chan *subfetcher - closeAbortChan chan struct{} // it is used to inform abortLoop - deliveryMissMeter metrics.Meter accountLoadMeter metrics.Meter accountDupMeter metrics.Meter @@ -76,11 +76,10 @@ type triePrefetcher struct { func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace p := &triePrefetcher{ - db: db, - root: root, - fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map - abortChan: make(chan *subfetcher, abortChanSize), - closeAbortChan: make(chan struct{}), + db: db, + root: root, + fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map + abortChan: make(chan *subfetcher, abortChanSize), closeMainChan: make(chan struct{}), closeMainDoneChan: make(chan struct{}), @@ -96,11 +95,13 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), } - go p.abortLoop() go p.mainLoop() return p } +// the subfetcher's lifecycle will only be updated in this loop, +// include: subfetcher's creation & abort, child subfetcher's creation & abort. +// since the mainLoop will handle all the requests, each message handle should be lightweight func (p *triePrefetcher) mainLoop() { for { select { @@ -112,12 +113,36 @@ func (p *triePrefetcher) mainLoop() { p.fetchers[pMsg.root] = fetcher p.fetchersMutex.Unlock() } - fetcher.schedule(pMsg.keys) + select { + case <-fetcher.stop: + default: + fetcher.schedule(pMsg.keys) + // no need to run parallel trie prefetch if threshold is not reached. + if atomic.LoadUint32(&fetcher.pendingSize) > parallelTriePrefetchThreshold { + fetcher.scheduleParallel(pMsg.keys) + } + } + + case fetcher := <-p.abortChan: + fetcher.abort() + for _, child := range fetcher.paraChildren { + child.abort() + } case <-p.closeMainChan: for _, fetcher := range p.fetchers { - p.abortChan <- fetcher // safe to do multiple times + fetcher.abort() // safe to do multiple times + for _, child := range fetcher.paraChildren { + child.abort() + } + } + // make sure all subfetchers and child subfetchers are stopped + for _, fetcher := range p.fetchers { <-fetcher.term + for _, child := range fetcher.paraChildren { + <-child.term + } + if metrics.EnabledExpensive { if fetcher.root == p.root { p.accountLoadMeter.Mark(int64(len(fetcher.seen))) @@ -143,7 +168,6 @@ func (p *triePrefetcher) mainLoop() { } } } - close(p.closeAbortChan) close(p.closeMainDoneChan) p.fetchersMutex.Lock() p.fetchers = nil @@ -161,17 +185,6 @@ func (p *triePrefetcher) mainLoop() { } } -func (p *triePrefetcher) abortLoop() { - for { - select { - case fetcher := <-p.abortChan: - fetcher.abort() - case <-p.closeAbortChan: - return - } - } -} - // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { @@ -208,7 +221,8 @@ func (p *triePrefetcher) copy() *triePrefetcher { select { case <-p.closeMainChan: - // for closed trie prefetcher, the fetches should not be nil + // for closed trie prefetcher, fetchers is empty + // but the fetches should not be nil, since fetches is used to check if it is a copied inactive one. fetcherCopied := &triePrefetcher{ db: p.db, root: p.root, @@ -255,6 +269,7 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { return p.db.CopyTrie(trie) } + // use lock instead of request to mainLoop by chan to get the fetcher for performance concern. p.fetchersMutex.RLock() fetcher := p.fetchers[root] p.fetchersMutex.RUnlock() @@ -266,8 +281,8 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { // Interrupt the prefetcher if it's by any chance still running and return // a copy of any pre-loaded trie. select { - case <-p.closeAbortChan: - case p.abortChan <- fetcher: // safe to do multiple times + case <-p.closeMainChan: + case p.abortChan <- fetcher: // safe to abort a fecther multiple times } trie := fetcher.peek() @@ -323,6 +338,9 @@ type subfetcher struct { used [][]byte // Tracks the entries used in the end accountHash common.Hash + + pendingSize uint32 + paraChildren []*subfetcher // Parallel trie prefetch for address of massive change } // newSubfetcher creates a goroutine to prefetch state items belonging to a @@ -348,12 +366,51 @@ func (sf *subfetcher) schedule(keys [][]byte) { sf.lock.Lock() sf.tasks = append(sf.tasks, keys...) sf.lock.Unlock() - // Notify the prefetcher, it's fine if it's already terminated select { case sf.wake <- struct{}{}: default: } + atomic.AddUint32(&sf.pendingSize, uint32(len(keys))) +} + +func (sf *subfetcher) scheduleParallel(keys [][]byte) { + var keyIndex uint32 = 0 + childrenNum := len(sf.paraChildren) + if childrenNum > 0 { + // To feed the children first, if they are hungry. + // A child can handle keys with capacity of parallelTriePrefetchCapacity. + childIndex := len(keys) % childrenNum // randomly select the start child to avoid always feed the first one + for i := 0; i < childrenNum; i++ { + child := sf.paraChildren[childIndex] + childIndex = (childIndex + 1) % childrenNum + if atomic.LoadUint32(&child.pendingSize) >= parallelTriePrefetchCapacity { + // the child is already full, skip it + continue + } + feedNum := parallelTriePrefetchCapacity - atomic.LoadUint32(&child.pendingSize) + if keyIndex+feedNum >= uint32(len(keys)) { + // the new arrived keys are all consumed by children. + child.schedule(keys[keyIndex:]) + return + } + child.schedule(keys[keyIndex : keyIndex+feedNum]) + keyIndex += feedNum + } + } + // Children did not consume all the keys, to create new subfetch to handle left keys. + keysLeft := keys[keyIndex:] + keysLeftSize := len(keysLeft) + for i := 0; i*parallelTriePrefetchCapacity < keysLeftSize; i++ { + child := newSubfetcher(sf.db, sf.root, sf.accountHash) + sf.paraChildren = append(sf.paraChildren, child) + endIndex := (i + 1) * parallelTriePrefetchCapacity + if endIndex >= keysLeftSize { + child.schedule(keysLeft[i*parallelTriePrefetchCapacity:]) + return + } + child.schedule(keysLeft[i*parallelTriePrefetchCapacity : endIndex]) + } } // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it @@ -382,7 +439,7 @@ func (sf *subfetcher) abort() { default: close(sf.stop) } - <-sf.term + // no need to wait <-sf.term here, will check sf.term later } // loop waits for new tasks to be scheduled and keeps loading them until it runs @@ -450,6 +507,7 @@ func (sf *subfetcher) loop() { sf.trie.TryGet(task) sf.seen[string(task)] = struct{}{} } + atomic.AddUint32(&sf.pendingSize, ^uint32(0)) // decrease } } diff --git a/core/state_processor.go b/core/state_processor.go index 97025fb9eb..b42938adf9 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -403,9 +403,10 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg posa, isPoSA := p.engine.(consensus.PoSA) commonTxs := make([]*types.Transaction, 0, txNum) - // initilise bloom processors + // initialise bloom processors bloomProcessors := NewAsyncReceiptBloomGenerator(txNum) statedb.MarkFullProcessed() + signer := types.MakeSigner(p.config, header.Number) // usually do have two tx, one for validator set contract, another for system reward contract. systemTxs := make([]*types.Transaction, 0, 2) @@ -421,7 +422,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg } } - msg, err := tx.AsMessage(types.MakeSigner(p.config, header.Number), header.BaseFee) + msg, err := tx.AsMessage(signer, header.BaseFee) if err != nil { bloomProcessors.Close() return statedb, nil, nil, 0, err