Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R][Feature]: Improve trie prefetch #952

Merged
merged 11 commits into from
Jul 15, 2022
30 changes: 30 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,36 @@ func (s *StateDB) StopPrefetcher() {
}
}

func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) {
s.prefetcherLock.Lock()
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
prefetcher := s.prefetcher // s.prefetcher could be resetted to nil
s.prefetcherLock.Unlock()

if prefetcher == nil {
return
}
accounts := make(map[common.Address]struct{}, block.Transactions().Len()<<1)
for _, tx := range block.Transactions() {
from, err := types.Sender(signer, tx)
if err != nil {
// invalid block, skip prefetch
return
}
accounts[from] = struct{}{}
if tx.To() != nil {
accounts[*tx.To()] = struct{}{}
}
}
addressesToPrefetch := make([][]byte, 0, len(accounts))
for addr := range accounts {
addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure
}

if len(addressesToPrefetch) > 0 {
prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr)
}
}

// Mark that the block is processed by diff layer
func (s *StateDB) SetExpectedStateRoot(root common.Hash) {
s.expectedRoot = root
Expand Down
118 changes: 88 additions & 30 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
)

const (
abortChanSize = 64
concurrentChanSize = 10
abortChanSize = 64
concurrentChanSize = 10
parallelTriePrefetchThreshold = 10
parallelTriePrefetchCapacity = 20
)

var (
Expand All @@ -52,15 +54,13 @@ type triePrefetcher struct {
fetches map[common.Hash]Trie // Partially or fully fetcher tries
fetchers map[common.Hash]*subfetcher // Subfetchers for each trie

abortChan chan *subfetcher // to abort a single subfetcher and its children
closed int32
closeMainChan chan struct{} // it is to inform the mainLoop
closeMainDoneChan chan struct{}
fetchersMutex sync.RWMutex
prefetchChan chan *prefetchMsg // no need to wait for return

abortChan chan *subfetcher
closeAbortChan chan struct{} // it is used to inform abortLoop

deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
Expand All @@ -76,11 +76,10 @@ type triePrefetcher struct {
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),
closeAbortChan: make(chan struct{}),
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),

closeMainChan: make(chan struct{}),
closeMainDoneChan: make(chan struct{}),
Expand All @@ -96,11 +95,13 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
go p.abortLoop()
go p.mainLoop()
return p
}

// the subfetcher's lifecycle will only be updated in this loop,
// include: subfetcher's creation & abort, child subfetcher's creation & abort.
// since the mainLoop will handle all the requests, each message handle should be lightweight
func (p *triePrefetcher) mainLoop() {
for {
select {
Expand All @@ -112,12 +113,36 @@ func (p *triePrefetcher) mainLoop() {
p.fetchers[pMsg.root] = fetcher
p.fetchersMutex.Unlock()
}
fetcher.schedule(pMsg.keys)
select {
case <-fetcher.stop:
default:
fetcher.schedule(pMsg.keys)
// no need to run parallel trie prefetch if threshold is not reached.
if atomic.LoadUint32(&fetcher.pendingSize) > parallelTriePrefetchThreshold {
fetcher.scheduleParallel(pMsg.keys)
}
}

case fetcher := <-p.abortChan:
fetcher.abort()
for _, child := range fetcher.paraChildren {
child.abort()
}

case <-p.closeMainChan:
for _, fetcher := range p.fetchers {
p.abortChan <- fetcher // safe to do multiple times
fetcher.abort() // safe to do multiple times
for _, child := range fetcher.paraChildren {
child.abort()
}
}
// make sure all subfetchers and child subfetchers are stopped
for _, fetcher := range p.fetchers {
<-fetcher.term
for _, child := range fetcher.paraChildren {
<-child.term
}

if metrics.EnabledExpensive {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
Expand All @@ -143,7 +168,6 @@ func (p *triePrefetcher) mainLoop() {
}
}
}
close(p.closeAbortChan)
close(p.closeMainDoneChan)
p.fetchersMutex.Lock()
p.fetchers = nil
Expand All @@ -161,17 +185,6 @@ func (p *triePrefetcher) mainLoop() {
}
setunapo marked this conversation as resolved.
Show resolved Hide resolved
}

func (p *triePrefetcher) abortLoop() {
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
case <-p.closeAbortChan:
return
}
}
}

