Skip to content

Commit

Permalink
txdag: opt TxDAG rwset collecting & generating; (bnb-chain#19)
Browse files Browse the repository at this point in the history
* txdag: opt some logic;

txdag: opt rw set collect logic;

* pevm: opt logs;

* txdag: opt txdag encoding, reduce rlp size;

---------

Co-authored-by: galaio <galaio@users.noreply.github.com>
  • Loading branch information
2 people authored and sunny2022da committed Aug 13, 2024
1 parent 7630e2a commit ef5ef54
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 96 deletions.
5 changes: 4 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2824,6 +2824,7 @@ func (bc *BlockChain) TxDAGEnabled() bool {
}

func (bc *BlockChain) SetupTxDAGGeneration(output string) {
log.Info("node enable TxDAG feature", "output", output)
bc.enableTxDAG = true
if len(output) == 0 {
return
Expand All @@ -2834,10 +2835,11 @@ func (bc *BlockChain) SetupTxDAGGeneration(output string) {
if err != nil {
log.Error("read TxDAG err", "err", err)
}
log.Info("load TxDAG from file", "output", output, "count", len(bc.txDAGMapping))

// write handler
go func() {
writeHandle, err := os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
writeHandle, err := os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.ModePerm)
if err != nil {
log.Error("OpenFile when open the txDAG output file", "file", output)
return
Expand Down Expand Up @@ -2877,6 +2879,7 @@ func writeTxDAGToFile(writeHandle *os.File, item TxDAGOutputItem) error {
return err
}

// TODO(galaio): support load with segments, every segment 100000 blocks?
func readTxDAGMappingFromFile(output string) (map[uint64]types.TxDAG, error) {
file, err := os.Open(output)
if err != nil {
Expand Down
16 changes: 15 additions & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4727,18 +4727,32 @@ func TestTxDAGFile_ReadWrite(t *testing.T) {
1: makeEmptyPlainTxDAG(1),
2: makeEmptyPlainTxDAG(2),
}
writeFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
writeFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.ModePerm)
require.NoError(t, err)
for num, dag := range except {
require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: dag}))
}
writeFile.Close()

except2 := map[uint64]types.TxDAG{
3: types.NewEmptyTxDAG(),
4: makeEmptyPlainTxDAG(4),
}
writeFile, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.ModePerm)
require.NoError(t, err)
for num, dag := range except2 {
require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: dag}))
}
writeFile.Close()

actual, err := readTxDAGMappingFromFile(path)
require.NoError(t, err)
for num, dag := range except {
require.Equal(t, dag, actual[num])
}
for num, dag := range except2 {
require.Equal(t, dag, actual[num])
}
}

