Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Txpool 4844 upgrades Part 2 #1125

Merged
merged 31 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/erigontech/mdbx-go v0.27.14
github.com/ledgerwatch/interfaces v0.0.0-20230909005156-bff86c603a43
github.com/ledgerwatch/interfaces v0.0.0-20230912104607-5501cfd6e5af
github.com/ledgerwatch/log/v3 v3.9.0
github.com/ledgerwatch/secp256k1 v1.0.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/ledgerwatch/interfaces v0.0.0-20230909005156-bff86c603a43 h1:AXQ1vPkmuBPtVRpAehMAXzmsRmdqUpNvl93wWE6gjCU=
github.com/ledgerwatch/interfaces v0.0.0-20230909005156-bff86c603a43/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/interfaces v0.0.0-20230912104607-5501cfd6e5af h1:gGWTa4p8npycnK9gVBbZxMSOBvUgM80lsDU9rnFqyHU=
somnathb1 marked this conversation as resolved.
Show resolved Hide resolved
github.com/ledgerwatch/interfaces v0.0.0-20230912104607-5501cfd6e5af/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/log/v3 v3.9.0 h1:iDwrXe0PVwBC68Dd94YSsHbMgQ3ufsgjzXtFNFVZFRk=
github.com/ledgerwatch/log/v3 v3.9.0/go.mod h1:EiAY6upmI/6LkNhOVxb4eVsmsP11HZCnZ3PlJMjYiqE=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
Expand Down
397 changes: 204 additions & 193 deletions gointerfaces/remote/kv.pb.go

Large diffs are not rendered by default.

43 changes: 15 additions & 28 deletions txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,19 +465,25 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}
}
if change.Direction == remote.Direction_UNWIND {
unwindTxs.Resize(uint(len(change.Txs)))
for i := range change.Txs {
unwindTxs.Txs[i] = &types2.TxSlot{}
if err = f.threadSafeParseStateChangeTxn(func(parseContext *types2.TxParseContext) error {
_, err = parseContext.ParseTransaction(change.Txs[i], 0, unwindTxs.Txs[i], unwindTxs.Senders.At(i), false /* hasEnvelope */, false /* wrappedWithBlobs */, nil)
if unwindTxs.Txs[i].Type == types2.BlobTxType {
knownBlobTxn, err := f.pool.GetKnownBlobTxn(tx, unwindTxs.Txs[i].IDHash[:])
if err != nil {
return err
utx := &types2.TxSlot{}
sender := make([]byte, 20)
_, err2 := parseContext.ParseTransaction(change.Txs[i], 0, utx, sender, false /* hasEnvelope */, false /* wrappedWithBlobs */, nil)
if err2 != nil {
return err2
}
if utx.Type == types2.BlobTxType {
knownBlobTxn, err2 := f.pool.GetKnownBlobTxn(tx, utx.IDHash[:])
if err2 != nil {
return err2
}
// Get the blob tx from cache; ignore altogether if it isn't there
if knownBlobTxn != nil {
unwindTxs.Txs[i] = knownBlobTxn.Tx
unwindTxs.Append(knownBlobTxn.Tx, sender, false)
}
} else {
unwindTxs.Append(utx, sender, false)
}
somnathb1 marked this conversation as resolved.
Show resolved Hide resolved
return err
}); err != nil && !errors.Is(err, context.Canceled) {
Expand All @@ -487,8 +493,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}
}
}
// TODO(eip-4844): If there are blob txs that need to be unwound, these will not replay properly since we only have the
// unwrapped version here (we would need to re-wrap the tx with its blobs & kzg commitments).

if err := f.db.View(ctx, func(tx kv.Tx) error {
return f.pool.OnNewBlock(ctx, req, unwindTxs, minedTxs, tx)
}); err != nil && !errors.Is(err, context.Canceled) {
Expand All @@ -499,21 +504,3 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}
}
}

// func (f *Fetch) requestUnknownTxs(unknownHashes types2.Hashes, sentryClient sentry.SentryClient, PeerId *types.H512) error{
// if len(unknownHashes) > 0 {
// var encodedRequest []byte
// var err error
// var messageID sentry.MessageId
// if encodedRequest, err = types2.EncodeGetPooledTransactions66(unknownHashes, uint64(1), nil); err != nil {
// return err
// }
// messageID = sentry.MessageId_GET_POOLED_TRANSACTIONS_66
// if _, err := sentryClient.SendMessageById(f.ctx, &sentry.SendMessageByIdRequest{
// Data: &sentry.OutboundMessageData{Id: messageID, Data: encodedRequest},
// PeerId: PeerId,
// }, &grpc.EmptyCallOption{}); err != nil {
// return err
// }
// }
// }
83 changes: 57 additions & 26 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ type TxPool struct {
lastFinalizedBlock atomic.Uint64
started atomic.Bool
pendingBaseFee atomic.Uint64
pendingBlobFee atomic.Uint64 // For gas accounting for blobs, which has its own dimension
blockGasLimit atomic.Uint64
shanghaiTime *uint64
isPostShanghai atomic.Bool
Expand Down Expand Up @@ -340,6 +341,9 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
p.queued.worst.pendingBaseFee = pendingBaseFee
}

