Skip to content

Commit

Permalink
swarm/handler: replace inject_* methods (#3085)
Browse files Browse the repository at this point in the history
Previously, we had one callback for each kind of message that a `ConnectionHandler` would receive from either its `NetworkBehaviour` or the connection itself.

With this patch, we combine these functions, resulting in two callbacks:

- `on_behaviour_event`
- `on_connection_event`

Resolves #3080.
  • Loading branch information
jxs authored Nov 17, 2022
1 parent 6d49bf4 commit 7803524
Show file tree
Hide file tree
Showing 30 changed files with 1,671 additions and 1,047 deletions.
4 changes: 4 additions & 0 deletions protocols/dcutr/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
- Replace `Behaviour`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods.
See [PR 3011].

- Replace `direct::Handler` and `relayed::Handler`'s `ConnectionHandler` implemention `inject_*`
methods with the new `on_*` methods. See [PR 3085].

[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085
[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011

# 0.7.0
Expand Down
21 changes: 20 additions & 1 deletion protocols/dcutr/src/handler/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use libp2p_core::connection::ConnectionId;
use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade};
use libp2p_swarm::handler::ConnectionEvent;
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
NegotiatedSubstream, SubstreamProtocol,
Expand Down Expand Up @@ -75,7 +76,7 @@ impl ConnectionHandler for Handler {
) {
}

fn inject_event(&mut self, _: Self::InEvent) {}
fn on_behaviour_event(&mut self, _: Self::InEvent) {}

fn inject_dial_upgrade_error(
&mut self,
Expand Down Expand Up @@ -111,4 +112,22 @@ impl ConnectionHandler for Handler {
}
Poll::Pending
}

fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(_)
| ConnectionEvent::FullyNegotiatedOutbound(_)
| ConnectionEvent::DialUpgradeError(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::AddressChange(_) => {}
}
}
}
192 changes: 116 additions & 76 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ use libp2p_core::either::{EitherError, EitherOutput};
use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::upgrade::{self, DeniedUpgrade, NegotiationError, UpgradeError};
use libp2p_core::ConnectedPoint;
use libp2p_swarm::handler::{InboundUpgradeSend, OutboundUpgradeSend};
use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
NegotiatedSubstream, SubstreamProtocol,
SubstreamProtocol,
};
use std::collections::VecDeque;
use std::fmt;
Expand Down Expand Up @@ -159,39 +162,15 @@ impl Handler {
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(30)),
}
}
}

impl ConnectionHandler for Handler {
type InEvent = Command;
type OutEvent = Event;
type Error = ConnectionHandlerUpgrErr<
EitherError<protocol::inbound::UpgradeError, protocol::outbound::UpgradeError>,
>;
type InboundProtocol = upgrade::EitherUpgrade<protocol::inbound::Upgrade, DeniedUpgrade>;
type OutboundProtocol = protocol::outbound::Upgrade;
type OutboundOpenInfo = u8; // Number of upgrade attempts.
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
match self.endpoint {
ConnectedPoint::Dialer { .. } => {
SubstreamProtocol::new(upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}), ())
}
ConnectedPoint::Listener { .. } => {
// By the protocol specification the listening side of a relayed connection
// initiates the _direct connection upgrade_. In other words the listening side of
// the relayed connection opens a substream to the dialing side. (Connection roles
// and substream roles are reversed.) The listening side on a relayed connection
// never expects incoming substreams, hence the denied upgrade below.
SubstreamProtocol::new(upgrade::EitherUpgrade::B(DeniedUpgrade), ())
}
}
}

