Skip to content

Commit

Permalink
[Feature]: Improve trie prefetch (#952)
Browse files Browse the repository at this point in the history
* trie prefetcher for From/To address in advance

We found that trie prefetch could be not fast enough, especially trie prefetch of
the outer big state trie tree.
Instead of do trie prefetch until a transaction is finalized, we could do trie prefetch
in advance. Try to prefetch the trie node of the From/To accounts, since their root hash
are most likely to be changed.

* Parallel TriePrefetch for large trie update.

Currently, we create a subfetch for each account address to do trie prefetch. If the address
has very large state change, trie prefetch could be not fast enough, e.g. a contract modified
lots of KV pair or a large number of account's root hash is changed in a block.

With this commit, there will be children subfetcher created to do trie prefetch in parallell if
the parent subfetch's workload exceed the threshold.

* some improvemnts of parallel trie prefetch implementation

1.childrenLock is removed, since it is not necessary
  APIs of triePrefetcher is not thread safe, they should be used sequentially.
  A prefetch will be interrupted by trie() or clos(), so we only need mark it as
  interrupted and check before call scheduleParallel to avoid the concurrent access to paraChildren
2.rename subfetcher.children to subfetcher.paraChildren
3.use subfetcher.pendingSize to replace totalSize & processedIndex
4.randomly select the start child to avoid always feed the first one
5.increase threshold and capacity to avoid create too many child routine

* fix review comments

** nil check refine
** create a separate routine for From/To prefetch, avoid blocking the cirtical path

* remove the interrupt member

* not create a signer for each transaction

* some changes to triePrefetcher

** remove the abortLoop, move the subfetcher abort operation into mainLoop
   since we want to make subfetcher's create & schedule & abort within a loop to
   avoid concurrent access locks.

** no wait subfetcher's term signal in abort()
   it could speed up the close by closing subfetcher concurrently.
   we send stop signnal to all subfetchers in burst and wait their term signal later.

* some coding improve for subfetcher.scheduleParallel

* fix a UT crash of s.prefetcher == nil

* update parallel trie prefetcher configuration

tested with different combination of parallelTriePrefetchThreshold & parallelTriePrefetchCapacity,
found the most efficient configure could be:
  parallelTriePrefetchThreshold = 10
  parallelTriePrefetchCapacity  = 20

* fix review comments: code refine
  • Loading branch information
setunapo authored and brilliant-lx committed Aug 1, 2022
1 parent 51bfeca commit df3e1be
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 33 deletions.
10 changes: 9 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1889,10 +1889,18 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
interruptCh := make(chan struct{})
// For diff sync, it may fallback to full sync, so we still do prefetch
if len(block.Transactions()) >= prefetchTxNumber {
throwaway := statedb.Copy()
// do Prefetch in a separate goroutine to avoid blocking the critical path

// 1.do state prefetch for snapshot cache
throwaway := statedb.Copy()
go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh)

// 2.do trie prefetch for MPT trie node cache
// it is for the big state trie tree, prefetch based on transaction's From/To address.
// trie prefetcher is thread safe now, ok to prefetch in a separate routine
go statedb.TriePrefetchInAdvance(block, signer)
}

//Process block using the parent state as reference point
substart := time.Now()
if bc.pipeCommit {
Expand Down
30 changes: 30 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,36 @@ func (s *StateDB) StopPrefetcher() {
}
}

func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) {
s.prefetcherLock.Lock()
prefetcher := s.prefetcher // s.prefetcher could be resetted to nil
s.prefetcherLock.Unlock()

if prefetcher == nil {
return
}
accounts := make(map[common.Address]struct{}, block.Transactions().Len()<<1)
for _, tx := range block.Transactions() {
from, err := types.Sender(signer, tx)
if err != nil {
// invalid block, skip prefetch
return
}
accounts[from] = struct{}{}
if tx.To() != nil {
accounts[*tx.To()] = struct{}{}
}
}
addressesToPrefetch := make([][]byte, 0, len(accounts))
for addr := range accounts {
addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure
}

if len(addressesToPrefetch) > 0 {
prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr)
}
}

