Skip to content

Commit

Permalink
fix: some builder issues (bnb-chain#22)
Browse files Browse the repository at this point in the history
* fix: allow fast node to rewind after abnormal shutdown (bnb-chain#2401)

(cherry picked from commit fb435eb)

* fix: bundlepool concurrent read and write and commit blob tx issue

* feat: set MaxBundleAliveBlock as bundle's default ddl

---------

Co-authored-by: buddho <galaxystroller@gmail.com>
Co-authored-by: irrun <irunert@gmail.com>
  • Loading branch information
3 people authored Apr 25, 2024
1 parent c242b27 commit b5e2af8
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 53 deletions.
21 changes: 17 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
mapset "github.com/deckarep/golang-set/v2"
exlru "github.com/hashicorp/golang-lru"
"golang.org/x/crypto/sha3"
"golang.org/x/exp/slices"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
Expand All @@ -56,7 +57,6 @@ import (
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/hashdb"
"github.com/ethereum/go-ethereum/triedb/pathdb"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -412,7 +412,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Make sure the state associated with the block is available, or log out
// if there is no available state, waiting for state sync.
head := bc.CurrentBlock()
if !bc.NoTries() && !bc.HasState(head.Root) {
if !bc.HasState(head.Root) {
if head.Number.Uint64() == 0 {
// The genesis state is missing, which is only possible in the path-based
// scheme. This situation occurs when the initial state sync is not finished
Expand All @@ -428,7 +428,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if bc.triedb.Scheme() == rawdb.PathScheme {
if bc.triedb.Scheme() == rawdb.PathScheme && !bc.NoTries() {
recoverable, _ := bc.triedb.Recoverable(diskRoot)
if !bc.HasState(diskRoot) && !recoverable {
diskRoot = bc.triedb.Head()
Expand Down Expand Up @@ -991,7 +991,7 @@ func (bc *BlockChain) rewindPathHead(head *types.Header, root common.Hash) (*typ
// then block number zero is returned, indicating that snapshot recovery is disabled
// and the whole snapshot should be auto-generated in case of head mismatch.
func (bc *BlockChain) rewindHead(head *types.Header, root common.Hash) (*types.Header, uint64) {
if bc.triedb.Scheme() == rawdb.PathScheme {
if bc.triedb.Scheme() == rawdb.PathScheme && !bc.NoTries() {
return bc.rewindPathHead(head, root)
}
return bc.rewindHashHead(head, root)
Expand Down Expand Up @@ -1028,6 +1028,19 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
// block. Note, depth equality is permitted to allow using SetHead as a
// chain reparation mechanism without deleting any data!
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() <= currentBlock.Number.Uint64() {
// load bc.snaps for the judge `HasState`
if bc.NoTries() {
if bc.cacheConfig.SnapshotLimit > 0 {
snapconfig := snapshot.Config{
CacheSize: bc.cacheConfig.SnapshotLimit,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, header.Root, int(bc.cacheConfig.TriesInMemory), bc.NoTries())
}
defer func() { bc.snaps = nil }()
}

var newHeadBlock *types.Header
newHeadBlock, rootNumber = bc.rewindHead(header, root)
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
Expand Down
5 changes: 3 additions & 2 deletions core/txpool/bundlepool/bundlepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,16 @@ func (p *BundlePool) AddBundle(bundle *types.Bundle) error {
}
bundle.Price = price

p.mu.Lock()
defer p.mu.Unlock()

hash := bundle.Hash()
if _, ok := p.bundles[hash]; ok {
return ErrBundleAlreadyExist
}
for p.slots+numSlots(bundle) > p.config.GlobalSlots {
p.drop()
}
p.mu.Lock()
defer p.mu.Unlock()
p.bundles[hash] = bundle
heap.Push(&p.bundleHeap, bundle)
p.slots += numSlots(bundle)
Expand Down
7 changes: 7 additions & 0 deletions core/types/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)

const (
// MaxBundleAliveBlock is the max alive block for bundle
MaxBundleAliveBlock = 100
// MaxBundleAliveTime is the max alive time for bundle
MaxBundleAliveTime = 5 * 60 // second
)

// SendBundleArgs represents the arguments for a call.
type SendBundleArgs struct {
Txs []hexutil.Bytes `json:"txs"`
Expand Down
24 changes: 10 additions & 14 deletions internal/ethapi/api_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)

const (
// MaxBundleAliveBlock is the max alive block for bundle
MaxBundleAliveBlock = 100
// MaxBundleAliveTime is the max alive time for bundle
MaxBundleAliveTime = 5 * 60 // second
MaxOracleBlocks = 21
DropBlocks = 3

InvalidBundleParamError = -38000
)
const InvalidBundleParamError = -38000

// PrivateTxBundleAPI offers an API for accepting bundled transactions
type PrivateTxBundleAPI struct {
Expand All @@ -44,11 +35,11 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args types.SendBund
currentHeader := s.b.CurrentHeader()

if args.MaxBlockNumber == 0 && (args.MaxTimestamp == nil || *args.MaxTimestamp == 0) {
maxTimeStamp := currentHeader.Time + MaxBundleAliveTime
maxTimeStamp := currentHeader.Time + types.MaxBundleAliveTime
args.MaxTimestamp = &maxTimeStamp
}

if args.MaxBlockNumber != 0 && args.MaxBlockNumber > currentHeader.Number.Uint64()+MaxBundleAliveBlock {
if args.MaxBlockNumber != 0 && args.MaxBlockNumber > currentHeader.Number.Uint64()+types.MaxBundleAliveBlock {
return common.Hash{}, newBundleError(errors.New("the maxBlockNumber should not be lager than currentBlockNum + 100"))
}

Expand All @@ -62,8 +53,8 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args types.SendBund
return common.Hash{}, newBundleError(errors.New("the maxTimestamp should not be less than currentBlockTimestamp"))
}

if (args.MaxTimestamp != nil && *args.MaxTimestamp > currentHeader.Time+MaxBundleAliveTime) ||
(args.MinTimestamp != nil && *args.MinTimestamp > currentHeader.Time+MaxBundleAliveTime) {
if (args.MaxTimestamp != nil && *args.MaxTimestamp > currentHeader.Time+types.MaxBundleAliveTime) ||
(args.MinTimestamp != nil && *args.MinTimestamp > currentHeader.Time+types.MaxBundleAliveTime) {
return common.Hash{}, newBundleError(errors.New("the minTimestamp/maxTimestamp should not be later than currentBlockTimestamp + 5 minutes"))
}

Expand Down Expand Up @@ -95,6 +86,11 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args types.SendBund
RevertingTxHashes: args.RevertingTxHashes,
}

// If the maxBlockNumber and maxTimestamp are not set, set max ddl of bundle as types.MaxBundleAliveBlock
if bundle.MaxBlockNumber == 0 && bundle.MaxTimestamp == 0 {
bundle.MaxBlockNumber = currentHeader.Number.Uint64() + types.MaxBundleAliveBlock
}

err := s.b.SendBundle(ctx, bundle)
if err != nil {
return common.Hash{}, err
Expand Down
13 changes: 7 additions & 6 deletions miner/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"container/heap"
"math/big"

"github.com/holiman/uint256"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/holiman/uint256"
)

// txWithMinerFee wraps a transaction with its gas price or effective miner gasTipCap
Expand Down Expand Up @@ -205,11 +206,11 @@ func (t *transactionsByPriceAndNonce) Forward(tx *types.Transaction) {
}
return
}
//check whether target tx exists in t.heads
// check whether target tx exists in t.heads
for _, head := range t.heads {
if head.tx != nil && head.tx.Resolve() != nil {
if tx == head.tx.Tx {
//shift t to the position one after tx
// shift t to the position one after tx
txTmp := t.PeekWithUnwrap()
for txTmp != tx {
t.Shift()
Expand All @@ -220,13 +221,13 @@ func (t *transactionsByPriceAndNonce) Forward(tx *types.Transaction) {
}
}
}
//get the sender address of tx
// get the sender address of tx
acc, _ := types.Sender(t.signer, tx)
//check whether target tx exists in t.txs
// check whether target tx exists in t.txs
if txs, ok := t.txs[acc]; ok {
for _, txLazyTmp := range txs {
if txLazyTmp != nil && txLazyTmp.Resolve() != nil {
//found the same pointer in t.txs as tx and then shift t to the position one after tx
// found the same pointer in t.txs as tx and then shift t to the position one after tx
if tx == txLazyTmp.Tx {
txTmp := t.PeekWithUnwrap()
for txTmp != tx {
Expand Down
2 changes: 1 addition & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction,
return nil, err
}
sc.TxIndex = uint64(len(env.txs))
env.txs = append(env.txs, tx.WithoutBlobTxSidecar())
env.txs = append(env.txs, tx)
env.receipts = append(env.receipts, receipt)
env.sidecars = append(env.sidecars, sc)
env.blobs += len(sc.Blobs)
Expand Down
56 changes: 30 additions & 26 deletions miner/worker_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,41 @@ var (
// fillTransactions retrieves the pending bundles and transactions from the txpool and fills them
// into the given sealing block. The selection and ordering strategy can be extended in the future.
func (w *worker) fillTransactionsAndBundles(interruptCh chan int32, env *environment, stopTimer *time.Timer) error {
env.state.StopPrefetcher() // no need to prefetch txs for a builder

var (
localPlainTxs map[common.Address][]*txpool.LazyTransaction
remotePlainTxs map[common.Address][]*txpool.LazyTransaction
localBlobTxs map[common.Address][]*txpool.LazyTransaction
remoteBlobTxs map[common.Address][]*txpool.LazyTransaction
bundles []*types.Bundle
)

// commit bundles
{
bundles = w.eth.TxPool().PendingBundles(env.header.Number.Uint64(), env.header.Time)

// if no bundles, not necessary to fill transactions
if len(bundles) == 0 {
return errors.New("no bundles in bundle pool")
}

txs, bundle, err := w.generateOrderedBundles(env, bundles)
if err != nil {
log.Error("fail to generate ordered bundles", "err", err)
return err
}

if err = w.commitBundles(env, txs, interruptCh, stopTimer); err != nil {
log.Error("fail to commit bundles", "err", err)
return err
}

env.profit.Add(env.profit, bundle.EthSentToSystem)
log.Info("fill bundles", "bundles_count", len(bundles))
}

// commit normal transactions
{
w.mu.RLock()
tip := w.tip
Expand Down Expand Up @@ -71,34 +99,9 @@ func (w *worker) fillTransactionsAndBundles(interruptCh chan int32, env *environ
localBlobTxs[account] = txs
}
}

bundles = w.eth.TxPool().PendingBundles(env.header.Number.Uint64(), env.header.Time)

log.Info("fill bundles and transactions", "bundles_count", len(bundles), "tx_count", len(localPlainTxs)+len(remotePlainTxs))

// if no bundles, not necessary to fill transactions
if len(bundles) == 0 {
return errors.New("no bundles in bundle pool")
}
log.Info("fill transactions", "plain_txs_count", len(localPlainTxs)+len(remotePlainTxs), "blob_txs_count", len(localBlobTxs)+len(remoteBlobTxs))
}

{
txs, bundle, err := w.generateOrderedBundles(env, bundles)
if err != nil {
log.Error("fail to generate ordered bundles", "err", err)
return err
}

if err = w.commitBundles(env, txs, interruptCh, stopTimer); err != nil {
log.Error("fail to commit bundles", "err", err)
return err
}

env.profit.Add(env.profit, bundle.EthSentToSystem)
}

env.state.StopPrefetcher() // no need to prefetch txs for a builder

// Fill the block with all available pending transactions.
// we will abort when:
// 1.new block was imported
Expand All @@ -122,6 +125,7 @@ func (w *worker) fillTransactionsAndBundles(interruptCh chan int32, env *environ
return err
}
}
log.Info("fill bundles and transactions done", "total_txs_count", len(env.txs))
return nil
}

Expand Down

0 comments on commit b5e2af8

Please sign in to comment.