pendingBlobFee := stateChanges.PendingBlobFeePerGas
p.setBlobFee(pendingBlobFee)

p.blockGasLimit.Store(stateChanges.BlockGasLimit)
if err := p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, p.logger); err != nil {
return err
Expand All @@ -362,11 +366,10 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
}
}

if err := removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil {
if err := p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil {
return err
}

if err := p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil {
if err := removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil {
return err
}

Expand All @@ -381,7 +384,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
p.pending.EnforceWorstInvariants()
p.baseFee.EnforceInvariants()
p.queued.EnforceInvariants()
promote(p.pending, p.baseFee, p.queued, pendingBaseFee, p.discardLocked, &announcements, p.logger)
promote(p.pending, p.baseFee, p.queued, pendingBaseFee, pendingBlobFee, p.discardLocked, &announcements, p.logger)
p.pending.EnforceBestInvariants()
p.promoted.Reset()
p.promoted.AppendOther(announcements)
Expand Down Expand Up @@ -438,7 +441,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error {
}

announcements, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -581,7 +584,11 @@ func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) {
if !has {
return nil, nil
}
txn, _ := tx.GetOne(kv.PoolTransaction, hash)

txn, err := tx.GetOne(kv.PoolTransaction, hash)
if err != nil {
return nil, err
}
parseCtx := types.NewTxParseContext(p.chainID)
parseCtx.WithSender(false)
txSlot := &types.TxSlot{}
Expand Down Expand Up @@ -1039,7 +1046,7 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots,
}

announcements, addReasons, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
if err == nil {
for i, reason := range addReasons {
if reason != txpoolcfg.NotSet {
Expand Down Expand Up @@ -1076,7 +1083,7 @@ func (p *TxPool) coreDBWithCache() (kv.RoDB, kvcache.Cache) {
return p._chainDB, p._stateCache
}
func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
newTxs types.TxSlots, pendingBaseFee, blockGasLimit uint64,
newTxs types.TxSlots, pendingBaseFee, pendingBlobFee, blockGasLimit uint64,
pending *PendingPool, baseFee, queued *SubPool,
byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) txpoolcfg.DiscardReason, discard func(*metaTx, txpoolcfg.DiscardReason), collect bool,
logger log.Logger) (types.Announcements, []txpoolcfg.DiscardReason, error) {
Expand Down Expand Up @@ -1130,7 +1137,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
protocolBaseFee, blockGasLimit, pending, baseFee, queued, discard, logger)
}

promote(pending, baseFee, queued, pendingBaseFee, discard, &announcements, logger)
promote(pending, baseFee, queued, pendingBaseFee, pendingBlobFee, discard, &announcements, logger)
pending.EnforceBestInvariants()

return announcements, discardReasons, nil
Expand Down Expand Up @@ -1211,6 +1218,12 @@ func (p *TxPool) setBaseFee(baseFee uint64) (uint64, bool) {
return p.pendingBaseFee.Load(), changed
}

func (p *TxPool) setBlobFee(blobFee uint64) {
if blobFee > 0 {
p.pendingBaseFee.Store(blobFee)
}
}

func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoolcfg.DiscardReason {
// Insert to pending pool, if pool doesn't have txn with same Nonce and bigger Tip
found := p.all.get(mt.Tx.SenderID, mt.Tx.Nonce)
Expand All @@ -1220,7 +1233,7 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo
}
priceBump := p.cfg.PriceBump

//Blob txn threshold checks
//Blob txn threshold checks for replace txn
if mt.Tx.Type == types.BlobTxType {
priceBump = p.cfg.BlobPriceBump
blobFeeThreshold, overflow := (&uint256.Int{}).MulDivOverflow(
Expand Down Expand Up @@ -1269,13 +1282,12 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo
p.discardLocked(found, txpoolcfg.ReplacedByHigherTip)
}

// Remove from mined cache in case this is coming from unwind txs
// and to ensure not double adding into the memory
hashStr := string(mt.Tx.IDHash[:])
if _, ok := p.minedBlobTxsByHash[hashStr]; ok {
p.deleteMinedBlobTxn(hashStr)
// Don't add blob tx to queued if it's less than current pending blob base fee
if mt.Tx.Type == types.BlobTxType && mt.Tx.BlobFeeCap.LtUint64(p.pendingBlobFee.Load()) {
return txpoolcfg.FeeTooLow
}

hashStr := string(mt.Tx.IDHash[:])
p.byHash[hashStr] = mt

if replaced := p.all.replaceOrInsert(mt); replaced != nil {
Expand All @@ -1289,6 +1301,8 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo
}
// All transactions are first added to the queued pool and then immediately promoted from there if required
p.queued.Add(mt, p.logger)
// Remove from mined cache as we are now "resurrecting" it to a sub-pool
p.deleteMinedBlobTxn(hashStr)
return txpoolcfg.NotSet
}

Expand All @@ -1304,6 +1318,7 @@ func (p *TxPool) discardLocked(mt *metaTx, reason txpoolcfg.DiscardReason) {

// Cache recently mined blobs in anticipation of reorg, delete finalized ones
func (p *TxPool) processMinedFinalizedBlobs(coreTx kv.Tx, minedTxs []*types.TxSlot, finalizedBlock uint64) error {
p.lastFinalizedBlock.Store(finalizedBlock)
// Remove blobs in the finalized block and older, loop through all entries
for l := len(p.minedBlobTxsByBlock); l > 0 && finalizedBlock > 0; l-- {
// delete individual hashes
Expand All @@ -1327,8 +1342,6 @@ func (p *TxPool) processMinedFinalizedBlobs(coreTx kv.Tx, minedTxs []*types.TxSl
p.minedBlobTxsByHash[string(txn.IDHash[:])] = mt
}
}

p.lastFinalizedBlock.Store(finalizedBlock)
return nil
}

Expand Down Expand Up @@ -1530,10 +1543,10 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint

// promote reasserts invariants of the subpool and returns the list of transactions that ended up
// being promoted to the pending or basefee pool, for re-broadcasting
func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, discard func(*metaTx, txpoolcfg.DiscardReason), announcements *types.Announcements,
func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, pendingBlobFee uint64, discard func(*metaTx, txpoolcfg.DiscardReason), announcements *types.Announcements,
logger log.Logger) {
// Demote worst transactions that do not qualify for pending sub pool anymore, to other sub pools, or discard
for worst := pending.Worst(); pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) < 0); worst = pending.Worst() {
for worst := pending.Worst(); pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.LtUint64(pendingBaseFee) || (worst.Tx.Type == types.BlobTxType && worst.Tx.BlobFeeCap.LtUint64(pendingBlobFee))); worst = pending.Worst() {
if worst.subPool >= BaseFeePoolBits {
tx := pending.PopWorst()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
Expand All @@ -1546,7 +1559,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint
}

// Promote best transactions from base fee pool to pending pool while they qualify
for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0; best = baseFee.Best() {
for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.CmpUint64(pendingBaseFee) >= 0 && (best.Tx.Type != types.BlobTxType || best.Tx.BlobFeeCap.CmpUint64(pendingBlobFee) >= 0); best = baseFee.Best() {
tx := baseFee.PopBest()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
pending.Add(tx, logger)
Expand Down Expand Up @@ -1846,6 +1859,10 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
if err := tx.Put(kv.PoolInfo, PoolPendingBaseFeeKey, encID); err != nil {
return err
}
binary.BigEndian.PutUint64(encID, p.pendingBlobFee.Load())
if err := tx.Put(kv.PoolInfo, PoolPendingBlobFeeKey, encID); err != nil {
return err
}
if err := PutLastSeenBlock(tx, p.lastSeenBlock.Load(), encID); err != nil {
return err
}
Expand Down Expand Up @@ -1938,16 +1955,27 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
pendingBaseFee = binary.BigEndian.Uint64(v)
}
}
var pendingBlobFee uint64 = 1 // MIN_BLOB_GAS_PRICE A/EIP-4844
{
v, err := tx.GetOne(kv.PoolInfo, PoolPendingBlobFeeKey)
if err != nil {
return err
}
if len(v) > 0 {
pendingBlobFee = binary.BigEndian.Uint64(v)
}
}

err = p.senders.registerNewSenders(&txs, p.logger)
if err != nil {
return err
}
if _, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs,
pendingBaseFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, false, p.logger); err != nil {
pendingBaseFee, pendingBlobFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, false, p.logger); err != nil {
return err
}
p.pendingBaseFee.Store(pendingBaseFee)

p.pendingBlobFee.Store(pendingBlobFee)
return nil
}
func LastSeenBlock(tx kv.Getter) (uint64, error) {
Expand Down Expand Up @@ -2068,6 +2096,7 @@ func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp []byte, sender
var PoolChainConfigKey = []byte("chain_config")
var PoolLastSeenBlockKey = []byte("last_seen_block")
var PoolPendingBaseFeeKey = []byte("pending_base_fee")
var PoolPendingBlobFeeKey = []byte("pending_blob_fee")

// recentlyConnectedPeers does buffer IDs of recently connected good peers
// then sync of pooled Transaction can happen to all of then at once
Expand Down Expand Up @@ -2461,10 +2490,12 @@ type BestQueue struct {
pendingBastFee uint64
}

// Returns true if the txn is better than the parameter txn
// it first compares the subpool markers of the two meta txns, then it compares
// depending on the pool (p, b, q) it compares the effective tip (p), nonceDistance (p,q)
// minFeeCap (b), and cumulative balance distance (p, q) for pending pool
// Returns true if the txn "mt" is better than the parameter txn "than"
// it first compares the subpool markers of the two meta txns, then,
// (since they have the same subpool marker, and thus same pool)
// depending on the pool - pending (P), basefee (B), queued (Q) -
// it compares the effective tip (for P), nonceDistance (for both P,Q)
// minFeeCap (for B), and cumulative balance distance (for P, Q)
func (mt *metaTx) better(than *metaTx, pendingBaseFee uint256.Int) bool {
subPool := mt.subPool
thanSubPool := than.subPool
Expand Down
Loading
Loading