Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/handler: replace inject_* methods #3085

Merged
merged 18 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions protocols/dcutr/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
- Replace `Behaviour`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods.
See [PR 3011].

- Replace `direct::Handler` and `relayed::Handler`'s `ConnectionHandler` implemention `inject_*`
methods with the new `on_*` methods. See [PR 3085].

[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085
[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011

# 0.7.0
Expand Down
21 changes: 20 additions & 1 deletion protocols/dcutr/src/handler/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use libp2p_core::connection::ConnectionId;
use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade};
use libp2p_swarm::handler::ConnectionEvent;
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
NegotiatedSubstream, SubstreamProtocol,
Expand Down Expand Up @@ -75,7 +76,7 @@ impl ConnectionHandler for Handler {
) {
}

fn inject_event(&mut self, _: Self::InEvent) {}
fn on_behaviour_event(&mut self, _: Self::InEvent) {}

fn inject_dial_upgrade_error(
&mut self,
Expand Down Expand Up @@ -111,4 +112,22 @@ impl ConnectionHandler for Handler {
}
Poll::Pending
}

fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(_)
| ConnectionEvent::FullyNegotiatedOutbound(_)
| ConnectionEvent::DialUpgradeError(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::AddressChange(_) => {}
}
}
}
192 changes: 116 additions & 76 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ use libp2p_core::either::{EitherError, EitherOutput};
use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::upgrade::{self, DeniedUpgrade, NegotiationError, UpgradeError};
use libp2p_core::ConnectedPoint;
use libp2p_swarm::handler::{InboundUpgradeSend, OutboundUpgradeSend};
use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
NegotiatedSubstream, SubstreamProtocol,
SubstreamProtocol,
};
use std::collections::VecDeque;
use std::fmt;
Expand Down Expand Up @@ -159,39 +162,15 @@ impl Handler {
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(30)),
}
}
}

impl ConnectionHandler for Handler {
type InEvent = Command;
type OutEvent = Event;
type Error = ConnectionHandlerUpgrErr<
EitherError<protocol::inbound::UpgradeError, protocol::outbound::UpgradeError>,
>;
type InboundProtocol = upgrade::EitherUpgrade<protocol::inbound::Upgrade, DeniedUpgrade>;
type OutboundProtocol = protocol::outbound::Upgrade;
type OutboundOpenInfo = u8; // Number of upgrade attempts.
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
match self.endpoint {
ConnectedPoint::Dialer { .. } => {
SubstreamProtocol::new(upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}), ())
}
ConnectedPoint::Listener { .. } => {
// By the protocol specification the listening side of a relayed connection
// initiates the _direct connection upgrade_. In other words the listening side of
// the relayed connection opens a substream to the dialing side. (Connection roles
// and substream roles are reversed.) The listening side on a relayed connection
// never expects incoming substreams, hence the denied upgrade below.
SubstreamProtocol::new(upgrade::EitherUpgrade::B(DeniedUpgrade), ())
}
}
}

