Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep track of recently rejected transactions. #257

Merged
merged 2 commits into from
Jun 1, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 109 additions & 9 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ package main
import (
"bytes"
"container/list"
crand "crypto/rand"
"encoding/gob"
"errors"
"fmt"
"io/ioutil"
"math/big"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -44,6 +46,18 @@ const (
// maxResendLimit is the maximum number of times a node can resend a
// block or transaction before it is dropped.
maxResendLimit = 3

// maxRejectedTxns is the maximum number of rejected transactions
// shas to store in memory.
maxRejectedTxns = 1000

// maxRequestedBlocks is the maximum number of requested block
// shas to store in memory.
maxRequestedBlocks = wire.MaxInvPerMsg

// maxRequestedTxns is the maximum number of requested transactions
// shas to store in memory.
maxRequestedTxns = wire.MaxInvPerMsg
)

// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
Expand Down Expand Up @@ -561,6 +575,7 @@ type blockManager struct {
started int32
shutdown int32
blockChain *blockchain.BlockChain
rejectedTxns map[chainhash.Hash]struct{}
requestedTxns map[chainhash.Hash]struct{}
requestedEverTxns map[chainhash.Hash]uint8
requestedBlocks map[chainhash.Hash]struct{}
Expand Down Expand Up @@ -713,6 +728,11 @@ func (b *blockManager) startSync(peers *list.List) {

// Start syncing from the best peer if one was selected.
if bestPeer != nil {
// Clear the requestedBlocks if the sync peer changes, otherwise
// we may ignore blocks we need that the last sync peer failed
// to send.
b.requestedBlocks = make(map[chainhash.Hash]struct{})

locator, err := b.blockChain.LatestBlockLocator()
if err != nil {
bmgrLog.Errorf("Failed to get block locator for the "+
Expand Down Expand Up @@ -912,6 +932,16 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// spec to proliferate. While this is not ideal, there is no check here
// to disconnect peers for sending unsolicited transactions to provide
// interoperability.
txHash := tmsg.tx.Sha()

// Ignore transactions that we have already rejected. Do not
// send a reject message here because if the transaction was already
// rejected, the transaction was unsolicited.
if _, exists := b.rejectedTxns[*txHash]; exists {
bmgrLog.Debugf("Ignoring unsolicited previously rejected "+
"transaction %v from %s", txHash, tmsg.peer)
return
}

// Process the transaction to include validation, insertion in the
// memory pool, orphan handling, etc.
Expand All @@ -923,11 +953,20 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// already knows about it and as such we shouldn't have any more
// instances of trying to fetch it, or we failed to insert and thus
// we'll retry next time we get an inv.
txHash := tmsg.tx.Sha()
delete(tmsg.peer.requestedTxns, *txHash)
delete(b.requestedTxns, *txHash)

if err != nil {
// Do not request this transaction again until a new block
// has been processed.
b.rejectedTxns[*txHash] = struct{}{}
lerr := b.limitMap(b.rejectedTxns, maxRejectedTxns)
if lerr != nil {
bmgrLog.Warnf("Failed to limit the number of "+
"rejected transactions: %v", lerr)
delete(b.rejectedTxns, *txHash)
}

// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
// so log it as such. Otherwise, something really did go wrong,
Expand Down Expand Up @@ -1363,20 +1402,19 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
b.server.txMemPool.PruneExpiredTx(b.chainState.newestHeight)
}

b.updateChainState(newestSha,
newestHeight,
lotteryData.finalState,
lotteryData.poolSize,
nextStakeDiff,
lotteryData.ntfnData.Tickets,
missedTickets,
curBlockHeader)
b.updateChainState(newestSha, newestHeight,
lotteryData.finalState, lotteryData.poolSize,
nextStakeDiff, lotteryData.ntfnData.Tickets,
missedTickets, curBlockHeader)

// Update this peer's latest block height, for future
// potential sync node candidancy.
heightUpdate = int32(newestHeight)
blkShaUpdate = newestSha

// Clear the rejected transactions.
b.rejectedTxns = make(map[chainhash.Hash]struct{})

// Allow any clients performing long polling via the
// getblocktemplate RPC to be notified when the new block causes
// their old block template to become stale.
Expand Down Expand Up @@ -1702,6 +1740,14 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
continue
}
if !haveInv {
if iv.Type == wire.InvTypeTx {
// Skip the transaction if it has already been
// rejected.
if _, exists := b.rejectedTxns[iv.Hash]; exists {
continue
}
}

// Add it to the request queue.
imsg.peer.requestQueue = append(imsg.peer.requestQueue, iv)
continue
Expand Down Expand Up @@ -1765,6 +1811,16 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
if _, exists := b.requestedBlocks[iv.Hash]; !exists {
b.requestedBlocks[iv.Hash] = struct{}{}
b.requestedEverBlocks[iv.Hash] = 0
err := b.limitMap(b.requestedBlocks,
maxRequestedBlocks)
if err != nil {
bmgrLog.Warnf("Failed to limit the "+
"number of requested "+
"blocks: %v", err)
delete(b.requestedBlocks, iv.Hash)
continue
}

imsg.peer.requestedBlocks[iv.Hash] = struct{}{}
gdmsg.AddInvVect(iv)
numRequested++
Expand All @@ -1776,6 +1832,15 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
if _, exists := b.requestedTxns[iv.Hash]; !exists {
b.requestedTxns[iv.Hash] = struct{}{}
b.requestedEverTxns[iv.Hash] = 0
err := b.limitMap(b.requestedTxns,
maxRequestedTxns)
if err != nil {
bmgrLog.Warnf("Failed to limit the "+
"number of requested "+
"transactions: %v", err)
delete(b.requestedTxns, iv.Hash)
continue
}
imsg.peer.requestedTxns[iv.Hash] = struct{}{}
gdmsg.AddInvVect(iv)
numRequested++
Expand All @@ -1792,6 +1857,40 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
}
}

// limitMap is a helper function for maps that require a maximum limit by
// evicting a random rejected transaction if adding a new value would cause it
// to overflow the maximum allowed.
func (b *blockManager) limitMap(m map[chainhash.Hash]struct{}, limit int) error {
if len(m)+1 > limit {
// Generate a cryptographically random hash.
randHashBytes := make([]byte, chainhash.HashSize)
_, err := crand.Read(randHashBytes)
if err != nil {
return err
}
randHashNum := new(big.Int).SetBytes(randHashBytes)

// Try to find the first entry that is greater than the random
// hash. Use the first entry (which is already pseudorandom due
// to Go's range statement over maps) as a fallback if none of
// the hashes in the map are larger than the random hash.
var foundHash *chainhash.Hash
for txHash := range m {
if foundHash == nil {
foundHash = &txHash
}
txHashNum := blockchain.ShaHashToBig(&txHash)
if txHashNum.Cmp(randHashNum) > 0 {
foundHash = &txHash
break
}
}
delete(m, *foundHash)
}

return nil
}

// blockHandler is the main handler for the block manager. It must be run
// as a goroutine. It processes block and inv messages in a separate goroutine
// from the peer handlers so the block (MsgBlock) messages are handled by a
Expand Down Expand Up @@ -2907,6 +3006,7 @@ func newBlockManager(s *server) (*blockManager, error) {

bm := blockManager{
server: s,
rejectedTxns: make(map[chainhash.Hash]struct{}),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedEverTxns: make(map[chainhash.Hash]uint8),
requestedBlocks: make(map[chainhash.Hash]struct{}),
Expand Down