Skip to content

Commit

Permalink
add semaphore to control push/delta concurrency
Browse files Browse the repository at this point in the history
This avoids stream storms that can clog the transient scope when we have a largish number of peers.
  • Loading branch information
vyzo committed Jan 21, 2022
1 parent da67311 commit c488a1c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
8 changes: 8 additions & 0 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const LibP2PVersion = "ipfs/0.1.0"

const ServiceName = "libp2p.identify"

const maxPushConcurrency = 32

// StreamReadTimeout is the read timeout on all incoming Identify family streams.
var StreamReadTimeout = 60 * time.Second

Expand Down Expand Up @@ -129,6 +131,10 @@ type idService struct {

addPeerHandlerCh chan addPeerHandlerReq
rmPeerHandlerCh chan rmPeerHandlerReq

// pushSemaphore limits the push/delta concurrency to avoid storms
// that clog the transient scope.
pushSemaphore chan struct{}
}

// NewIDService constructs a new *idService and activates it by
Expand All @@ -154,6 +160,8 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {

addPeerHandlerCh: make(chan addPeerHandlerReq),
rmPeerHandlerCh: make(chan rmPeerHandlerReq),

pushSemaphore: make(chan struct{}, maxPushConcurrency),
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())

Expand Down
10 changes: 10 additions & 0 deletions p2p/protocol/identify/peer_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ func (ph *peerHandler) sendDelta(ctx context.Context) error {
return nil
}

ph.ids.pushSemaphore <- struct{}{}
defer func() {
<-ph.ids.pushSemaphore
}()

// extract a delta message, updating the last state.
mes := ph.nextDelta()
if mes == nil || (len(mes.AddedProtocols) == 0 && len(mes.RmProtocols) == 0) {
Expand All @@ -140,6 +145,11 @@ func (ph *peerHandler) sendDelta(ctx context.Context) error {
}

func (ph *peerHandler) sendPush(ctx context.Context) error {
ph.ids.pushSemaphore <- struct{}{}
defer func() {
<-ph.ids.pushSemaphore
}()

dp, err := ph.openStream(ctx, []string{IDPush})
if err == errProtocolNotSupported {
log.Debugw("not sending push as peer does not support protocol", "peer", ph.pid)
Expand Down

0 comments on commit c488a1c

Please sign in to comment.