diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index 9c4634f2c..d9e0a2e83 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -63,7 +63,7 @@ type BlockchainReactor struct { // NewBlockchainReactor returns new reactor instance. func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, - fastSync bool) *BlockchainReactor { + fastSync bool, async bool, recvBufSize int) *BlockchainReactor { if state.LastBlockHeight != store.Height() { panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, @@ -90,7 +90,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st requestsCh: requestsCh, errorsCh: errorsCh, } - bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) + bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, async, recvBufSize) return bcR } @@ -103,7 +103,13 @@ func (bcR *BlockchainReactor) SetLogger(l log.Logger) { // OnStart implements service.Service. func (bcR *BlockchainReactor) OnStart() error { if bcR.fastSync { - err := bcR.pool.Start() + // call BaseReactor's OnStart() + err := bcR.BaseReactor.OnStart() + if err != nil { + return err + } + + err = bcR.pool.Start() if err != nil { return err } diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index 8aa4d52f4..6aeed8dcc 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -55,7 +55,9 @@ func newBlockchainReactor( logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, - maxBlockHeight int64) BlockchainReactorPair { + maxBlockHeight int64, + async bool, + recvBufSize int) BlockchainReactorPair { if len(privVals) != 1 { panic("only support one validator") } @@ -125,7 +127,7 @@ func newBlockchainReactor( blockStore.SaveBlock(thisBlock, thisParts, lastCommit) } - bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync, async, recvBufSize) bcReactor.SetLogger(logger.With("module", "blockchain")) return BlockchainReactorPair{bcReactor, proxyApp} @@ -140,10 +142,12 @@ func TestNoBlockResponse(t *testing.T) { reactorPairs := make([]BlockchainReactorPair, 2) - reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) - reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) + reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) - p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { + p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch { s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) return s @@ -202,7 +206,8 @@ func TestBadBlockStopsPeer(t *testing.T) { // Other chain needs a different validator set otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30) - otherChain := newBlockchainReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight) + otherChain := newBlockchainReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) defer func() { err := otherChain.reactor.Stop() @@ -213,12 +218,17 @@ func TestBadBlockStopsPeer(t *testing.T) { reactorPairs := make([]BlockchainReactorPair, 4) - reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) - reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - - switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch { + reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) + reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) + reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) + reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) + + switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch, + config *cfg.P2PConfig) *p2p.Switch { s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) return s @@ -254,10 +264,12 @@ func TestBadBlockStopsPeer(t *testing.T) { // race, but can't be easily avoided. reactorPairs[3].reactor.store = otherChain.reactor.store - lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) reactorPairs = append(reactorPairs, lastReactorPair) - switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { + switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch, + config *cfg.P2PConfig) *p2p.Switch { s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) return s diff --git a/blockchain/v1/reactor.go b/blockchain/v1/reactor.go index 84c47ed5f..9068ba4a3 100644 --- a/blockchain/v1/reactor.go +++ b/blockchain/v1/reactor.go @@ -71,7 +71,7 @@ type BlockchainReactor struct { // NewBlockchainReactor returns new reactor instance. func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, - fastSync bool) *BlockchainReactor { + fastSync bool, async bool, recvBufSize int) *BlockchainReactor { if state.LastBlockHeight != store.Height() { panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, @@ -99,7 +99,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st } fsm := NewFSM(startHeight, bcR) bcR.fsm = fsm - bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) + bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, async, recvBufSize) // bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch) return bcR @@ -140,6 +140,12 @@ func (bcR *BlockchainReactor) SetLogger(l log.Logger) { func (bcR *BlockchainReactor) OnStart() error { bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch) if bcR.fastSync { + // call BaseReactor's OnStart() + err := bcR.BaseReactor.OnStart() + if err != nil { + return err + } + go bcR.poolRoutine() } return nil diff --git a/blockchain/v1/reactor_test.go b/blockchain/v1/reactor_test.go index d3d9d817e..5c859c5e5 100644 --- a/blockchain/v1/reactor_test.go +++ b/blockchain/v1/reactor_test.go @@ -146,7 +146,8 @@ func newBlockchainReactor( blockStore.SaveBlock(thisBlock, thisParts, lastCommit) } - bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) bcReactor.SetLogger(logger.With("module", "blockchain")) return bcReactor @@ -160,7 +161,8 @@ func newBlockchainReactorPair( maxBlockHeight int64) BlockchainReactorPair { consensusReactor := &consensusReactorTest{} - consensusReactor.BaseReactor = *p2p.NewBaseReactor("Consensus reactor", consensusReactor) + consensusReactor.BaseReactor = *p2p.NewBaseReactor("Consensus reactor", consensusReactor, + config.P2P.RecvAsync, config.P2P.ConsensusRecvBufSize) return BlockchainReactorPair{ newBlockchainReactor(t, logger, genDoc, privVals, maxBlockHeight), @@ -193,7 +195,7 @@ func TestFastSyncNoBlockResponse(t *testing.T) { reactorPairs[0] = newBlockchainReactorPair(t, logger, genDoc, privVals, maxBlockHeight) reactorPairs[1] = newBlockchainReactorPair(t, logger, genDoc, privVals, 0) - p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { + p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch { s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR) s.AddReactor("CONSENSUS", reactorPairs[i].conR) moduleName := fmt.Sprintf("blockchain-%v", i) @@ -273,7 +275,8 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) { reactorPairs[i] = newBlockchainReactorPair(t, logger[i], genDoc, privVals, height) } - switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch) *p2p.Switch { + switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch, + config *cfg.P2PConfig) *p2p.Switch { reactorPairs[i].conR.mtx.Lock() s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR) s.AddReactor("CONSENSUS", reactorPairs[i].conR) @@ -315,7 +318,8 @@ outerFor: lastReactorPair := newBlockchainReactorPair(t, lastLogger, genDoc, privVals, 0) reactorPairs = append(reactorPairs, lastReactorPair) - switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { + switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch, + config *cfg.P2PConfig) *p2p.Switch { s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].bcR) s.AddReactor("CONSENSUS", reactorPairs[len(reactorPairs)-1].conR) moduleName := fmt.Sprintf("blockchain-%v", len(reactorPairs)-1) diff --git a/config/config.go b/config/config.go index a47c1bcf8..a08e26cef 100644 --- a/config/config.go +++ b/config/config.go @@ -584,6 +584,16 @@ type P2PConfig struct { //nolint: maligned HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` DialTimeout time.Duration `mapstructure:"dial_timeout"` + // Reactor async receive + RecvAsync bool `mapstructure:"recv_async"` + + // Size of receive buffer used in async receiving + PexRecvBufSize int `mapstructure:"pex_recv_buf_size"` + EvidenceRecvBufSize int `mapstructure:"evidence_recv_buf_size"` + MempoolRecvBufSize int `mapstructure:"mempool_recv_buf_size"` + ConsensusRecvBufSize int `mapstructure:"consensus_recv_buf_size"` + BlockchainRecvBufSize int `mapstructure:"blockchain_recv_buf_size"` + // Testing params. // Force dial to fail TestDialFail bool `mapstructure:"test_dial_fail"` @@ -612,6 +622,12 @@ func DefaultP2PConfig() *P2PConfig { AllowDuplicateIP: false, HandshakeTimeout: 20 * time.Second, DialTimeout: 3 * time.Second, + RecvAsync: true, + PexRecvBufSize: 1000, + EvidenceRecvBufSize: 1000, + MempoolRecvBufSize: 1000, + ConsensusRecvBufSize: 1000, + BlockchainRecvBufSize: 1000, TestDialFail: false, TestFuzz: false, TestFuzzConfig: DefaultFuzzConnConfig(), diff --git a/config/toml.go b/config/toml.go index fe797e95b..78ed6a80e 100644 --- a/config/toml.go +++ b/config/toml.go @@ -332,6 +332,18 @@ allow_duplicate_ip = {{ .P2P.AllowDuplicateIP }} handshake_timeout = "{{ .P2P.HandshakeTimeout }}" dial_timeout = "{{ .P2P.DialTimeout }}" +# Sync/async of reactor's receive function +recv_async = {{ .P2P.RecvAsync }} + +# Size of channel buffer of reactor +pex_recv_buf_size = {{ .P2P.PexRecvBufSize }} +mempool_recv_buf_size = {{ .P2P.MempoolRecvBufSize }} +evidence_recv_buf_size = {{ .P2P.EvidenceRecvBufSize }} +consensus_recv_buf_size = {{ .P2P.ConsensusRecvBufSize }} +blockchain_recv_buf_size = {{ .P2P.BlockchainRecvBufSize }} + +##### mempool configuration options ##### + ####################################################### ### Mempool Configuration Option ### ####################################################### diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 4639d34bd..769941960 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + config2 "github.com/line/ostracon/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -101,7 +102,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { blocksSubs := make([]types.Subscription, 0) eventBuses := make([]*types.EventBus, nValidators) for i := 0; i < nValidators; i++ { - reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states + reactors[i] = NewReactor(css[i], true, true, 1000) // so we dont start the consensus states reactors[i].SetLogger(css[i].Logger) // eventBus is already started with the cs @@ -118,7 +119,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { } } // make connected switches and start all reactors - p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch) *p2p.Switch { + p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch, config *config2.P2PConfig) *p2p.Switch { s.AddReactor("CONSENSUS", reactors[i]) s.SetLogger(reactors[i].conS.Logger.With("module", "p2p")) return s @@ -297,7 +298,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { config.P2P, i, "foo", "1.0.0", - func(i int, sw *p2p.Switch) *p2p.Switch { + func(i int, sw *p2p.Switch, config *config2.P2PConfig) *p2p.Switch { return sw }) switches[i].SetLogger(p2pLogger.With("validator", i)) @@ -331,7 +332,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) require.NoError(t, err) - conR := NewReactor(css[i], true) // so we don't start the consensus states + conR := NewReactor(css[i], true, true, 1000) // so we don't start the consensus states conR.SetLogger(logger.With("validator", i)) conR.SetEventBus(eventBus) @@ -359,7 +360,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { } }() - p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { + p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch, config *config2.P2PConfig) *p2p.Switch { // ignore new switch s, we already made ours switches[i].AddReactor("CONSENSUS", reactors[i]) return switches[i] @@ -552,3 +553,10 @@ func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { br.reactor.Receive(chID, peer, msgBytes) } func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer } +func (br *ByzantineReactor) RecvRoutine() { + br.reactor.RecvRoutine() +} + +func (br *ByzantineReactor) GetRecvChan() chan *p2p.BufferedMsg { + return br.reactor.GetRecvChan() +} diff --git a/consensus/reactor.go b/consensus/reactor.go index 5d75c09d4..554fb300e 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -54,13 +54,13 @@ type ReactorOption func(*Reactor) // NewReactor returns a new Reactor with the given // consensusState. -func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor { +func NewReactor(consensusState *State, fastSync bool, async bool, recvBufSize int, options ...ReactorOption) *Reactor { conR := &Reactor{ conS: consensusState, - waitSync: waitSync, + waitSync: fastSync, Metrics: NopMetrics(), } - conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR) + conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR, async, recvBufSize) for _, option := range options { option(conR) @@ -74,6 +74,12 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) func (conR *Reactor) OnStart() error { conR.Logger.Info("Reactor ", "waitSync", conR.WaitSync()) + // call BaseReactor's OnStart() + err := conR.BaseReactor.OnStart() + if err != nil { + return err + } + // start routine that computes peer statistics for evaluating peer quality go conR.peerStatsRoutine() diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 845e20e99..208bc1e23 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -54,7 +54,7 @@ func startConsensusNet(t *testing.T, css []*State, n int) ( for i := 0; i < n; i++ { /*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info") if err != nil { t.Fatal(err)}*/ - reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states + reactors[i] = NewReactor(css[i], true, true, 1000) // so we dont start the consensus states reactors[i].SetLogger(css[i].Logger) // eventBus is already started with the cs @@ -73,7 +73,7 @@ func startConsensusNet(t *testing.T, css []*State, n int) ( } } // make connected switches and start all reactors - p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch { + p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch { s.AddReactor("CONSENSUS", reactors[i]) s.SetLogger(reactors[i].conS.Logger.With("module", "p2p")) return s diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index d13574884..8c12ab805 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -258,9 +258,17 @@ allow_duplicate_ip = false handshake_timeout = "20s" dial_timeout = "3s" -####################################################### -### Mempool Configurattion Option ### -####################################################### +# Sync/async of reactor's receive function +recv_async = true + +# Size of channel buffer of reactor +pex_recv_buf_size = 1000 +mempool_recv_buf_size = 1000 +evidence_recv_buf_size = 1000 +consensus_recv_buf_size = 1000 +blockchain_recv_buf_size = 1000 + +##### mempool configuration options ##### [mempool] recheck = true diff --git a/evidence/reactor.go b/evidence/reactor.go index 6c97bb1ae..d4f1ce941 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -33,11 +33,11 @@ type Reactor struct { } // NewReactor returns a new Reactor with the given config and evpool. -func NewReactor(evpool *Pool) *Reactor { +func NewReactor(evpool *Pool, async bool, recvBufSize int) *Reactor { evR := &Reactor{ evpool: evpool, } - evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR) + evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR, async, recvBufSize) return evR } diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 89e3f6a69..5bd73e12b 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -188,6 +188,8 @@ func TestReactorsGossipNoCommittedEvidence(t *testing.T) { } func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) { + config := cfg.TestConfig() + evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) evidenceDB := memdb.NewDB() blockStore := &mocks.BlockStore{} @@ -215,7 +217,7 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) { p.On("ID").Return("ABC") p.On("String").Return("mock") - r := evidence.NewReactor(pool) + r := evidence.NewReactor(pool, config.P2P.RecvAsync, config.P2P.EvidenceRecvBufSize) r.SetLogger(log.TestingLogger()) r.AddPeer(p) @@ -256,11 +258,11 @@ func makeAndConnectReactorsAndPools(config *cfg.Config, stateStores []sm.Store) panic(err) } pools[i] = pool - reactors[i] = evidence.NewReactor(pool) + reactors[i] = evidence.NewReactor(pool, config.P2P.RecvAsync, config.P2P.EvidenceRecvBufSize) reactors[i].SetLogger(logger.With("validator", i)) } - p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { + p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch { s.AddReactor("EVIDENCE", reactors[i]) return s diff --git a/mempool/reactor.go b/mempool/reactor.go index 761f3a53a..b93cb588c 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -101,13 +101,13 @@ func newMempoolIDs() *mempoolIDs { } // NewReactor returns a new Reactor with the given config and mempool. -func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor { +func NewReactor(config *cfg.MempoolConfig, async bool, recvBufSize int, mempool *CListMempool) *Reactor { memR := &Reactor{ config: config, mempool: mempool, ids: newMempoolIDs(), } - memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) + memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR, async, recvBufSize) return memR } @@ -125,6 +125,12 @@ func (memR *Reactor) SetLogger(l log.Logger) { // OnStart implements p2p.BaseReactor. func (memR *Reactor) OnStart() error { + // call BaseReactor's OnStart() + err := memR.BaseReactor.OnStart() + if err != nil { + return nil + } + if !memR.config.Broadcast { memR.Logger.Info("Tx broadcasting is disabled") } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 5ba7bfcda..c96f71bda 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -307,11 +307,12 @@ func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor { mempool, cleanup := newMempoolWithApp(cc) defer cleanup() - reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states + // so we dont start the consensus states + reactors[i] = NewReactor(config.Mempool, config.P2P.RecvAsync, config.P2P.MempoolRecvBufSize, mempool) reactors[i].SetLogger(logger.With("validator", i)) } - p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch { + p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch { s.AddReactor("MEMPOOL", reactors[i]) return s diff --git a/node/node.go b/node/node.go index a4c49493b..a074d54b0 100644 --- a/node/node.go +++ b/node/node.go @@ -325,7 +325,7 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, mempl.WithPreCheck(sm.TxPreCheck(state)), ) mempoolLogger := logger.With("module", "mempool") - mempoolReactor := mempl.NewReactor(config.Mempool, mempool) + mempoolReactor := mempl.NewReactor(config.Mempool, config.P2P.RecvAsync, config.P2P.MempoolRecvBufSize, mempool) mempoolReactor.SetLogger(mempoolLogger) if config.Consensus.WaitForTxs() { @@ -346,7 +346,7 @@ func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider, if err != nil { return nil, nil, err } - evidenceReactor := evidence.NewReactor(evidencePool) + evidenceReactor := evidence.NewReactor(evidencePool, config.P2P.RecvAsync, config.P2P.EvidenceRecvBufSize) evidenceReactor.SetLogger(evidenceLogger) return evidenceReactor, evidencePool, nil } @@ -360,9 +360,11 @@ func createBlockchainReactor(config *cfg.Config, switch config.FastSync.Version { case "v0": - bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) case "v1": - bcReactor = bcv1.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor = bcv1.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) case "v2": bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) default: @@ -398,7 +400,8 @@ func createConsensusReactor(config *cfg.Config, if privValidator != nil { consensusState.SetPrivValidator(privValidator) } - consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics)) + consensusReactor := cs.NewReactor(consensusState, waitSync, config.P2P.RecvAsync, config.P2P.ConsensusRecvBufSize, + cs.ReactorMetrics(csMetrics)) consensusReactor.SetLogger(consensusLogger) // services which will be publishing and/or subscribing for messages (events) // consensusReactor will set it on consensusState and blockExecutor @@ -540,6 +543,7 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, // TODO persistent peers ? so we can have their DNS addrs saved pexReactor := pex.NewReactor(addrBook, + config.P2P.RecvAsync, &pex.ReactorConfig{ Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "), SeedMode: config.P2P.SeedMode, @@ -550,6 +554,7 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, // https://github.com/tendermint/tendermint/issues/3523 SeedDisconnectWaitPeriod: 28 * time.Hour, PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod, + RecvBufSize: config.P2P.PexRecvBufSize, }) pexReactor.SetLogger(logger.With("module", "pex")) sw.AddReactor("PEX", pexReactor) diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index f3721e52b..9ffc530b5 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -45,6 +45,12 @@ type Reactor interface { // // CONTRACT: msgBytes are not nil. Receive(chID byte, peer Peer, msgBytes []byte) + + // receive async version + GetRecvChan() chan *BufferedMsg + + // receive routine per reactor + RecvRoutine() } //-------------------------------------- @@ -52,13 +58,20 @@ type Reactor interface { type BaseReactor struct { service.BaseService // Provides Start, Stop, .Quit Switch *Switch + recvMsgBuf chan *BufferedMsg + impl Reactor } -func NewBaseReactor(name string, impl Reactor) *BaseReactor { - return &BaseReactor{ +func NewBaseReactor(name string, impl Reactor, async bool, recvBufSize int) *BaseReactor { + baseReactor := &BaseReactor{ BaseService: *service.NewBaseService(nil, name, impl), Switch: nil, + impl: impl, + } + if async { + baseReactor.recvMsgBuf = make(chan *BufferedMsg, recvBufSize) } + return baseReactor } func (br *BaseReactor) SetSwitch(sw *Switch) { @@ -69,3 +82,29 @@ func (*BaseReactor) AddPeer(peer Peer) {} func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } + +func (br *BaseReactor) OnStart() error { + if br.recvMsgBuf != nil { + // if it is async mode it starts RecvRoutine() + go br.RecvRoutine() + } + return nil +} + +func (br *BaseReactor) RecvRoutine() { + for { + select { + case msg := <-br.recvMsgBuf: + br.impl.Receive(msg.ChID, msg.Peer, msg.Msg) + case <-br.Quit(): + return + } + } +} + +func (br *BaseReactor) GetRecvChan() chan *BufferedMsg { + if br.recvMsgBuf == nil { + panic("It's not async reactor, but GetRecvChan() is called ") + } + return br.recvMsgBuf +} diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 15e9dc1be..9213c8c45 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -45,6 +45,7 @@ const ( defaultSendTimeout = 10 * time.Second defaultPingInterval = 60 * time.Second defaultPongTimeout = 45 * time.Second + defaultRecvAsync = true ) type receiveCbFunc func(chID byte, msgBytes []byte) @@ -133,6 +134,9 @@ type MConnConfig struct { // Maximum wait time for pongs PongTimeout time.Duration `mapstructure:"pong_timeout"` + + // Action method of reactor's receive function + RecvAsync bool `mapstructure:"recv_async"` } // DefaultMConnConfig returns the default config. @@ -144,6 +148,7 @@ func DefaultMConnConfig() MConnConfig { FlushThrottle: defaultFlushThrottle, PingInterval: defaultPingInterval, PongTimeout: defaultPongTimeout, + RecvAsync: defaultRecvAsync, } } diff --git a/p2p/metrics.go b/p2p/metrics.go index 675dd9c7c..5eeaf37b4 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -25,6 +25,10 @@ type Metrics struct { PeerPendingSendBytes metrics.Gauge // Number of transactions submitted by each peer. NumTxs metrics.Gauge + // Number of abandoned peer messages + NumAbandonedPeerMsgs metrics.Counter + // Number of pooled peer messages + NumPooledPeerMsgs metrics.Gauge } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -66,6 +70,18 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "num_txs", Help: "Number of transactions submitted by each peer.", }, append(labels, "peer_id")).With(labelsAndValues...), + NumAbandonedPeerMsgs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "num_abandoned_peer_msgs", + Help: "Number of peer messages abandoned because of full channel", + }, append(labels, "peer_id", "chID")).With(labelsAndValues...), + NumPooledPeerMsgs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "num_pooled_peer_msgs", + Help: "Number of peer messages pooled currently", + }, append(labels, "peer_id", "chID")).With(labelsAndValues...), } } @@ -77,5 +93,7 @@ func NopMetrics() *Metrics { PeerSendBytesTotal: discard.NewCounter(), PeerPendingSendBytes: discard.NewGauge(), NumTxs: discard.NewGauge(), + NumAbandonedPeerMsgs: discard.NewCounter(), + NumPooledPeerMsgs: discard.NewGauge(), } } diff --git a/p2p/mock/reactor.go b/p2p/mock/reactor.go index 8d1b5ee68..cb80a924a 100644 --- a/p2p/mock/reactor.go +++ b/p2p/mock/reactor.go @@ -12,7 +12,7 @@ type Reactor struct { func NewReactor() *Reactor { r := &Reactor{} - r.BaseReactor = *p2p.NewBaseReactor("Mock-PEX", r) + r.BaseReactor = *p2p.NewBaseReactor("Mock-PEX", r, true, 1000) r.SetLogger(log.TestingLogger()) return r } diff --git a/p2p/peer.go b/p2p/peer.go index ae772eb38..454520b6b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -41,6 +41,12 @@ type Peer interface { Get(string) interface{} } +type BufferedMsg struct { + ChID byte + Peer Peer + Msg []byte +} + //---------------------------------------------------------- // peerConn contains the raw connection and its config. @@ -387,7 +393,25 @@ func createMConnection( "chID", fmt.Sprintf("%#x", chID), } p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) - reactor.Receive(chID, p, msgBytes) + if config.RecvAsync { + ch := reactor.GetRecvChan() + p.metrics.NumPooledPeerMsgs.With(labels...).Set(float64(len(ch))) + // we must use copied msgBytes + // because msgBytes is on socket receive buffer yet so reactor can read it concurrently + copied := make([]byte, len(msgBytes)) + copy(copied, msgBytes) + select { + case ch <- &BufferedMsg{ + ChID: chID, + Peer: p, + Msg: copied}: + default: + // if the channel is full, we abandon this message + p.metrics.NumAbandonedPeerMsgs.With(labels...).Add(1) + } + } else { + reactor.Receive(chID, p, msgBytes) + } } onError := func(r interface{}) { diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 7e973a614..5f6cb5473 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -81,7 +81,7 @@ func createOutboundPeerAndPerformHandshake( chDescs := []*tmconn.ChannelDescriptor{ {ID: testCh, Priority: 1}, } - reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)} + reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true, 1000, true)} pk := ed25519.GenPrivKey() pc, err := testOutboundPeerConn(addr, config, false, pk) if err != nil { diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index ac88f5f19..4008e6dc5 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -122,6 +122,9 @@ type ReactorConfig struct { // Seeds is a list of addresses reactor may use // if it can't connect to peers in the addrbook. Seeds []string + + // Receive channel buffer size + RecvBufSize int } type _attemptsToDial struct { @@ -130,7 +133,7 @@ type _attemptsToDial struct { } // NewReactor creates new PEX reactor. -func NewReactor(b AddrBook, config *ReactorConfig) *Reactor { +func NewReactor(b AddrBook, async bool, config *ReactorConfig) *Reactor { r := &Reactor{ book: b, config: config, @@ -139,13 +142,19 @@ func NewReactor(b AddrBook, config *ReactorConfig) *Reactor { lastReceivedRequests: cmap.NewCMap(), crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo), } - r.BaseReactor = *p2p.NewBaseReactor("PEX", r) + r.BaseReactor = *p2p.NewBaseReactor("PEX", r, async, config.RecvBufSize) return r } // OnStart implements BaseService func (r *Reactor) OnStart() error { - err := r.book.Start() + // call BaseReactor's OnStart() + err := r.BaseReactor.OnStart() + if err != nil { + return err + } + + err = r.book.Start() if err != nil && err != service.ErrAlreadyStarted { return err } @@ -166,6 +175,7 @@ func (r *Reactor) OnStart() error { } else { go r.ensurePeersRoutine() } + return nil } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 37b7f6ffe..b029f3d77 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -82,14 +82,15 @@ func TestPEXReactorRunning(t *testing.T) { // create switches for i := 0; i < N; i++ { - switches[i] = p2p.MakeSwitch(cfg, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { + switches[i] = p2p.MakeSwitch(cfg, i, "testing", "123.123.123", func(i int, sw *p2p.Switch, + config *config.P2PConfig) *p2p.Switch { books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false) books[i].SetLogger(logger.With("pex", i)) sw.SetAddrBook(books[i]) sw.SetLogger(logger.With("pex", i)) - r := NewReactor(books[i], &ReactorConfig{}) + r := NewReactor(books[i], config.RecvAsync, &ReactorConfig{}) r.SetLogger(logger.With("pex", i)) r.SetEnsurePeersPeriod(250 * time.Millisecond) sw.AddReactor("pex", r) @@ -418,19 +419,20 @@ func TestPEXReactorSeedModeFlushStop(t *testing.T) { // create switches for i := 0; i < N; i++ { - switches[i] = p2p.MakeSwitch(cfg, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { + switches[i] = p2p.MakeSwitch(cfg, i, "testing", "123.123.123", func(i int, sw *p2p.Switch, + config *config.P2PConfig) *p2p.Switch { books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false) books[i].SetLogger(logger.With("pex", i)) sw.SetAddrBook(books[i]) sw.SetLogger(logger.With("pex", i)) - config := &ReactorConfig{} + pexConfig := &ReactorConfig{} if i == 0 { // first one is a seed node - config = &ReactorConfig{SeedMode: true} + pexConfig = &ReactorConfig{SeedMode: true} } - r := NewReactor(books[i], config) + r := NewReactor(books[i], config.RecvAsync, pexConfig) r.SetLogger(logger.With("pex", i)) r.SetEnsurePeersPeriod(250 * time.Millisecond) sw.AddReactor("pex", r) @@ -577,13 +579,13 @@ func assertPeersWithTimeout( } // Creates a peer with the provided config -func testCreatePeerWithConfig(dir string, id int, config *ReactorConfig) *p2p.Switch { +func testCreatePeerWithConfig(dir string, id int, pexConfig *ReactorConfig) *p2p.Switch { peer := p2p.MakeSwitch( cfg, id, "127.0.0.1", "123.123.123", - func(i int, sw *p2p.Switch) *p2p.Switch { + func(i int, sw *p2p.Switch, config *config.P2PConfig) *p2p.Switch { book := NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", id)), false) book.SetLogger(log.TestingLogger()) sw.SetAddrBook(book) @@ -592,7 +594,8 @@ func testCreatePeerWithConfig(dir string, id int, config *ReactorConfig) *p2p.Sw r := NewReactor( book, - config, + config.RecvAsync, + pexConfig, ) r.SetLogger(log.TestingLogger()) sw.AddReactor("pex", r) @@ -615,7 +618,7 @@ func testCreateSeed(dir string, id int, knownAddrs, srcAddrs []*p2p.NetAddress) id, "127.0.0.1", "123.123.123", - func(i int, sw *p2p.Switch) *p2p.Switch { + func(i int, sw *p2p.Switch, config *config.P2PConfig) *p2p.Switch { book := NewAddrBook(filepath.Join(dir, "addrbookSeed.json"), false) book.SetLogger(log.TestingLogger()) for j := 0; j < len(knownAddrs); j++ { @@ -626,7 +629,7 @@ func testCreateSeed(dir string, id int, knownAddrs, srcAddrs []*p2p.NetAddress) sw.SetLogger(log.TestingLogger()) - r := NewReactor(book, &ReactorConfig{}) + r := NewReactor(book, config.RecvAsync, &ReactorConfig{}) r.SetLogger(log.TestingLogger()) sw.AddReactor("pex", r) return sw @@ -653,7 +656,7 @@ func createReactor(conf *ReactorConfig) (r *Reactor, book AddrBook) { book = NewAddrBook(filepath.Join(dir, "addrbook.json"), true) book.SetLogger(log.TestingLogger()) - r = NewReactor(book, conf) + r = NewReactor(book, cfg.RecvAsync, conf) r.SetLogger(log.TestingLogger()) return } @@ -667,7 +670,8 @@ func teardownReactor(book AddrBook) { } func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch { - sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) + sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", + func(i int, sw *p2p.Switch, config *config.P2PConfig) *p2p.Switch { return sw }) sw.SetLogger(log.TestingLogger()) for _, r := range reactors { sw.AddReactor(r.String(), r) diff --git a/p2p/switch.go b/p2p/switch.go index 5bc3bbe7c..57b7fcf5d 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -37,6 +37,7 @@ func MConnConfig(cfg *config.P2PConfig) conn.MConnConfig { mConfig.SendRate = cfg.SendRate mConfig.RecvRate = cfg.RecvRate mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize + mConfig.RecvAsync = cfg.RecvAsync return mConfig } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index cba3beb8f..27b8115d0 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -51,13 +51,13 @@ type TestReactor struct { msgsReceived map[byte][]PeerMessage } -func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor { +func NewTestReactor(channels []*conn.ChannelDescriptor, async bool, recvBufSize int, logMessages bool) *TestReactor { tr := &TestReactor{ channels: channels, logMessages: logMessages, msgsReceived: make(map[byte][]PeerMessage), } - tr.BaseReactor = *NewBaseReactor("TestReactor", tr) + tr.BaseReactor = *NewBaseReactor("TestReactor", tr, async, recvBufSize) tr.SetLogger(log.TestingLogger()) return tr } @@ -90,13 +90,13 @@ func (tr *TestReactor) getMsgs(chID byte) []PeerMessage { // convenience method for creating two switches connected to each other. // XXX: note this uses net.Pipe and not a proper TCP conn -func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) { +func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch, *config.P2PConfig) *Switch) (*Switch, *Switch) { // Create two switches that will be interconnected. switches := MakeConnectedSwitches(cfg, 2, initSwitch, Connect2Switches) return switches[0], switches[1] } -func initSwitchFunc(i int, sw *Switch) *Switch { +func initSwitchFunc(i int, sw *Switch, config *config.P2PConfig) *Switch { sw.SetAddrBook(&AddrBookMock{ Addrs: make(map[string]struct{}), OurAddrs: make(map[string]struct{})}) @@ -105,11 +105,11 @@ func initSwitchFunc(i int, sw *Switch) *Switch { sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{ {ID: byte(0x00), Priority: 10}, {ID: byte(0x01), Priority: 10}, - }, true)) + }, config.RecvAsync, 1000, true)) sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{ {ID: byte(0x02), Priority: 10}, {ID: byte(0x03), Priority: 10}, - }, true)) + }, config.RecvAsync, 1000, true)) return sw } @@ -415,13 +415,13 @@ func TestSwitchStopPeerForError(t *testing.T) { p2pMetrics := PrometheusMetrics(namespace) // make two connected switches - sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch { + sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch, config *config.P2PConfig) *Switch { // set metrics on sw1 if i == 0 { opt := WithMetrics(p2pMetrics) opt(sw) } - return initSwitchFunc(i, sw) + return initSwitchFunc(i, sw, config) }) assert.Equal(t, len(sw1.Peers().List()), 1) @@ -747,13 +747,13 @@ func (r *mockReactor) InitCalledBeforeRemoveFinished() bool { func TestSwitchInitPeerIsNotCalledBeforeRemovePeer(t *testing.T) { // make reactor reactor := &mockReactor{} - reactor.BaseReactor = NewBaseReactor("mockReactor", reactor) + reactor.BaseReactor = NewBaseReactor("mockReactor", reactor, true, 1000) // make switch - sw := MakeSwitch(cfg, 1, "testing", "123.123.123", func(i int, sw *Switch) *Switch { + sw := MakeSwitch(cfg, 1, "testing", "123.123.123", func(i int, sw *Switch, config *config.P2PConfig) *Switch { sw.AddReactor("mock", reactor) return sw - }) + }) // mock reactor uses 1000 chan buffer err := sw.Start() require.NoError(t, err) t.Cleanup(func() { @@ -789,16 +789,16 @@ func TestSwitchInitPeerIsNotCalledBeforeRemovePeer(t *testing.T) { } func BenchmarkSwitchBroadcast(b *testing.B) { - s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch { + s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch, config *config.P2PConfig) *Switch { // Make bar reactors of bar channels each sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{ {ID: byte(0x00), Priority: 10}, {ID: byte(0x01), Priority: 10}, - }, false)) + }, config.RecvAsync, 1000, false)) sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{ {ID: byte(0x02), Priority: 10}, {ID: byte(0x03), Priority: 10}, - }, false)) + }, config.RecvAsync, 1000, false)) return sw }) @@ -836,3 +836,125 @@ func BenchmarkSwitchBroadcast(b *testing.B) { b.Logf("success: %v, failure: %v", numSuccess, numFailure) } + +type NormalReactor struct { + BaseReactor + channels []*conn.ChannelDescriptor + msgChan chan []byte +} + +func NewNormalReactor(channels []*conn.ChannelDescriptor, async bool, recvBufSize int) *NormalReactor { + nr := &NormalReactor{ + channels: channels, + } + nr.BaseReactor = *NewBaseReactor("NormalReactor", nr, async, recvBufSize) + nr.msgChan = make(chan []byte) + nr.SetLogger(log.TestingLogger()) + return nr +} + +func (nr *NormalReactor) GetChannels() []*conn.ChannelDescriptor { + return nr.channels +} + +func (nr *NormalReactor) AddPeer(peer Peer) {} + +func (nr *NormalReactor) RemovePeer(peer Peer, reason interface{}) {} + +func (nr *NormalReactor) Receive(chID byte, peer Peer, msgBytes []byte) { + nr.msgChan <- msgBytes +} + +type BlockedReactor struct { + BaseReactor + channels []*conn.ChannelDescriptor + waitChan chan int +} + +func NewBlockedReactor(channels []*conn.ChannelDescriptor, async bool, recvBufSize int) *BlockedReactor { + br := &BlockedReactor{ + channels: channels, + } + br.BaseReactor = *NewBaseReactor("BlockedReactor", br, async, recvBufSize) + br.waitChan = make(chan int, 1) + br.SetLogger(log.TestingLogger()) + return br +} + +func (br *BlockedReactor) GetChannels() []*conn.ChannelDescriptor { + return br.channels +} + +func (br *BlockedReactor) AddPeer(peer Peer) {} + +func (br *BlockedReactor) RemovePeer(peer Peer, reason interface{}) {} + +func (br *BlockedReactor) Receive(chID byte, peer Peer, msgBytes []byte) { + <-br.waitChan +} + +const ( + reactorNameNormal = "normal" + reactorNameBlocked = "blocked" +) + +func TestSyncReactor(t *testing.T) { + cfg.RecvAsync = false + s1, s2 := MakeSwitchPair(t, getInitSwitchFunc(0)) + defer s1.Stop() //nolint:errcheck + defer s2.Stop() //nolint:errcheck + + normalReactor := s2.Reactor(reactorNameNormal).(*NormalReactor) + blockedReactor := s2.Reactor(reactorNameBlocked).(*BlockedReactor) + s1.Broadcast(0x01, []byte{1}) // the message for blocked reactor is first + time.Sleep(time.Millisecond * 200) // to make order among messages + s1.Broadcast(0x00, []byte{0}) // and then second message is for normal reactor + + select { + case <-normalReactor.msgChan: + assert.Fail(t, "blocked reactor is not blocked") + case <-time.After(time.Second * 1): + assert.True(t, true, "blocked reactor is blocked: OK") + } + + blockedReactor.waitChan <- 1 // release blocked reactor + msg := <-normalReactor.msgChan + assert.True(t, bytes.Equal(msg, []byte{0})) +} + +func TestAsyncReactor(t *testing.T) { + cfg.RecvAsync = true + s1, s2 := MakeSwitchPair(t, getInitSwitchFunc(1)) + defer s1.Stop() //nolint:errcheck + defer s2.Stop() //nolint:errcheck + + normalReactor := s2.Reactor(reactorNameNormal).(*NormalReactor) + s1.Broadcast(0x01, []byte{1}) // the message for blocked reactor is first + time.Sleep(time.Millisecond * 200) // to make order among messages + s1.Broadcast(0x00, []byte{0}) // and then second message is for normal reactor + + select { + case msg := <-normalReactor.msgChan: + assert.True(t, bytes.Equal(msg, []byte{0})) + case <-time.After(time.Second * 1): + assert.Fail(t, "blocked reactor is blocked") + } +} + +func getInitSwitchFunc(bufSize int) func(int, *Switch, *config.P2PConfig) *Switch { + return func(i int, sw *Switch, config *config.P2PConfig) *Switch { + sw.SetAddrBook(&AddrBookMock{ + Addrs: make(map[string]struct{}), + OurAddrs: make(map[string]struct{})}) + + // Make two reactors of two channels each + sw.AddReactor(reactorNameNormal, NewNormalReactor([]*conn.ChannelDescriptor{ + {ID: byte(0x00), Priority: 10}, + }, config.RecvAsync, bufSize)) + sw.AddReactor(reactorNameBlocked, NewBlockedReactor([]*conn.ChannelDescriptor{ + {ID: byte(0x01), Priority: 10}, + }, config.RecvAsync, bufSize)) + + return sw + } +} diff --git a/p2p/test_util.go b/p2p/test_util.go index c709dfbb1..e190f234e 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -78,7 +78,7 @@ const TestHost = "localhost" // NOTE: panics if any switch fails to start. func MakeConnectedSwitches(cfg *config.P2PConfig, n int, - initSwitch func(int, *Switch) *Switch, + initSwitch func(int, *Switch, *config.P2PConfig) *Switch, connect func([]*Switch, int, int), ) []*Switch { switches := make([]*Switch, n) @@ -177,7 +177,7 @@ func MakeSwitch( cfg *config.P2PConfig, i int, network, version string, - initSwitch func(int, *Switch) *Switch, + initSwitch func(int, *Switch, *config.P2PConfig) *Switch, opts ...SwitchOption, ) *Switch { @@ -199,7 +199,7 @@ func MakeSwitch( } // TODO: let the config be passed in? - sw := initSwitch(i, NewSwitch(cfg, t, opts...)) + sw := initSwitch(i, NewSwitch(cfg, t, opts...), cfg) // receive buffer size is all 1000 in test sw.SetLogger(log.TestingLogger().With("switch", i)) sw.SetNodeKey(&nodeKey) diff --git a/privval/signer_listener_endpoint_test.go b/privval/signer_listener_endpoint_test.go index dc3249511..9499d71ea 100644 --- a/privval/signer_listener_endpoint_test.go +++ b/privval/signer_listener_endpoint_test.go @@ -18,8 +18,8 @@ import ( var ( testTimeoutAccept = defaultTimeoutAcceptSeconds * time.Second - testTimeoutReadWrite = 100 * time.Millisecond - testTimeoutReadWrite2o3 = 60 * time.Millisecond // 2/3 of the other one + testTimeoutReadWrite = 1000 * time.Millisecond // increase timeout for slow test env + testTimeoutReadWrite2o3 = 60 * time.Millisecond // 2/3 of the other one ) type dialerTestCase struct { diff --git a/rpc/core/net_test.go b/rpc/core/net_test.go index 56863a11f..d64ed94fd 100644 --- a/rpc/core/net_test.go +++ b/rpc/core/net_test.go @@ -14,7 +14,7 @@ import ( func TestUnsafeDialSeeds(t *testing.T) { sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", - func(n int, sw *p2p.Switch) *p2p.Switch { return sw }) + func(n int, sw *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch { return sw }) err := sw.Start() require.NoError(t, err) t.Cleanup(func() { @@ -48,7 +48,7 @@ func TestUnsafeDialSeeds(t *testing.T) { func TestUnsafeDialPeers(t *testing.T) { sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", - func(n int, sw *p2p.Switch) *p2p.Switch { return sw }) + func(n int, sw *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch { return sw }) sw.SetAddrBook(&p2p.AddrBookMock{ Addrs: make(map[string]struct{}), OurAddrs: make(map[string]struct{}), diff --git a/statesync/reactor.go b/statesync/reactor.go index 4da332469..3c82248c1 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -44,7 +44,7 @@ func NewReactor(conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, tempDi conn: conn, connQuery: connQuery, } - r.BaseReactor = *p2p.NewBaseReactor("StateSync", r) + r.BaseReactor = *p2p.NewBaseReactor("StateSync", r, false, 0) return r } diff --git a/test/fuzz/p2p/pex/reactor_receive.go b/test/fuzz/p2p/pex/reactor_receive.go index 6b2178559..040a2392d 100644 --- a/test/fuzz/p2p/pex/reactor_receive.go +++ b/test/fuzz/p2p/pex/reactor_receive.go @@ -18,8 +18,10 @@ var ( ) func init() { + cfg := config.DefaultP2PConfig() + addrB := pex.NewAddrBook("./testdata/addrbook1", false) - pexR := pex.NewReactor(addrB, &pex.ReactorConfig{SeedMode: false}) + pexR := pex.NewReactor(addrB, cfg.RecvAsync, &pex.ReactorConfig{SeedMode: false, RecvBufSize: cfg.PexRecvBufSize}) if pexR == nil { panic("NewReactor returned nil") } @@ -33,9 +35,10 @@ func Fuzz(data []byte) int { // MakeSwitch uses log.TestingLogger which can't be executed in init() cfg := config.DefaultP2PConfig() cfg.PexReactor = true - sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { - return sw - }) + sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", + func(i int, sw *p2p.Switch, cfg *config.P2PConfig) *p2p.Switch { + return sw + }) pexR.SetSwitch(sw) pexR.Receive(pex.PexChannel, peer, data) diff --git a/test/maverick/consensus/reactor.go b/test/maverick/consensus/reactor.go index e47845ed5..43c5a2bd9 100644 --- a/test/maverick/consensus/reactor.go +++ b/test/maverick/consensus/reactor.go @@ -55,13 +55,13 @@ type ReactorOption func(*Reactor) // NewReactor returns a new Reactor with the given // consensusState. -func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor { +func NewReactor(consensusState *State, waitSync bool, async bool, recvBufSize int, options ...ReactorOption) *Reactor { conR := &Reactor{ conS: consensusState, waitSync: waitSync, Metrics: tmcon.NopMetrics(), } - conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR) + conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR, async, recvBufSize) for _, option := range options { option(conR) diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index a6f73ed65..9316fea7d 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -372,7 +372,7 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, mempl.WithPreCheck(sm.TxPreCheck(state)), ) mempoolLogger := logger.With("module", "mempool") - mempoolReactor := mempl.NewReactor(config.Mempool, mempool) + mempoolReactor := mempl.NewReactor(config.Mempool, config.P2P.RecvAsync, config.P2P.MempoolRecvBufSize, mempool) mempoolReactor.SetLogger(mempoolLogger) if config.Consensus.WaitForTxs() { @@ -393,7 +393,7 @@ func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider, if err != nil { return nil, nil, err } - evidenceReactor := evidence.NewReactor(evidencePool) + evidenceReactor := evidence.NewReactor(evidencePool, config.P2P.RecvAsync, config.P2P.EvidenceRecvBufSize) evidenceReactor.SetLogger(evidenceLogger) return evidenceReactor, evidencePool, nil } @@ -407,9 +407,11 @@ func createBlockchainReactor(config *cfg.Config, switch config.FastSync.Version { case "v0": - bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) case "v1": - bcReactor = bcv1.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor = bcv1.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync, + config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize) case "v2": bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) default: @@ -447,7 +449,8 @@ func createConsensusReactor(config *cfg.Config, if privValidator != nil { consensusState.SetPrivValidator(privValidator) } - consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics)) + consensusReactor := cs.NewReactor(consensusState, waitSync, config.P2P.RecvAsync, + config.P2P.ConsensusRecvBufSize, cs.ReactorMetrics(csMetrics)) consensusReactor.SetLogger(consensusLogger) // services which will be publishing and/or subscribing for messages (events) // consensusReactor will set it on consensusState and blockExecutor @@ -589,6 +592,7 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, // TODO persistent peers ? so we can have their DNS addrs saved pexReactor := pex.NewReactor(addrBook, + config.P2P.RecvAsync, &pex.ReactorConfig{ Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "), SeedMode: config.P2P.SeedMode, @@ -599,6 +603,7 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, // https://github.com/tendermint/tendermint/issues/3523 SeedDisconnectWaitPeriod: 28 * time.Hour, PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod, + RecvBufSize: config.P2P.PexRecvBufSize, }) pexReactor.SetLogger(logger.With("module", "pex")) sw.AddReactor("PEX", pexReactor)