From 8fa25d7233abc2ac547e2bf00da5c45f5b66805c Mon Sep 17 00:00:00 2001 From: setunapo Date: Mon, 20 Jun 2022 14:22:07 +0800 Subject: [PATCH 01/11] trie prefetcher for From/To address in advance We found that trie prefetch could be not fast enough, especially trie prefetch of the outer big state trie tree. Instead of do trie prefetch until a transaction is finalized, we could do trie prefetch in advance. Try to prefetch the trie node of the From/To accounts, since their root hash are most likely to be changed. --- core/state/statedb.go | 23 +++++++++++++++++++++++ core/state_processor.go | 5 ++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/core/state/statedb.go b/core/state/statedb.go index 43e99e3f14..83631af8ee 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -236,6 +236,29 @@ func (s *StateDB) StopPrefetcher() { } } +func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) { + accounts := make(map[common.Address]struct{}, block.Transactions().Len()<<1) + for _, tx := range block.Transactions() { + from, err := types.Sender(signer, tx) + if err != nil { + // invalid block, skip prefetch + return + } + accounts[from] = struct{}{} + if tx.To() != nil { + accounts[*tx.To()] = struct{}{} + } + } + addressesToPrefetch := make([][]byte, 0, len(accounts)) + for addr := range accounts { + addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure + } + + if s.prefetcher != nil && len(addressesToPrefetch) > 0 { + s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr) + } +} + // Mark that the block is processed by diff layer func (s *StateDB) SetExpectedStateRoot(root common.Hash) { s.expectedRoot = root diff --git a/core/state_processor.go b/core/state_processor.go index 97025fb9eb..4aaecffd32 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -403,10 +403,13 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg posa, isPoSA := p.engine.(consensus.PoSA) commonTxs := make([]*types.Transaction, 0, txNum) - // initilise bloom processors + // initialise bloom processors bloomProcessors := NewAsyncReceiptBloomGenerator(txNum) statedb.MarkFullProcessed() + // do trie prefetch for the big state trie tree in advance based transaction's From/To address. + statedb.TriePrefetchInAdvance(block, signer) + // usually do have two tx, one for validator set contract, another for system reward contract. systemTxs := make([]*types.Transaction, 0, 2) From 70f7d9e223e005d820b4149035647b7afd768e32 Mon Sep 17 00:00:00 2001 From: setunapo Date: Tue, 21 Jun 2022 09:14:02 +0800 Subject: [PATCH 02/11] Parallel TriePrefetch for large trie update. Currently, we create a subfetch for each account address to do trie prefetch. If the address has very large state change, trie prefetch could be not fast enough, e.g. a contract modified lots of KV pair or a large number of account's root hash is changed in a block. With this commit, there will be children subfetcher created to do trie prefetch in parallell if the parent subfetch's workload exceed the threshold. --- core/state/trie_prefetcher.go | 73 ++++++++++++++++++++++++++++++++--- 1 file changed, 67 insertions(+), 6 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index e42628dc67..06ab00678b 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -26,8 +26,10 @@ import ( ) const ( - abortChanSize = 64 - concurrentChanSize = 10 + abortChanSize = 64 + concurrentChanSize = 10 + parallelTriePrefetchThreshold = 50 + parallelTriePrefetchCapacity = 100 ) var ( @@ -113,6 +115,10 @@ func (p *triePrefetcher) mainLoop() { p.fetchersMutex.Unlock() } fetcher.schedule(pMsg.keys) + // no need to run parallel trie prefetch if threshold is not reached. + if fetcher.pendingSize() > parallelTriePrefetchThreshold { + fetcher.scheduleParallel(pMsg.keys) + } case <-p.closeMainChan: for _, fetcher := range p.fetchers { @@ -166,6 +172,14 @@ func (p *triePrefetcher) abortLoop() { select { case fetcher := <-p.abortChan: fetcher.abort() + // stop fetcher's parallel children + fetcher.childrenLock.Lock() + children := fetcher.children + fetcher.children = nil + fetcher.childrenLock.Unlock() + for _, child := range children { + child.abort() + } case <-p.closeAbortChan: return } @@ -310,8 +324,10 @@ type subfetcher struct { root common.Hash // Root hash of the trie to prefetch trie Trie // Trie being populated with nodes - tasks [][]byte // Items queued up for retrieval - lock sync.Mutex // Lock protecting the task queue + tasks [][]byte // Items queued up for retrieval + lock sync.Mutex // Lock protecting the task queue + totalSize uint32 + processedIndex uint32 wake chan struct{} // Wake channel if a new task is scheduled stop chan struct{} // Channel to interrupt processing @@ -322,7 +338,9 @@ type subfetcher struct { dups int // Number of duplicate preload tasks used [][]byte // Tracks the entries used in the end - accountHash common.Hash + accountHash common.Hash + children []*subfetcher + childrenLock sync.Mutex } // newSubfetcher creates a goroutine to prefetch state items belonging to a @@ -342,18 +360,60 @@ func newSubfetcher(db Database, root common.Hash, accountHash common.Hash) *subf return sf } +func (sf *subfetcher) pendingSize() uint32 { + return sf.totalSize - atomic.LoadUint32(&sf.processedIndex) +} + // schedule adds a batch of trie keys to the queue to prefetch. func (sf *subfetcher) schedule(keys [][]byte) { // Append the tasks to the current queue sf.lock.Lock() sf.tasks = append(sf.tasks, keys...) sf.lock.Unlock() - // Notify the prefetcher, it's fine if it's already terminated select { case sf.wake <- struct{}{}: default: } + sf.totalSize += uint32(len(keys)) +} + +func (sf *subfetcher) scheduleParallel(keys [][]byte) { + // To feed the children first, if they are hungry. + // A child can handle keys with capacity of parallelTriePrefetchCapacity. + var curKeyIndex uint32 = 0 + for _, child := range sf.children { + feedNum := parallelTriePrefetchCapacity - child.pendingSize() + if feedNum == 0 { // the child is full, can't process more tasks + continue + } + if curKeyIndex+feedNum > uint32(len(keys)) { + feedNum = uint32(len(keys)) - curKeyIndex + } + child.schedule(keys[curKeyIndex : curKeyIndex+feedNum]) + curKeyIndex += feedNum + if curKeyIndex == uint32(len(keys)) { + return // the new arrived keys were all consumed by children. + } + } + // Children did not comsume all the keys, to create new subfetch to handle left keys. + keysLeft := keys[curKeyIndex:] + + // the pending tasks exceed the threshold and have not been consumed up by its children + dispatchSize := len(keysLeft) + children := []*subfetcher{} + for i := 0; i*parallelTriePrefetchCapacity < dispatchSize; i++ { + child := newSubfetcher(sf.db, sf.root, sf.accountHash) + endIndex := (i + 1) * parallelTriePrefetchCapacity + if endIndex > dispatchSize { + endIndex = dispatchSize + } + child.schedule(keysLeft[i*parallelTriePrefetchCapacity : endIndex]) + children = append(children, child) + } + sf.childrenLock.Lock() + sf.children = append(sf.children, children...) + sf.childrenLock.Unlock() } // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it @@ -450,6 +510,7 @@ func (sf *subfetcher) loop() { sf.trie.TryGet(task) sf.seen[string(task)] = struct{}{} } + atomic.AddUint32(&sf.processedIndex, 1) } } From c12564bfc19b4cce50e74444fb2e72f172d9dc90 Mon Sep 17 00:00:00 2001 From: setunapo Date: Mon, 27 Jun 2022 13:40:33 +0800 Subject: [PATCH 03/11] some improvemnts of parallel trie prefetch implementation 1.childrenLock is removed, since it is not necessary APIs of triePrefetcher is not thread safe, they should be used sequentially. A prefetch will be interrupted by trie() or clos(), so we only need mark it as interrupted and check before call scheduleParallel to avoid the concurrent access to paraChildren 2.rename subfetcher.children to subfetcher.paraChildren 3.use subfetcher.pendingSize to replace totalSize & processedIndex 4.randomly select the start child to avoid always feed the first one 5.increase threshold and capacity to avoid create too many child routine --- core/state/trie_prefetcher.go | 86 ++++++++++++++++++----------------- 1 file changed, 44 insertions(+), 42 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 06ab00678b..1060e33654 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -28,8 +28,8 @@ import ( const ( abortChanSize = 64 concurrentChanSize = 10 - parallelTriePrefetchThreshold = 50 - parallelTriePrefetchCapacity = 100 + parallelTriePrefetchThreshold = 100 + parallelTriePrefetchCapacity = 200 ) var ( @@ -115,13 +115,19 @@ func (p *triePrefetcher) mainLoop() { p.fetchersMutex.Unlock() } fetcher.schedule(pMsg.keys) + + // fecther could be interrupted by call to trie() or close() + if fetcher.interrupted { + continue + } // no need to run parallel trie prefetch if threshold is not reached. - if fetcher.pendingSize() > parallelTriePrefetchThreshold { + if atomic.LoadUint32(&fetcher.pendingSize) > parallelTriePrefetchThreshold { fetcher.scheduleParallel(pMsg.keys) } case <-p.closeMainChan: for _, fetcher := range p.fetchers { + fetcher.interrupted = true p.abortChan <- fetcher // safe to do multiple times <-fetcher.term if metrics.EnabledExpensive { @@ -173,13 +179,10 @@ func (p *triePrefetcher) abortLoop() { case fetcher := <-p.abortChan: fetcher.abort() // stop fetcher's parallel children - fetcher.childrenLock.Lock() - children := fetcher.children - fetcher.children = nil - fetcher.childrenLock.Unlock() - for _, child := range children { + for _, child := range fetcher.paraChildren { child.abort() } + fetcher.paraChildren = nil case <-p.closeAbortChan: return } @@ -276,6 +279,7 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { p.deliveryMissMeter.Mark(1) return nil } + fetcher.interrupted = true // Interrupt the prefetcher if it's by any chance still running and return // a copy of any pre-loaded trie. @@ -324,10 +328,8 @@ type subfetcher struct { root common.Hash // Root hash of the trie to prefetch trie Trie // Trie being populated with nodes - tasks [][]byte // Items queued up for retrieval - lock sync.Mutex // Lock protecting the task queue - totalSize uint32 - processedIndex uint32 + tasks [][]byte // Items queued up for retrieval + lock sync.Mutex // Lock protecting the task queue wake chan struct{} // Wake channel if a new task is scheduled stop chan struct{} // Channel to interrupt processing @@ -338,9 +340,11 @@ type subfetcher struct { dups int // Number of duplicate preload tasks used [][]byte // Tracks the entries used in the end - accountHash common.Hash - children []*subfetcher - childrenLock sync.Mutex + accountHash common.Hash + + interrupted bool + pendingSize uint32 + paraChildren []*subfetcher // Parallel trie prefetch for address of massive change } // newSubfetcher creates a goroutine to prefetch state items belonging to a @@ -360,10 +364,6 @@ func newSubfetcher(db Database, root common.Hash, accountHash common.Hash) *subf return sf } -func (sf *subfetcher) pendingSize() uint32 { - return sf.totalSize - atomic.LoadUint32(&sf.processedIndex) -} - // schedule adds a batch of trie keys to the queue to prefetch. func (sf *subfetcher) schedule(keys [][]byte) { // Append the tasks to the current queue @@ -375,33 +375,38 @@ func (sf *subfetcher) schedule(keys [][]byte) { case sf.wake <- struct{}{}: default: } - sf.totalSize += uint32(len(keys)) + atomic.AddUint32(&sf.pendingSize, uint32(len(keys))) } func (sf *subfetcher) scheduleParallel(keys [][]byte) { - // To feed the children first, if they are hungry. - // A child can handle keys with capacity of parallelTriePrefetchCapacity. - var curKeyIndex uint32 = 0 - for _, child := range sf.children { - feedNum := parallelTriePrefetchCapacity - child.pendingSize() - if feedNum == 0 { // the child is full, can't process more tasks - continue - } - if curKeyIndex+feedNum > uint32(len(keys)) { - feedNum = uint32(len(keys)) - curKeyIndex - } - child.schedule(keys[curKeyIndex : curKeyIndex+feedNum]) - curKeyIndex += feedNum - if curKeyIndex == uint32(len(keys)) { - return // the new arrived keys were all consumed by children. + var keyIndex uint32 = 0 + childrenNum := len(sf.paraChildren) + if childrenNum > 0 { + // To feed the children first, if they are hungry. + // A child can handle keys with capacity of parallelTriePrefetchCapacity. + startIndex := len(keys) % childrenNum // randomly select the start child to avoid always feed the first one + for i := 0; i < childrenNum; i++ { + child := sf.paraChildren[startIndex] + startIndex = (startIndex + 1) % childrenNum + feedNum := parallelTriePrefetchCapacity - atomic.LoadUint32(&child.pendingSize) + if feedNum == 0 { // the child is full, can't process more tasks + continue + } + if keyIndex+feedNum > uint32(len(keys)) { + feedNum = uint32(len(keys)) - keyIndex + } + child.schedule(keys[keyIndex : keyIndex+feedNum]) + keyIndex += feedNum + if keyIndex == uint32(len(keys)) { + return // the new arrived keys were all consumed by children. + } } } // Children did not comsume all the keys, to create new subfetch to handle left keys. - keysLeft := keys[curKeyIndex:] + keysLeft := keys[keyIndex:] // the pending tasks exceed the threshold and have not been consumed up by its children dispatchSize := len(keysLeft) - children := []*subfetcher{} for i := 0; i*parallelTriePrefetchCapacity < dispatchSize; i++ { child := newSubfetcher(sf.db, sf.root, sf.accountHash) endIndex := (i + 1) * parallelTriePrefetchCapacity @@ -409,11 +414,8 @@ func (sf *subfetcher) scheduleParallel(keys [][]byte) { endIndex = dispatchSize } child.schedule(keysLeft[i*parallelTriePrefetchCapacity : endIndex]) - children = append(children, child) + sf.paraChildren = append(sf.paraChildren, child) } - sf.childrenLock.Lock() - sf.children = append(sf.children, children...) - sf.childrenLock.Unlock() } // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it @@ -510,7 +512,7 @@ func (sf *subfetcher) loop() { sf.trie.TryGet(task) sf.seen[string(task)] = struct{}{} } - atomic.AddUint32(&sf.processedIndex, 1) + atomic.AddUint32(&sf.pendingSize, ^uint32(0)) // decrease } } From c21562dbae641d02989da86a59b597a14af8a4ee Mon Sep 17 00:00:00 2001 From: setunapo Date: Tue, 28 Jun 2022 10:29:09 +0800 Subject: [PATCH 04/11] fix review comments ** nil check refine ** create a separate routine for From/To prefetch, avoid blocking the cirtical path --- core/state/statedb.go | 5 ++++- core/state_processor.go | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/state/statedb.go b/core/state/statedb.go index 83631af8ee..9cd33f4f87 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -237,6 +237,9 @@ func (s *StateDB) StopPrefetcher() { } func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) { + if s.prefetcher == nil { + return + } accounts := make(map[common.Address]struct{}, block.Transactions().Len()<<1) for _, tx := range block.Transactions() { from, err := types.Sender(signer, tx) @@ -254,7 +257,7 @@ func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure } - if s.prefetcher != nil && len(addressesToPrefetch) > 0 { + if len(addressesToPrefetch) > 0 { s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr) } } diff --git a/core/state_processor.go b/core/state_processor.go index 4aaecffd32..29e9b3f10d 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -408,7 +408,10 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg statedb.MarkFullProcessed() // do trie prefetch for the big state trie tree in advance based transaction's From/To address. - statedb.TriePrefetchInAdvance(block, signer) + go func() { + // trie prefetcher is thread safe now, ok now to prefetch in a separate routine + statedb.TriePrefetchInAdvance(block, signer) + }() // usually do have two tx, one for validator set contract, another for system reward contract. systemTxs := make([]*types.Transaction, 0, 2) From 1f292f4932705a85d7ee47aaf83d9d80acd5cf10 Mon Sep 17 00:00:00 2001 From: setunapo Date: Thu, 7 Jul 2022 17:50:42 +0800 Subject: [PATCH 05/11] remove the interrupt member --- core/state/trie_prefetcher.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 1060e33654..3bc0fea4eb 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -116,18 +116,17 @@ func (p *triePrefetcher) mainLoop() { } fetcher.schedule(pMsg.keys) - // fecther could be interrupted by call to trie() or close() - if fetcher.interrupted { - continue - } - // no need to run parallel trie prefetch if threshold is not reached. - if atomic.LoadUint32(&fetcher.pendingSize) > parallelTriePrefetchThreshold { - fetcher.scheduleParallel(pMsg.keys) + select { + case <-fetcher.term: + default: + // no need to run parallel trie prefetch if threshold is not reached. + if atomic.LoadUint32(&fetcher.pendingSize) > parallelTriePrefetchThreshold { + fetcher.scheduleParallel(pMsg.keys) + } } case <-p.closeMainChan: for _, fetcher := range p.fetchers { - fetcher.interrupted = true p.abortChan <- fetcher // safe to do multiple times <-fetcher.term if metrics.EnabledExpensive { @@ -279,7 +278,6 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { p.deliveryMissMeter.Mark(1) return nil } - fetcher.interrupted = true // Interrupt the prefetcher if it's by any chance still running and return // a copy of any pre-loaded trie. @@ -342,7 +340,6 @@ type subfetcher struct { accountHash common.Hash - interrupted bool pendingSize uint32 paraChildren []*subfetcher // Parallel trie prefetch for address of massive change } From ced98db12193353807d23074685b4d8b1e63c914 Mon Sep 17 00:00:00 2001 From: setunapo Date: Thu, 7 Jul 2022 18:13:31 +0800 Subject: [PATCH 06/11] not create a signer for each transaction --- core/state_processor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/state_processor.go b/core/state_processor.go index 29e9b3f10d..18682c4ad9 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -406,7 +406,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg // initialise bloom processors bloomProcessors := NewAsyncReceiptBloomGenerator(txNum) statedb.MarkFullProcessed() - + signer := types.MakeSigner(p.config, header.Number) // do trie prefetch for the big state trie tree in advance based transaction's From/To address. go func() { // trie prefetcher is thread safe now, ok now to prefetch in a separate routine @@ -427,7 +427,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg } } - msg, err := tx.AsMessage(types.MakeSigner(p.config, header.Number), header.BaseFee) + msg, err := tx.AsMessage(signer, header.BaseFee) if err != nil { bloomProcessors.Close() return statedb, nil, nil, 0, err From 95abbf8caaa98bc7f24960bef0f302e89941fb42 Mon Sep 17 00:00:00 2001 From: setunapo Date: Fri, 8 Jul 2022 11:01:28 +0800 Subject: [PATCH 07/11] some changes to triePrefetcher ** remove the abortLoop, move the subfetcher abort operation into mainLoop since we want to make subfetcher's create & schedule & abort within a loop to avoid concurrent access locks. ** no wait subfetcher's term signal in abort() it could speed up the close by closing subfetcher concurrently. we send stop signnal to all subfetchers in burst and wait their term signal later. --- core/state/trie_prefetcher.go | 67 +++++++++++++++++------------------ 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 3bc0fea4eb..86cdb15d33 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -54,15 +54,13 @@ type triePrefetcher struct { fetches map[common.Hash]Trie // Partially or fully fetcher tries fetchers map[common.Hash]*subfetcher // Subfetchers for each trie + abortChan chan *subfetcher // to abort a single subfetcher and its children closed int32 closeMainChan chan struct{} // it is to inform the mainLoop closeMainDoneChan chan struct{} fetchersMutex sync.RWMutex prefetchChan chan *prefetchMsg // no need to wait for return - abortChan chan *subfetcher - closeAbortChan chan struct{} // it is used to inform abortLoop - deliveryMissMeter metrics.Meter accountLoadMeter metrics.Meter accountDupMeter metrics.Meter @@ -78,11 +76,10 @@ type triePrefetcher struct { func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace p := &triePrefetcher{ - db: db, - root: root, - fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map - abortChan: make(chan *subfetcher, abortChanSize), - closeAbortChan: make(chan struct{}), + db: db, + root: root, + fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map + abortChan: make(chan *subfetcher, abortChanSize), closeMainChan: make(chan struct{}), closeMainDoneChan: make(chan struct{}), @@ -98,11 +95,13 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), } - go p.abortLoop() go p.mainLoop() return p } +// the subfetcher's lifecycle will only be updated in this loop, +// include: subfetcher's creation & abort, child subfetcher's creation & abort. +// since the mainLoop will handle all the requests, each message handle should be lightweight func (p *triePrefetcher) mainLoop() { for { select { @@ -114,21 +113,36 @@ func (p *triePrefetcher) mainLoop() { p.fetchers[pMsg.root] = fetcher p.fetchersMutex.Unlock() } - fetcher.schedule(pMsg.keys) - select { - case <-fetcher.term: + case <-fetcher.stop: default: + fetcher.schedule(pMsg.keys) // no need to run parallel trie prefetch if threshold is not reached. if atomic.LoadUint32(&fetcher.pendingSize) > parallelTriePrefetchThreshold { fetcher.scheduleParallel(pMsg.keys) } } + case fetcher := <-p.abortChan: + fetcher.abort() + for _, child := range fetcher.paraChildren { + child.abort() + } + case <-p.closeMainChan: for _, fetcher := range p.fetchers { - p.abortChan <- fetcher // safe to do multiple times + fetcher.abort() // safe to do multiple times + for _, child := range fetcher.paraChildren { + child.abort() + } + } + // make sure all subfetchers and child subfetchers are stopped + for _, fetcher := range p.fetchers { <-fetcher.term + for _, child := range fetcher.paraChildren { + <-child.term + } + if metrics.EnabledExpensive { if fetcher.root == p.root { p.accountLoadMeter.Mark(int64(len(fetcher.seen))) @@ -154,7 +168,6 @@ func (p *triePrefetcher) mainLoop() { } } } - close(p.closeAbortChan) close(p.closeMainDoneChan) p.fetchersMutex.Lock() p.fetchers = nil @@ -172,22 +185,6 @@ func (p *triePrefetcher) mainLoop() { } } -func (p *triePrefetcher) abortLoop() { - for { - select { - case fetcher := <-p.abortChan: - fetcher.abort() - // stop fetcher's parallel children - for _, child := range fetcher.paraChildren { - child.abort() - } - fetcher.paraChildren = nil - case <-p.closeAbortChan: - return - } - } -} - // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { @@ -224,7 +221,8 @@ func (p *triePrefetcher) copy() *triePrefetcher { select { case <-p.closeMainChan: - // for closed trie prefetcher, the fetches should not be nil + // for closed trie prefetcher, fetchers is empty + // but the fetches should not be nil, since fetches is used to check if it is a copied inactive one. fetcherCopied := &triePrefetcher{ db: p.db, root: p.root, @@ -271,6 +269,7 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { return p.db.CopyTrie(trie) } + // use lock instead of request to mainLoop by chan to get the fetcher for performance concern. p.fetchersMutex.RLock() fetcher := p.fetchers[root] p.fetchersMutex.RUnlock() @@ -282,8 +281,8 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { // Interrupt the prefetcher if it's by any chance still running and return // a copy of any pre-loaded trie. select { - case <-p.closeAbortChan: - case p.abortChan <- fetcher: // safe to do multiple times + case <-p.closeMainChan: + case p.abortChan <- fetcher: // safe to abort a fecther multiple times } trie := fetcher.peek() @@ -441,7 +440,7 @@ func (sf *subfetcher) abort() { default: close(sf.stop) } - <-sf.term + // no need to wait <-sf.term here, will check sf.term later } // loop waits for new tasks to be scheduled and keeps loading them until it runs From 065c0b8a7001ed7df89abe81c1737334b08fd32e Mon Sep 17 00:00:00 2001 From: setunapo Date: Fri, 8 Jul 2022 15:47:02 +0800 Subject: [PATCH 08/11] some coding improve for subfetcher.scheduleParallel --- core/state/trie_prefetcher.go | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 86cdb15d33..667eac141d 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -380,37 +380,36 @@ func (sf *subfetcher) scheduleParallel(keys [][]byte) { if childrenNum > 0 { // To feed the children first, if they are hungry. // A child can handle keys with capacity of parallelTriePrefetchCapacity. - startIndex := len(keys) % childrenNum // randomly select the start child to avoid always feed the first one + childIndex := len(keys) % childrenNum // randomly select the start child to avoid always feed the first one for i := 0; i < childrenNum; i++ { - child := sf.paraChildren[startIndex] - startIndex = (startIndex + 1) % childrenNum - feedNum := parallelTriePrefetchCapacity - atomic.LoadUint32(&child.pendingSize) - if feedNum == 0 { // the child is full, can't process more tasks + child := sf.paraChildren[childIndex] + childIndex = (childIndex + 1) % childrenNum + if atomic.LoadUint32(&child.pendingSize) >= parallelTriePrefetchCapacity { + // the child is already full, skip it continue } - if keyIndex+feedNum > uint32(len(keys)) { - feedNum = uint32(len(keys)) - keyIndex + feedNum := parallelTriePrefetchCapacity - atomic.LoadUint32(&child.pendingSize) + if keyIndex+feedNum >= uint32(len(keys)) { + // the new arrived keys are all consumed by children. + child.schedule(keys[keyIndex:]) + return } child.schedule(keys[keyIndex : keyIndex+feedNum]) keyIndex += feedNum - if keyIndex == uint32(len(keys)) { - return // the new arrived keys were all consumed by children. - } } } // Children did not comsume all the keys, to create new subfetch to handle left keys. keysLeft := keys[keyIndex:] - - // the pending tasks exceed the threshold and have not been consumed up by its children - dispatchSize := len(keysLeft) - for i := 0; i*parallelTriePrefetchCapacity < dispatchSize; i++ { + keysLeftSize := len(keysLeft) + for i := 0; i*parallelTriePrefetchCapacity < keysLeftSize; i++ { child := newSubfetcher(sf.db, sf.root, sf.accountHash) + sf.paraChildren = append(sf.paraChildren, child) endIndex := (i + 1) * parallelTriePrefetchCapacity - if endIndex > dispatchSize { - endIndex = dispatchSize + if endIndex >= keysLeftSize { + child.schedule(keysLeft[i*parallelTriePrefetchCapacity:]) + return } child.schedule(keysLeft[i*parallelTriePrefetchCapacity : endIndex]) - sf.paraChildren = append(sf.paraChildren, child) } } From 93cfb3d8902df7df938b18becb81b64c9517ab11 Mon Sep 17 00:00:00 2001 From: setunapo Date: Fri, 8 Jul 2022 17:34:12 +0800 Subject: [PATCH 09/11] fix a UT crash of s.prefetcher == nil --- core/state/statedb.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/state/statedb.go b/core/state/statedb.go index 9cd33f4f87..0fa2e1faa5 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -237,7 +237,11 @@ func (s *StateDB) StopPrefetcher() { } func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) { - if s.prefetcher == nil { + s.prefetcherLock.Lock() + prefetcher := s.prefetcher // s.prefetcher could be resetted to nil + s.prefetcherLock.Unlock() + + if prefetcher == nil { return } accounts := make(map[common.Address]struct{}, block.Transactions().Len()<<1) @@ -258,7 +262,7 @@ func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) } if len(addressesToPrefetch) > 0 { - s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr) + prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr) } } From 1c2d37831db80f248ea3bdbe8d6d5e26d4daacfc Mon Sep 17 00:00:00 2001 From: setunapo Date: Tue, 12 Jul 2022 18:34:07 +0800 Subject: [PATCH 10/11] update parallel trie prefetcher configuration tested with different combination of parallelTriePrefetchThreshold & parallelTriePrefetchCapacity, found the most efficient configure could be: parallelTriePrefetchThreshold = 10 parallelTriePrefetchCapacity = 20 --- core/state/trie_prefetcher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 667eac141d..3ab097ab43 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -28,8 +28,8 @@ import ( const ( abortChanSize = 64 concurrentChanSize = 10 - parallelTriePrefetchThreshold = 100 - parallelTriePrefetchCapacity = 200 + parallelTriePrefetchThreshold = 10 + parallelTriePrefetchCapacity = 20 ) var ( From 84d281f1545d41ac84d3e4643f7d41923b282f27 Mon Sep 17 00:00:00 2001 From: setunapo Date: Thu, 14 Jul 2022 16:14:42 +0800 Subject: [PATCH 11/11] fix review comments: code refine --- core/blockchain.go | 10 +++++++++- core/state/trie_prefetcher.go | 2 +- core/state_processor.go | 5 ----- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 89059792f0..2a509f9194 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1889,10 +1889,18 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) interruptCh := make(chan struct{}) // For diff sync, it may fallback to full sync, so we still do prefetch if len(block.Transactions()) >= prefetchTxNumber { - throwaway := statedb.Copy() // do Prefetch in a separate goroutine to avoid blocking the critical path + + // 1.do state prefetch for snapshot cache + throwaway := statedb.Copy() go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh) + + // 2.do trie prefetch for MPT trie node cache + // it is for the big state trie tree, prefetch based on transaction's From/To address. + // trie prefetcher is thread safe now, ok to prefetch in a separate routine + go statedb.TriePrefetchInAdvance(block, signer) } + //Process block using the parent state as reference point substart := time.Now() if bc.pipeCommit { diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 3ab097ab43..9004f489ee 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -398,7 +398,7 @@ func (sf *subfetcher) scheduleParallel(keys [][]byte) { keyIndex += feedNum } } - // Children did not comsume all the keys, to create new subfetch to handle left keys. + // Children did not consume all the keys, to create new subfetch to handle left keys. keysLeft := keys[keyIndex:] keysLeftSize := len(keysLeft) for i := 0; i*parallelTriePrefetchCapacity < keysLeftSize; i++ { diff --git a/core/state_processor.go b/core/state_processor.go index 18682c4ad9..b42938adf9 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -407,11 +407,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg bloomProcessors := NewAsyncReceiptBloomGenerator(txNum) statedb.MarkFullProcessed() signer := types.MakeSigner(p.config, header.Number) - // do trie prefetch for the big state trie tree in advance based transaction's From/To address. - go func() { - // trie prefetcher is thread safe now, ok now to prefetch in a separate routine - statedb.TriePrefetchInAdvance(block, signer) - }() // usually do have two tx, one for validator set contract, another for system reward contract. systemTxs := make([]*types.Transaction, 0, 2)