Skip to content

Commit

Permalink
mvstates: opt async dep generation; (bnb-chain#38)
Browse files Browse the repository at this point in the history
mvstates: opt resolve dep logic;

Co-authored-by: galaio <galaio@users.noreply.github.com>
  • Loading branch information
galaio and galaio committed Aug 16, 2024
1 parent 5407300 commit 5cde1ce
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 39 deletions.
8 changes: 4 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1996,7 +1996,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return it.index, err
}

if bc.enableTxDAG && !bc.parallelExecution {
vtime := time.Since(vstart)
proctime := time.Since(start) // processing + validation

if bc.enableTxDAG && !bc.vmConfig.EnableParallelExec {
// compare input TxDAG when it enable in consensus
dag, err := statedb.ResolveTxDAG(len(block.Transactions()), []common.Address{block.Coinbase(), params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient})
if err == nil {
Expand All @@ -2017,9 +2020,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
}

vtime := time.Since(vstart)
proctime := time.Since(start) // processing + validation

// Update the metrics touched during block processing and validation
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing)
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing)
Expand Down
90 changes: 55 additions & 35 deletions core/types/mvstates.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"strings"
"sync"

"github.com/ethereum/go-ethereum/metrics"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/holiman/uint256"
Expand All @@ -33,7 +31,7 @@ const (
)

const (
asyncDepGenChanSize = 100
asyncDepGenChanSize = 10000
)

func AccountStateKey(account common.Address, state AccountState) RWKey {
Expand Down Expand Up @@ -111,8 +109,8 @@ type RWSet struct {
func NewRWSet(ver StateVersion) *RWSet {
return &RWSet{
ver: ver,
readSet: make(map[RWKey]*RWItem),
writeSet: make(map[RWKey]*RWItem),
readSet: make(map[RWKey]*RWItem, 64),
writeSet: make(map[RWKey]*RWItem, 32),
}
}

Expand Down Expand Up @@ -242,7 +240,7 @@ type PendingWrites struct {

func NewPendingWrites() *PendingWrites {
return &PendingWrites{
list: make([]*RWItem, 0),
list: make([]*RWItem, 0, 8),
}
}

Expand Down Expand Up @@ -309,8 +307,9 @@ type MVStates struct {
depsCache map[int][]uint64

// async dep analysis
depsGenChan chan int
stopChan chan struct{}
depsGenChan chan int
stopChan chan struct{}
asyncRunning bool

// execution stat infos
stats map[int]*ExeStat
Expand All @@ -328,16 +327,20 @@ func NewMVStates(txCount int) *MVStates {
}

func (s *MVStates) EnableAsyncDepGen() *MVStates {
s.lock.Lock()
defer s.lock.Unlock()
s.depsGenChan = make(chan int, asyncDepGenChanSize)
s.stopChan = make(chan struct{}, 1)
s.stopChan = make(chan struct{})
s.asyncRunning = true
go s.asyncDepGenLoop()
return s
}

func (s *MVStates) stopAsyncDepGen() {
if s.stopChan != nil {
s.stopChan <- struct{}{}
close(s.stopChan)
}
s.asyncRunning = false
}

func (s *MVStates) asyncDepGenLoop() {
Expand Down Expand Up @@ -403,12 +406,12 @@ func (s *MVStates) FulfillRWSet(rwSet *RWSet, stat *ExeStat) error {
s.stats[index] = stat
}

if metrics.EnabledExpensive {
for k := range rwSet.writeSet {
// this action is only for testing, it runs when enable expensive metrics.
checkRWSetInconsistent(index, k, rwSet.readSet, rwSet.writeSet)
}
}
//if metrics.EnabledExpensive {
// for k := range rwSet.writeSet {
// // this action is only for testing, it runs when enable expensive metrics.
// checkRWSetInconsistent(index, k, rwSet.readSet, rwSet.writeSet)
// }
//}
s.rwSets[index] = rwSet
return nil
}
Expand All @@ -417,14 +420,15 @@ func (s *MVStates) FulfillRWSet(rwSet *RWSet, stat *ExeStat) error {
func (s *MVStates) Finalise(index int) error {
log.Debug("Finalise", "total", len(s.rwSets), "index", index)
s.lock.Lock()
defer s.lock.Unlock()

rwSet := s.rwSets[index]
if rwSet == nil {
s.lock.Unlock()
return fmt.Errorf("finalise a non-exist RWSet, index: %d", index)
}

if index != s.nextFinaliseIndex {
s.lock.Unlock()
return fmt.Errorf("finalise in wrong order, next: %d, input: %d", s.nextFinaliseIndex, index)
}

Expand All @@ -436,34 +440,50 @@ func (s *MVStates) Finalise(index int) error {
s.pendingWriteSet[k].Append(v)
}
s.nextFinaliseIndex++
// async resolve dependency
if s.depsGenChan != nil {
go func() {
s.depsGenChan <- index
}()
s.lock.Unlock()
// async resolve dependency, but non-block action
if s.asyncRunning && s.depsGenChan != nil {
s.depsGenChan <- index
}
return nil
}

func (s *MVStates) resolveDepsCacheByWrites(index int, rwSet *RWSet) {
// analysis dep, if the previous transaction is not executed/validated, re-analysis is required
s.depMapCache[index] = NewTxDeps(0)
s.depMapCache[index] = NewTxDeps(8)
if rwSet.excludedTx {
return
}
seen := make(map[int]struct{})
for key := range rwSet.readSet {
// check self destruct
if key.IsAccountSelf() {
key = AccountStateKey(key.Addr(), AccountSuicide)
}
writes := s.pendingWriteSet[key]
if writes == nil {
continue
seen := make(map[int]struct{}, 8)
// check tx dependency, only check key, skip version
if len(s.pendingWriteSet) > len(rwSet.readSet) {
for key := range rwSet.readSet {
// check self destruct
if key.IsAccountSelf() {
key = AccountStateKey(key.Addr(), AccountSuicide)
}
writes := s.pendingWriteSet[key]
if writes == nil {
continue
}
items := writes.FindPrevWrites(index)
for _, item := range items {
seen[item.TxIndex()] = struct{}{}
}
}
items := writes.FindPrevWrites(index)
for _, item := range items {
seen[item.TxIndex()] = struct{}{}
} else {
for k, w := range s.pendingWriteSet {
// check suicide, add read address flag, it only for check suicide quickly, and cannot for other scenarios.
if k.IsAccountSuicide() {
k = k.ToAccountSelf()
}
if _, ok := rwSet.readSet[k]; !ok {
continue
}
items := w.FindPrevWrites(index)
for _, item := range items {
seen[item.TxIndex()] = struct{}{}
}
}
}
for prev := 0; prev < index; prev++ {
Expand Down

0 comments on commit 5cde1ce

Please sign in to comment.