// Mark that the block is processed by diff layer
func (s *StateDB) SetExpectedStateRoot(root common.Hash) {
s.expectedRoot = root
Expand Down
118 changes: 88 additions & 30 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
)

const (
abortChanSize = 64
concurrentChanSize = 10
abortChanSize = 64
concurrentChanSize = 10
parallelTriePrefetchThreshold = 10
parallelTriePrefetchCapacity = 20
)

var (
Expand All @@ -52,15 +54,13 @@ type triePrefetcher struct {
fetches map[common.Hash]Trie // Partially or fully fetcher tries
fetchers map[common.Hash]*subfetcher // Subfetchers for each trie

abortChan chan *subfetcher // to abort a single subfetcher and its children
closed int32
closeMainChan chan struct{} // it is to inform the mainLoop
closeMainDoneChan chan struct{}
fetchersMutex sync.RWMutex
prefetchChan chan *prefetchMsg // no need to wait for return

abortChan chan *subfetcher
closeAbortChan chan struct{} // it is used to inform abortLoop

deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
Expand All @@ -76,11 +76,10 @@ type triePrefetcher struct {
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),
closeAbortChan: make(chan struct{}),
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),

closeMainChan: make(chan struct{}),
closeMainDoneChan: make(chan struct{}),
Expand All @@ -96,11 +95,13 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
go p.abortLoop()
go p.mainLoop()
return p
}

// the subfetcher's lifecycle will only be updated in this loop,
// include: subfetcher's creation & abort, child subfetcher's creation & abort.
// since the mainLoop will handle all the requests, each message handle should be lightweight
func (p *triePrefetcher) mainLoop() {
for {
select {
Expand All @@ -112,12 +113,36 @@ func (p *triePrefetcher) mainLoop() {
p.fetchers[pMsg.root] = fetcher
p.fetchersMutex.Unlock()
}
fetcher.schedule(pMsg.keys)
select {
case <-fetcher.stop:
default:
fetcher.schedule(pMsg.keys)
// no need to run parallel trie prefetch if threshold is not reached.
if atomic.LoadUint32(&fetcher.pendingSize) > parallelTriePrefetchThreshold {
fetcher.scheduleParallel(pMsg.keys)
}
}

case fetcher := <-p.abortChan:
fetcher.abort()
for _, child := range fetcher.paraChildren {
child.abort()
}

case <-p.closeMainChan:
for _, fetcher := range p.fetchers {
p.abortChan <- fetcher // safe to do multiple times
fetcher.abort() // safe to do multiple times
for _, child := range fetcher.paraChildren {
child.abort()
}
}
// make sure all subfetchers and child subfetchers are stopped
for _, fetcher := range p.fetchers {
<-fetcher.term
for _, child := range fetcher.paraChildren {
<-child.term
}

if metrics.EnabledExpensive {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
Expand All @@ -143,7 +168,6 @@ func (p *triePrefetcher) mainLoop() {
}
}
}
close(p.closeAbortChan)
close(p.closeMainDoneChan)
p.fetchersMutex.Lock()
p.fetchers = nil
Expand All @@ -161,17 +185,6 @@ func (p *triePrefetcher) mainLoop() {
}
}

func (p *triePrefetcher) abortLoop() {
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
case <-p.closeAbortChan:
return
}
}
}