func makeEmptyPlainTxDAG(cnt int) *types.PlainTxDAG {
Expand Down
7 changes: 4 additions & 3 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (p *ParallelStateProcessor) doStaticDispatchV2(txReqs []*ParallelTxRequest,
}
// resolve isolate execution paths from TxDAG, it indicates the tx dispatch
paths := types.MergeTxDAGExecutionPaths(txDAG)
log.Info("doStaticDispatchV2 merge parallel execution paths", "slots", len(p.slotState), "paths", len(paths))
log.Debug("doStaticDispatchV2 merge parallel execution paths", "slots", len(p.slotState), "paths", len(paths))

for _, path := range paths {
slotIndex := p.mostHungrySlot()
Expand Down Expand Up @@ -866,12 +866,13 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
p.doCleanUp()

// len(commonTxs) could be 0, such as: https://bscscan.com/block/14580486
if len(commonTxs) > 0 {
if len(commonTxs) > 0 && p.debugConflictRedoNum > 0 {
log.Info("ProcessParallel tx all done", "block", header.Number, "usedGas", *usedGas,
"txNum", txNum,
"len(commonTxs)", len(commonTxs),
"conflictNum", p.debugConflictRedoNum,
"redoRate(%)", 100*(p.debugConflictRedoNum)/len(commonTxs))
"redoRate(%)", 100*(p.debugConflictRedoNum)/len(commonTxs),
"txDAG", txDAG)
}

// Fail if Shanghai not enabled and len(withdrawals) is non-zero.
Expand Down
15 changes: 13 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2510,7 +2510,7 @@ func (s *StateDB) removeStateObjectsDestruct(addr common.Address) {
delete(s.stateObjectsDestruct, addr)
}

func (s *StateDB) ResolveTxDAG(gasFeeReceivers []common.Address) (types.TxDAG, map[int]*types.ExeStat) {
func (s *StateDB) ResolveTxDAG(txCnt int, gasFeeReceivers []common.Address) (types.TxDAG, error) {
if s.isParallel && s.parallel.isSlotDB {
return nil, nil
}
Expand All @@ -2523,7 +2523,18 @@ func (s *StateDB) ResolveTxDAG(gasFeeReceivers []common.Address) (types.TxDAG, m
}(time.Now())
}

return s.mvStates.ResolveTxDAG(gasFeeReceivers), s.mvStates.Stats()
return s.mvStates.ResolveTxDAG(txCnt, gasFeeReceivers)
}

func (s *StateDB) ResolveStats() map[int]*types.ExeStat {
if s.isParallel && s.parallel.isSlotDB {
return nil
}
if s.mvStates == nil {
return nil
}

return s.mvStates.Stats()
}

func (s *StateDB) MVStates() *types.MVStates {
Expand Down
26 changes: 15 additions & 11 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,22 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg

if p.bc.enableTxDAG {
// compare input TxDAG when it enable in consensus
dag, extraStats := statedb.ResolveTxDAG([]common.Address{context.Coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient})
// TODO(galaio): check TxDAG correctness?
log.Debug("Process TxDAG result", "block", block.NumberU64(), "txDAG", dag)
if metrics.EnabledExpensive {
types.EvaluateTxDAGPerformance(dag, extraStats)
}
// try to write txDAG into file
if p.bc.txDAGWriteCh != nil && dag != nil {
p.bc.txDAGWriteCh <- TxDAGOutputItem{
blockNumber: block.NumberU64(),
txDAG: dag,
dag, err := statedb.ResolveTxDAG(len(block.Transactions()), []common.Address{context.Coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient})
if err == nil {
// TODO(galaio): check TxDAG correctness?
log.Debug("Process TxDAG result", "block", block.NumberU64(), "txDAG", dag)
if metrics.EnabledExpensive {
types.EvaluateTxDAGPerformance(dag, statedb.ResolveStats())
}
// try to write txDAG into file
if p.bc.txDAGWriteCh != nil && dag != nil {
p.bc.txDAGWriteCh <- TxDAGOutputItem{
blockNumber: block.NumberU64(),
txDAG: dag,
}
}
} else {
log.Error("ResolveTxDAG err", "block", block.NumberU64(), "tx", len(block.Transactions()), "err", err)
}
}
return receipts, allLogs, *usedGas, nil
Expand Down
20 changes: 16 additions & 4 deletions core/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/log"
"math"
"math/big"
"time"
Expand Down Expand Up @@ -426,13 +427,23 @@ func (st *StateTransition) TransitionDb() (*ExecutionResult, error) {
if st.msg.IsSystemTx && !st.evm.ChainConfig().IsRegolith(st.evm.Context.Time) {
gasUsed = 0
}
// just record error tx here
if ferr := st.state.FinaliseRWSet(); ferr != nil {
log.Error("finalise error deposit tx rwSet fail", "block", st.evm.Context.BlockNumber, "tx", st.evm.StateDB.TxIndex())
}
result = &ExecutionResult{
UsedGas: gasUsed,
Err: fmt.Errorf("failed deposit: %w", err),
ReturnData: nil,
}
err = nil
}
if err != nil {
// just record error tx here
if ferr := st.state.FinaliseRWSet(); ferr != nil {
log.Error("finalise error tx rwSet fail", "block", st.evm.Context.BlockNumber, "tx", st.evm.StateDB.TxIndex())
}
}
return result, err
}

Expand Down Expand Up @@ -507,6 +518,11 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) {
}
DebugInnerExecutionDuration += time.Since(start)

// stop record rw set in here, skip gas fee distribution
if ferr := st.state.FinaliseRWSet(); ferr != nil {
log.Error("finalise tx rwSet fail", "block", st.evm.Context.BlockNumber, "tx", st.evm.StateDB.TxIndex())
}

// if deposit: skip refunds, skip tipping coinbase
// Regolith changes this behaviour to report the actual gasUsed instead of always reporting all gas used.
if st.msg.IsDepositTx && !rules.IsOptimismRegolith {
Expand All @@ -522,10 +538,6 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) {
ReturnData: ret,
}, nil
}
// stop record rw set in here, skip gas fee distribution
if err := st.state.FinaliseRWSet(); err != nil {
return nil, err
}

// Note for deposit tx there is no ETH refunded for unused gas, but that's taken care of by the fact that gasPrice
// is always 0 for deposit tx. So calling refundGas will ensure the gasUsed accounting is correct without actually
Expand Down
77 changes: 51 additions & 26 deletions core/types/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ const (
PlainTxDAGType
)

var (
TxDAGRelation0 uint8 = 0
TxDAGRelation1 uint8 = 1
)

type TxDAG interface {
// Type return TxDAG type
Type() byte
Expand Down Expand Up @@ -93,8 +98,8 @@ func (d *EmptyTxDAG) DelayGasDistribution() bool {

func (d *EmptyTxDAG) TxDep(int) TxDep {
return TxDep{
Relation: 1,
TxIndexes: nil,
Relation: &TxDAGRelation1,
}
}

Expand Down Expand Up @@ -156,9 +161,12 @@ func NewPlainTxDAG(txLen int) *PlainTxDAG {

func (d *PlainTxDAG) String() string {
builder := strings.Builder{}
exePaths := travelTxDAGExecutionPaths(d)
for _, path := range exePaths {
builder.WriteString(fmt.Sprintf("%v\n", path))
for _, txDep := range d.TxDeps {
if txDep.Relation == nil || txDep.RelationEqual(TxDAGRelation0) {
builder.WriteString(fmt.Sprintf("%v\n", txDep.TxIndexes))
continue
}
builder.WriteString(fmt.Sprintf("%d: %v\n", *txDep.Relation, txDep.TxIndexes))
}
return builder.String()
}
Expand All @@ -174,11 +182,12 @@ func (d *PlainTxDAG) Size() int {
// MergeTxDAGExecutionPaths will merge duplicate tx path for scheduling parallel.
// Any tx cannot exist in >= 2 paths.
func MergeTxDAGExecutionPaths(d TxDAG) [][]uint64 {
mergeMap := make(map[uint64][]uint64, d.TxCount())
txMap := make(map[uint64]uint64, d.TxCount())
for i := d.TxCount() - 1; i >= 0; i-- {
nd := convert2PlainTxDAGWithRelation0(d)
mergeMap := make(map[uint64][]uint64, nd.TxCount())
txMap := make(map[uint64]uint64, nd.TxCount())
for i := nd.TxCount() - 1; i >= 0; i-- {
index, merge := uint64(i), uint64(i)
deps := d.TxDep(i).TxIndexes
deps := nd.TxDep(i).TxIndexes
if oldIdx, exist := findTxPathIndex(deps, index, txMap); exist {
merge = oldIdx
}
Expand All @@ -196,7 +205,7 @@ func MergeTxDAGExecutionPaths(d TxDAG) [][]uint64 {
mergeMap[t] = append(mergeMap[t], f)
}
mergePaths := make([][]uint64, 0, len(mergeMap))
for i := 0; i < d.TxCount(); i++ {
for i := 0; i < nd.TxCount(); i++ {
path, ok := mergeMap[uint64(i)]
if !ok {
continue
Expand Down Expand Up @@ -224,37 +233,46 @@ func findTxPathIndex(path []uint64, cur uint64, txMap map[uint64]uint64) (uint64

// travelTxDAGExecutionPaths will print all tx execution path
func travelTxDAGExecutionPaths(d TxDAG) [][]uint64 {
txCount := d.TxCount()
deps := make([]TxDep, txCount)
for i := 0; i < txCount; i++ {
nd := convert2PlainTxDAGWithRelation0(d)

exePaths := make([][]uint64, 0)
// travel tx deps with BFS
for i := uint64(0); i < uint64(nd.TxCount()); i++ {
exePaths = append(exePaths, travelTxDAGTargetPath(nd.TxDeps, i))
}
return exePaths
}

func convert2PlainTxDAGWithRelation0(d TxDAG) *PlainTxDAG {
if d.TxCount() == 0 {
return NewPlainTxDAG(0)
}
nd := NewPlainTxDAG(d.TxCount())
for i := 0; i < d.TxCount(); i++ {
dep := d.TxDep(i)
if dep.Relation == 0 {
deps[i] = dep
if dep.RelationEqual(TxDAGRelation0) {
nd.SetTxDep(i, dep)
continue
}
np := TxDep{}
// recover to relation 0
for j := 0; j < i; j++ {
if !dep.Exist(j) {
deps[i].AppendDep(j)
if !dep.Exist(j) && j != i {
np.AppendDep(j)
}
}
nd.SetTxDep(i, np)
}

exePaths := make([][]uint64, 0)
// travel tx deps with BFS
for i := uint64(0); i < uint64(txCount); i++ {
exePaths = append(exePaths, travelTxDAGTargetPath(deps, i))
}
return exePaths
return nd
}

// TxDep store the current tx dependency relation with other txs
type TxDep struct {
TxIndexes []uint64
// It describes the Relation with below txs
// 0: this tx depends on below txs
// 0: this tx depends on below txs, it can be ignored and not be encoded in rlp encoder.
// 1: this transaction does not depend on below txs, all other previous txs depend on
Relation uint8
TxIndexes []uint64
Relation *uint8 `rlp:"optional"`
}

func (d *TxDep) AppendDep(i int) {
Expand Down Expand Up @@ -282,6 +300,13 @@ func (d *TxDep) Last() int {
return int(d.TxIndexes[len(d.TxIndexes)-1])
}

func (d *TxDep) RelationEqual(rel uint8) bool {
if d.Relation == nil {
return TxDAGRelation0 == rel
}
return *d.Relation == rel
}

var (
longestTimeTimer = metrics.NewRegisteredTimer("dag/longesttime", nil)
longestGasTimer = metrics.NewRegisteredTimer("dag/longestgas", nil)
Expand Down
Loading

0 comments on commit ef5ef54

Please sign in to comment.