Skip to content

Commit

Permalink
feat: async reactor receiving
Browse files Browse the repository at this point in the history
  • Loading branch information
Woosang Son committed Nov 9, 2020
1 parent 6f8f870 commit 91e1df7
Show file tree
Hide file tree
Showing 31 changed files with 402 additions and 108 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@
- [consensus] [\#126](https://github.com/line/tendermint/pull/126) Add some metrics measuring duration of each consensus steps

### IMPROVEMENTS:
- [p2p] [\#135](https://github.com/line/tendermint/pull/135) Add async mode for reactors

### BUG FIXES:
7 changes: 5 additions & 2 deletions blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type BlockchainReactor struct {

// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
fastSync bool) *BlockchainReactor {
fastSync bool, async bool, recvBufSize int) *BlockchainReactor {

if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
Expand All @@ -98,7 +98,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st
requestsCh: requestsCh,
errorsCh: errorsCh,
}
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, async, recvBufSize)
return bcR
}

Expand All @@ -111,6 +111,9 @@ func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
// OnStart implements service.Service.
func (bcR *BlockchainReactor) OnStart() error {
if bcR.fastSync {
// call BaseReactor's OnStart()
bcR.BaseReactor.OnStart()

err := bcR.pool.Start()
if err != nil {
return err
Expand Down
40 changes: 26 additions & 14 deletions blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func newBlockchainReactor(
logger log.Logger,
genDoc *types.GenesisDoc,
privVals []types.PrivValidator,
maxBlockHeight int64) BlockchainReactorPair {
maxBlockHeight int64,
async bool,
recvBufSize int) BlockchainReactorPair {
if len(privVals) != 1 {
panic("only support one validator")
}
Expand Down Expand Up @@ -120,7 +122,7 @@ func newBlockchainReactor(
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}

bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync, async, recvBufSize)
bcReactor.SetLogger(logger.With("module", "blockchain"))

return BlockchainReactorPair{bcReactor, proxyApp}
Expand All @@ -135,10 +137,12 @@ func TestNoBlockResponse(t *testing.T) {

reactorPairs := make([]BlockchainReactorPair, 2)

reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)

p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s

Expand Down Expand Up @@ -193,20 +197,26 @@ func TestBadBlockStopsPeer(t *testing.T) {

maxBlockHeight := int64(148)

otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
defer func() {
otherChain.reactor.Stop()
otherChain.app.Stop()
}()

reactorPairs := make([]BlockchainReactorPair, 4)

reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)

switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)

switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch,
config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s

Expand All @@ -233,10 +243,12 @@ func TestBadBlockStopsPeer(t *testing.T) {
//mark reactorPairs[3] is an invalid peer
reactorPairs[3].reactor.store = otherChain.reactor.store

lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
reactorPairs = append(reactorPairs, lastReactorPair)

switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch,
config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
return s

Expand Down
7 changes: 5 additions & 2 deletions blockchain/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type BlockchainReactor struct {

// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
fastSync bool) *BlockchainReactor {
fastSync bool, async bool, recvBufSize int) *BlockchainReactor {

if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
Expand All @@ -103,7 +103,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st
}
fsm := NewFSM(startHeight, bcR)
bcR.fsm = fsm
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, async, recvBufSize)
//bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)

return bcR
Expand Down Expand Up @@ -144,6 +144,9 @@ func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
func (bcR *BlockchainReactor) OnStart() error {
bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
if bcR.fastSync {
// call BaseReactor's OnStart()
bcR.BaseReactor.OnStart()

go bcR.poolRoutine()
}
return nil
Expand Down
14 changes: 9 additions & 5 deletions blockchain/v1/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ func newBlockchainReactor(
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}

bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
bcReactor.SetLogger(logger.With("module", "blockchain"))

return bcReactor
Expand All @@ -153,7 +154,8 @@ func newBlockchainReactorPair(
maxBlockHeight int64) BlockchainReactorPair {

consensusReactor := &consensusReactorTest{}
consensusReactor.BaseReactor = *p2p.NewBaseReactor("Consensus reactor", consensusReactor)
consensusReactor.BaseReactor = *p2p.NewBaseReactor("Consensus reactor", consensusReactor,
config.P2P.RecvAsync, config.P2P.ConsensusRecvBufSize)

return BlockchainReactorPair{
newBlockchainReactor(t, logger, genDoc, privVals, maxBlockHeight),
Expand Down Expand Up @@ -185,7 +187,7 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
reactorPairs[0] = newBlockchainReactorPair(t, logger, genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactorPair(t, logger, genDoc, privVals, 0)

p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR)
s.AddReactor("CONSENSUS", reactorPairs[i].conR)
moduleName := fmt.Sprintf("blockchain-%v", i)
Expand Down Expand Up @@ -265,7 +267,8 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) {
reactorPairs[i] = newBlockchainReactorPair(t, logger[i], genDoc, privVals, height)
}

switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch) *p2p.Switch {
switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch,
config *cfg.P2PConfig) *p2p.Switch {
reactorPairs[i].conR.mtx.Lock()
s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR)
s.AddReactor("CONSENSUS", reactorPairs[i].conR)
Expand Down Expand Up @@ -307,7 +310,8 @@ outerFor:
lastReactorPair := newBlockchainReactorPair(t, lastLogger, genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)

switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch,
config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].bcR)
s.AddReactor("CONSENSUS", reactorPairs[len(reactorPairs)-1].conR)
moduleName := fmt.Sprintf("blockchain-%v", len(reactorPairs)-1)
Expand Down
16 changes: 16 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,16 @@ type P2PConfig struct { //nolint: maligned
HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
DialTimeout time.Duration `mapstructure:"dial_timeout"`

// Reactor async receive
RecvAsync bool `mapstructure:"recv_async"`

// Size of receive buffer used in async receiving
PexRecvBufSize int `mapstructure:"pex_recv_buf_size"`
EvidenceRecvBufSize int `mapstructure:"evidence_recv_buf_size"`
MempoolRecvBufSize int `mapstructure:"mempool_recv_buf_size"`
ConsensusRecvBufSize int `mapstructure:"consensus_recv_buf_size"`
BlockchainRecvBufSize int `mapstructure:"blockchain_recv_buf_size"`

// Testing params.
// Force dial to fail
TestDialFail bool `mapstructure:"test_dial_fail"`
Expand Down Expand Up @@ -578,6 +588,12 @@ func DefaultP2PConfig() *P2PConfig {
AllowDuplicateIP: false,
HandshakeTimeout: 20 * time.Second,
DialTimeout: 3 * time.Second,
RecvAsync: true,
PexRecvBufSize: 1000,
EvidenceRecvBufSize: 1000,
MempoolRecvBufSize: 1000,
ConsensusRecvBufSize: 1000,
BlockchainRecvBufSize: 1000,
TestDialFail: false,
TestFuzz: false,
TestFuzzConfig: DefaultFuzzConnConfig(),
Expand Down
10 changes: 10 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,16 @@ allow_duplicate_ip = {{ .P2P.AllowDuplicateIP }}
handshake_timeout = "{{ .P2P.HandshakeTimeout }}"
dial_timeout = "{{ .P2P.DialTimeout }}"
# Sync/async of reactor's receive function
recv_async = {{ .P2P.RecvAsync }}
# Size of channel buffer of reactor
pex_recv_buf_size = {{ .P2P.PexRecvBufSize }}
mempool_recv_buf_size = {{ .P2P.MempoolRecvBufSize }}
evidence_recv_buf_size = {{ .P2P.EvidenceRecvBufSize }}
consensus_recv_buf_size = {{ .P2P.ConsensusRecvBufSize }}
blockchain_recv_buf_size = {{ .P2P.BlockchainRecvBufSize }}
##### mempool configuration options #####
[mempool]
Expand Down
14 changes: 11 additions & 3 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/stretchr/testify/require"
config2 "github.com/tendermint/tendermint/config"

"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/p2p"
Expand Down Expand Up @@ -45,7 +46,7 @@ func TestByzantine(t *testing.T) {
config.P2P,
i,
"foo", "1.0.0",
func(i int, sw *p2p.Switch) *p2p.Switch {
func(i int, sw *p2p.Switch, config *config2.P2PConfig) *p2p.Switch {
return sw
})
switches[i].SetLogger(p2pLogger.With("validator", i))
Expand Down Expand Up @@ -74,7 +75,7 @@ func TestByzantine(t *testing.T) {
blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)

conR := NewReactor(css[i], true) // so we don't start the consensus states
conR := NewReactor(css[i], true, true, 1000) // so we don't start the consensus states
conR.SetLogger(logger.With("validator", i))
conR.SetEventBus(eventBus)

Expand All @@ -99,7 +100,7 @@ func TestByzantine(t *testing.T) {
}
}()

p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch, config *config2.P2PConfig) *p2p.Switch {
// ignore new switch s, we already made ours
switches[i].AddReactor("CONSENSUS", reactors[i])
return switches[i]
Expand Down Expand Up @@ -294,3 +295,10 @@ func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }
func (br *ByzantineReactor) RecvRoutine() {
br.reactor.RecvRoutine()
}

func (br *ByzantineReactor) GetRecvChan() chan *p2p.BufferedMsg {
return br.reactor.GetRecvChan()
}
7 changes: 5 additions & 2 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ type ReactorOption func(*Reactor)

// NewReactor returns a new Reactor with the given
// consensusState.
func NewReactor(consensusState *State, fastSync bool, options ...ReactorOption) *Reactor {
func NewReactor(consensusState *State, fastSync bool, async bool, recvBufSize int, options ...ReactorOption) *Reactor {
conR := &Reactor{
conS: consensusState,
fastSync: fastSync,
metrics: NopMetrics(),
}
conR.updateFastSyncingMetric()
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR, async, recvBufSize)

for _, option := range options {
option(conR)
Expand All @@ -72,6 +72,9 @@ func NewReactor(consensusState *State, fastSync bool, options ...ReactorOption)
func (conR *Reactor) OnStart() error {
conR.Logger.Info("Reactor ", "fastSync", conR.FastSync())

// call BaseReactor's OnStart()
conR.BaseReactor.OnStart()

// start routine that computes peer statistics for evaluating peer quality
go conR.peerStatsRoutine()

Expand Down
4 changes: 2 additions & 2 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func startConsensusNet(t *testing.T, css []*State, n int) (
for i := 0; i < n; i++ {
/*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info")
if err != nil { t.Fatal(err)}*/
reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states
reactors[i] = NewReactor(css[i], true, true, 1000) // so we dont start the consensus states
reactors[i].SetLogger(css[i].Logger)

// eventBus is already started with the cs
Expand All @@ -63,7 +63,7 @@ func startConsensusNet(t *testing.T, css []*State, n int) (
}
}
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch {
p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.SetLogger(reactors[i].conS.Logger.With("module", "p2p"))
return s
Expand Down
10 changes: 10 additions & 0 deletions docs/tendermint-core/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ allow_duplicate_ip = false
handshake_timeout = "20s"
dial_timeout = "3s"
# Sync/async of reactor's receive function
recv_async = true
# Size of channel buffer of reactor
pex_recv_buf_size = 1000
mempool_recv_buf_size = 1000
evidence_recv_buf_size = 1000
consensus_recv_buf_size = 1000
blockchain_recv_buf_size = 1000
##### mempool configuration options #####
[mempool]
Expand Down
4 changes: 2 additions & 2 deletions evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ type Reactor struct {
}

// NewReactor returns a new Reactor with the given config and evpool.
func NewReactor(evpool *Pool) *Reactor {
func NewReactor(evpool *Pool, async bool, recvBufSize int) *Reactor {
evR := &Reactor{
evpool: evpool,
}
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR, async, recvBufSize)
return evR
}

Expand Down
4 changes: 2 additions & 2 deletions evidence/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func makeAndConnectReactors(config *cfg.Config, stateDBs []dbm.DB) []*Reactor {

evidenceDB := dbm.NewMemDB()
pool := NewPool(stateDBs[i], evidenceDB)
reactors[i] = NewReactor(pool)
reactors[i] = NewReactor(pool, config.P2P.RecvAsync, config.P2P.EvidenceRecvBufSize)
reactors[i].SetLogger(logger.With("validator", i))
}

p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("EVIDENCE", reactors[i])
return s

Expand Down
Loading

0 comments on commit 91e1df7

Please sign in to comment.