From 5b00017bc1de047c9b8d7ce3eabdf597ecb2c4a5 Mon Sep 17 00:00:00 2001 From: ucwong Date: Wed, 27 Sep 2023 04:19:30 +0800 Subject: [PATCH] allow underpriced transactions in after timeout --- core/types/transaction.go | 13 +++++++++++++ ctxc/fetcher/tx_fetcher.go | 38 ++++++++++++++++++++++++-------------- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/core/types/transaction.go b/core/types/transaction.go index cae325f013..735f8ba0ec 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -194,6 +194,19 @@ func (tx *Transaction) To() *common.Address { return copyAddressPtr(tx.data.Recipient) } +// SetTime sets the decoding time of a transaction. This is used by tests to set +// arbitrary times and by persistent transaction pools when loading old txs from +// disk. +func (tx *Transaction) SetTime(t time.Time) { + tx.time = t +} + +// Time returns the time when the transaction was first seen on the network. It +// is a heuristic to prefer mining older txs vs new all other things equal. +func (tx *Transaction) Time() time.Time { + return tx.time +} + // Hash hashes the RLP encoding of tx. // It uniquely identifies the transaction. func (tx *Transaction) Hash() common.Hash { diff --git a/ctxc/fetcher/tx_fetcher.go b/ctxc/fetcher/tx_fetcher.go index 3832d98fc5..a5338501fa 100644 --- a/ctxc/fetcher/tx_fetcher.go +++ b/ctxc/fetcher/tx_fetcher.go @@ -25,12 +25,12 @@ import ( "time" "github.com/CortexFoundation/CortexTheseus/common" + "github.com/CortexFoundation/CortexTheseus/common/lru" "github.com/CortexFoundation/CortexTheseus/common/mclock" "github.com/CortexFoundation/CortexTheseus/core/txpool" "github.com/CortexFoundation/CortexTheseus/core/types" "github.com/CortexFoundation/CortexTheseus/log" "github.com/CortexFoundation/CortexTheseus/metrics" - mapset "github.com/deckarep/golang-set/v2" ) const ( @@ -53,6 +53,9 @@ const ( // re-request them. maxTxUnderpricedSetSize = 32768 + // maxTxUnderpricedTimeout is the max time a transaction should be stuck in the underpriced set. + maxTxUnderpricedTimeout = int64(5 * time.Minute) + // txArriveTimeout is the time allowance before an announced transaction is // explicitly requested. txArriveTimeout = 500 * time.Millisecond @@ -148,7 +151,7 @@ type TxFetcher struct { drop chan *txDrop quit chan struct{} - underpriced mapset.Set[common.Hash] // Transactions discarded as too cheap (don't re-fetch) + underpriced *lru.Cache[common.Hash, int64] // Transactions discarded as too cheap (don't re-fetch) // Stage 1: Waiting lists for newly discovered transactions that might be // broadcast without needing explicit request/reply round trips. @@ -202,7 +205,7 @@ func NewTxFetcherForTests( fetching: make(map[common.Hash]string), requests: make(map[string]*txRequest), alternates: make(map[common.Hash]map[string]struct{}), - underpriced: mapset.NewSet[common.Hash](), + underpriced: lru.NewCache[common.Hash, int64](maxTxUnderpricedSetSize), hasTx: hasTx, addTxs: addTxs, fetchTxs: fetchTxs, @@ -223,15 +226,16 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { // still valuable to check here because it runs concurrent to the internal // loop, so anything caught here is time saved internally. var ( - unknowns = make([]common.Hash, 0, len(hashes)) - duplicate, underpriced int64 + unknowns = make([]common.Hash, 0, len(hashes)) + duplicate int64 + underpriced int64 ) for _, hash := range hashes { switch { case f.hasTx(hash): duplicate++ - case f.underpriced.Contains(hash): + case f.isKnownUnderpriced(hash): underpriced++ default: @@ -245,10 +249,9 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { if len(unknowns) == 0 { return nil } - announce := &txAnnounce{ - origin: peer, - hashes: unknowns, - } + + announce := &txAnnounce{origin: peer, hashes: unknowns} + select { case f.notify <- announce: return nil @@ -257,6 +260,16 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { } } +// isKnownUnderpriced reports whether a transaction hash was recently found to be underpriced. +func (f *TxFetcher) isKnownUnderpriced(hash common.Hash) bool { + prevTime, ok := f.underpriced.Peek(hash) + if ok && prevTime+maxTxUnderpricedTimeout < time.Now().Unix() { + f.underpriced.Remove(hash) + return false + } + return ok +} + // Enqueue imports a batch of received transaction into the transaction pool // and the fetcher. This method may be called by both transaction broadcasts and // direct request replies. The differentiation is important so the fetcher can @@ -299,10 +312,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) // Avoid re-request this transaction when we receive another // announcement. if errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) { - for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize { - f.underpriced.Pop() - } - f.underpriced.Add(batch[j].Hash()) + f.underpriced.Add(batch[j].Hash(), batch[j].Time().Unix()) } // Track a few interesting failure types switch {