Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: handle notfound #1603

Merged
merged 1 commit into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ type headersMsg struct {
peer *peerpkg.Peer
}

// notFoundMsg packages a bitcoin notfound message and the peer it came from
// together so the block handler has access to that information.
type notFoundMsg struct {
notFound *wire.MsgNotFound
peer *peerpkg.Peer
}

// donePeerMsg signifies a newly disconnected peer to the block handler.
type donePeerMsg struct {
peer *peerpkg.Peer
Expand Down Expand Up @@ -1012,6 +1019,32 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
}
}

// handleNotFoundMsg handles notfound messages from all peers.
func (sm *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
peer := nfmsg.peer
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received notfound message from unknown peer %s", peer)
return
}
for _, inv := range nfmsg.notFound.InvList {
// verify the hash was actually announced by the peer
// before deleting from the global requested maps.
switch inv.Type {
case wire.InvTypeBlock:
if _, exists := state.requestedBlocks[inv.Hash]; exists {
delete(state.requestedBlocks, inv.Hash)
delete(sm.requestedBlocks, inv.Hash)
}
case wire.InvTypeTx:
if _, exists := state.requestedTxns[inv.Hash]; exists {
delete(state.requestedTxns, inv.Hash)
delete(sm.requestedTxns, inv.Hash)
}
}
}
}

// haveInventory returns whether or not the inventory represented by the passed
// inventory vector is known. This includes checking all of the various places
// inventory can be when it is in different states such as blocks that are part
Expand Down Expand Up @@ -1293,6 +1326,9 @@ out:
case *headersMsg:
sm.handleHeadersMsg(msg)

case *notFoundMsg:
sm.handleNotFoundMsg(msg)

case *donePeerMsg:
sm.handleDonePeerMsg(msg.peer)

Expand Down Expand Up @@ -1490,6 +1526,18 @@ func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer
sm.msgChan <- &headersMsg{headers: headers, peer: peer}
}

// QueueNotFound adds the passed notfound message and peer to the block handling
// queue.
func (sm *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) {
// No channel handling here because peers do not need to block on
// reject messages.
if atomic.LoadInt32(&sm.shutdown) != 0 {
return
}

sm.msgChan <- &notFoundMsg{notFound: notFound, peer: peer}
}

// DonePeer informs the blockmanager that a peer has disconnected.
func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) {
// Ignore if we are shutting down.
Expand Down
57 changes: 51 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,14 @@ func (sp *serverPeer) pushAddrMsg(addresses []*wire.NetAddress) {
// threshold, a warning is logged including the reason provided. Further, if
// the score is above the ban threshold, the peer will be banned and
// disconnected.
func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) bool {
// No warning is logged and no score is calculated if banning is disabled.
if cfg.DisableBanning {
return
return false
}
if sp.isWhitelisted {
peerLog.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason)
return
return false
}

warnThreshold := cfg.BanThreshold >> 1
Expand All @@ -383,7 +383,7 @@ func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
peerLog.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+
"it was not increased this time", sp, reason, score)
}
return
return false
}
score := sp.banScore.Increase(persistent, transient)
if score > warnThreshold {
Expand All @@ -394,8 +394,10 @@ func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
sp)
sp.server.BanPeer(sp)
sp.Disconnect()
return true
}
}
return false
}

// hasServices returns whether or not the provided advertised service flags have
Expand Down Expand Up @@ -498,7 +500,9 @@ func (sp *serverPeer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) {
// The ban score accumulates and passes the ban threshold if a burst of
// mempool messages comes from a peer. The score decays each minute to
// half of its value.
sp.addBanScore(0, 33, "mempool")
if sp.addBanScore(0, 33, "mempool") {
return
}

// Generate inventory message with the available transactions in the
// transaction memory pool. Limit it to the max allowed inventory
Expand Down Expand Up @@ -638,7 +642,9 @@ func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
// bursts of small requests are not penalized as that would potentially ban
// peers performing IBD.
// This incremental score decays each minute to half of its value.
sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata")
if sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") {
return
}

// We wait on this wait channel periodically to prevent queuing
// far more data than we can send in a reasonable time, wasting memory.
Expand Down Expand Up @@ -1304,6 +1310,44 @@ func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, msg wire.Message,
sp.server.AddBytesSent(uint64(bytesWritten))
}

// OnNotFound is invoked when a peer sends a notfound message.
func (sp *serverPeer) OnNotFound(p *peer.Peer, msg *wire.MsgNotFound) {
if !sp.Connected() {
return
}

var numBlocks, numTxns uint32
for _, inv := range msg.InvList {
switch inv.Type {
case wire.InvTypeBlock:
numBlocks++
case wire.InvTypeTx:
numTxns++
default:
peerLog.Debugf("Invalid inv type '%d' in notfound message from %s",
inv.Type, sp)
sp.Disconnect()
return
}
}
if numBlocks > 0 {
blockStr := pickNoun(uint64(numBlocks), "block", "blocks")
reason := fmt.Sprintf("%d %v not found", numBlocks, blockStr)
if sp.addBanScore(20*numBlocks, 0, reason) {
return
}
}
if numTxns > 0 {
txStr := pickNoun(uint64(numTxns), "transaction", "transactions")
reason := fmt.Sprintf("%d %v not found", numBlocks, txStr)
if sp.addBanScore(0, 10*numTxns, reason) {
return
}
}

sp.server.syncManager.QueueNotFound(msg, p)
}

// randomUint16Number returns a random uint16 in a specified input range. Note
// that the range is in zeroth ordering; if you pass it 1800, you will get
// values from 0 to 1800.
Expand Down Expand Up @@ -1998,6 +2042,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
OnAddr: sp.OnAddr,
OnRead: sp.OnRead,
OnWrite: sp.OnWrite,
OnNotFound: sp.OnNotFound,

// Note: The reference client currently bans peers that send alerts
// not signed with its key. We could verify against their key, but
Expand Down