Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Send drop events from blobpool. #177

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,11 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserve txpool.Addres
p.Close()
return err
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropUnexecutable,
})
}
}
// Sort the indexed transactions by nonce and delete anything gapped, create
Expand Down Expand Up @@ -566,6 +571,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropUnexecutable,
})
}
return
}
Expand Down Expand Up @@ -597,6 +607,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropLowNonce,
})
}
p.index[addr] = txs
}
Expand Down Expand Up @@ -646,6 +661,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
txs = append(txs[:i], txs[i+1:]...)
p.index[addr] = txs

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropReplaced,
})

i--
continue
}
Expand All @@ -671,6 +691,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropUnexecutable,
})
}
p.index[addr] = txs
break
Expand Down Expand Up @@ -717,6 +742,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropAccountCap,
})
}
}
// Sanity check that no account can have more queued transactions than the
Expand Down Expand Up @@ -749,6 +779,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropAccountCap,
})
}
}
// Included cheap transactions might have left the remaining ones better from
Expand Down Expand Up @@ -1075,6 +1110,11 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete dropped transaction", "id", id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropGasPriceUpdated,
})
}
break
}
Expand Down Expand Up @@ -1343,6 +1383,12 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
delete(p.lookup, prev.hash)
p.lookup[meta.hash] = meta.id
p.stored += uint64(meta.size) - uint64(prev.size)

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobTxMetaToTransaction(prev)},
Reason: txpool.DropReplaced,
Replacement: tx,
})
} else {
// Transaction extends previously scheduled ones
p.index[from] = append(p.index[from], meta)
Expand Down Expand Up @@ -1464,6 +1510,11 @@ func (p *BlobPool) drop() {
if err := p.store.Delete(drop.id); err != nil {
log.Error("Failed to drop evicted transaction", "id", drop.id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobTxMetaToTransaction(&blobTxMeta{id: drop.id})},
Reason: txpool.DropTruncating,
})
}

// Pending retrieves all currently processable transactions, grouped by origin
Expand Down Expand Up @@ -1712,3 +1763,18 @@ func (pool *BlobPool) SubscribeDropTxsEvent(ch chan<- core.DropTxsEvent) event.S
func (pool *BlobPool) SubscribeRejectedTxEvent(ch chan<- core.RejectedTxEvent) event.Subscription {
return pool.rejectTxFeed.Subscribe(ch)
}

func blobIDToTransaction(id uint64) *types.Transaction {
return blobTxMetaToTransaction(&blobTxMeta{id: id})
}

func blobTxMetaToTransaction(meta *blobTxMeta) *types.Transaction {
return types.NewTx(&types.BlobTx{
Gas: meta.execGas,
BlobFeeCap: meta.blobFeeCap,
Nonce: meta.nonce,
GasFeeCap: meta.execFeeCap,
GasTipCap: meta.execTipCap,
BlobHashes: meta.blobHashes,
})
}
14 changes: 14 additions & 0 deletions core/txpool/drop_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package txpool

