From 2679498c9aa1b93f1d92f1104ec35e9e6bee6f54 Mon Sep 17 00:00:00 2001 From: lasaro Date: Fri, 24 Nov 2023 07:42:54 -0300 Subject: [PATCH] Do not block indefinitely on the semaphore (#1654) * Do not block indefinitely on the semaphore * Cancel the context, irrespective of the flow followed * Makes the code more readable * Improving comment * make linter happy * Updating comments to match * Commenting out `select` and leaving it as TODO for when Contexts are more widely used * Cleaned up comments --- config/config.go | 9 +++++---- config/toml.go | 9 +++++---- mempool/reactor.go | 41 ++++++++++++++++++++++------------------ test/e2e/pkg/manifest.go | 2 +- 4 files changed, 34 insertions(+), 27 deletions(-) diff --git a/config/config.go b/config/config.go index 1bd3426188..1c71885c01 100644 --- a/config/config.go +++ b/config/config.go @@ -892,12 +892,13 @@ type MempoolConfig struct { // XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796 MaxBatchBytes int `mapstructure:"max_batch_bytes"` // Experimental parameters to limit gossiping txs to up to the specified number of peers. - // We use two independent upper values for persistent peers and for non-persistent peers. + // We use two independent upper values for persistent and non-persistent peers. // Unconditional peers are not affected by this feature. // If we are connected to more than the specified number of persistent peers, only send txs to - // the first ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those - // persistent peers disconnects, activate another persistent peer. Similarly for non-persistent - // peers, with an upper limit of ExperimentalMaxGossipConnectionsToNonPersistentPeers. + // ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those + // persistent peers disconnects, activate another persistent peer. + // Similarly for non-persistent peers, with an upper limit of + // ExperimentalMaxGossipConnectionsToNonPersistentPeers. // If set to 0, the feature is disabled for the corresponding group of peers, that is, the // number of active connections to that group of peers is not bounded. // For non-persistent peers, if enabled, a value of 10 is recommended based on experimental diff --git a/config/toml.go b/config/toml.go index 13b72e5a2e..d696978b18 100644 --- a/config/toml.go +++ b/config/toml.go @@ -436,12 +436,13 @@ max_tx_bytes = {{ .Mempool.MaxTxBytes }} max_batch_bytes = {{ .Mempool.MaxBatchBytes }} # Experimental parameters to limit gossiping txs to up to the specified number of peers. -# We use two independent upper values for persistent peers and for non-persistent peers. +# We use two independent upper values for persistent and non-persistent peers. # Unconditional peers are not affected by this feature. # If we are connected to more than the specified number of persistent peers, only send txs to -# the first experimental_max_gossip_connections_to_persistent_peers of them. If one of those -# persistent peers disconnects, activate another persistent peer. Similarly for non-persistent -# peers, with an upper limit of experimental_max_gossip_connections_to_non_persistent_peers. +# ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those +# persistent peers disconnects, activate another persistent peer. +# Similarly for non-persistent peers, with an upper limit of +# ExperimentalMaxGossipConnectionsToNonPersistentPeers. # If set to 0, the feature is disabled for the corresponding group of peers, that is, the # number of active connections to that group of peers is not bounded. # For non-persistent peers, if enabled, a value of 10 is recommended based on experimental diff --git a/mempool/reactor.go b/mempool/reactor.go index 2fa3ca2219..5a950492c3 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -107,32 +107,37 @@ func (memR *Reactor) AddPeer(peer p2p.Peer) { go func() { // Always forward transactions to unconditional peers. if !memR.Switch.IsPeerUnconditional(peer.ID()) { + // Depending on the type of peer, we choose a semaphore to limit the gossiping peers. + var peerSemaphore *semaphore.Weighted if peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers > 0 { - // Block sending transactions to peer until one of the connections become - // available in the semaphore. - if err := memR.activePersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil { - memR.Logger.Error("Failed to acquire semaphore: %v", err) - return - } - // Release semaphore to allow other peer to start sending transactions. - defer memR.activePersistentPeersSemaphore.Release(1) - defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1) + peerSemaphore = memR.activePersistentPeersSemaphore + } else if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 { + peerSemaphore = memR.activeNonPersistentPeersSemaphore } - if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 { - // Block sending transactions to peer until one of the connections become - // available in the semaphore. - if err := memR.activeNonPersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil { - memR.Logger.Error("Failed to acquire semaphore: %v", err) - return + if peerSemaphore != nil { + for peer.IsRunning() { + // Block on the semaphore until a slot is available to start gossiping with this peer. + // Do not block indefinitely, in case the peer is disconnected before gossiping starts. + ctxTimeout, cancel := context.WithTimeout(context.TODO(), 30*time.Second) + // Block sending transactions to peer until one of the connections become + // available in the semaphore. + err := peerSemaphore.Acquire(ctxTimeout, 1) + cancel() + + if err != nil { + continue + } + + // Release semaphore to allow other peer to start sending transactions. + defer peerSemaphore.Release(1) + break } - // Release semaphore to allow other peer to start sending transactions. - defer memR.activeNonPersistentPeersSemaphore.Release(1) - defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1) } } memR.mempool.metrics.ActiveOutboundConnections.Add(1) + defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1) memR.broadcastTxRoutine(peer) }() } diff --git a/test/e2e/pkg/manifest.go b/test/e2e/pkg/manifest.go index 3c8bad1daa..36a2075306 100644 --- a/test/e2e/pkg/manifest.go +++ b/test/e2e/pkg/manifest.go @@ -102,7 +102,7 @@ type Manifest struct { // Upper bound of sleep duration then gossipping votes and block parts PeerGossipIntraloopSleepDuration time.Duration `toml:"peer_gossip_intraloop_sleep_duration"` - // Maximum number of peers to which the node gossip transactions + // Maximum number of peers to which the node gossips transactions ExperimentalMaxGossipConnectionsToPersistentPeers uint `toml:"experimental_max_gossip_connections_to_persistent_peers"` ExperimentalMaxGossipConnectionsToNonPersistentPeers uint `toml:"experimental_max_gossip_connections_to_non_persistent_peers"`