fn inject_fully_negotiated_inbound(
fn on_fully_negotiated_inbound(
&mut self,
output: <Self::InboundProtocol as upgrade::InboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::InboundOpenInfo,
FullyNegotiatedInbound {
protocol: output, ..
}: FullyNegotiatedInbound<
<Self as ConnectionHandler>::InboundProtocol,
<Self as ConnectionHandler>::InboundOpenInfo,
>,
) {
match output {
EitherOutput::First(inbound_connect) => {
Expand All @@ -211,12 +190,15 @@ impl ConnectionHandler for Handler {
}
}

fn inject_fully_negotiated_outbound(
fn on_fully_negotiated_outbound(
&mut self,
protocol::outbound::Connect { obs_addrs }: <Self::OutboundProtocol as upgrade::OutboundUpgrade<
NegotiatedSubstream,
>>::Output,
attempt: Self::OutboundOpenInfo,
FullyNegotiatedOutbound {
protocol: protocol::outbound::Connect { obs_addrs },
info: attempt,
}: FullyNegotiatedOutbound<
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
>,
) {
assert!(
self.endpoint.is_listener(),
Expand All @@ -230,42 +212,12 @@ impl ConnectionHandler for Handler {
));
}

fn inject_event(&mut self, event: Self::InEvent) {
match event {
Command::Connect { obs_addrs, attempt } => {
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
protocol::outbound::Upgrade::new(obs_addrs),
attempt,
),
});
}
Command::AcceptInboundConnect {
inbound_connect,
obs_addrs,
} => {
if self
.inbound_connect
.replace(inbound_connect.accept(obs_addrs).boxed())
.is_some()
{
log::warn!(
"New inbound connect stream while still upgrading previous one. \
Replacing previous with new.",
);
}
}
Command::UpgradeFinishedDontKeepAlive => {
self.keep_alive = KeepAlive::No;
}
}
}

fn inject_listen_upgrade_error(
fn on_listen_upgrade_error(
&mut self,
_: Self::InboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
ListenUpgradeError { error, .. }: ListenUpgradeError<
<Self as ConnectionHandler>::InboundOpenInfo,
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
match error {
ConnectionHandlerUpgrErr::Timeout => {
Expand Down Expand Up @@ -308,10 +260,12 @@ impl ConnectionHandler for Handler {
}
}

fn inject_dial_upgrade_error(
fn on_dial_upgrade_error(
&mut self,
_open_info: Self::OutboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
DialUpgradeError { error, .. }: DialUpgradeError<
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutboundProtocol,
>,
) {
self.keep_alive = KeepAlive::No;

Expand Down Expand Up @@ -342,6 +296,66 @@ impl ConnectionHandler for Handler {
}
}
}
}

impl ConnectionHandler for Handler {
type InEvent = Command;
type OutEvent = Event;
type Error = ConnectionHandlerUpgrErr<
EitherError<protocol::inbound::UpgradeError, protocol::outbound::UpgradeError>,
>;
type InboundProtocol = upgrade::EitherUpgrade<protocol::inbound::Upgrade, DeniedUpgrade>;
type OutboundProtocol = protocol::outbound::Upgrade;
type OutboundOpenInfo = u8; // Number of upgrade attempts.
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
match self.endpoint {
ConnectedPoint::Dialer { .. } => {
SubstreamProtocol::new(upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}), ())
}
ConnectedPoint::Listener { .. } => {
// By the protocol specification the listening side of a relayed connection
// initiates the _direct connection upgrade_. In other words the listening side of
// the relayed connection opens a substream to the dialing side. (Connection roles
// and substream roles are reversed.) The listening side on a relayed connection
// never expects incoming substreams, hence the denied upgrade below.
SubstreamProtocol::new(upgrade::EitherUpgrade::B(DeniedUpgrade), ())
}
}
}

fn on_behaviour_event(&mut self, event: Self::InEvent) {
match event {
Command::Connect { obs_addrs, attempt } => {
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
protocol::outbound::Upgrade::new(obs_addrs),
attempt,
),
});
}
Command::AcceptInboundConnect {
inbound_connect,
obs_addrs,
} => {
if self
.inbound_connect
.replace(inbound_connect.accept(obs_addrs).boxed())
.is_some()
{
log::warn!(
"New inbound connect stream while still upgrading previous one. \
Replacing previous with new.",
);
}
}
Command::UpgradeFinishedDontKeepAlive => {
self.keep_alive = KeepAlive::No;
}
}
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
Expand Down Expand Up @@ -387,4 +401,30 @@ impl ConnectionHandler for Handler {

Poll::Pending
}

fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
self.on_fully_negotiated_inbound(fully_negotiated_inbound)
}
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
self.on_listen_upgrade_error(listen_upgrade_error)
}
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
}
ConnectionEvent::AddressChange(_) => {}
}
}
}
4 changes: 4 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
- Replace `Gossipsub`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods.
See [PR 3011].

- Replace `GossipsubHandler`'s `ConnectionHandler` implemention `inject_*` methods with the new `on_*` methods.
See [PR 3085].

[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085
[PR 3070]: https://github.com/libp2p/rust-libp2p/pull/3070
[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011

Expand Down
Loading