From c488a1c2777df5b7641c67c1682b20a4f1b80f6f Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 21 Jan 2022 09:45:03 +0200 Subject: [PATCH] add semaphore to control push/delta concurrency This avoids stream storms that can clog the transient scope when we have a largish number of peers. --- p2p/protocol/identify/id.go | 8 ++++++++ p2p/protocol/identify/peer_loop.go | 10 ++++++++++ 2 files changed, 18 insertions(+) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 4c792ccf70..70ff28b638 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -43,6 +43,8 @@ const LibP2PVersion = "ipfs/0.1.0" const ServiceName = "libp2p.identify" +const maxPushConcurrency = 32 + // StreamReadTimeout is the read timeout on all incoming Identify family streams. var StreamReadTimeout = 60 * time.Second @@ -129,6 +131,10 @@ type idService struct { addPeerHandlerCh chan addPeerHandlerReq rmPeerHandlerCh chan rmPeerHandlerReq + + // pushSemaphore limits the push/delta concurrency to avoid storms + // that clog the transient scope. + pushSemaphore chan struct{} } // NewIDService constructs a new *idService and activates it by @@ -154,6 +160,8 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) { addPeerHandlerCh: make(chan addPeerHandlerReq), rmPeerHandlerCh: make(chan rmPeerHandlerReq), + + pushSemaphore: make(chan struct{}, maxPushConcurrency), } s.ctx, s.ctxCancel = context.WithCancel(context.Background()) diff --git a/p2p/protocol/identify/peer_loop.go b/p2p/protocol/identify/peer_loop.go index ad62910f4a..c2dc4581fb 100644 --- a/p2p/protocol/identify/peer_loop.go +++ b/p2p/protocol/identify/peer_loop.go @@ -115,6 +115,11 @@ func (ph *peerHandler) sendDelta(ctx context.Context) error { return nil } + ph.ids.pushSemaphore <- struct{}{} + defer func() { + <-ph.ids.pushSemaphore + }() + // extract a delta message, updating the last state. mes := ph.nextDelta() if mes == nil || (len(mes.AddedProtocols) == 0 && len(mes.RmProtocols) == 0) { @@ -140,6 +145,11 @@ func (ph *peerHandler) sendDelta(ctx context.Context) error { } func (ph *peerHandler) sendPush(ctx context.Context) error { + ph.ids.pushSemaphore <- struct{}{} + defer func() { + <-ph.ids.pushSemaphore + }() + dp, err := ph.openStream(ctx, []string{IDPush}) if err == errProtocolNotSupported { log.Debugw("not sending push as peer does not support protocol", "peer", ph.pid)