// close iterates over all the subfetchers, aborts any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
Expand Down Expand Up @@ -208,7 +221,8 @@ func (p *triePrefetcher) copy() *triePrefetcher {

select {
case <-p.closeMainChan:
// for closed trie prefetcher, the fetches should not be nil
// for closed trie prefetcher, fetchers is empty
// but the fetches should not be nil, since fetches is used to check if it is a copied inactive one.
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
Expand Down Expand Up @@ -255,6 +269,7 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
return p.db.CopyTrie(trie)
}

// use lock instead of request to mainLoop by chan to get the fetcher for performance concern.
p.fetchersMutex.RLock()
fetcher := p.fetchers[root]
p.fetchersMutex.RUnlock()
Expand All @@ -266,8 +281,8 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
// Interrupt the prefetcher if it's by any chance still running and return
// a copy of any pre-loaded trie.
select {
case <-p.closeAbortChan:
case p.abortChan <- fetcher: // safe to do multiple times
case <-p.closeMainChan:
case p.abortChan <- fetcher: // safe to abort a fecther multiple times
}

trie := fetcher.peek()
Expand Down Expand Up @@ -323,6 +338,9 @@ type subfetcher struct {
used [][]byte // Tracks the entries used in the end

accountHash common.Hash

pendingSize uint32
paraChildren []*subfetcher // Parallel trie prefetch for address of massive change
}

// newSubfetcher creates a goroutine to prefetch state items belonging to a
Expand All @@ -348,12 +366,51 @@ func (sf *subfetcher) schedule(keys [][]byte) {
sf.lock.Lock()
sf.tasks = append(sf.tasks, keys...)
sf.lock.Unlock()

// Notify the prefetcher, it's fine if it's already terminated
select {
case sf.wake <- struct{}{}:
default:
}
atomic.AddUint32(&sf.pendingSize, uint32(len(keys)))
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
}

func (sf *subfetcher) scheduleParallel(keys [][]byte) {
var keyIndex uint32 = 0
childrenNum := len(sf.paraChildren)
if childrenNum > 0 {
// To feed the children first, if they are hungry.
// A child can handle keys with capacity of parallelTriePrefetchCapacity.
childIndex := len(keys) % childrenNum // randomly select the start child to avoid always feed the first one
for i := 0; i < childrenNum; i++ {
child := sf.paraChildren[childIndex]
childIndex = (childIndex + 1) % childrenNum
if atomic.LoadUint32(&child.pendingSize) >= parallelTriePrefetchCapacity {
// the child is already full, skip it
continue
}
feedNum := parallelTriePrefetchCapacity - atomic.LoadUint32(&child.pendingSize)
if keyIndex+feedNum >= uint32(len(keys)) {
// the new arrived keys are all consumed by children.
child.schedule(keys[keyIndex:])
return
}
child.schedule(keys[keyIndex : keyIndex+feedNum])
keyIndex += feedNum
}
}
// Children did not comsume all the keys, to create new subfetch to handle left keys.
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
keysLeft := keys[keyIndex:]
keysLeftSize := len(keysLeft)
for i := 0; i*parallelTriePrefetchCapacity < keysLeftSize; i++ {
child := newSubfetcher(sf.db, sf.root, sf.accountHash)
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
sf.paraChildren = append(sf.paraChildren, child)
endIndex := (i + 1) * parallelTriePrefetchCapacity
if endIndex >= keysLeftSize {
child.schedule(keysLeft[i*parallelTriePrefetchCapacity:])
return
}
child.schedule(keysLeft[i*parallelTriePrefetchCapacity : endIndex])
}
}

// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
Expand Down Expand Up @@ -382,7 +439,7 @@ func (sf *subfetcher) abort() {
default:
close(sf.stop)
}
<-sf.term
// no need to wait <-sf.term here, will check sf.term later
}

// loop waits for new tasks to be scheduled and keeps loading them until it runs
Expand Down Expand Up @@ -450,6 +507,7 @@ func (sf *subfetcher) loop() {
sf.trie.TryGet(task)
sf.seen[string(task)] = struct{}{}
}
atomic.AddUint32(&sf.pendingSize, ^uint32(0)) // decrease
}
}

Expand Down
10 changes: 8 additions & 2 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,15 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
posa, isPoSA := p.engine.(consensus.PoSA)
commonTxs := make([]*types.Transaction, 0, txNum)

// initilise bloom processors
// initialise bloom processors
bloomProcessors := NewAsyncReceiptBloomGenerator(txNum)
statedb.MarkFullProcessed()
signer := types.MakeSigner(p.config, header.Number)
// do trie prefetch for the big state trie tree in advance based transaction's From/To address.
go func() {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
// trie prefetcher is thread safe now, ok now to prefetch in a separate routine
statedb.TriePrefetchInAdvance(block, signer)
}()

// usually do have two tx, one for validator set contract, another for system reward contract.
systemTxs := make([]*types.Transaction, 0, 2)
Expand All @@ -421,7 +427,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
}
}

msg, err := tx.AsMessage(types.MakeSigner(p.config, header.Number), header.BaseFee)
msg, err := tx.AsMessage(signer, header.BaseFee)
if err != nil {
bloomProcessors.Close()
return statedb, nil, nil, 0, err
Expand Down