const (
DropUnderpriced = "underpriced-txs"
DropLowNonce = "low-nonce-txs"
DropUnpayable = "unpayable-txs"

DropAccountCap = "account-cap-txs" // Accounts exceeding txpool.accountslots transactions
DropReplaced = "replaced-txs"
DropUnexecutable = "unexecutable-txs"
DropTruncating = "truncating-txs"
DropOld = "old-txs"
DropGasPriceUpdated = "updated-gas-price"
)
44 changes: 15 additions & 29 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (pool *LegacyPool) loop() {
}
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: list,
Reason: dropOld,
Reason: txpool.DropOld,
})
queuedEvictionMeter.Mark(int64(len(list)))
}
Expand Down Expand Up @@ -468,7 +468,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) {
pool.priced.Removed(len(drop))
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: drop,
Reason: dropGasPriceUpdated,
Reason: txpool.DropGasPriceUpdated,
})
}
log.Info("Legacy pool tip threshold updated", "tip", newTip)
Expand Down Expand Up @@ -794,7 +794,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
pool.changesSinceReorg += dropped
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: drop,
Reason: dropUnderpriced,
Reason: txpool.DropUnderpriced,
})
}
}
Expand All @@ -814,7 +814,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
pendingReplaceMeter.Mark(1)
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{old},
Reason: dropReplaced,
Reason: txpool.DropReplaced,
Replacement: tx,
})
}
Expand Down Expand Up @@ -894,7 +894,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
queuedReplaceMeter.Mark(1)
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{old},
Reason: dropReplaced,
Reason: txpool.DropReplaced,
})
} else {
// Nothing was replaced, bump the queued counter
Expand Down Expand Up @@ -954,7 +954,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
pendingReplaceMeter.Mark(1)
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{old},
Reason: dropReplaced,
Reason: txpool.DropReplaced,
})
} else {
// Nothing was replaced, bump the pending counter
Expand Down Expand Up @@ -1185,7 +1185,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
pendingGauge.Dec(int64(1 + len(invalids)))
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: invalids,
Reason: dropUnexecutable,
Reason: txpool.DropUnexecutable,
})
return 1 + len(invalids)
}
Expand Down Expand Up @@ -1509,7 +1509,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
log.Trace("Removed old queued transactions", "count", len(forwards))
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: forwards,
Reason: dropLowNonce,
Reason: txpool.DropLowNonce,
})
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
Expand All @@ -1521,7 +1521,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
queuedNofundsMeter.Mark(int64(len(drops)))
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: drops,
Reason: dropUnpayable,
Reason: txpool.DropUnpayable,
})

// Gather all executable transactions and promote them
Expand All @@ -1547,7 +1547,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
queuedRateLimitMeter.Mark(int64(len(caps)))
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: caps,
Reason: dropAccountCap,
Reason: txpool.DropAccountCap,
})
}
// Mark all the items dropped as removed
Expand Down Expand Up @@ -1618,7 +1618,7 @@ func (pool *LegacyPool) truncatePending() {
}
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: caps,
Reason: dropAccountCap,
Reason: txpool.DropAccountCap,
})
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
Expand Down Expand Up @@ -1693,7 +1693,7 @@ func (pool *LegacyPool) truncateQueue() {
}
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: txs,
Reason: dropTruncating,
Reason: txpool.DropTruncating,
})
drop -= size
queuedRateLimitMeter.Mark(int64(size))
Expand All @@ -1707,7 +1707,7 @@ func (pool *LegacyPool) truncateQueue() {
queuedRateLimitMeter.Mark(1)
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{txs[i]},
Reason: dropTruncating,
Reason: txpool.DropTruncating,
})
}
}
Expand Down Expand Up @@ -1735,7 +1735,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
}
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: olds,
Reason: dropLowNonce,
Reason: txpool.DropLowNonce,
})
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
Expand All @@ -1746,7 +1746,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
}
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: drops,
Reason: dropUnpayable,
Reason: txpool.DropUnpayable,
})
pool.priced.Removed(len(olds) + len(drops))
pendingNofundsMeter.Mark(int64(len(drops)))
Expand Down Expand Up @@ -2043,17 +2043,3 @@ func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions {
func numSlots(tx *types.Transaction) int {
return int((tx.Size() + txSlotSize - 1) / txSlotSize)
}


const (
dropUnderpriced = "underpriced-txs"
dropLowNonce = "low-nonce-txs"
dropUnpayable = "unpayable-txs"

dropAccountCap = "account-cap-txs" // Accounts exceeding txpool.accountslots transactions
dropReplaced = "replaced-txs"
dropUnexecutable = "unexecutable-txs"
dropTruncating = "truncating-txs"
dropOld = "old-txs"
dropGasPriceUpdated = "updated-gas-price"
)
Loading