// close iterates over all the subfetchers, aborts any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
Expand Down Expand Up @@ -208,7 +221,8 @@ func (p *triePrefetcher) copy() *triePrefetcher {

select {
case <-p.closeMainChan:
// for closed trie prefetcher, the fetches should not be nil
// for closed trie prefetcher, fetchers is empty
// but the fetches should not be nil, since fetches is used to check if it is a copied inactive one.
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
Expand Down Expand Up @@ -255,6 +269,7 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
return p.db.CopyTrie(trie)
}

// use lock instead of request to mainLoop by chan to get the fetcher for performance concern.
p.fetchersMutex.RLock()
fetcher := p.fetchers[root]
p.fetchersMutex.RUnlock()
Expand All @@ -266,8 +281,8 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
// Interrupt the prefetcher if it's by any chance still running and return
// a copy of any pre-loaded trie.
select {
case <-p.closeAbortChan:
case p.abortChan <- fetcher: // safe to do multiple times
case <-p.closeMainChan:
case p.abortChan <- fetcher: // safe to abort a fecther multiple times
}

trie := fetcher.peek()
Expand Down Expand Up @@ -323,6 +338,9 @@ type subfetcher struct {
used [][]byte // Tracks the entries used in the end

accountHash common.Hash

pendingSize uint32
paraChildren []*subfetcher // Parallel trie prefetch for address of massive change
}

// newSubfetcher creates a goroutine to prefetch state items belonging to a
Expand All @@ -348,12 +366,51 @@ func (sf *subfetcher) schedule(keys [][]byte) {
sf.lock.Lock()
sf.tasks = append(sf.tasks, keys...)
sf.lock.Unlock()

// Notify the prefetcher, it's fine if it's already terminated
select {
case sf.wake <- struct{}{}:
default:
}
atomic.AddUint32(&sf.pendingSize, uint32(len(keys)))
}

func (sf *subfetcher) scheduleParallel(keys [][]byte) {
var keyIndex uint32 = 0
childrenNum := len(sf.paraChildren)
if childrenNum > 0 {
// To feed the children first, if they are hungry.
// A child can handle keys with capacity of parallelTriePrefetchCapacity.
childIndex := len(keys) % childrenNum // randomly select the start child to avoid always feed the first one
for i := 0; i < childrenNum; i++ {
child := sf.paraChildren[childIndex]
childIndex = (childIndex + 1) % childrenNum
if atomic.LoadUint32(&child.pendingSize) >= parallelTriePrefetchCapacity {
// the child is already full, skip it
continue
}
feedNum := parallelTriePrefetchCapacity - atomic.LoadUint32(&child.pendingSize)
if keyIndex+feedNum >= uint32(len(keys)) {
// the new arrived keys are all consumed by children.
child.schedule(keys[keyIndex:])
return
}
child.schedule(keys[keyIndex : keyIndex+feedNum])
keyIndex += feedNum
}
}
// Children did not consume all the keys, to create new subfetch to handle left keys.
keysLeft := keys[keyIndex:]
keysLeftSize := len(keysLeft)
for i := 0; i*parallelTriePrefetchCapacity < keysLeftSize; i++ {
child := newSubfetcher(sf.db, sf.root, sf.accountHash)
sf.paraChildren = append(sf.paraChildren, child)
endIndex := (i + 1) * parallelTriePrefetchCapacity
if endIndex >= keysLeftSize {
child.schedule(keysLeft[i*parallelTriePrefetchCapacity:])
return
}
child.schedule(keysLeft[i*parallelTriePrefetchCapacity : endIndex])
}
}

// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
Expand Down Expand Up @@ -382,7 +439,7 @@ func (sf *subfetcher) abort() {
default:
close(sf.stop)
}
<-sf.term
// no need to wait <-sf.term here, will check sf.term later
}

// loop waits for new tasks to be scheduled and keeps loading them until it runs
Expand Down Expand Up @@ -450,6 +507,7 @@ func (sf *subfetcher) loop() {
sf.trie.TryGet(task)
sf.seen[string(task)] = struct{}{}
}
atomic.AddUint32(&sf.pendingSize, ^uint32(0)) // decrease
}
}

Expand Down
5 changes: 3 additions & 2 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,10 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
posa, isPoSA := p.engine.(consensus.PoSA)
commonTxs := make([]*types.Transaction, 0, txNum)

// initilise bloom processors
// initialise bloom processors
bloomProcessors := NewAsyncReceiptBloomGenerator(txNum)
statedb.MarkFullProcessed()
signer := types.MakeSigner(p.config, header.Number)

// usually do have two tx, one for validator set contract, another for system reward contract.
systemTxs := make([]*types.Transaction, 0, 2)
Expand All @@ -421,7 +422,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
}
}

msg, err := tx.AsMessage(types.MakeSigner(p.config, header.Number), header.BaseFee)
msg, err := tx.AsMessage(signer, header.BaseFee)
if err != nil {
bloomProcessors.Close()
return statedb, nil, nil, 0, err
Expand Down

0 comments on commit df3e1be

Please sign in to comment.