Skip to content

Commit

Permalink
Merge 5b00017 into 0f90d82
Browse files Browse the repository at this point in the history
  • Loading branch information
ucwong authored Sep 28, 2023
2 parents 0f90d82 + 5b00017 commit 44d33c2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 14 deletions.
13 changes: 13 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,19 @@ func (tx *Transaction) To() *common.Address {
return copyAddressPtr(tx.data.Recipient)
}

// SetTime sets the decoding time of a transaction. This is used by tests to set
// arbitrary times and by persistent transaction pools when loading old txs from
// disk.
func (tx *Transaction) SetTime(t time.Time) {
tx.time = t
}

// Time returns the time when the transaction was first seen on the network. It
// is a heuristic to prefer mining older txs vs new all other things equal.
func (tx *Transaction) Time() time.Time {
return tx.time
}

// Hash hashes the RLP encoding of tx.
// It uniquely identifies the transaction.
func (tx *Transaction) Hash() common.Hash {
Expand Down
38 changes: 24 additions & 14 deletions ctxc/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
"time"

"github.com/CortexFoundation/CortexTheseus/common"
"github.com/CortexFoundation/CortexTheseus/common/lru"
"github.com/CortexFoundation/CortexTheseus/common/mclock"
"github.com/CortexFoundation/CortexTheseus/core/txpool"
"github.com/CortexFoundation/CortexTheseus/core/types"
"github.com/CortexFoundation/CortexTheseus/log"
"github.com/CortexFoundation/CortexTheseus/metrics"
mapset "github.com/deckarep/golang-set/v2"
)

const (
Expand All @@ -53,6 +53,9 @@ const (
// re-request them.
maxTxUnderpricedSetSize = 32768

// maxTxUnderpricedTimeout is the max time a transaction should be stuck in the underpriced set.
maxTxUnderpricedTimeout = int64(5 * time.Minute)

// txArriveTimeout is the time allowance before an announced transaction is
// explicitly requested.
txArriveTimeout = 500 * time.Millisecond
Expand Down Expand Up @@ -148,7 +151,7 @@ type TxFetcher struct {
drop chan *txDrop
quit chan struct{}

underpriced mapset.Set[common.Hash] // Transactions discarded as too cheap (don't re-fetch)
underpriced *lru.Cache[common.Hash, int64] // Transactions discarded as too cheap (don't re-fetch)

// Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
Expand Down Expand Up @@ -202,7 +205,7 @@ func NewTxFetcherForTests(
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: mapset.NewSet[common.Hash](),
underpriced: lru.NewCache[common.Hash, int64](maxTxUnderpricedSetSize),
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
Expand All @@ -223,15 +226,16 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
// still valuable to check here because it runs concurrent to the internal
// loop, so anything caught here is time saved internally.
var (
unknowns = make([]common.Hash, 0, len(hashes))
duplicate, underpriced int64
unknowns = make([]common.Hash, 0, len(hashes))
duplicate int64
underpriced int64
)
for _, hash := range hashes {
switch {
case f.hasTx(hash):
duplicate++

case f.underpriced.Contains(hash):
case f.isKnownUnderpriced(hash):
underpriced++

default:
Expand All @@ -245,10 +249,9 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
if len(unknowns) == 0 {
return nil
}
announce := &txAnnounce{
origin: peer,
hashes: unknowns,
}

announce := &txAnnounce{origin: peer, hashes: unknowns}

select {
case f.notify <- announce:
return nil
Expand All @@ -257,6 +260,16 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
}
}

// isKnownUnderpriced reports whether a transaction hash was recently found to be underpriced.
func (f *TxFetcher) isKnownUnderpriced(hash common.Hash) bool {
prevTime, ok := f.underpriced.Peek(hash)
if ok && prevTime+maxTxUnderpricedTimeout < time.Now().Unix() {
f.underpriced.Remove(hash)
return false
}
return ok
}

// Enqueue imports a batch of received transaction into the transaction pool
// and the fetcher. This method may be called by both transaction broadcasts and
// direct request replies. The differentiation is important so the fetcher can
Expand Down Expand Up @@ -299,10 +312,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
// Avoid re-request this transaction when we receive another
// announcement.
if errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) {
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
f.underpriced.Pop()
}
f.underpriced.Add(batch[j].Hash())
f.underpriced.Add(batch[j].Hash(), batch[j].Time().Unix())
}
// Track a few interesting failure types
switch {
Expand Down

0 comments on commit 44d33c2

Please sign in to comment.