diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index c8667deae3..116302001d 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -25,6 +25,8 @@ import ( "github.com/ethereum/go-ethereum/metrics" ) +const abortChanSize = 64 + var ( // triePrefetchMetricsPrefix is the prefix under which to publis the metrics. triePrefetchMetricsPrefix = "trie/prefetch/" @@ -41,6 +43,9 @@ type triePrefetcher struct { fetches map[common.Hash]Trie // Partially or fully fetcher tries fetchers map[common.Hash]*subfetcher // Subfetchers for each trie + abortChan chan *subfetcher + closeChan chan struct{} + deliveryMissMeter metrics.Meter accountLoadMeter metrics.Meter accountDupMeter metrics.Meter @@ -56,9 +61,11 @@ type triePrefetcher struct { func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace p := &triePrefetcher{ - db: db, - root: root, - fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map + db: db, + root: root, + fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map + abortChan: make(chan *subfetcher, abortChanSize), + closeChan: make(chan struct{}), deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), @@ -70,14 +77,34 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), } + go p.abortLoop() return p } +func (p *triePrefetcher) abortLoop() { + for { + select { + case fetcher := <-p.abortChan: + fetcher.abort() + case <-p.closeChan: + // drain fetcher channel + for { + select { + case fetcher := <-p.abortChan: + fetcher.abort() + default: + return + } + } + } + } +} + // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { for _, fetcher := range p.fetchers { - fetcher.abort() // safe to do multiple times + p.abortChan <- fetcher // safe to do multiple times if metrics.Enabled { if fetcher.root == p.root { @@ -101,6 +128,7 @@ func (p *triePrefetcher) close() { } } } + close(p.closeChan) // Clear out all fetchers (will crash on a second call, deliberate) p.fetchers = nil } @@ -174,7 +202,7 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { } // Interrupt the prefetcher if it's by any chance still running and return // a copy of any pre-loaded trie. - fetcher.abort() // safe to do multiple times + p.abortChan <- fetcher // safe to do multiple times trie := fetcher.peek() if trie == nil {