Skip to content

Commit

Permalink
server: Cache advertised txns.
Browse files Browse the repository at this point in the history
This adds a cache to house transactions that have recently been
advertised to other peers.  It makes use of the new container/lru module
to handle automatic expiration of entries and maximum entry limiting.

The rationale for this change is that it is considered misbehavior to
advertise a transaction and then claim it is not found when the
corresponding request arrives.  Maintaining a separate cache of
advertised transactions for a short period of time after they were
advertised significantly increases the probability they are available to
serve when a request for the advertisement arrives independent of the
current status of the unconfirmed transaction mempool.
  • Loading branch information
davecgh committed Jun 14, 2024
1 parent 1c2bd15 commit 0498261
Showing 1 changed file with 130 additions and 14 deletions.
144 changes: 130 additions & 14 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/decred/dcrd/chaincfg/v3"
"github.com/decred/dcrd/connmgr/v3"
"github.com/decred/dcrd/container/apbf"
"github.com/decred/dcrd/container/lru"
"github.com/decred/dcrd/crypto/rand"
"github.com/decred/dcrd/database/v3"
"github.com/decred/dcrd/dcrutil/v4"
Expand Down Expand Up @@ -149,6 +150,20 @@ const (
// These values result in about 183 KiB memory usage including overhead.
maxRecentlyConfirmedTxns = 23000
recentlyConfirmedTxnsFPRate = 0.000001

// These fields are used when caching recently advertised transactions.
//
// maxRecentlyAdvertisedTxns specifies the maximum number to cache and is
// set to target tracking the maximum number transactions of the minimum
// realistic size (~206 bytes) in approximately two blocks on the main
// network plus an additional 20%.
//
// recentlyAdvertisedTxnsTTL is the time to keep recently advertised
// transactions in the cache before they are expired.
//
// These values result in about 640 KiB memory usage including overhead.
maxRecentlyAdvertisedTxns = 4500
recentlyAdvertisedTxnsTTL = 45 * time.Second
)

