From e2b1dd24cf8047111eb62950e77754476c540c16 Mon Sep 17 00:00:00 2001 From: georgehao Date: Tue, 20 Aug 2024 15:39:28 +0800 Subject: [PATCH] feat(metrics): calculate the real pending tx (#983) * calculate the real pending tx * update * move realPendingTx to miner * update * calculate the real pending tx by statsWithMinBaseFee * update * fix lint * address comments * add metrics to StatsWithMinBaseFee * change read_lock to write_lock --- core/txpool/legacypool/legacypool.go | 49 +++++++++++++++++++++------- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 58dbbd91f01c..85ca577d9324 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -95,14 +95,17 @@ var ( // that this number is pretty low, since txpool reorgs happen very frequently. dropBetweenReorgHistogram = metrics.NewRegisteredHistogram("txpool/dropbetweenreorg", nil, metrics.NewExpDecaySample(1028, 0.015)) - pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) - queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) - localGauge = metrics.NewRegisteredGauge("txpool/local", nil) - slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) + pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) + realPendingGauge = metrics.NewRegisteredGauge("txpool/real_pending", nil) + queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) + realQueuedGauge = metrics.NewRegisteredGauge("txpool/real_queued", nil) + localGauge = metrics.NewRegisteredGauge("txpool/local", nil) + slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil) - txLifecycleTimer = metrics.NewRegisteredTimer("txpool/txfifecycle", nil) + txLifecycleTimer = metrics.NewRegisteredTimer("txpool/txfifecycle", nil) + statsWithMinBaseFeeTimer = metrics.NewRegisteredTimer("txpool/stats_min_base_fee", nil) ) // BlockChain defines the minimal set of methods needed to back a tx pool with @@ -229,11 +232,13 @@ type LegacyPool struct { queueTxEventCh chan *types.Transaction reorgDoneCh chan chan struct{} reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop - reorgPauseCh chan bool // requests to pause scheduleReorgLoop wg sync.WaitGroup // tracks loop, scheduleReorgLoop initDoneCh chan struct{} // is closed once the pool is initialized (for tests) changesSinceReorg int // A counter for how many drops we've performed in-between reorg. + + reorgPauseCh chan bool // requests to pause scheduleReorgLoop + realTxActivityShutdownCh chan struct{} } type txpoolResetRequest struct { @@ -261,8 +266,10 @@ func New(config Config, chain BlockChain) *LegacyPool { queueTxEventCh: make(chan *types.Transaction), reorgDoneCh: make(chan chan struct{}), reorgShutdownCh: make(chan struct{}), - reorgPauseCh: make(chan bool), initDoneCh: make(chan struct{}), + + reorgPauseCh: make(chan bool), + realTxActivityShutdownCh: make(chan struct{}), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -332,6 +339,21 @@ func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool return nil } +func (pool *LegacyPool) periodicallyCalculateRealTxActivity() { + defer pool.wg.Done() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + pool.StatsWithMinBaseFee(pool.chain.CurrentBlock().BaseFee) + case <-pool.realTxActivityShutdownCh: + log.Info("Real tx activity calculation stopped") + return + } + } +} + // loop is the transaction pool's main event loop, waiting for and reacting to // outside blockchain events as well as for various reporting and transaction // eviction events. @@ -488,10 +510,12 @@ func (pool *LegacyPool) stats() (int, int) { // StatsWithMinBaseFee retrieves the current pool stats, namely the number of pending and the // number of queued (non-executable) transactions greater equal minBaseFee. func (pool *LegacyPool) StatsWithMinBaseFee(minBaseFee *big.Int) (int, int) { - pool.mu.RLock() - defer pool.mu.RUnlock() - - return pool.statsWithMinBaseFee(minBaseFee) + statsStart := time.Now() + pool.mu.Lock() + pendingTxs, queuedTxs := pool.statsWithMinBaseFee(minBaseFee) + pool.mu.Unlock() + statsWithMinBaseFeeTimer.UpdateSince(statsStart) + return pendingTxs, queuedTxs } // statsWithMinBaseFee retrieves the current pool stats, namely the number of pending and the @@ -506,6 +530,7 @@ func (pool *LegacyPool) statsWithMinBaseFee(minBaseFee *big.Int) (int, int) { pending++ } } + realPendingGauge.Update(int64(pending)) queued := 0 for _, list := range pool.queue { @@ -516,6 +541,8 @@ func (pool *LegacyPool) statsWithMinBaseFee(minBaseFee *big.Int) (int, int) { queued++ } } + realQueuedGauge.Update(int64(queued)) + return pending, queued }