Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): render HasChannel(chID) is a public p2p.Peer method (#… #142

Merged
merged 4 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/v0.38.6/features/3472-p2p-has-channel-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[p2p]` `HasChannel(chID)` method added to the `Peer` interface, used by
reactors to check whether a peer implements/supports a given channel.
([#3472](https://github.com/cometbft/cometbft/issues/3472))
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ It also includes a few other bug fixes and performance improvements.
* [#109](https://github.com/osmosis-labs/cometbft/pull/109) perf(p2p,mempool): Make mempool reactor receive not block. (Fixed by either #3209, #3230)
* [#105](https://github.com/osmosis-labs/cometbft/pull/105) perf(p2p)!: Remove PeerSendBytesTotal metric #3184
* [#95](https://github.com/osmosis-labs/cometbft/pull/95) perf(types) Make a new method `GetByAddressMut` for `ValSet`, which does not copy the returned validator. (#3129)
* [#128](https://github.com/osmosis-labs/cometbft/pull/128) feat(p2p): render HasChannel(chID) is a public p2p.Peer method (#3510)
* [#126]() Remove p2p allocations for wrapping outbound packets
* [#125]() Fix marshalling and concurrency overhead within broadcast routines
* perf(p2p): Only update send monitor once per batch packet msg send (#3382)
* [#124]() Secret connection read buffer

## v0.38.10

Expand Down
8 changes: 8 additions & 0 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,10 @@ func (conR *Reactor) getRoundState() *cstypes.RoundState {

func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
if !peer.HasChannel(DataChannel) {
logger.Info("Peer does not implement DataChannel.")
return
}
rng := cmtrand.NewStdlibRand()

OUTER_LOOP:
Expand Down Expand Up @@ -729,6 +733,10 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt

func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
if !peer.HasChannel(VoteChannel) {
logger.Info("Peer does not implement VoteChannel.")
return
}
rng := cmtrand.NewStdlibRand()

// Simple hack to throttle logs upon sleep.
Expand Down
4 changes: 3 additions & 1 deletion evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor {

// AddPeer implements Reactor.
func (evR *Reactor) AddPeer(peer p2p.Peer) {
go evR.broadcastEvidenceRoutine(peer)
if peer.HasChannel(EvidenceChannel) {
go evR.broadcastEvidenceRoutine(peer)
}
}

// Receive implements Reactor.
Expand Down
1 change: 1 addition & 0 deletions evidence/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == evidence.EvidenceChannel
})).Return(false)
p.On("HasChannel", evidence.EvidenceChannel).Maybe().Return(true)
quitChan := make(<-chan struct{})
p.On("Quit").Return(quitChan)
ps := peerState{2}
Expand Down
2 changes: 1 addition & 1 deletion mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor.
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *Reactor) AddPeer(peer p2p.Peer) {
if memR.config.Broadcast {
if memR.config.Broadcast && peer.HasChannel(MempoolChannel) {
go func() {
// Always forward transactions to unconditional peers.
if !memR.Switch.IsPeerUnconditional(peer.ID()) {
Expand Down
1 change: 1 addition & 0 deletions p2p/mock/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewPeer(ip net.IP) *Peer {
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) TrySend(_ p2p.Envelope) bool { return true }
func (mp *Peer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true }
func (mp *Peer) HasChannel(_ byte) bool { return true }
func (mp *Peer) Send(_ p2p.Envelope) bool { return true }
func (mp *Peer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{
Expand Down
18 changes: 18 additions & 0 deletions p2p/mocks/peer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Peer interface {
Status() cmtconn.ConnectionStatus
SocketAddr() *NetAddress // actual address of the socket

HasChannel(chID byte) bool // Does the peer implement this channel?
Send(Envelope) bool
TrySend(Envelope) bool
TrySendMarshalled(MarshalledEnvelope) bool
Expand Down Expand Up @@ -117,7 +118,7 @@ type peer struct {

// peer's node info and the channel it knows about
// channels = nodeInfo.Channels
// cached to avoid copying nodeInfo in hasChannel
// cached to avoid copying nodeInfo in HasChannel
nodeInfo NodeInfo
channels []byte

Expand Down Expand Up @@ -289,7 +290,7 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo
func (p *peer) sendMarshalled(chID byte, msgType reflect.Type, msgBytes []byte, sendFunc func(byte, []byte) bool) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
} else if !p.HasChannel(chID) {
return false
}
res := sendFunc(chID, msgBytes)
Expand All @@ -309,9 +310,8 @@ func (p *peer) Set(key string, data interface{}) {
p.Data.Set(key, data)
}

// hasChannel returns true if the peer reported
// knowing about the given chID.
func (p *peer) hasChannel(chID byte) bool {
// HasChannel returns whether the peer reported implementing this channel.
func (p *peer) HasChannel(chID byte) bool {
for _, ch := range p.channels {
if ch == chID {
return true
Expand Down
1 change: 1 addition & 0 deletions p2p/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type mockPeer struct {
}

func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) HasChannel(byte) bool { return true }
func (mp *mockPeer) TrySend(Envelope) bool { return true }
func (mp *mockPeer) TrySendMarshalled(MarshalledEnvelope) bool { return true }
func (mp *mockPeer) Send(Envelope) bool { return true }
Expand Down
2 changes: 1 addition & 1 deletion p2p/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Envelope struct {
ChannelID byte
}

// MarshalledEnvelope contains a proto message, its marshalled message, with sender routing info.
// MarshalledEnvelope contains a proto message, its marshaled message, with sender routing info.
type MarshalledEnvelope struct {
Envelope
MarshalledMessage []byte
Expand Down
26 changes: 16 additions & 10 deletions spec/p2p/reactor-api/p2p-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,16 @@ From this point, reactors can use the methods of the new `Peer` instance.
The table below summarizes the interaction of the standard reactors with
connected peers, with the `Peer` methods used by them:

| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX |
|--------------------------------------------|-----------|------------|------------|---------|-----------|-------|
| `ID() ID` | x | x | x | x | x | x |
| `IsRunning() bool` | x | | | x | x | |
| `Quit() <-chan struct{}` | | | | x | x | |
| `Get(string) interface{}` | x | | | x | x | |
| `Set(string, interface{})` | x | | | | | |
| `Send(Envelope) bool` | x | x | x | x | x | x |
| `TrySend(Envelope) bool` | x | x | | | | |
| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX |
|----------------------------|-----------|------------|------------|---------|----------|-----|
| `ID() ID` | x | x | x | x | x | x |
| `IsRunning() bool` | x | | | x | x | |
| `Quit() <-chan struct{}` | | | | x | x | |
| `Get(string) interface{}` | x | | | x | x | |
| `Set(string, interface{})` | x | | | | | |
| `HasChannel(byte) bool` | x | | | x | x | |
| `Send(Envelope) bool` | x | x | x | x | x | x |
| `TrySend(Envelope) bool` | x | x | | | | |

The above list is not exhaustive as it does not include all the `Peer` methods
invoked by the PEX reactor, a special component that should be considered part
Expand Down Expand Up @@ -269,8 +270,10 @@ Finally, a `Peer` instance allows a reactor to send messages to companion
reactors running at that peer.
This is ultimately the goal of the switch when it provides `Peer` instances to
the registered reactors.
There are two methods for sending messages:
There are two methods for sending messages, and one auxiliary method to check
whether the peer supports a given channel:

func (p Peer) HasChannel(chID byte) bool
func (p Peer) Send(e Envelope) bool
func (p Peer) TrySend(e Envelope) bool

Expand All @@ -279,6 +282,9 @@ set as follows:

- `ChannelID`: the channel the message should be sent through, which defines
the reactor that will process the message;
- The auxiliary `HasChannel()` method allows testing whether the remote peer
implements a channel; if it does not, both message-sending methods will
immediately return `false`, as sending always fails.
- `Src`: this field represents the source of an incoming message, which is
irrelevant for outgoing messages;
- `Message`: the actual message's payload, which is marshalled using protocol buffers.
Expand Down
Loading