var (
Expand Down Expand Up @@ -580,6 +595,45 @@ type server struct {
// recentlyConfirmedTxns tracks transactions that have been confirmed in the
// most recent blocks.
recentlyConfirmedTxns *apbf.Filter

// recentlyAdvertisedTxns caches transactions that have recently been
// advertised to other peers. The cache handles automatic expiration and
// maximum entry limiting.
//
// It is considered misbehavior to advertise a transaction and then claim it
// is not found when the corresponding request arrives. Further, since the
// mempool only contains the unconfirmed transactions as of the current best
// chain tip, a transaction might be advertised when it is first added to
// the mempool and then removed from the mempool prior to it being requested
// in the case new blocks are connected in between the advertisement and
// request.
//
// Thus, maintaining a separate cache of advertised transactions increases
// the probability they are available to serve regardless of whether or not
// they are still in the mempool when a request for the advertisement
// arrives.
//
// Note that it might be tempting to keep track of the number of times a tx
// has been advertised and requested so it can be removed from the cache as
// soon as there are no longer any potential outstanding requests, however,
// that is intentionally not done because it is exceedingly rare for
// advertisements to result in a request from all peers, so the extra
// overhead is not warranted.
recentlyAdvertisedTxns *lru.Map[chainhash.Hash, *dcrutil.Tx]

// The following fields are used to periodically log the total number
// evicted recently advertised transactions. They are only accessed from
// a single long-running goroutine, so they are not protected for concurrent
// access.
//
// totalAdvertisedTxnsEvicted is the total number of advertised transactions
// that have been evicted from the cache since the previous report.
//
// lastAdvertisedTxnsEvictedLogged is the last time the total number of
// advertised transactions that have been evicted from the cache was
// reported.
totalAdvertisedTxnsEvicted uint64
lastAdvertisedTxnsEvictedLogged time.Time
}

// serverPeer extends the peer to maintain state shared by the server.
Expand Down Expand Up @@ -678,28 +732,38 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect,
var dataMsg wire.Message
switch iv.Type {
case wire.InvTypeTx:
// Attempt to fetch the requested transaction from the pool. A call
// could be made to check for existence first, but simply trying to
// fetch a missing transaction results in the same behavior. Do not
// allow peers to request transactions already in a block but are
// unconfirmed, as they may be expensive. Restrict that to the
// authenticated RPC only.
// Attempt to fetch the requested transaction. Try the pool of
// recently advertised transactions first and then fall back to the
// mempool.
//
// Note that this does not allow peers to request transactions
// already in a block over p2p unless they still happen to be in the
// pool of advertised transactions, as that would require all nodes
// to maintain a full transaction index which can be expensive.
// That ability is restricted to authenticated RPC only and requires
// the aforementioned full transaction index.
txHash := &iv.Hash
tx, err := sp.server.txMemPool.FetchTransaction(txHash)
if err != nil {
peerLog.Debugf("Unable to fetch tx %v from the "+
"transaction pool for %v: %v", txHash,
sp, err)
break
tx, ok := sp.server.recentlyAdvertisedTxns.Get(*txHash)
if !ok {
// Note that a call could be made to check for existence first,
// but simply trying to fetch a missing transaction results in
// the same behavior.
var err error
tx, err = sp.server.txMemPool.FetchTransaction(txHash)
if err != nil {
peerLog.Debugf("Unable to fetch tx %v from transaction "+
"pool for peer %s: %v", txHash, sp, err)
break
}
}
dataMsg = tx.MsgTx()

case wire.InvTypeBlock:
blockHash := &iv.Hash
block, err := sp.server.chain.BlockByHash(blockHash)
if err != nil {
peerLog.Debugf("Unable to fetch block hash %v "+
"for %v: %v", blockHash, sp, err)
peerLog.Debugf("Unable to fetch block hash %v for peer %s: %v",
blockHash, sp, err)
break
}
dataMsg = block.MsgBlock()
Expand Down Expand Up @@ -1965,6 +2029,33 @@ func (s *server) TransactionConfirmed(tx *dcrutil.Tx) {
}
}

// maybeLogRecentlyAdvertisedNumEvicted periodically logs the total number of
// evicted advertised transactions.
//
// It is NOT safe for concurrent access.
func (s *server) maybeLogRecentlyAdvertisedNumEvicted() {
// Do not log anything when there haven't been any evicted transactions.
totalEvicted := s.totalAdvertisedTxnsEvicted
if totalEvicted == 0 {
return
}

// Only log a maximum of once per day.
const logInterval = 24 * time.Hour
sinceLastLogged := time.Since(s.lastAdvertisedTxnsEvictedLogged)
if sinceLastLogged < logInterval {
return
}

srvrLog.Debugf("Evicted %d advertised %s in the last %v (%d remaining, "+
"%.2f%% hit ratio)", totalEvicted, pickNoun(totalEvicted, "tx", "txns"),
sinceLastLogged.Truncate(time.Second), s.recentlyAdvertisedTxns.Len(),
s.recentlyAdvertisedTxns.HitRatio())

s.totalAdvertisedTxnsEvicted = 0
s.lastAdvertisedTxnsEvictedLogged = time.Now()
}

// handleRelayInvMsg deals with relaying inventory to peers that are not already
// known to have it. It is invoked from the peerHandler goroutine.
func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) {
Expand Down Expand Up @@ -2014,6 +2105,28 @@ func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) {
if sp.disableRelayTx.Load() {
return
}

// Track advertised transactions for a period of time in order to
// increase the probability they are available to serve regardless
// of whether or not they are still in the mempool when a request
// for the advertisement arrives. Note that it is still possible
// for the advertised transaction to not be available in the case it
// is later removed from the advertised transactions pool due to
// expiration and/or exceeding the maximum limits prior to the
// request for it arriving. However, not only is that case
// extremely rare in practice, the transaction is also likely still
// in the mempool in that case and will be served from there given
// peers generally do not request transactions that have been
// recently confirmed.
tx, ok := msg.data.(*dcrutil.Tx)
if !ok {
peerLog.Warnf("Underlying data for inventory vector is not a " +
" transaction")
return
}
numEvicted := s.recentlyAdvertisedTxns.Put(iv.Hash, tx)
s.totalAdvertisedTxnsEvicted += uint64(numEvicted)
s.maybeLogRecentlyAdvertisedNumEvicted()
}

if iv.Type == wire.InvTypeMix {
Expand Down Expand Up @@ -3703,6 +3816,9 @@ func newServer(ctx context.Context, profiler *profileServer,
recentlyConfirmedTxnsFPRate),
indexSubscriber: indexers.NewIndexSubscriber(ctx),
quit: make(chan struct{}),
recentlyAdvertisedTxns: lru.NewMapWithDefaultTTL[chainhash.Hash,
*dcrutil.Tx](maxRecentlyAdvertisedTxns, recentlyAdvertisedTxnsTTL),
lastAdvertisedTxnsEvictedLogged: time.Now(),
}

// Convert the minimum known work to a uint256 when it exists. Ideally, the
Expand Down

0 comments on commit 0498261

Please sign in to comment.