fn inject_fully_negotiated_inbound(
fn on_fully_negotiated_inbound(
&mut self,
output: <Self::InboundProtocol as upgrade::InboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::InboundOpenInfo,
FullyNegotiatedInbound {
protocol: output, ..
}: FullyNegotiatedInbound<
<Self as ConnectionHandler>::InboundProtocol,
<Self as ConnectionHandler>::InboundOpenInfo,
>,
) {
match output {
EitherOutput::First(inbound_connect) => {
Expand All @@ -211,12 +190,15 @@ impl ConnectionHandler for Handler {
}
}

fn inject_fully_negotiated_outbound(
fn on_fully_negotiated_outbound(
&mut self,
protocol::outbound::Connect { obs_addrs }: <Self::OutboundProtocol as upgrade::OutboundUpgrade<
NegotiatedSubstream,
>>::Output,
attempt: Self::OutboundOpenInfo,
FullyNegotiatedOutbound {
protocol: protocol::outbound::Connect { obs_addrs },
info: attempt,
}: FullyNegotiatedOutbound<
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
>,
) {
assert!(
self.endpoint.is_listener(),
Expand All @@ -230,42 +212,12 @@ impl ConnectionHandler for Handler {
));
}

fn inject_event(&mut self, event: Self::InEvent) {
match event {
Command::Connect { obs_addrs, attempt } => {
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
protocol::outbound::Upgrade::new(obs_addrs),
attempt,
),
});
}
Command::AcceptInboundConnect {
inbound_connect,
obs_addrs,
} => {
if self
.inbound_connect
.replace(inbound_connect.accept(obs_addrs).boxed())
.is_some()
{
log::warn!(
"New inbound connect stream while still upgrading previous one. \
Replacing previous with new.",
);
}
}
Command::UpgradeFinishedDontKeepAlive => {
self.keep_alive = KeepAlive::No;
}
}
}

fn inject_listen_upgrade_error(
fn on_listen_upgrade_error(
&mut self,
_: Self::InboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
ListenUpgradeError { error, .. }: ListenUpgradeError<
<Self as ConnectionHandler>::InboundOpenInfo,
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
match error {
ConnectionHandlerUpgrErr::Timeout => {
Expand Down Expand Up @@ -308,10 +260,12 @@ impl ConnectionHandler for Handler {
}
}

fn inject_dial_upgrade_error(
fn on_dial_upgrade_error(
&mut self,
_open_info: Self::OutboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
DialUpgradeError { error, .. }: DialUpgradeError<
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutboundProtocol,
>,
) {
self.keep_alive = KeepAlive::No;

Expand Down Expand Up @@ -342,6 +296,66 @@ impl ConnectionHandler for Handler {
}
}
}
}

impl ConnectionHandler for Handler {
type InEvent = Command;
type OutEvent = Event;
type Error = ConnectionHandlerUpgrErr<
EitherError<protocol::inbound::UpgradeError, protocol::outbound::UpgradeError>,
>;
type InboundProtocol = upgrade::EitherUpgrade<protocol::inbound::Upgrade, DeniedUpgrade>;
type OutboundProtocol = protocol::outbound::Upgrade;
type OutboundOpenInfo = u8; // Number of upgrade attempts.
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
match self.endpoint {
ConnectedPoint::Dialer { .. } => {
SubstreamProtocol::new(upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}), ())
}
ConnectedPoint::Listener { .. } => {
// By the protocol specification the listening side of a relayed connection
// initiates the _direct connection upgrade_. In other words the listening side of
// the relayed connection opens a substream to the dialing side. (Connection roles
// and substream roles are reversed.) The listening side on a relayed connection
// never expects incoming substreams, hence the denied upgrade below.
SubstreamProtocol::new(upgrade::EitherUpgrade::B(DeniedUpgrade), ())
}
}
}

fn on_behaviour_event(&mut self, event: Self::InEvent) {
match event {
Command::Connect { obs_addrs, attempt } => {
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
protocol::outbound::Upgrade::new(obs_addrs),
attempt,
),
});
}
Command::AcceptInboundConnect {
inbound_connect,
obs_addrs,
} => {
if self
.inbound_connect
.replace(inbound_connect.accept(obs_addrs).boxed())
.is_some()
{
log::warn!(
"New inbound connect stream while still upgrading previous one. \
Replacing previous with new.",
);
}
}
Command::UpgradeFinishedDontKeepAlive => {
self.keep_alive = KeepAlive::No;
}
}
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
Expand Down Expand Up @@ -387,4 +401,30 @@ impl ConnectionHandler for Handler {

Poll::Pending
}

fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
self.on_fully_negotiated_inbound(fully_negotiated_inbound)
}
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
self.on_listen_upgrade_error(listen_upgrade_error)
}
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
}
ConnectionEvent::AddressChange(_) => {}
}
}
}
4 changes: 4 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
- Replace `Gossipsub`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods.
See [PR 3011].

- Replace `GossipsubHandler`'s `ConnectionHandler` implemention `inject_*` methods with the new `on_*` methods.
See [PR 3085].

[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085
[PR 3070]: https://github.com/libp2p/rust-libp2p/pull/3070
[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011

Expand Down
Loading

0 comments on commit 7803524

Please sign in to comment.