From 0eb577e88e0d4115e714761abe30811a43db38e5 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Mon, 10 Jun 2024 15:43:54 -0500 Subject: [PATCH] server: Cache advertised txns. This adds a cache to house transactions that have recently been advertised to other peers. It makes use of the new container/lru module to handle automatic expiration of entries and maximum entry limiting. The rationale for this change is that it is considered misbehavior to advertise a transaction and then claim it is not found when the corresponding request arrives. Maintaining a separate cache of advertised transactions for a short period of time after they were advertised significantly increases the probability they are available to serve when a request for the advertisement arrives independent of the current status of the unconfirmed transaction mempool. --- server.go | 144 ++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 130 insertions(+), 14 deletions(-) diff --git a/server.go b/server.go index 9a871cbb1..96b045364 100644 --- a/server.go +++ b/server.go @@ -32,6 +32,7 @@ import ( "github.com/decred/dcrd/chaincfg/v3" "github.com/decred/dcrd/connmgr/v3" "github.com/decred/dcrd/container/apbf" + "github.com/decred/dcrd/container/lru" "github.com/decred/dcrd/crypto/rand" "github.com/decred/dcrd/database/v3" "github.com/decred/dcrd/dcrutil/v4" @@ -149,6 +150,20 @@ const ( // These values result in about 183 KiB memory usage including overhead. maxRecentlyConfirmedTxns = 23000 recentlyConfirmedTxnsFPRate = 0.000001 + + // These fields are used when caching recently advertised transactions. + // + // maxRecentlyAdvertisedTxns specifies the maximum number to cache and is + // set to target tracking the maximum number transactions of the minimum + // realistic size (~206 bytes) in approximately two blocks on the main + // network plus an additional 20%. + // + // recentlyAdvertisedTxnsTTL is the time to keep recently advertised + // transactions in the cache before they are expired. + // + // These values result in about 640 KiB memory usage including overhead. + maxRecentlyAdvertisedTxns = 4500 + recentlyAdvertisedTxnsTTL = 45 * time.Second ) var ( @@ -580,6 +595,45 @@ type server struct { // recentlyConfirmedTxns tracks transactions that have been confirmed in the // most recent blocks. recentlyConfirmedTxns *apbf.Filter + + // recentlyAdvertisedTxns caches transactions that have recently been + // advertised to other peers. The cache handles automatic expiration and + // maximum entry limiting. + // + // It is considered misbehavior to advertise a transaction and then claim it + // is not found when the corresponding request arrives. Further, since the + // mempool only contains the unconfirmed transactions as of the current best + // chain tip, a transaction might be advertised when it is first added to + // the mempool and then removed from the mempool prior to it being requested + // in the case new blocks are connected in between the advertisement and + // request. + // + // Thus, maintaining a separate cache of advertised transactions increases + // the probability they are available to serve regardless of whether or not + // they are still in the mempool when a request for the advertisement + // arrives. + // + // Note that it might be tempting to keep track of the number of times a tx + // has been advertised and requested so it can be removed from the cache as + // soon as there are no longer any potential outstanding requests, however, + // that is intentionally not done because it is exceedingly rare for + // advertisements to result in a request from all peers, so the extra + // overhead is not warranted. + recentlyAdvertisedTxns *lru.Map[chainhash.Hash, *dcrutil.Tx] + + // The following fields are used to periodically log the total number + // evicted recently advertised transactions. They are only accessed from + // a single long-running goroutine, so they are not protected for concurrent + // access. + // + // totalAdvertisedTxnsEvicted is the total number of advertised transactions + // that have been evicted from the cache since the previous report. + // + // lastAdvertisedTxnsEvictedLogged is the last time the total number of + // advertised transactions that have been evicted from the cache was + // reported. + totalAdvertisedTxnsEvicted uint64 + lastAdvertisedTxnsEvictedLogged time.Time } // serverPeer extends the peer to maintain state shared by the server. @@ -678,19 +732,29 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect, var dataMsg wire.Message switch iv.Type { case wire.InvTypeTx: - // Attempt to fetch the requested transaction from the pool. A call - // could be made to check for existence first, but simply trying to - // fetch a missing transaction results in the same behavior. Do not - // allow peers to request transactions already in a block but are - // unconfirmed, as they may be expensive. Restrict that to the - // authenticated RPC only. + // Attempt to fetch the requested transaction. Try the pool of + // recently advertised transactions first and then fall back to the + // mempool. + // + // Note that this does not allow peers to request transactions + // already in a block over p2p unless they still happen to be in the + // pool of advertised transactions, as that would require all nodes + // to maintain a full transaction index which can be expensive. + // That ability is restricted to authenticated RPC only and requires + // the aforementioned full transaction index. txHash := &iv.Hash - tx, err := sp.server.txMemPool.FetchTransaction(txHash) - if err != nil { - peerLog.Debugf("Unable to fetch tx %v from the "+ - "transaction pool for %v: %v", txHash, - sp, err) - break + tx, ok := sp.server.recentlyAdvertisedTxns.Get(*txHash) + if !ok { + // Note that a call could be made to check for existence first, + // but simply trying to fetch a missing transaction results in + // the same behavior. + var err error + tx, err = sp.server.txMemPool.FetchTransaction(txHash) + if err != nil { + peerLog.Debugf("Unable to fetch tx %v from transaction "+ + "pool for peer %s: %v", txHash, sp, err) + break + } } dataMsg = tx.MsgTx() @@ -698,8 +762,8 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect, blockHash := &iv.Hash block, err := sp.server.chain.BlockByHash(blockHash) if err != nil { - peerLog.Debugf("Unable to fetch block hash %v "+ - "for %v: %v", blockHash, sp, err) + peerLog.Debugf("Unable to fetch block hash %v for peer %s: %v", + blockHash, sp, err) break } dataMsg = block.MsgBlock() @@ -1965,6 +2029,33 @@ func (s *server) TransactionConfirmed(tx *dcrutil.Tx) { } } +// maybeLogRecentlyAdvertisedNumEvicted periodically logs the total number of +// evicted advertised transactions. +// +// It is NOT safe for concurrent access. +func (s *server) maybeLogRecentlyAdvertisedNumEvicted() { + // Do not log anything when there haven't been any evicted transactions. + totalEvicted := s.totalAdvertisedTxnsEvicted + if totalEvicted == 0 { + return + } + + // Only log a maximum of once per day. + const logInterval = 24 * time.Hour + sinceLastLogged := time.Since(s.lastAdvertisedTxnsEvictedLogged) + if sinceLastLogged < logInterval { + return + } + + srvrLog.Debugf("Evicted %d advertised %s in the last %v (%d remaining, "+ + "%.2f%% hit ratio)", totalEvicted, pickNoun(totalEvicted, "tx", "txns"), + sinceLastLogged.Truncate(time.Second), s.recentlyAdvertisedTxns.Len(), + s.recentlyAdvertisedTxns.HitRatio()) + + s.totalAdvertisedTxnsEvicted = 0 + s.lastAdvertisedTxnsEvictedLogged = time.Now() +} + // handleRelayInvMsg deals with relaying inventory to peers that are not already // known to have it. It is invoked from the peerHandler goroutine. func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) { @@ -2014,6 +2105,28 @@ func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) { if sp.disableRelayTx.Load() { return } + + // Track advertised transactions for a period of time in order to + // increase the probability they are available to serve regardless + // of whether or not they are still in the mempool when a request + // for the advertisement arrives. Note that it is still possible + // for the advertised transaction to not be available in the case it + // is later removed from the advertised transactions pool due to + // expiration and/or exceeding the maximum limits prior to the + // request for it arriving. However, not only is that case + // extremely rare in practice, the transaction is also likely still + // in the mempool in that case and will be served from there given + // peers generally do not request transactions that have been + // recently confirmed. + tx, ok := msg.data.(*dcrutil.Tx) + if !ok { + peerLog.Warnf("Underlying data for inventory vector is not a " + + " transaction") + return + } + numEvicted := s.recentlyAdvertisedTxns.Put(iv.Hash, tx) + s.totalAdvertisedTxnsEvicted += uint64(numEvicted) + s.maybeLogRecentlyAdvertisedNumEvicted() } if iv.Type == wire.InvTypeMix { @@ -3703,6 +3816,9 @@ func newServer(ctx context.Context, profiler *profileServer, recentlyConfirmedTxnsFPRate), indexSubscriber: indexers.NewIndexSubscriber(ctx), quit: make(chan struct{}), + recentlyAdvertisedTxns: lru.NewMapWithDefaultTTL[chainhash.Hash, + *dcrutil.Tx](maxRecentlyAdvertisedTxns, recentlyAdvertisedTxnsTTL), + lastAdvertisedTxnsEvictedLogged: time.Now(), } // Convert the minimum known work to a uint256 when it exists. Ideally, the