Skip to content

Commit

Permalink
some improvemnts of parallel trie prefetch implementation
Browse files Browse the repository at this point in the history
1.childrenLock is removed, since it is not necessary
  APIs of triePrefetcher is not thread safe, they should be used sequentially.
  A prefetch will be interrupted by trie() or clos(), so we only need mark it as
  interrupted and check before call scheduleParallel to avoid the concurrent access to paraChildren

2.rename subfetcher.children to subfetcher.paraChildren

3.use subfetcher.pendingSize to replace totalSize & processedIndex
  • Loading branch information
setunapo committed Jun 27, 2022
1 parent b37dc30 commit 56689a1
Showing 1 changed file with 32 additions and 40 deletions.
72 changes: 32 additions & 40 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,28 +91,21 @@ func (p *triePrefetcher) abortLoop() {
case fetcher := <-p.abortChan:
fetcher.abort()
// stop fetcher's parallel children
fetcher.childrenLock.Lock()
children := fetcher.children
fetcher.children = nil
fetcher.childrenLock.Unlock()
for _, child := range children {
for _, child := range fetcher.paraChildren {
child.abort()
}
fetcher.paraChildren = nil
case <-p.closeChan:
// drain fetcher channel
for {
select {
case fetcher := <-p.abortChan:

fetcher.abort()
// stop fetcher's parallel children
fetcher.childrenLock.Lock()
children := fetcher.children
fetcher.children = nil
fetcher.childrenLock.Unlock()
for _, child := range children {
for _, child := range fetcher.paraChildren {
child.abort()
}
fetcher.paraChildren = nil
default:
return
}
Expand All @@ -125,6 +118,7 @@ func (p *triePrefetcher) abortLoop() {
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
for _, fetcher := range p.fetchers {
fetcher.interrupted = true
p.abortChan <- fetcher // safe to do multiple times
<-fetcher.term
if metrics.EnabledExpensive {
Expand Down Expand Up @@ -201,11 +195,16 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c
p.fetchers[root] = fetcher
}
fetcher.schedule(keys)

// fecther could be interrupted by call to trie() or close()
if fetcher.interrupted {
return
}
// no need to run parallel trie prefetch if threshold is not reached.
if fetcher.pendingSize() <= parallelTriePrefetchThreshold {
if atomic.LoadUint32(&fetcher.pendingSize) <= parallelTriePrefetchThreshold {
return
}

// ok to do trie prefetch in parallel mode
fetcher.scheduleParallel(keys)
}

Expand All @@ -229,6 +228,7 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
}
// Interrupt the prefetcher if it's by any chance still running and return
// a copy of any pre-loaded trie.
fetcher.interrupted = true
p.abortChan <- fetcher // safe to do multiple times

trie := fetcher.peek()
Expand Down Expand Up @@ -256,10 +256,8 @@ type subfetcher struct {
root common.Hash // Root hash of the trie to prefetch
trie Trie // Trie being populated with nodes

tasks [][]byte // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue
totalSize uint32
processedIndex uint32
tasks [][]byte // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue

wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing
Expand All @@ -270,9 +268,11 @@ type subfetcher struct {
dups int // Number of duplicate preload tasks
used [][]byte // Tracks the entries used in the end

accountHash common.Hash
children []*subfetcher
childrenLock sync.Mutex
accountHash common.Hash

interrupted bool
pendingSize uint32
paraChildren []*subfetcher // Parallel trie prefetch for address of massive change
}

// newSubfetcher creates a goroutine to prefetch state items belonging to a
Expand All @@ -292,10 +292,6 @@ func newSubfetcher(db Database, root common.Hash, accountHash common.Hash) *subf
return sf
}

func (sf *subfetcher) pendingSize() uint32 {
return sf.totalSize - atomic.LoadUint32(&sf.processedIndex)
}

// schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(keys [][]byte) {
// Append the tasks to the current queue
Expand All @@ -307,45 +303,41 @@ func (sf *subfetcher) schedule(keys [][]byte) {
case sf.wake <- struct{}{}:
default:
}
sf.totalSize += uint32(len(keys))
atomic.AddUint32(&sf.pendingSize, uint32(len(keys)))
}

func (sf *subfetcher) scheduleParallel(keys [][]byte) {
// To feed the children first, if they are hungry.
// A child can handle keys with capacity of parallelTriePrefetchCapacity.
var curKeyIndex uint32 = 0
for _, child := range sf.children {
feedNum := parallelTriePrefetchCapacity - child.pendingSize()
var keyIndex uint32 = 0
for _, child := range sf.paraChildren {
feedNum := parallelTriePrefetchCapacity - atomic.LoadUint32(&child.pendingSize)
if feedNum == 0 { // the child is full, can't process more tasks
continue
}
if curKeyIndex+feedNum > uint32(len(keys)) {
feedNum = uint32(len(keys)) - curKeyIndex
if keyIndex+feedNum > uint32(len(keys)) {
feedNum = uint32(len(keys)) - keyIndex
}
child.schedule(keys[curKeyIndex : curKeyIndex+feedNum])
curKeyIndex += feedNum
if curKeyIndex == uint32(len(keys)) {
child.schedule(keys[keyIndex : keyIndex+feedNum])
keyIndex += feedNum
if keyIndex == uint32(len(keys)) {
return // the new arrived keys were all consumed by children.
}
}
// Children did not comsume all the keys, to create new subfetch to handle left keys.
keysLeft := keys[curKeyIndex:]
keysLeft := keys[keyIndex:]

// the pending tasks exceed the threshold and have not been consumed up by its children
dispatchSize := len(keysLeft)
children := []*subfetcher{}
for i := 0; i*parallelTriePrefetchCapacity < dispatchSize; i++ {
child := newSubfetcher(sf.db, sf.root, sf.accountHash)
endIndex := (i + 1) * parallelTriePrefetchCapacity
if endIndex > dispatchSize {
endIndex = dispatchSize
}
child.schedule(keysLeft[i*parallelTriePrefetchCapacity : endIndex])
children = append(children, child)
sf.paraChildren = append(sf.paraChildren, child)
}
sf.childrenLock.Lock()
sf.children = append(sf.children, children...)
sf.childrenLock.Unlock()
}

// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
Expand Down Expand Up @@ -442,7 +434,7 @@ func (sf *subfetcher) loop() {
sf.trie.TryGet(task)
sf.seen[taskid] = struct{}{}
}
atomic.AddUint32(&sf.processedIndex, 1)
atomic.AddUint32(&sf.pendingSize, ^uint32(0)) // decrease
}
}

Expand Down

0 comments on commit 56689a1

Please sign in to comment.