diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 4f850ed5ce2..c41ead17289 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -9,6 +9,10 @@ - Replace `Behaviour`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods. See [PR 3011]. +- Replace `direct::Handler` and `relayed::Handler`'s `ConnectionHandler` implemention `inject_*` + methods with the new `on_*` methods. See [PR 3085]. + +[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 # 0.7.0 diff --git a/protocols/dcutr/src/handler/direct.rs b/protocols/dcutr/src/handler/direct.rs index 7e0237d75c4..f0fdf5930ac 100644 --- a/protocols/dcutr/src/handler/direct.rs +++ b/protocols/dcutr/src/handler/direct.rs @@ -22,6 +22,7 @@ use libp2p_core::connection::ConnectionId; use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; +use libp2p_swarm::handler::ConnectionEvent; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, NegotiatedSubstream, SubstreamProtocol, @@ -75,7 +76,7 @@ impl ConnectionHandler for Handler { ) { } - fn inject_event(&mut self, _: Self::InEvent) {} + fn on_behaviour_event(&mut self, _: Self::InEvent) {} fn inject_dial_upgrade_error( &mut self, @@ -111,4 +112,22 @@ impl ConnectionHandler for Handler { } Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(_) + | ConnectionEvent::FullyNegotiatedOutbound(_) + | ConnectionEvent::DialUpgradeError(_) + | ConnectionEvent::ListenUpgradeError(_) + | ConnectionEvent::AddressChange(_) => {} + } + } } diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index e172b8f6993..301f2ee3d82 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -27,10 +27,13 @@ use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::multiaddr::Multiaddr; use libp2p_core::upgrade::{self, DeniedUpgrade, NegotiationError, UpgradeError}; use libp2p_core::ConnectedPoint; -use libp2p_swarm::handler::{InboundUpgradeSend, OutboundUpgradeSend}; +use libp2p_swarm::handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ListenUpgradeError, +}; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - NegotiatedSubstream, SubstreamProtocol, + SubstreamProtocol, }; use std::collections::VecDeque; use std::fmt; @@ -159,39 +162,15 @@ impl Handler { keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(30)), } } -} - -impl ConnectionHandler for Handler { - type InEvent = Command; - type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< - EitherError, - >; - type InboundProtocol = upgrade::EitherUpgrade; - type OutboundProtocol = protocol::outbound::Upgrade; - type OutboundOpenInfo = u8; // Number of upgrade attempts. - type InboundOpenInfo = (); - fn listen_protocol(&self) -> SubstreamProtocol { - match self.endpoint { - ConnectedPoint::Dialer { .. } => { - SubstreamProtocol::new(upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}), ()) - } - ConnectedPoint::Listener { .. } => { - // By the protocol specification the listening side of a relayed connection - // initiates the _direct connection upgrade_. In other words the listening side of - // the relayed connection opens a substream to the dialing side. (Connection roles - // and substream roles are reversed.) The listening side on a relayed connection - // never expects incoming substreams, hence the denied upgrade below. - SubstreamProtocol::new(upgrade::EitherUpgrade::B(DeniedUpgrade), ()) - } - } - } - - fn inject_fully_negotiated_inbound( + fn on_fully_negotiated_inbound( &mut self, - output: >::Output, - _: Self::InboundOpenInfo, + FullyNegotiatedInbound { + protocol: output, .. + }: FullyNegotiatedInbound< + ::InboundProtocol, + ::InboundOpenInfo, + >, ) { match output { EitherOutput::First(inbound_connect) => { @@ -211,12 +190,15 @@ impl ConnectionHandler for Handler { } } - fn inject_fully_negotiated_outbound( + fn on_fully_negotiated_outbound( &mut self, - protocol::outbound::Connect { obs_addrs }: >::Output, - attempt: Self::OutboundOpenInfo, + FullyNegotiatedOutbound { + protocol: protocol::outbound::Connect { obs_addrs }, + info: attempt, + }: FullyNegotiatedOutbound< + ::OutboundProtocol, + ::OutboundOpenInfo, + >, ) { assert!( self.endpoint.is_listener(), @@ -230,42 +212,12 @@ impl ConnectionHandler for Handler { )); } - fn inject_event(&mut self, event: Self::InEvent) { - match event { - Command::Connect { obs_addrs, attempt } => { - self.queued_events - .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new( - protocol::outbound::Upgrade::new(obs_addrs), - attempt, - ), - }); - } - Command::AcceptInboundConnect { - inbound_connect, - obs_addrs, - } => { - if self - .inbound_connect - .replace(inbound_connect.accept(obs_addrs).boxed()) - .is_some() - { - log::warn!( - "New inbound connect stream while still upgrading previous one. \ - Replacing previous with new.", - ); - } - } - Command::UpgradeFinishedDontKeepAlive => { - self.keep_alive = KeepAlive::No; - } - } - } - - fn inject_listen_upgrade_error( + fn on_listen_upgrade_error( &mut self, - _: Self::InboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, + ListenUpgradeError { error, .. }: ListenUpgradeError< + ::InboundOpenInfo, + ::InboundProtocol, + >, ) { match error { ConnectionHandlerUpgrErr::Timeout => { @@ -308,10 +260,12 @@ impl ConnectionHandler for Handler { } } - fn inject_dial_upgrade_error( + fn on_dial_upgrade_error( &mut self, - _open_info: Self::OutboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, + DialUpgradeError { error, .. }: DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, ) { self.keep_alive = KeepAlive::No; @@ -342,6 +296,66 @@ impl ConnectionHandler for Handler { } } } +} + +impl ConnectionHandler for Handler { + type InEvent = Command; + type OutEvent = Event; + type Error = ConnectionHandlerUpgrErr< + EitherError, + >; + type InboundProtocol = upgrade::EitherUpgrade; + type OutboundProtocol = protocol::outbound::Upgrade; + type OutboundOpenInfo = u8; // Number of upgrade attempts. + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + match self.endpoint { + ConnectedPoint::Dialer { .. } => { + SubstreamProtocol::new(upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}), ()) + } + ConnectedPoint::Listener { .. } => { + // By the protocol specification the listening side of a relayed connection + // initiates the _direct connection upgrade_. In other words the listening side of + // the relayed connection opens a substream to the dialing side. (Connection roles + // and substream roles are reversed.) The listening side on a relayed connection + // never expects incoming substreams, hence the denied upgrade below. + SubstreamProtocol::new(upgrade::EitherUpgrade::B(DeniedUpgrade), ()) + } + } + } + + fn on_behaviour_event(&mut self, event: Self::InEvent) { + match event { + Command::Connect { obs_addrs, attempt } => { + self.queued_events + .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + protocol::outbound::Upgrade::new(obs_addrs), + attempt, + ), + }); + } + Command::AcceptInboundConnect { + inbound_connect, + obs_addrs, + } => { + if self + .inbound_connect + .replace(inbound_connect.accept(obs_addrs).boxed()) + .is_some() + { + log::warn!( + "New inbound connect stream while still upgrading previous one. \ + Replacing previous with new.", + ); + } + } + Command::UpgradeFinishedDontKeepAlive => { + self.keep_alive = KeepAlive::No; + } + } + } fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive @@ -387,4 +401,30 @@ impl ConnectionHandler for Handler { Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + self.on_fully_negotiated_inbound(fully_negotiated_inbound) + } + ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + self.on_fully_negotiated_outbound(fully_negotiated_outbound) + } + ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { + self.on_listen_upgrade_error(listen_upgrade_error) + } + ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + self.on_dial_upgrade_error(dial_upgrade_error) + } + ConnectionEvent::AddressChange(_) => {} + } + } } diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 0cdc6e99cfd..c1caa980788 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -11,6 +11,10 @@ - Replace `Gossipsub`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods. See [PR 3011]. +- Replace `GossipsubHandler`'s `ConnectionHandler` implemention `inject_*` methods with the new `on_*` methods. + See [PR 3085]. + +[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3070]: https://github.com/libp2p/rust-libp2p/pull/3070 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 9ea2e6ea49f..68bcf912975 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -25,9 +25,10 @@ use asynchronous_codec::Framed; use futures::prelude::*; use futures::StreamExt; use instant::Instant; -use libp2p_core::upgrade::{InboundUpgrade, NegotiationError, OutboundUpgrade, UpgradeError}; +use libp2p_core::upgrade::{NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, + DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, }; use libp2p_swarm::NegotiatedSubstream; @@ -180,25 +181,13 @@ impl GossipsubHandler { in_mesh: false, } } -} - -impl ConnectionHandler for GossipsubHandler { - type InEvent = GossipsubHandlerIn; - type OutEvent = HandlerEvent; - type Error = GossipsubHandlerError; - type InboundOpenInfo = (); - type InboundProtocol = ProtocolConfig; - type OutboundOpenInfo = crate::rpc_proto::Rpc; - type OutboundProtocol = ProtocolConfig; - - fn listen_protocol(&self) -> SubstreamProtocol { - self.listen_protocol.clone() - } - fn inject_fully_negotiated_inbound( + fn on_fully_negotiated_inbound( &mut self, - protocol: >::Output, - _info: Self::InboundOpenInfo, + FullyNegotiatedInbound { protocol, .. }: FullyNegotiatedInbound< + ::InboundProtocol, + ::InboundOpenInfo, + >, ) { let (substream, peer_kind) = protocol; @@ -219,10 +208,15 @@ impl ConnectionHandler for GossipsubHandler { self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream)); } - fn inject_fully_negotiated_outbound( + fn on_fully_negotiated_outbound( &mut self, - protocol: >::Output, - message: Self::OutboundOpenInfo, + FullyNegotiatedOutbound { + protocol, + info: message, + }: FullyNegotiatedOutbound< + ::OutboundProtocol, + ::OutboundOpenInfo, + >, ) { let (substream, peer_kind) = protocol; @@ -249,8 +243,22 @@ impl ConnectionHandler for GossipsubHandler { self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); } } +} - fn inject_event(&mut self, message: GossipsubHandlerIn) { +impl ConnectionHandler for GossipsubHandler { + type InEvent = GossipsubHandlerIn; + type OutEvent = HandlerEvent; + type Error = GossipsubHandlerError; + type InboundOpenInfo = (); + type InboundProtocol = ProtocolConfig; + type OutboundOpenInfo = crate::rpc_proto::Rpc; + type OutboundProtocol = ProtocolConfig; + + fn listen_protocol(&self) -> SubstreamProtocol { + self.listen_protocol.clone() + } + + fn on_behaviour_event(&mut self, message: GossipsubHandlerIn) { if !self.protocol_unsupported { match message { GossipsubHandlerIn::Message(m) => self.send_queue.push(m), @@ -268,18 +276,6 @@ impl ConnectionHandler for GossipsubHandler { } } - fn inject_dial_upgrade_error( - &mut self, - _: Self::OutboundOpenInfo, - e: ConnectionHandlerUpgrErr< - >::Error, - >, - ) { - self.outbound_substream_establishing = false; - warn!("Dial upgrade error {:?}", e); - self.upgrade_errors.push_back(e); - } - fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive } @@ -560,4 +556,29 @@ impl ConnectionHandler for GossipsubHandler { Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + self.on_fully_negotiated_inbound(fully_negotiated_inbound) + } + ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + self.on_fully_negotiated_outbound(fully_negotiated_outbound) + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { error: e, .. }) => { + self.outbound_substream_establishing = false; + warn!("Dial upgrade error {:?}", e); + self.upgrade_errors.push_back(e); + } + ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} + } + } } diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index a91ce9789f2..93093b25343 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -11,6 +11,10 @@ - Replace `Behaviour`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods. See [PR 3011]. +- Replace `Handler`'s `ConnectionHandler` implemention `inject_*` methods with the new `on_*` methods. + See [PR 3085]. + +[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 [PR 2995]: https://github.com/libp2p/rust-libp2p/pull/2995 diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 987b56496f5..0de54f0a006 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -25,8 +25,11 @@ use futures::future::BoxFuture; use futures::prelude::*; use futures_timer::Delay; use libp2p_core::either::{EitherError, EitherOutput}; -use libp2p_core::upgrade::{EitherUpgrade, InboundUpgrade, OutboundUpgrade, SelectUpgrade}; +use libp2p_core::upgrade::{EitherUpgrade, SelectUpgrade}; use libp2p_core::{ConnectedPoint, PeerId}; +use libp2p_swarm::handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, +}; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, KeepAlive, NegotiatedSubstream, SubstreamProtocol, @@ -119,25 +122,15 @@ impl Handler { interval, } } -} - -impl ConnectionHandler for Handler { - type InEvent = Push; - type OutEvent = Event; - type Error = io::Error; - type InboundProtocol = SelectUpgrade>; - type OutboundProtocol = EitherUpgrade>; - type OutboundOpenInfo = (); - type InboundOpenInfo = (); - - fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(SelectUpgrade::new(Protocol, PushProtocol::inbound()), ()) - } - fn inject_fully_negotiated_inbound( + fn on_fully_negotiated_inbound( &mut self, - output: >::Output, - _: Self::InboundOpenInfo, + FullyNegotiatedInbound { + protocol: output, .. + }: FullyNegotiatedInbound< + ::InboundProtocol, + ::InboundOpenInfo, + >, ) { match output { EitherOutput::First(substream) => self @@ -155,10 +148,14 @@ impl ConnectionHandler for Handler { } } - fn inject_fully_negotiated_outbound( + fn on_fully_negotiated_outbound( &mut self, - output: >::Output, - _: Self::OutboundOpenInfo, + FullyNegotiatedOutbound { + protocol: output, .. + }: FullyNegotiatedOutbound< + ::OutboundProtocol, + ::OutboundOpenInfo, + >, ) { match output { EitherOutput::First(remote_info) => { @@ -174,21 +171,11 @@ impl ConnectionHandler for Handler { } } - fn inject_event(&mut self, Push(push): Self::InEvent) { - self.events - .push(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new( - EitherUpgrade::B(PushProtocol::outbound(push)), - (), - ), - }); - } - - fn inject_dial_upgrade_error( + fn on_dial_upgrade_error( &mut self, - _info: Self::OutboundOpenInfo, - err: ConnectionHandlerUpgrErr< - >::Error, + DialUpgradeError { error: err, .. }: DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, >, ) { use libp2p_core::upgrade::UpgradeError; @@ -205,6 +192,30 @@ impl ConnectionHandler for Handler { self.keep_alive = KeepAlive::No; self.trigger_next_identify.reset(self.interval); } +} + +impl ConnectionHandler for Handler { + type InEvent = Push; + type OutEvent = Event; + type Error = io::Error; + type InboundProtocol = SelectUpgrade>; + type OutboundProtocol = EitherUpgrade>; + type OutboundOpenInfo = (); + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(SelectUpgrade::new(Protocol, PushProtocol::inbound()), ()) + } + + fn on_behaviour_event(&mut self, Push(push): Self::InEvent) { + self.events + .push(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + EitherUpgrade::B(PushProtocol::outbound(push)), + (), + ), + }); + } fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive @@ -246,4 +257,27 @@ impl ConnectionHandler for Handler { Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + self.on_fully_negotiated_inbound(fully_negotiated_inbound) + } + ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + self.on_fully_negotiated_outbound(fully_negotiated_outbound) + } + ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + self.on_dial_upgrade_error(dial_upgrade_error) + } + ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} + } + } } diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index a4b23ebd757..bc1bcbe7823 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -7,6 +7,10 @@ - Replace `Kademlia`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods. See [PR 3011]. +- Replace `KademliaHandler`'s `ConnectionHandler` implemention `inject_*` methods with the new `on_*` methods. + See [PR 3085]. + +[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 # 0.41.0 diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index a2efb9063fe..a43804701a2 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -26,10 +26,9 @@ use crate::record::{self, Record}; use futures::prelude::*; use futures::stream::SelectAll; use instant::Instant; -use libp2p_core::{ - either::EitherOutput, - upgrade::{self, InboundUpgrade, OutboundUpgrade}, - ConnectedPoint, PeerId, +use libp2p_core::{either::EitherOutput, upgrade, ConnectedPoint, PeerId}; +use libp2p_swarm::handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, @@ -507,7 +506,7 @@ struct UniqueConnecId(u64); impl KademliaHandler where - TUserData: Unpin, + TUserData: Clone + fmt::Debug + Send + 'static + Unpin, { /// Create a [`KademliaHandler`] using the given configuration. pub fn new( @@ -528,34 +527,16 @@ where protocol_status: ProtocolStatus::Unconfirmed, } } -} -impl ConnectionHandler for KademliaHandler -where - TUserData: Clone + fmt::Debug + Send + 'static + Unpin, -{ - type InEvent = KademliaHandlerIn; - type OutEvent = KademliaHandlerEvent; - type Error = io::Error; // TODO: better error type? - type InboundProtocol = upgrade::EitherUpgrade; - type OutboundProtocol = KademliaProtocolConfig; - // Message of the request to send to the remote, and user data if we expect an answer. - type OutboundOpenInfo = (KadRequestMsg, Option); - type InboundOpenInfo = (); - - fn listen_protocol(&self) -> SubstreamProtocol { - if self.config.allow_listening { - SubstreamProtocol::new(self.config.protocol_config.clone(), ()) - .map_upgrade(upgrade::EitherUpgrade::A) - } else { - SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade), ()) - } - } - - fn inject_fully_negotiated_outbound( + fn on_fully_negotiated_outbound( &mut self, - protocol: >::Output, - (msg, user_data): Self::OutboundOpenInfo, + FullyNegotiatedOutbound { + protocol, + info: (msg, user_data), + }: FullyNegotiatedOutbound< + ::OutboundProtocol, + ::OutboundOpenInfo, + >, ) { self.outbound_substreams .push(OutboundSubstreamState::PendingSend( @@ -569,10 +550,12 @@ where } } - fn inject_fully_negotiated_inbound( + fn on_fully_negotiated_inbound( &mut self, - protocol: >::Output, - (): Self::InboundOpenInfo, + FullyNegotiatedInbound { protocol, .. }: FullyNegotiatedInbound< + ::InboundProtocol, + ::InboundOpenInfo, + >, ) { // If `self.allow_listening` is false, then we produced a `DeniedUpgrade` and `protocol` // is a `Void`. @@ -623,7 +606,49 @@ where }); } - fn inject_event(&mut self, message: KademliaHandlerIn) { + fn on_dial_upgrade_error( + &mut self, + DialUpgradeError { + info: (_, user_data), + error, + .. + }: DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, + ) { + // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't + // continue trying + if let Some(user_data) = user_data { + self.outbound_substreams + .push(OutboundSubstreamState::ReportError(error.into(), user_data)); + } + } +} + +impl ConnectionHandler for KademliaHandler +where + TUserData: Clone + fmt::Debug + Send + 'static + Unpin, +{ + type InEvent = KademliaHandlerIn; + type OutEvent = KademliaHandlerEvent; + type Error = io::Error; // TODO: better error type? + type InboundProtocol = upgrade::EitherUpgrade; + type OutboundProtocol = KademliaProtocolConfig; + // Message of the request to send to the remote, and user data if we expect an answer. + type OutboundOpenInfo = (KadRequestMsg, Option); + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + if self.config.allow_listening { + SubstreamProtocol::new(self.config.protocol_config.clone(), ()) + .map_upgrade(upgrade::EitherUpgrade::A) + } else { + SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade), ()) + } + } + + fn on_behaviour_event(&mut self, message: KademliaHandlerIn) { match message { KademliaHandlerIn::Reset(request_id) => { if let Some(state) = self @@ -717,19 +742,6 @@ where } } - fn inject_dial_upgrade_error( - &mut self, - (_, user_data): Self::OutboundOpenInfo, - error: ConnectionHandlerUpgrErr, - ) { - // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't - // continue trying - if let Some(user_data) = user_data { - self.outbound_substreams - .push(OutboundSubstreamState::ReportError(error.into(), user_data)); - } - } - fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive } @@ -771,6 +783,29 @@ where Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + self.on_fully_negotiated_outbound(fully_negotiated_outbound) + } + ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + self.on_fully_negotiated_inbound(fully_negotiated_inbound) + } + ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + self.on_dial_upgrade_error(dial_upgrade_error) + } + ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} + } + } } impl KademliaHandler diff --git a/protocols/ping/CHANGELOG.md b/protocols/ping/CHANGELOG.md index 26fc9835e4d..12f11597261 100644 --- a/protocols/ping/CHANGELOG.md +++ b/protocols/ping/CHANGELOG.md @@ -7,6 +7,10 @@ - Replace `Behaviour`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods. See [PR 3011]. +- Replace `Handler`'s `ConnectionHandler` implemention `inject_*` methods with the new `on_*` methods. + See [PR 3085]. + +[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 # 0.40.0 diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index af1ef898981..67d4b66c0a0 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -24,6 +24,9 @@ use futures::prelude::*; use futures_timer::Delay; use libp2p_core::upgrade::ReadyUpgrade; use libp2p_core::{upgrade::NegotiationError, UpgradeError}; +use libp2p_swarm::handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, +}; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, NegotiatedSubstream, SubstreamProtocol, @@ -224,33 +227,14 @@ impl Handler { state: State::Active, } } -} - -impl ConnectionHandler for Handler { - type InEvent = Void; - type OutEvent = crate::Result; - type Error = Failure; - type InboundProtocol = ReadyUpgrade<&'static [u8]>; - type OutboundProtocol = ReadyUpgrade<&'static [u8]>; - type OutboundOpenInfo = (); - type InboundOpenInfo = (); - - fn listen_protocol(&self) -> SubstreamProtocol, ()> { - SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) - } - - fn inject_fully_negotiated_inbound(&mut self, stream: NegotiatedSubstream, (): ()) { - self.inbound = Some(protocol::recv_ping(stream).boxed()); - } - fn inject_fully_negotiated_outbound(&mut self, stream: NegotiatedSubstream, (): ()) { - self.timer.reset(self.config.timeout); - self.outbound = Some(OutboundState::Ping(protocol::send_ping(stream).boxed())); - } - - fn inject_event(&mut self, _: Void) {} - - fn inject_dial_upgrade_error(&mut self, _info: (), error: ConnectionHandlerUpgrErr) { + fn on_dial_upgrade_error( + &mut self, + DialUpgradeError { error, .. }: DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, + ) { self.outbound = None; // Request a new substream on the next `poll`. let error = match error { @@ -267,6 +251,22 @@ impl ConnectionHandler for Handler { self.pending_errors.push_front(error); } +} + +impl ConnectionHandler for Handler { + type InEvent = Void; + type OutEvent = crate::Result; + type Error = Failure; + type InboundProtocol = ReadyUpgrade<&'static [u8]>; + type OutboundProtocol = ReadyUpgrade<&'static [u8]>; + type OutboundOpenInfo = (); + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol, ()> { + SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) + } + + fn on_behaviour_event(&mut self, _: Void) {} fn connection_keep_alive(&self) -> KeepAlive { if self.config.keep_alive { @@ -384,6 +384,36 @@ impl ConnectionHandler for Handler { Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol: stream, + .. + }) => { + self.inbound = Some(protocol::recv_ping(stream).boxed()); + } + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol: stream, + .. + }) => { + self.timer.reset(self.config.timeout); + self.outbound = Some(OutboundState::Ping(protocol::send_ping(stream).boxed())); + } + ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + self.on_dial_upgrade_error(dial_upgrade_error) + } + ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} + } + } } type PingFuture = BoxFuture<'static, Result<(NegotiatedSubstream, Duration), io::Error>>; diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 27f567c81d8..0dcc650d626 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -9,6 +9,10 @@ - Replace `Client` and `Relay`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods. See [PR 3011]. +- Replace `client::Handler` and `relay::Handler`'s `ConnectionHandler` implemention `inject_*` methods + with the new `on_*` methods. See [PR 3085]. + +[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 # 0.13.0 diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index a346a4eaeb3..5d01cf9dbce 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -31,10 +31,13 @@ use instant::Instant; use libp2p_core::either::EitherError; use libp2p_core::multiaddr::Protocol; use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; -use libp2p_swarm::handler::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; +use libp2p_swarm::handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ListenUpgradeError, SendWrapper, +}; use libp2p_swarm::{ dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - IntoConnectionHandler, KeepAlive, NegotiatedSubstream, SubstreamProtocol, + IntoConnectionHandler, KeepAlive, SubstreamProtocol, }; use log::debug; use std::collections::{HashMap, VecDeque}; @@ -152,7 +155,7 @@ impl IntoConnectionHandler for Prototype { }; if let Some(event) = self.initial_in { - handler.inject_event(event) + handler.on_behaviour_event(event) } Either::Left(handler) @@ -209,25 +212,16 @@ pub struct Handler { send_error_futs: FuturesUnordered>, } -impl ConnectionHandler for Handler { - type InEvent = In; - type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< - EitherError, - >; - type InboundProtocol = inbound_stop::Upgrade; - type OutboundProtocol = outbound_hop::Upgrade; - type OutboundOpenInfo = OutboundOpenInfo; - type InboundOpenInfo = (); - - fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(inbound_stop::Upgrade {}, ()) - } - - fn inject_fully_negotiated_inbound( +impl Handler { + fn on_fully_negotiated_inbound( &mut self, - inbound_circuit: inbound_stop::Circuit, - _: Self::InboundOpenInfo, + FullyNegotiatedInbound { + protocol: inbound_circuit, + .. + }: FullyNegotiatedInbound< + ::InboundProtocol, + ::InboundOpenInfo, + >, ) { match &mut self.reservation { Reservation::Accepted { pending_msgs, .. } @@ -280,10 +274,15 @@ impl ConnectionHandler for Handler { } } - fn inject_fully_negotiated_outbound( + fn on_fully_negotiated_outbound( &mut self, - output: >::Output, - info: Self::OutboundOpenInfo, + FullyNegotiatedOutbound { + protocol: output, + info, + }: FullyNegotiatedOutbound< + ::OutboundProtocol, + ::OutboundOpenInfo, + >, ) { match (output, info) { // Outbound reservation @@ -340,36 +339,12 @@ impl ConnectionHandler for Handler { } } - fn inject_event(&mut self, event: Self::InEvent) { - match event { - In::Reserve { to_listener } => { - self.queued_events - .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new( - outbound_hop::Upgrade::Reserve, - OutboundOpenInfo::Reserve { to_listener }, - ), - }); - } - In::EstablishCircuit { - send_back, - dst_peer_id, - } => { - self.queued_events - .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new( - outbound_hop::Upgrade::Connect { dst_peer_id }, - OutboundOpenInfo::Connect { send_back }, - ), - }); - } - } - } - - fn inject_listen_upgrade_error( + fn on_listen_upgrade_error( &mut self, - _: Self::InboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, + ListenUpgradeError { error, .. }: ListenUpgradeError< + ::InboundOpenInfo, + ::InboundProtocol, + >, ) { let non_fatal_error = match error { ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, @@ -404,10 +379,15 @@ impl ConnectionHandler for Handler { )); } - fn inject_dial_upgrade_error( + fn on_dial_upgrade_error( &mut self, - open_info: Self::OutboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, + DialUpgradeError { + info: open_info, + error, + }: DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, ) { match open_info { OutboundOpenInfo::Reserve { mut to_listener } => { @@ -524,6 +504,48 @@ impl ConnectionHandler for Handler { } } } +} + +impl ConnectionHandler for Handler { + type InEvent = In; + type OutEvent = Event; + type Error = ConnectionHandlerUpgrErr< + EitherError, + >; + type InboundProtocol = inbound_stop::Upgrade; + type OutboundProtocol = outbound_hop::Upgrade; + type OutboundOpenInfo = OutboundOpenInfo; + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(inbound_stop::Upgrade {}, ()) + } + + fn on_behaviour_event(&mut self, event: Self::InEvent) { + match event { + In::Reserve { to_listener } => { + self.queued_events + .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + outbound_hop::Upgrade::Reserve, + OutboundOpenInfo::Reserve { to_listener }, + ), + }); + } + In::EstablishCircuit { + send_back, + dst_peer_id, + } => { + self.queued_events + .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + outbound_hop::Upgrade::Connect { dst_peer_id }, + OutboundOpenInfo::Connect { send_back }, + ), + }); + } + } + } fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive @@ -610,6 +632,32 @@ impl ConnectionHandler for Handler { Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + self.on_fully_negotiated_inbound(fully_negotiated_inbound) + } + ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + self.on_fully_negotiated_outbound(fully_negotiated_outbound) + } + ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { + self.on_listen_upgrade_error(listen_upgrade_error) + } + ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + self.on_dial_upgrade_error(dial_upgrade_error) + } + ConnectionEvent::AddressChange(_) => {} + } + } } enum Reservation { diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index 1c6987692fa..ef8b40755b2 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -33,8 +33,10 @@ use instant::Instant; use libp2p_core::connection::ConnectionId; use libp2p_core::either::EitherError; use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; -use libp2p_swarm::handler::SendWrapper; -use libp2p_swarm::handler::{InboundUpgradeSend, OutboundUpgradeSend}; +use libp2p_swarm::handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ListenUpgradeError, SendWrapper, +}; use libp2p_swarm::{ dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, KeepAlive, NegotiatedSubstream, SubstreamProtocol, @@ -429,39 +431,15 @@ pub struct Handler { circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>, } -enum ReservationRequestFuture { - Accepting(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>), - Denying(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>), -} - -type Futures = FuturesUnordered>; - -impl ConnectionHandler for Handler { - type InEvent = In; - type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< - EitherError, - >; - type InboundProtocol = inbound_hop::Upgrade; - type OutboundProtocol = outbound_stop::Upgrade; - type OutboundOpenInfo = OutboundOpenInfo; - type InboundOpenInfo = (); - - fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new( - inbound_hop::Upgrade { - reservation_duration: self.config.reservation_duration, - max_circuit_duration: self.config.max_circuit_duration, - max_circuit_bytes: self.config.max_circuit_bytes, - }, - (), - ) - } - - fn inject_fully_negotiated_inbound( +impl Handler { + fn on_fully_negotiated_inbound( &mut self, - request: >::Output, - _: Self::InboundOpenInfo, + FullyNegotiatedInbound { + protocol: request, .. + }: FullyNegotiatedInbound< + ::InboundProtocol, + ::InboundOpenInfo, + >, ) { match request { inbound_hop::Req::Reserve(inbound_reservation_req) => { @@ -484,12 +462,15 @@ impl ConnectionHandler for Handler { } } - fn inject_fully_negotiated_outbound( + fn on_fully_negotiated_outbound( &mut self, - (dst_stream, dst_pending_data): >::Output, - outbound_open_info: Self::OutboundOpenInfo, + FullyNegotiatedOutbound { + protocol: (dst_stream, dst_pending_data), + info: outbound_open_info, + }: FullyNegotiatedOutbound< + ::OutboundProtocol, + ::OutboundOpenInfo, + >, ) { let OutboundOpenInfo { circuit_id, @@ -513,104 +494,12 @@ impl ConnectionHandler for Handler { )); } - fn inject_event(&mut self, event: Self::InEvent) { - match event { - In::AcceptReservationReq { - inbound_reservation_req, - addrs, - } => { - if self - .reservation_request_future - .replace(ReservationRequestFuture::Accepting( - inbound_reservation_req.accept(addrs).boxed(), - )) - .is_some() - { - log::warn!("Dropping existing deny/accept future in favor of new one.") - } - } - In::DenyReservationReq { - inbound_reservation_req, - status, - } => { - if self - .reservation_request_future - .replace(ReservationRequestFuture::Denying( - inbound_reservation_req.deny(status).boxed(), - )) - .is_some() - { - log::warn!("Dropping existing deny/accept future in favor of new one.") - } - } - In::NegotiateOutboundConnect { - circuit_id, - inbound_circuit_req, - relay_peer_id, - src_peer_id, - src_connection_id, - } => { - self.queued_events - .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new( - outbound_stop::Upgrade { - relay_peer_id, - max_circuit_duration: self.config.max_circuit_duration, - max_circuit_bytes: self.config.max_circuit_bytes, - }, - OutboundOpenInfo { - circuit_id, - inbound_circuit_req, - src_peer_id, - src_connection_id, - }, - ), - }); - } - In::DenyCircuitReq { - circuit_id, - inbound_circuit_req, - status, - } => { - let dst_peer_id = inbound_circuit_req.dst(); - self.circuit_deny_futures.push( - inbound_circuit_req - .deny(status) - .map(move |result| (circuit_id, dst_peer_id, result)) - .boxed(), - ); - } - In::AcceptAndDriveCircuit { - circuit_id, - dst_peer_id, - inbound_circuit_req, - dst_handler_notifier, - dst_stream, - dst_pending_data, - } => { - self.circuit_accept_futures.push( - inbound_circuit_req - .accept() - .map_ok(move |(src_stream, src_pending_data)| CircuitParts { - circuit_id, - src_stream, - src_pending_data, - dst_peer_id, - dst_handler_notifier, - dst_stream, - dst_pending_data, - }) - .map_err(move |e| (circuit_id, dst_peer_id, e)) - .boxed(), - ); - } - } - } - - fn inject_listen_upgrade_error( + fn on_listen_upgrade_error( &mut self, - _: Self::InboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, + ListenUpgradeError { error, .. }: ListenUpgradeError< + ::InboundOpenInfo, + ::InboundProtocol, + >, ) { let non_fatal_error = match error { ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, @@ -645,10 +534,15 @@ impl ConnectionHandler for Handler { )); } - fn inject_dial_upgrade_error( + fn on_dial_upgrade_error( &mut self, - open_info: Self::OutboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, + DialUpgradeError { + info: open_info, + error, + }: DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, ) { let (non_fatal_error, status) = match error { ConnectionHandlerUpgrErr::Timeout => { @@ -717,6 +611,130 @@ impl ConnectionHandler for Handler { }, )); } +} + +enum ReservationRequestFuture { + Accepting(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>), + Denying(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>), +} + +type Futures = FuturesUnordered>; + +impl ConnectionHandler for Handler { + type InEvent = In; + type OutEvent = Event; + type Error = ConnectionHandlerUpgrErr< + EitherError, + >; + type InboundProtocol = inbound_hop::Upgrade; + type OutboundProtocol = outbound_stop::Upgrade; + type OutboundOpenInfo = OutboundOpenInfo; + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new( + inbound_hop::Upgrade { + reservation_duration: self.config.reservation_duration, + max_circuit_duration: self.config.max_circuit_duration, + max_circuit_bytes: self.config.max_circuit_bytes, + }, + (), + ) + } + + fn on_behaviour_event(&mut self, event: Self::InEvent) { + match event { + In::AcceptReservationReq { + inbound_reservation_req, + addrs, + } => { + if self + .reservation_request_future + .replace(ReservationRequestFuture::Accepting( + inbound_reservation_req.accept(addrs).boxed(), + )) + .is_some() + { + log::warn!("Dropping existing deny/accept future in favor of new one.") + } + } + In::DenyReservationReq { + inbound_reservation_req, + status, + } => { + if self + .reservation_request_future + .replace(ReservationRequestFuture::Denying( + inbound_reservation_req.deny(status).boxed(), + )) + .is_some() + { + log::warn!("Dropping existing deny/accept future in favor of new one.") + } + } + In::NegotiateOutboundConnect { + circuit_id, + inbound_circuit_req, + relay_peer_id, + src_peer_id, + src_connection_id, + } => { + self.queued_events + .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + outbound_stop::Upgrade { + relay_peer_id, + max_circuit_duration: self.config.max_circuit_duration, + max_circuit_bytes: self.config.max_circuit_bytes, + }, + OutboundOpenInfo { + circuit_id, + inbound_circuit_req, + src_peer_id, + src_connection_id, + }, + ), + }); + } + In::DenyCircuitReq { + circuit_id, + inbound_circuit_req, + status, + } => { + let dst_peer_id = inbound_circuit_req.dst(); + self.circuit_deny_futures.push( + inbound_circuit_req + .deny(status) + .map(move |result| (circuit_id, dst_peer_id, result)) + .boxed(), + ); + } + In::AcceptAndDriveCircuit { + circuit_id, + dst_peer_id, + inbound_circuit_req, + dst_handler_notifier, + dst_stream, + dst_pending_data, + } => { + self.circuit_accept_futures.push( + inbound_circuit_req + .accept() + .map_ok(move |(src_stream, src_pending_data)| CircuitParts { + circuit_id, + src_stream, + src_pending_data, + dst_peer_id, + dst_handler_notifier, + dst_stream, + dst_pending_data, + }) + .map_err(move |e| (circuit_id, dst_peer_id, e)) + .boxed(), + ); + } + } + } fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive @@ -933,6 +951,32 @@ impl ConnectionHandler for Handler { Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + self.on_fully_negotiated_inbound(fully_negotiated_inbound) + } + ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + self.on_fully_negotiated_outbound(fully_negotiated_outbound) + } + ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { + self.on_listen_upgrade_error(listen_upgrade_error) + } + ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + self.on_dial_upgrade_error(dial_upgrade_error) + } + ConnectionEvent::AddressChange(_) => {} + } + } } pub struct OutboundOpenInfo { diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 59409581c90..f6c3a6a0865 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -7,6 +7,10 @@ - Replace `RequestResponse`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods. See [PR 3011]. +- Replace `RequestResponseHandler`'s `ConnectionHandler` implemention `inject_*` methods + with the new `on_*` methods. See [PR 3085]. + +[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 # 0.22.0 diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index fd843b7d473..317da87dc0a 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -23,6 +23,10 @@ mod protocol; use crate::codec::RequestResponseCodec; use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; +use libp2p_swarm::handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ListenUpgradeError, +}; pub use protocol::{ProtocolSupport, RequestProtocol, ResponseProtocol}; use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; @@ -86,7 +90,7 @@ where impl RequestResponseHandler where - TCodec: RequestResponseCodec, + TCodec: RequestResponseCodec + Send + Clone + 'static, { pub(super) fn new( inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, @@ -108,6 +112,83 @@ where inbound_request_id, } } + + fn on_fully_negotiated_inbound( + &mut self, + FullyNegotiatedInbound { + protocol: sent, + info: request_id, + }: FullyNegotiatedInbound< + ::InboundProtocol, + ::InboundOpenInfo, + >, + ) { + if sent { + self.pending_events + .push_back(RequestResponseHandlerEvent::ResponseSent(request_id)) + } else { + self.pending_events + .push_back(RequestResponseHandlerEvent::ResponseOmission(request_id)) + } + } + + fn on_dial_upgrade_error( + &mut self, + DialUpgradeError { info, error }: DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, + ) { + match error { + ConnectionHandlerUpgrErr::Timeout => { + self.pending_events + .push_back(RequestResponseHandlerEvent::OutboundTimeout(info)); + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + // The remote merely doesn't support the protocol(s) we requested. + // This is no reason to close the connection, which may + // successfully communicate with other protocols already. + // An event is reported to permit user code to react to the fact that + // the remote peer does not support the requested protocol(s). + self.pending_events.push_back( + RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info), + ); + } + _ => { + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + self.pending_error = Some(error); + } + } + } + fn on_listen_upgrade_error( + &mut self, + ListenUpgradeError { info, error }: ListenUpgradeError< + ::InboundOpenInfo, + ::InboundProtocol, + >, + ) { + match error { + ConnectionHandlerUpgrErr::Timeout => self + .pending_events + .push_back(RequestResponseHandlerEvent::InboundTimeout(info)), + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + // The local peer merely doesn't support the protocol(s) requested. + // This is no reason to close the connection, which may + // successfully communicate with other protocols already. + // An event is reported to permit user code to react to the fact that + // the local peer does not support the requested protocol(s). + self.pending_events.push_back( + RequestResponseHandlerEvent::InboundUnsupportedProtocols(info), + ); + } + _ => { + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + self.pending_error = Some(error); + } + } + } } /// The events emitted by the [`RequestResponseHandler`]. @@ -236,88 +317,11 @@ where SubstreamProtocol::new(proto, request_id).with_timeout(self.substream_timeout) } - fn inject_fully_negotiated_inbound(&mut self, sent: bool, request_id: RequestId) { - if sent { - self.pending_events - .push_back(RequestResponseHandlerEvent::ResponseSent(request_id)) - } else { - self.pending_events - .push_back(RequestResponseHandlerEvent::ResponseOmission(request_id)) - } - } - - fn inject_fully_negotiated_outbound( - &mut self, - response: TCodec::Response, - request_id: RequestId, - ) { - self.pending_events - .push_back(RequestResponseHandlerEvent::Response { - request_id, - response, - }); - } - - fn inject_event(&mut self, request: Self::InEvent) { + fn on_behaviour_event(&mut self, request: Self::InEvent) { self.keep_alive = KeepAlive::Yes; self.outbound.push_back(request); } - fn inject_dial_upgrade_error( - &mut self, - info: RequestId, - error: ConnectionHandlerUpgrErr, - ) { - match error { - ConnectionHandlerUpgrErr::Timeout => { - self.pending_events - .push_back(RequestResponseHandlerEvent::OutboundTimeout(info)); - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { - // The remote merely doesn't support the protocol(s) we requested. - // This is no reason to close the connection, which may - // successfully communicate with other protocols already. - // An event is reported to permit user code to react to the fact that - // the remote peer does not support the requested protocol(s). - self.pending_events.push_back( - RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info), - ); - } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error); - } - } - } - - fn inject_listen_upgrade_error( - &mut self, - info: RequestId, - error: ConnectionHandlerUpgrErr, - ) { - match error { - ConnectionHandlerUpgrErr::Timeout => self - .pending_events - .push_back(RequestResponseHandlerEvent::InboundTimeout(info)), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { - // The local peer merely doesn't support the protocol(s) requested. - // This is no reason to close the connection, which may - // successfully communicate with other protocols already. - // An event is reported to permit user code to react to the fact that - // the local peer does not support the requested protocol(s). - self.pending_events.push_back( - RequestResponseHandlerEvent::InboundUnsupportedProtocols(info), - ); - } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error); - } - } - } - fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive } @@ -387,4 +391,37 @@ where Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + self.on_fully_negotiated_inbound(fully_negotiated_inbound) + } + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol: response, + info: request_id, + }) => { + self.pending_events + .push_back(RequestResponseHandlerEvent::Response { + request_id, + response, + }); + } + ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + self.on_dial_upgrade_error(dial_upgrade_error) + } + ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { + self.on_listen_upgrade_error(listen_upgrade_error) + } + ConnectionEvent::AddressChange(_) => {} + } + } } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 1d0c5882868..7b668422a17 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -2,6 +2,20 @@ - Update to `libp2p-core` `v0.38.0`. +- Add new `on_connection_event` method to `ConnectionHandler` that accepts a `ConnectionEvent` enum and update + `inject_*` methods to call `on_connection_event` with the respective `ConnectionEvent` variant and deprecate + `inject_*`. + To migrate, users should replace the `ConnectionHandler::inject_*` calls with a single + implementation of `ConnectionHandler::on_connection_event` treating each `ConnectionEvent` variant in + the same way its corresponding `inject_*` call was treated. + See [PR 3085]. + +- Add new `on_behaviour_event` method with the same signature as `inject_event`, make the + default implementation of `inject_event` call `on_behaviour_event` and deprecate it. + To migrate, users should replace the `ConnectionHandler::inject_event` call + with `ConnectionHandler::on_behaviour_event`. + See [PR 3085]. + - Add new `on_swarm_event` method to `NetworkBehaviour` that accepts a `FromSwarm` enum and update `inject_*` methods to call `on_swarm_event` with the respective `FromSwarm` variant and deprecate `inject_*`. @@ -80,6 +94,7 @@ - `SwarmBuilder::new` - `SwarmBuilder::executor` +[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 [PR 3055]: https://github.com/libp2p/rust-libp2p/pull/3055 [PR 3097]: https://github.com/libp2p/rust-libp2p/pull/3097 diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 4b01138a596..81255a40274 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -20,10 +20,11 @@ use crate::behaviour::{inject_from_swarm, FromSwarm}; use crate::handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, - KeepAlive, SubstreamProtocol, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, + DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, IntoConnectionHandler, + KeepAlive, ListenUpgradeError, SubstreamProtocol, }; -use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; +use crate::upgrade::SendWrapper; use crate::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use either::Either; use libp2p_core::{ @@ -155,37 +156,19 @@ pub struct ToggleConnectionHandler { inner: Option, } -impl ConnectionHandler for ToggleConnectionHandler +impl ToggleConnectionHandler where TInner: ConnectionHandler, { - type InEvent = TInner::InEvent; - type OutEvent = TInner::OutEvent; - type Error = TInner::Error; - type InboundProtocol = - EitherUpgrade, SendWrapper>; - type OutboundProtocol = TInner::OutboundProtocol; - type OutboundOpenInfo = TInner::OutboundOpenInfo; - type InboundOpenInfo = Either; - - fn listen_protocol(&self) -> SubstreamProtocol { - if let Some(inner) = self.inner.as_ref() { - inner - .listen_protocol() - .map_upgrade(|u| EitherUpgrade::A(SendWrapper(u))) - .map_info(Either::Left) - } else { - SubstreamProtocol::new( - EitherUpgrade::B(SendWrapper(DeniedUpgrade)), - Either::Right(()), - ) - } - } - - fn inject_fully_negotiated_inbound( + fn on_fully_negotiated_inbound( &mut self, - out: ::Output, - info: Self::InboundOpenInfo, + FullyNegotiatedInbound { + protocol: out, + info, + }: FullyNegotiatedInbound< + ::InboundProtocol, + ::InboundOpenInfo, + >, ) { let out = match out { EitherOutput::First(out) => out, @@ -193,6 +176,7 @@ where }; if let Either::Left(info) = info { + #[allow(deprecated)] self.inner .as_mut() .expect("Can't receive an inbound substream if disabled; QED") @@ -202,45 +186,12 @@ where } } - fn inject_fully_negotiated_outbound( - &mut self, - out: ::Output, - info: Self::OutboundOpenInfo, - ) { - self.inner - .as_mut() - .expect("Can't receive an outbound substream if disabled; QED") - .inject_fully_negotiated_outbound(out, info) - } - - fn inject_event(&mut self, event: Self::InEvent) { - self.inner - .as_mut() - .expect("Can't receive events if disabled; QED") - .inject_event(event) - } - - fn inject_address_change(&mut self, addr: &Multiaddr) { - if let Some(inner) = self.inner.as_mut() { - inner.inject_address_change(addr) - } - } - - fn inject_dial_upgrade_error( - &mut self, - info: Self::OutboundOpenInfo, - err: ConnectionHandlerUpgrErr<::Error>, - ) { - self.inner - .as_mut() - .expect("Can't receive an outbound substream if disabled; QED") - .inject_dial_upgrade_error(info, err) - } - - fn inject_listen_upgrade_error( + fn on_listen_upgrade_error( &mut self, - info: Self::InboundOpenInfo, - err: ConnectionHandlerUpgrErr<::Error>, + ListenUpgradeError { info, error: err }: ListenUpgradeError< + ::InboundOpenInfo, + ::InboundProtocol, + >, ) { let (inner, info) = match (self.inner.as_mut(), info) { (Some(inner), Either::Left(info)) => (inner, info), @@ -267,8 +218,45 @@ where } }; + #[allow(deprecated)] inner.inject_listen_upgrade_error(info, err) } +} + +impl ConnectionHandler for ToggleConnectionHandler +where + TInner: ConnectionHandler, +{ + type InEvent = TInner::InEvent; + type OutEvent = TInner::OutEvent; + type Error = TInner::Error; + type InboundProtocol = + EitherUpgrade, SendWrapper>; + type OutboundProtocol = TInner::OutboundProtocol; + type OutboundOpenInfo = TInner::OutboundOpenInfo; + type InboundOpenInfo = Either; + + fn listen_protocol(&self) -> SubstreamProtocol { + if let Some(inner) = self.inner.as_ref() { + inner + .listen_protocol() + .map_upgrade(|u| EitherUpgrade::A(SendWrapper(u))) + .map_info(Either::Left) + } else { + SubstreamProtocol::new( + EitherUpgrade::B(SendWrapper(DeniedUpgrade)), + Either::Right(()), + ) + } + } + + fn on_behaviour_event(&mut self, event: Self::InEvent) { + #[allow(deprecated)] + self.inner + .as_mut() + .expect("Can't receive events if disabled; QED") + .inject_event(event) + } fn connection_keep_alive(&self) -> KeepAlive { self.inner @@ -294,6 +282,50 @@ where Poll::Pending } } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + self.on_fully_negotiated_inbound(fully_negotiated_inbound) + } + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol: out, + info, + }) => + { + #[allow(deprecated)] + self.inner + .as_mut() + .expect("Can't receive an outbound substream if disabled; QED") + .inject_fully_negotiated_outbound(out, info) + } + ConnectionEvent::AddressChange(address_change) => { + if let Some(inner) = self.inner.as_mut() { + #[allow(deprecated)] + inner.inject_address_change(address_change.new_address) + } + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error: err }) => + { + #[allow(deprecated)] + self.inner + .as_mut() + .expect("Can't receive an outbound substream if disabled; QED") + .inject_dial_upgrade_error(info, err) + } + ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { + self.on_listen_upgrade_error(listen_upgrade_error) + } + } + } } #[cfg(test)] @@ -319,6 +351,9 @@ mod tests { fn ignore_listen_upgrade_error_when_disabled() { let mut handler = ToggleConnectionHandler:: { inner: None }; - handler.inject_listen_upgrade_error(Either::Right(()), ConnectionHandlerUpgrErr::Timeout); + handler.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { + info: Either::Right(()), + error: ConnectionHandlerUpgrErr::Timeout, + })); } } diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 4db3342ae39..272f78f3a6b 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -149,7 +149,8 @@ where } /// Notifies the connection handler of an event. - pub fn inject_event(&mut self, event: THandler::InEvent) { + pub fn on_behaviour_event(&mut self, event: THandler::InEvent) { + #[allow(deprecated)] self.handler.inject_event(event); } @@ -180,6 +181,7 @@ where match requested_substreams.poll_next_unpin(cx) { Poll::Ready(Some(Ok(()))) => continue, Poll::Ready(Some(Err(user_data))) => { + #[allow(deprecated)] handler.inject_dial_upgrade_error(user_data, ConnectionHandlerUpgrErr::Timeout); continue; } @@ -208,10 +210,12 @@ where match negotiating_out.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => {} Poll::Ready(Some((user_data, Ok(upgrade)))) => { + #[allow(deprecated)] handler.inject_fully_negotiated_outbound(upgrade, user_data); continue; } Poll::Ready(Some((user_data, Err(err)))) => { + #[allow(deprecated)] handler.inject_dial_upgrade_error(user_data, err); continue; } @@ -222,10 +226,12 @@ where match negotiating_in.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => {} Poll::Ready(Some((user_data, Ok(upgrade)))) => { + #[allow(deprecated)] handler.inject_fully_negotiated_inbound(upgrade, user_data); continue; } Poll::Ready(Some((user_data, Err(err)))) => { + #[allow(deprecated)] handler.inject_listen_upgrade_error(user_data, err); continue; } @@ -273,6 +279,7 @@ where match muxing.poll_unpin(cx)? { Poll::Pending => {} Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { + #[allow(deprecated)] handler.inject_address_change(&address); return Poll::Ready(Ok(Event::AddressChange(address))); } diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index 866049e50da..8e1129d8cae 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -193,7 +193,7 @@ pub async fn new_for_established_connection( .await { Either::Left((Some(command), _)) => match command { - Command::NotifyHandler(event) => connection.inject_event(event), + Command::NotifyHandler(event) => connection.on_behaviour_event(event), Command::Close => { command_receiver.close(); let (handler, closing_muxer) = connection.close(); diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 6c51065a9e1..4ec58581c2e 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -1,5 +1,7 @@ use crate::behaviour::{FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use crate::handler::{InboundUpgradeSend, OutboundUpgradeSend}; +use crate::handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, +}; use crate::{ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol}; use libp2p_core::connection::ConnectionId; use libp2p_core::upgrade::DeniedUpgrade; @@ -66,41 +68,10 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { SubstreamProtocol::new(DeniedUpgrade, ()) } - fn inject_fully_negotiated_inbound( - &mut self, - protocol: ::Output, - _: Self::InboundOpenInfo, - ) { - void::unreachable(protocol) - } - - fn inject_fully_negotiated_outbound( - &mut self, - protocol: ::Output, - _: Self::OutboundOpenInfo, - ) { - void::unreachable(protocol) - } - - fn inject_event(&mut self, event: Self::InEvent) { + fn on_behaviour_event(&mut self, event: Self::InEvent) { void::unreachable(event) } - fn inject_dial_upgrade_error( - &mut self, - _: Self::OutboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, - ) { - match error { - ConnectionHandlerUpgrErr::Timeout => unreachable!(), - ConnectionHandlerUpgrErr::Timer => unreachable!(), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => void::unreachable(e), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)) => { - unreachable!("Denied upgrade does not support any protocols") - } - } - } - fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::No } @@ -118,4 +89,32 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { > { Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol, .. + }) => void::unreachable(protocol), + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol, .. + }) => void::unreachable(protocol), + ConnectionEvent::DialUpgradeError(DialUpgradeError { info: _, error }) => match error { + ConnectionHandlerUpgrErr::Timeout => unreachable!(), + ConnectionHandlerUpgrErr::Timer => unreachable!(), + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => void::unreachable(e), + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)) => { + unreachable!("Denied upgrade does not support any protocols") + } + }, + ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} + } + } } diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 5c60f2bf24a..8d34509c085 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -70,15 +70,16 @@ pub use select::{ConnectionHandlerSelect, IntoConnectionHandlerSelect}; /// 1. Dialing by initiating a new outbound substream. In order to do so, /// [`ConnectionHandler::poll()`] must return an [`ConnectionHandlerEvent::OutboundSubstreamRequest`], /// providing an instance of [`libp2p_core::upgrade::OutboundUpgrade`] that is used to negotiate the -/// protocol(s). Upon success, [`ConnectionHandler::inject_fully_negotiated_outbound`] -/// is called with the final output of the upgrade. +/// protocol(s). Upon success, [`ConnectionHandler::on_connection_event`] is called with +/// [`ConnectionEvent::FullyNegotiatedOutbound`] translating the final output of the upgrade. /// /// 2. Listening by accepting a new inbound substream. When a new inbound substream /// is created on a connection, [`ConnectionHandler::listen_protocol`] is called /// to obtain an instance of [`libp2p_core::upgrade::InboundUpgrade`] that is used to /// negotiate the protocol(s). Upon success, -/// [`ConnectionHandler::inject_fully_negotiated_inbound`] is called with the final -/// output of the upgrade. +/// [`ConnectionHandler::on_connection_event`] is called with [`ConnectionEvent::FullyNegotiatedInbound`] +/// translating the final output of the upgrade. +/// /// /// # Connection Keep-Alive /// @@ -122,41 +123,93 @@ pub trait ConnectionHandler: Send + 'static { /// of simultaneously open negotiated inbound substreams. In other words it is up to the /// [`ConnectionHandler`] implementation to stop a malicious remote node to open and keep alive /// an excessive amount of inbound substreams. + #[deprecated( + since = "0.41.0", + note = "Handle `ConnectionEvent::FullyNegotiatedInbound` on `ConnectionHandler::on_connection_event` instead. + The default implemention of this `inject_*` method delegates to it." + )] fn inject_fully_negotiated_inbound( &mut self, protocol: ::Output, info: Self::InboundOpenInfo, - ); + ) { + self.on_connection_event(ConnectionEvent::FullyNegotiatedInbound( + FullyNegotiatedInbound { protocol, info }, + )) + } /// Injects the output of a successful upgrade on a new outbound substream. /// /// The second argument is the information that was previously passed to /// [`ConnectionHandlerEvent::OutboundSubstreamRequest`]. + #[deprecated( + since = "0.41.0", + note = "Handle `ConnectionEvent::FullyNegotiatedOutbound` on `ConnectionHandler::on_connection_event` instead. + The default implemention of this `inject_*` method delegates to it." + )] fn inject_fully_negotiated_outbound( &mut self, protocol: ::Output, info: Self::OutboundOpenInfo, - ); + ) { + self.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound( + FullyNegotiatedOutbound { protocol, info }, + )) + } /// Injects an event coming from the outside in the handler. - fn inject_event(&mut self, event: Self::InEvent); + #[deprecated( + since = "0.41.0", + note = "Implement `ConnectionHandler::on_behaviour_event` instead. The default implementation of `inject_event` delegates to it." + )] + fn inject_event(&mut self, event: Self::InEvent) { + self.on_behaviour_event(event); + } /// Notifies the handler of a change in the address of the remote. - fn inject_address_change(&mut self, _new_address: &Multiaddr) {} + #[deprecated( + since = "0.41.0", + note = "Handle `ConnectionEvent::AddressChange` on `ConnectionHandler::on_connection_event` instead. + The default implemention of this `inject_*` method delegates to it." + )] + fn inject_address_change(&mut self, new_address: &Multiaddr) { + self.on_connection_event(ConnectionEvent::AddressChange(AddressChange { + new_address, + })) + } /// Indicates to the handler that upgrading an outbound substream to the given protocol has failed. + #[deprecated( + since = "0.41.0", + note = "Handle `ConnectionEvent::DialUpgradeError` on `ConnectionHandler::on_connection_event` instead. + The default implemention of this `inject_*` method delegates to it." + )] fn inject_dial_upgrade_error( &mut self, info: Self::OutboundOpenInfo, error: ConnectionHandlerUpgrErr<::Error>, - ); + ) { + self.on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError { + info, + error, + })) + } /// Indicates to the handler that upgrading an inbound substream to the given protocol has failed. + #[deprecated( + since = "0.41.0", + note = "Handle `ConnectionEvent::ListenUpgradeError` on `ConnectionHandler::on_connection_event` instead. + The default implemention of this `inject_*` method delegates to it." + )] fn inject_listen_upgrade_error( &mut self, - _: Self::InboundOpenInfo, - _: ConnectionHandlerUpgrErr<::Error>, + info: Self::InboundOpenInfo, + error: ConnectionHandlerUpgrErr<::Error>, ) { + self.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { + info, + error, + })) } /// Returns until when the connection should be kept alive. @@ -224,6 +277,76 @@ pub trait ConnectionHandler: Send + 'static { { ConnectionHandlerSelect::new(self, other) } + + /// Informs the handler about an event from the [`NetworkBehaviour`](super::NetworkBehaviour). + fn on_behaviour_event(&mut self, _event: Self::InEvent) {} + + fn on_connection_event( + &mut self, + _event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + } +} + +/// Enumeration with the list of the possible stream events +/// to pass to [`on_connection_event`](ConnectionHandler::on_connection_event). +pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> { + /// Informs the handler about the output of a successful upgrade on a new inbound substream. + FullyNegotiatedInbound(FullyNegotiatedInbound), + /// Informs the handler about the output of a successful upgrade on a new outbound stream. + FullyNegotiatedOutbound(FullyNegotiatedOutbound), + /// Informs the handler about a change in the address of the remote. + AddressChange(AddressChange<'a>), + /// Informs the handler that upgrading an outbound substream to the given protocol has failed. + DialUpgradeError(DialUpgradeError), + /// Informs the handler that upgrading an inbound substream to the given protocol has failed. + ListenUpgradeError(ListenUpgradeError), +} + +/// [`ConnectionEvent`] variant that informs the handler about +/// the output of a successful upgrade on a new inbound substream. +/// +/// Note that it is up to the [`ConnectionHandler`] implementation to manage the lifetime of the +/// negotiated inbound substreams. E.g. the implementation has to enforce a limit on the number +/// of simultaneously open negotiated inbound substreams. In other words it is up to the +/// [`ConnectionHandler`] implementation to stop a malicious remote node to open and keep alive +/// an excessive amount of inbound substreams. +pub struct FullyNegotiatedInbound { + pub protocol: IP::Output, + pub info: IOI, +} + +/// [`ConnectionEvent`] variant that informs the handler about successful upgrade on a new outbound stream. +/// +/// The `protocol` field is the information that was previously passed to +/// [`ConnectionHandlerEvent::OutboundSubstreamRequest`]. +pub struct FullyNegotiatedOutbound { + pub protocol: OP::Output, + pub info: OOI, +} + +/// [`ConnectionEvent`] variant that informs the handler about a change in the address of the remote. +pub struct AddressChange<'a> { + pub new_address: &'a Multiaddr, +} + +/// [`ConnectionEvent`] variant that informs the handler +/// that upgrading an outbound substream to the given protocol has failed. +pub struct DialUpgradeError { + pub info: OOI, + pub error: ConnectionHandlerUpgrErr, +} + +/// [`ConnectionEvent`] variant that informs the handler +/// that upgrading an inbound substream to the given protocol has failed. +pub struct ListenUpgradeError { + pub info: IOI, + pub error: ConnectionHandlerUpgrErr, } /// Configuration of inbound or outbound substream protocol(s) diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index 53007b5a181..e6d16ed1133 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -19,14 +19,15 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, - KeepAlive, SubstreamProtocol, + AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, + ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + IntoConnectionHandler, KeepAlive, ListenUpgradeError, SubstreamProtocol, }; -use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; +use crate::upgrade::SendWrapper; use either::Either; use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::upgrade::{EitherUpgrade, UpgradeError}; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{ConnectedPoint, PeerId}; use std::task::{Context, Poll}; /// Auxiliary type to allow implementing [`IntoConnectionHandler`]. As [`IntoConnectionHandler`] is @@ -119,181 +120,16 @@ where } } - fn inject_fully_negotiated_outbound( - &mut self, - output: ::Output, - info: Self::OutboundOpenInfo, - ) { - match (self, output, info) { - (Either::Left(handler), EitherOutput::First(output), Either::Left(info)) => { - handler.inject_fully_negotiated_outbound(output, info) - } - (Either::Right(handler), EitherOutput::Second(output), Either::Right(info)) => { - handler.inject_fully_negotiated_outbound(output, info) - } - _ => unreachable!(), - } - } - - fn inject_fully_negotiated_inbound( - &mut self, - output: ::Output, - info: Self::InboundOpenInfo, - ) { - match (self, output, info) { - (Either::Left(handler), EitherOutput::First(output), Either::Left(info)) => { - handler.inject_fully_negotiated_inbound(output, info) - } - (Either::Right(handler), EitherOutput::Second(output), Either::Right(info)) => { - handler.inject_fully_negotiated_inbound(output, info) - } - _ => unreachable!(), - } - } - - fn inject_event(&mut self, event: Self::InEvent) { + fn on_behaviour_event(&mut self, event: Self::InEvent) { match (self, event) { + #[allow(deprecated)] (Either::Left(handler), Either::Left(event)) => handler.inject_event(event), + #[allow(deprecated)] (Either::Right(handler), Either::Right(event)) => handler.inject_event(event), _ => unreachable!(), } } - fn inject_address_change(&mut self, addr: &Multiaddr) { - match self { - Either::Left(handler) => handler.inject_address_change(addr), - Either::Right(handler) => handler.inject_address_change(addr), - } - } - - fn inject_dial_upgrade_error( - &mut self, - info: Self::OutboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, - ) { - match error { - ConnectionHandlerUpgrErr::Timer => match (self, info) { - (Either::Left(handler), Either::Left(info)) => { - handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer); - } - (Either::Right(handler), Either::Right(info)) => { - handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer); - } - _ => unreachable!(), - }, - ConnectionHandlerUpgrErr::Timeout => match (self, info) { - (Either::Left(handler), Either::Left(info)) => { - handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout); - } - (Either::Right(handler), Either::Right(info)) => { - handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout); - } - _ => unreachable!(), - }, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)) => match (self, info) { - (Either::Left(handler), Either::Left(info)) => { - handler.inject_dial_upgrade_error( - info, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - ); - } - (Either::Right(handler), Either::Right(info)) => { - handler.inject_dial_upgrade_error( - info, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - ); - } - _ => unreachable!(), - }, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => { - match (self, info) { - (Either::Left(handler), Either::Left(info)) => { - handler.inject_dial_upgrade_error( - info, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), - ); - } - _ => unreachable!(), - } - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => { - match (self, info) { - (Either::Right(handler), Either::Right(info)) => { - handler.inject_dial_upgrade_error( - info, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), - ); - } - _ => unreachable!(), - } - } - } - } - - fn inject_listen_upgrade_error( - &mut self, - info: Self::InboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, - ) { - match error { - ConnectionHandlerUpgrErr::Timer => match (self, info) { - (Either::Left(handler), Either::Left(info)) => { - handler.inject_listen_upgrade_error(info, ConnectionHandlerUpgrErr::Timer); - } - (Either::Right(handler), Either::Right(info)) => { - handler.inject_listen_upgrade_error(info, ConnectionHandlerUpgrErr::Timer); - } - _ => unreachable!(), - }, - ConnectionHandlerUpgrErr::Timeout => match (self, info) { - (Either::Left(handler), Either::Left(info)) => { - handler.inject_listen_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout); - } - (Either::Right(handler), Either::Right(info)) => { - handler.inject_listen_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout); - } - _ => unreachable!(), - }, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)) => match (self, info) { - (Either::Left(handler), Either::Left(info)) => { - handler.inject_listen_upgrade_error( - info, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - ); - } - (Either::Right(handler), Either::Right(info)) => { - handler.inject_listen_upgrade_error( - info, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - ); - } - _ => unreachable!(), - }, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => { - match (self, info) { - (Either::Left(handler), Either::Left(info)) => { - handler.inject_listen_upgrade_error( - info, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), - ); - } - _ => unreachable!(), - } - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => { - match (self, info) { - (Either::Right(handler), Either::Right(info)) => { - handler.inject_listen_upgrade_error( - info, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), - ); - } - _ => unreachable!(), - } - } - } - } - fn connection_keep_alive(&self) -> KeepAlive { match self { Either::Left(handler) => handler.connection_keep_alive(), @@ -327,4 +163,199 @@ where Poll::Ready(event) } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol: output, + info, + }) => match (self, output, info) { + (Either::Left(handler), EitherOutput::First(output), Either::Left(info)) => + { + #[allow(deprecated)] + handler.inject_fully_negotiated_outbound(output, info) + } + (Either::Right(handler), EitherOutput::Second(output), Either::Right(info)) => + { + #[allow(deprecated)] + handler.inject_fully_negotiated_outbound(output, info) + } + _ => unreachable!(), + }, + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol: output, + info, + }) => match (self, output, info) { + (Either::Left(handler), EitherOutput::First(output), Either::Left(info)) => + { + #[allow(deprecated)] + handler.inject_fully_negotiated_inbound(output, info) + } + (Either::Right(handler), EitherOutput::Second(output), Either::Right(info)) => + { + #[allow(deprecated)] + handler.inject_fully_negotiated_inbound(output, info) + } + _ => unreachable!(), + }, + ConnectionEvent::AddressChange(AddressChange { new_address: addr }) => match self { + #[allow(deprecated)] + Either::Left(handler) => handler.inject_address_change(addr), + #[allow(deprecated)] + Either::Right(handler) => handler.inject_address_change(addr), + }, + ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => match error { + ConnectionHandlerUpgrErr::Timer => match (self, info) { + (Either::Left(handler), Either::Left(info)) => { + #[allow(deprecated)] + handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer); + } + (Either::Right(handler), Either::Right(info)) => { + #[allow(deprecated)] + handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer); + } + _ => unreachable!(), + }, + ConnectionHandlerUpgrErr::Timeout => match (self, info) { + (Either::Left(handler), Either::Left(info)) => { + #[allow(deprecated)] + handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout); + } + (Either::Right(handler), Either::Right(info)) => { + #[allow(deprecated)] + handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout); + } + _ => unreachable!(), + }, + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)) => { + match (self, info) { + (Either::Left(handler), Either::Left(info)) => { + #[allow(deprecated)] + handler.inject_dial_upgrade_error( + info, + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), + ); + } + (Either::Right(handler), Either::Right(info)) => { + #[allow(deprecated)] + handler.inject_dial_upgrade_error( + info, + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), + ); + } + _ => unreachable!(), + } + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => { + match (self, info) { + (Either::Left(handler), Either::Left(info)) => { + #[allow(deprecated)] + handler.inject_dial_upgrade_error( + info, + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + ); + } + _ => unreachable!(), + } + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => { + match (self, info) { + (Either::Right(handler), Either::Right(info)) => { + #[allow(deprecated)] + handler.inject_dial_upgrade_error( + info, + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + ); + } + _ => unreachable!(), + } + } + }, + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info, error }) => { + match error { + ConnectionHandlerUpgrErr::Timer => match (self, info) { + (Either::Left(handler), Either::Left(info)) => { + #[allow(deprecated)] + handler + .inject_listen_upgrade_error(info, ConnectionHandlerUpgrErr::Timer); + } + (Either::Right(handler), Either::Right(info)) => { + #[allow(deprecated)] + handler + .inject_listen_upgrade_error(info, ConnectionHandlerUpgrErr::Timer); + } + _ => unreachable!(), + }, + ConnectionHandlerUpgrErr::Timeout => match (self, info) { + (Either::Left(handler), Either::Left(info)) => { + #[allow(deprecated)] + handler.inject_listen_upgrade_error( + info, + ConnectionHandlerUpgrErr::Timeout, + ); + } + (Either::Right(handler), Either::Right(info)) => { + #[allow(deprecated)] + handler.inject_listen_upgrade_error( + info, + ConnectionHandlerUpgrErr::Timeout, + ); + } + _ => unreachable!(), + }, + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)) => { + match (self, info) { + (Either::Left(handler), Either::Left(info)) => { + #[allow(deprecated)] + handler.inject_listen_upgrade_error( + info, + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), + ); + } + (Either::Right(handler), Either::Right(info)) => { + #[allow(deprecated)] + handler.inject_listen_upgrade_error( + info, + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), + ); + } + _ => unreachable!(), + } + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => { + match (self, info) { + (Either::Left(handler), Either::Left(info)) => { + #[allow(deprecated)] + handler.inject_listen_upgrade_error( + info, + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + ); + } + _ => unreachable!(), + } + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => { + match (self, info) { + (Either::Right(handler), Either::Right(info)) => { + #[allow(deprecated)] + handler.inject_listen_upgrade_error( + info, + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + ); + } + _ => unreachable!(), + } + } + } + } + } + } } diff --git a/swarm/src/handler/map_in.rs b/swarm/src/handler/map_in.rs index a209225045e..326a6f8f4f9 100644 --- a/swarm/src/handler/map_in.rs +++ b/swarm/src/handler/map_in.rs @@ -19,11 +19,10 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, + FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, ListenUpgradeError, SubstreamProtocol, }; -use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; -use libp2p_core::Multiaddr; use std::{fmt::Debug, marker::PhantomData, task::Context, task::Poll}; /// Wrapper around a protocol handler that turns the input event into something else. @@ -64,48 +63,13 @@ where self.inner.listen_protocol() } - fn inject_fully_negotiated_inbound( - &mut self, - protocol: ::Output, - info: Self::InboundOpenInfo, - ) { - self.inner.inject_fully_negotiated_inbound(protocol, info) - } - - fn inject_fully_negotiated_outbound( - &mut self, - protocol: ::Output, - info: Self::OutboundOpenInfo, - ) { - self.inner.inject_fully_negotiated_outbound(protocol, info) - } - - fn inject_event(&mut self, event: TNewIn) { + fn on_behaviour_event(&mut self, event: TNewIn) { if let Some(event) = (self.map)(event) { + #[allow(deprecated)] self.inner.inject_event(event); } } - fn inject_address_change(&mut self, addr: &Multiaddr) { - self.inner.inject_address_change(addr) - } - - fn inject_dial_upgrade_error( - &mut self, - info: Self::OutboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, - ) { - self.inner.inject_dial_upgrade_error(info, error) - } - - fn inject_listen_upgrade_error( - &mut self, - info: Self::InboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, - ) { - self.inner.inject_listen_upgrade_error(info, error) - } - fn connection_keep_alive(&self) -> KeepAlive { self.inner.connection_keep_alive() } @@ -123,4 +87,45 @@ where > { self.inner.poll(cx) } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { protocol, info }) => + { + #[allow(deprecated)] + self.inner.inject_fully_negotiated_inbound(protocol, info) + } + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol, + info, + }) => + { + #[allow(deprecated)] + self.inner.inject_fully_negotiated_outbound(protocol, info) + } + ConnectionEvent::AddressChange(AddressChange { new_address }) => + { + #[allow(deprecated)] + self.inner.inject_address_change(new_address) + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => + { + #[allow(deprecated)] + self.inner.inject_dial_upgrade_error(info, error) + } + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info, error }) => + { + #[allow(deprecated)] + self.inner.inject_listen_upgrade_error(info, error) + } + } + } } diff --git a/swarm/src/handler/map_out.rs b/swarm/src/handler/map_out.rs index 2eb0c2f9bdc..87306dc48c6 100644 --- a/swarm/src/handler/map_out.rs +++ b/swarm/src/handler/map_out.rs @@ -19,11 +19,10 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, + FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, ListenUpgradeError, SubstreamProtocol, }; -use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; -use libp2p_core::Multiaddr; use std::fmt::Debug; use std::task::{Context, Poll}; @@ -59,46 +58,11 @@ where self.inner.listen_protocol() } - fn inject_fully_negotiated_inbound( - &mut self, - protocol: ::Output, - info: Self::InboundOpenInfo, - ) { - self.inner.inject_fully_negotiated_inbound(protocol, info) - } - - fn inject_fully_negotiated_outbound( - &mut self, - protocol: ::Output, - info: Self::OutboundOpenInfo, - ) { - self.inner.inject_fully_negotiated_outbound(protocol, info) - } - - fn inject_event(&mut self, event: Self::InEvent) { + fn on_behaviour_event(&mut self, event: Self::InEvent) { + #[allow(deprecated)] self.inner.inject_event(event) } - fn inject_address_change(&mut self, addr: &Multiaddr) { - self.inner.inject_address_change(addr) - } - - fn inject_dial_upgrade_error( - &mut self, - info: Self::OutboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, - ) { - self.inner.inject_dial_upgrade_error(info, error) - } - - fn inject_listen_upgrade_error( - &mut self, - info: Self::InboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, - ) { - self.inner.inject_listen_upgrade_error(info, error) - } - fn connection_keep_alive(&self) -> KeepAlive { self.inner.connection_keep_alive() } @@ -122,4 +86,45 @@ where } }) } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { protocol, info }) => + { + #[allow(deprecated)] + self.inner.inject_fully_negotiated_inbound(protocol, info) + } + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol, + info, + }) => + { + #[allow(deprecated)] + self.inner.inject_fully_negotiated_outbound(protocol, info) + } + ConnectionEvent::AddressChange(AddressChange { new_address }) => + { + #[allow(deprecated)] + self.inner.inject_address_change(new_address) + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => + { + #[allow(deprecated)] + self.inner.inject_dial_upgrade_error(info, error) + } + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info, error }) => + { + #[allow(deprecated)] + self.inner.inject_listen_upgrade_error(info, error) + } + } + } } diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index 07c1168b132..c1f937c1cb3 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -127,6 +127,7 @@ where (key, arg): Self::OutboundOpenInfo, ) { if let Some(h) = self.handlers.get_mut(&key) { + #[allow(deprecated)] h.inject_fully_negotiated_outbound(protocol, arg) } else { log::error!("inject_fully_negotiated_outbound: no handler for key") @@ -140,6 +141,7 @@ where ) { if let Some(h) = self.handlers.get_mut(&key) { if let Some(i) = info.take(&key) { + #[allow(deprecated)] h.inject_fully_negotiated_inbound(arg, i) } } else { @@ -147,8 +149,9 @@ where } } - fn inject_event(&mut self, (key, event): Self::InEvent) { + fn on_behaviour_event(&mut self, (key, event): Self::InEvent) { if let Some(h) = self.handlers.get_mut(&key) { + #[allow(deprecated)] h.inject_event(event) } else { log::error!("inject_event: no handler for key") @@ -157,6 +160,7 @@ where fn inject_address_change(&mut self, addr: &Multiaddr) { for h in self.handlers.values_mut() { + #[allow(deprecated)] h.inject_address_change(addr) } } @@ -167,6 +171,7 @@ where error: ConnectionHandlerUpgrErr<::Error>, ) { if let Some(h) = self.handlers.get_mut(&key) { + #[allow(deprecated)] h.inject_dial_upgrade_error(arg, error) } else { log::error!("inject_dial_upgrade_error: no handler for protocol") @@ -182,6 +187,7 @@ where ConnectionHandlerUpgrErr::Timer => { for (k, h) in &mut self.handlers { if let Some(i) = info.take(k) { + #[allow(deprecated)] h.inject_listen_upgrade_error(i, ConnectionHandlerUpgrErr::Timer) } } @@ -189,6 +195,7 @@ where ConnectionHandlerUpgrErr::Timeout => { for (k, h) in &mut self.handlers { if let Some(i) = info.take(k) { + #[allow(deprecated)] h.inject_listen_upgrade_error(i, ConnectionHandlerUpgrErr::Timeout) } } @@ -196,6 +203,7 @@ where ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { for (k, h) in &mut self.handlers { if let Some(i) = info.take(k) { + #[allow(deprecated)] h.inject_listen_upgrade_error( i, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( @@ -214,6 +222,7 @@ where let e = NegotiationError::ProtocolError(ProtocolError::IoError( e.kind().into(), )); + #[allow(deprecated)] h.inject_listen_upgrade_error( i, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e)), @@ -225,6 +234,7 @@ where for (k, h) in &mut self.handlers { if let Some(i) = info.take(k) { let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage); + #[allow(deprecated)] h.inject_listen_upgrade_error( i, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e)), @@ -236,6 +246,7 @@ where for (k, h) in &mut self.handlers { if let Some(i) = info.take(k) { let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol); + #[allow(deprecated)] h.inject_listen_upgrade_error( i, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e)), @@ -248,6 +259,7 @@ where if let Some(i) = info.take(k) { let e = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols); + #[allow(deprecated)] h.inject_listen_upgrade_error( i, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e)), @@ -259,6 +271,7 @@ where ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) => { if let Some(h) = self.handlers.get_mut(&k) { if let Some(i) = info.take(&k) { + #[allow(deprecated)] h.inject_listen_upgrade_error( i, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index c599ff801e1..e8cd03ebed8 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -19,7 +19,8 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, + DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; @@ -132,42 +133,10 @@ where self.listen_protocol.clone() } - fn inject_fully_negotiated_inbound( - &mut self, - out: ::Output, - (): Self::InboundOpenInfo, - ) { - // If we're shutting down the connection for inactivity, reset the timeout. - if !self.keep_alive.is_yes() { - self.keep_alive = KeepAlive::Until(Instant::now() + self.config.keep_alive_timeout); - } - - self.events_out.push(out.into()); - } - - fn inject_fully_negotiated_outbound( - &mut self, - out: ::Output, - _: Self::OutboundOpenInfo, - ) { - self.dial_negotiated -= 1; - self.events_out.push(out.into()); - } - - fn inject_event(&mut self, event: Self::InEvent) { + fn on_behaviour_event(&mut self, event: Self::InEvent) { self.send_request(event); } - fn inject_dial_upgrade_error( - &mut self, - _info: Self::OutboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, - ) { - if self.pending_error.is_none() { - self.pending_error = Some(error); - } - } - fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive } @@ -212,6 +181,44 @@ where Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol: out, + .. + }) => { + // If we're shutting down the connection for inactivity, reset the timeout. + if !self.keep_alive.is_yes() { + self.keep_alive = + KeepAlive::Until(Instant::now() + self.config.keep_alive_timeout); + } + + self.events_out.push(out.into()); + } + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol: out, + .. + }) => { + self.dial_negotiated -= 1; + self.events_out.push(out.into()); + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { + if self.pending_error.is_none() { + self.pending_error = Some(error); + } + } + ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} + } + } } /// Configuration parameters for the `OneShotHandler` diff --git a/swarm/src/handler/pending.rs b/swarm/src/handler/pending.rs index 04c1696515c..2efa949dd71 100644 --- a/swarm/src/handler/pending.rs +++ b/swarm/src/handler/pending.rs @@ -20,14 +20,10 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - SubstreamProtocol, -}; -use crate::NegotiatedSubstream; -use libp2p_core::{ - upgrade::{InboundUpgrade, OutboundUpgrade, PendingUpgrade}, - Multiaddr, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, FullyNegotiatedInbound, + FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, }; +use libp2p_core::upgrade::PendingUpgrade; use std::task::{Context, Poll}; use void::Void; @@ -56,50 +52,10 @@ impl ConnectionHandler for PendingConnectionHandler { SubstreamProtocol::new(PendingUpgrade::new(self.protocol_name.clone()), ()) } - fn inject_fully_negotiated_inbound( - &mut self, - protocol: >::Output, - _: Self::InboundOpenInfo, - ) { - void::unreachable(protocol) - } - - fn inject_fully_negotiated_outbound( - &mut self, - protocol: >::Output, - _info: Self::OutboundOpenInfo, - ) { - void::unreachable(protocol); - #[allow(unreachable_code)] - { - void::unreachable(_info); - } - } - - fn inject_event(&mut self, v: Self::InEvent) { + fn on_behaviour_event(&mut self, v: Self::InEvent) { void::unreachable(v) } - fn inject_address_change(&mut self, _: &Multiaddr) {} - - fn inject_dial_upgrade_error( - &mut self, - _: Self::OutboundOpenInfo, - _: ConnectionHandlerUpgrErr< - >::Error, - >, - ) { - } - - fn inject_listen_upgrade_error( - &mut self, - _: Self::InboundOpenInfo, - _: ConnectionHandlerUpgrErr< - >::Error, - >, - ) { - } - fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::No } @@ -117,4 +73,33 @@ impl ConnectionHandler for PendingConnectionHandler { > { Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol, .. + }) => void::unreachable(protocol), + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol, + info: _info, + }) => { + void::unreachable(protocol); + #[allow(unreachable_code)] + { + void::unreachable(_info); + } + } + ConnectionEvent::AddressChange(_) + | ConnectionEvent::DialUpgradeError(_) + | ConnectionEvent::ListenUpgradeError(_) => {} + } + } } diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 70a6f3c26f6..65508c0b6a5 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -19,15 +19,16 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, - KeepAlive, SubstreamProtocol, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, + DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, IntoConnectionHandler, + KeepAlive, ListenUpgradeError, SubstreamProtocol, }; -use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; +use crate::upgrade::SendWrapper; use libp2p_core::{ either::{EitherError, EitherOutput}, upgrade::{EitherUpgrade, NegotiationError, ProtocolError, SelectUpgrade, UpgradeError}, - ConnectedPoint, Multiaddr, PeerId, + ConnectedPoint, PeerId, }; use std::{cmp, task::Context, task::Poll}; @@ -97,45 +98,30 @@ impl ConnectionHandlerSelect { } } -impl ConnectionHandler for ConnectionHandlerSelect +impl ConnectionHandlerSelect where TProto1: ConnectionHandler, TProto2: ConnectionHandler, { - type InEvent = EitherOutput; - type OutEvent = EitherOutput; - type Error = EitherError; - type InboundProtocol = SelectUpgrade< - SendWrapper<::InboundProtocol>, - SendWrapper<::InboundProtocol>, - >; - type OutboundProtocol = EitherUpgrade< - SendWrapper, - SendWrapper, - >; - type OutboundOpenInfo = EitherOutput; - type InboundOpenInfo = (TProto1::InboundOpenInfo, TProto2::InboundOpenInfo); - - fn listen_protocol(&self) -> SubstreamProtocol { - let proto1 = self.proto1.listen_protocol(); - let proto2 = self.proto2.listen_protocol(); - let timeout = *std::cmp::max(proto1.timeout(), proto2.timeout()); - let (u1, i1) = proto1.into_upgrade(); - let (u2, i2) = proto2.into_upgrade(); - let choice = SelectUpgrade::new(SendWrapper(u1), SendWrapper(u2)); - SubstreamProtocol::new(choice, (i1, i2)).with_timeout(timeout) - } - - fn inject_fully_negotiated_outbound( + fn on_fully_negotiated_outbound( &mut self, - protocol: ::Output, - endpoint: Self::OutboundOpenInfo, + FullyNegotiatedOutbound { + protocol, + info: endpoint, + }: FullyNegotiatedOutbound< + ::OutboundProtocol, + ::OutboundOpenInfo, + >, ) { match (protocol, endpoint) { - (EitherOutput::First(protocol), EitherOutput::First(info)) => { + (EitherOutput::First(protocol), EitherOutput::First(info)) => + { + #[allow(deprecated)] self.proto1.inject_fully_negotiated_outbound(protocol, info) } - (EitherOutput::Second(protocol), EitherOutput::Second(info)) => { + (EitherOutput::Second(protocol), EitherOutput::Second(info)) => + { + #[allow(deprecated)] self.proto2.inject_fully_negotiated_outbound(protocol, info) } (EitherOutput::First(_), EitherOutput::Second(_)) => { @@ -147,45 +133,47 @@ where } } - fn inject_fully_negotiated_inbound( + fn on_fully_negotiated_inbound( &mut self, - protocol: ::Output, - (i1, i2): Self::InboundOpenInfo, + FullyNegotiatedInbound { + protocol, + info: (i1, i2), + }: FullyNegotiatedInbound< + ::InboundProtocol, + ::InboundOpenInfo, + >, ) { match protocol { - EitherOutput::First(protocol) => { + EitherOutput::First(protocol) => + { + #[allow(deprecated)] self.proto1.inject_fully_negotiated_inbound(protocol, i1) } - EitherOutput::Second(protocol) => { + EitherOutput::Second(protocol) => + { + #[allow(deprecated)] self.proto2.inject_fully_negotiated_inbound(protocol, i2) } } } - fn inject_event(&mut self, event: Self::InEvent) { - match event { - EitherOutput::First(event) => self.proto1.inject_event(event), - EitherOutput::Second(event) => self.proto2.inject_event(event), - } - } - - fn inject_address_change(&mut self, new_address: &Multiaddr) { - self.proto1.inject_address_change(new_address); - self.proto2.inject_address_change(new_address) - } - - fn inject_dial_upgrade_error( + fn on_dial_upgrade_error( &mut self, - info: Self::OutboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, + DialUpgradeError { info, error }: DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, ) { match (info, error) { + #[allow(deprecated)] (EitherOutput::First(info), ConnectionHandlerUpgrErr::Timer) => self .proto1 .inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer), + #[allow(deprecated)] (EitherOutput::First(info), ConnectionHandlerUpgrErr::Timeout) => self .proto1 .inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout), + #[allow(deprecated)] ( EitherOutput::First(info), ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), @@ -193,6 +181,7 @@ where info, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), ), + #[allow(deprecated)] ( EitherOutput::First(info), ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err))), @@ -206,12 +195,15 @@ where ) => { panic!("Wrong API usage; the upgrade error doesn't match the outbound open info"); } + #[allow(deprecated)] (EitherOutput::Second(info), ConnectionHandlerUpgrErr::Timeout) => self .proto2 .inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout), + #[allow(deprecated)] (EitherOutput::Second(info), ConnectionHandlerUpgrErr::Timer) => self .proto2 .inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer), + #[allow(deprecated)] ( EitherOutput::Second(info), ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), @@ -219,6 +211,7 @@ where info, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), ), + #[allow(deprecated)] ( EitherOutput::Second(info), ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(err))), @@ -235,31 +228,42 @@ where } } - fn inject_listen_upgrade_error( + fn on_listen_upgrade_error( &mut self, - (i1, i2): Self::InboundOpenInfo, - error: ConnectionHandlerUpgrErr<::Error>, + ListenUpgradeError { + info: (i1, i2), + error, + }: ListenUpgradeError< + ::InboundOpenInfo, + ::InboundProtocol, + >, ) { match error { ConnectionHandlerUpgrErr::Timer => { + #[allow(deprecated)] self.proto1 .inject_listen_upgrade_error(i1, ConnectionHandlerUpgrErr::Timer); + #[allow(deprecated)] self.proto2 .inject_listen_upgrade_error(i2, ConnectionHandlerUpgrErr::Timer) } ConnectionHandlerUpgrErr::Timeout => { + #[allow(deprecated)] self.proto1 .inject_listen_upgrade_error(i1, ConnectionHandlerUpgrErr::Timeout); + #[allow(deprecated)] self.proto2 .inject_listen_upgrade_error(i2, ConnectionHandlerUpgrErr::Timeout) } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + #[allow(deprecated)] self.proto1.inject_listen_upgrade_error( i1, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( NegotiationError::Failed, )), ); + #[allow(deprecated)] self.proto2.inject_listen_upgrade_error( i2, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( @@ -291,22 +295,28 @@ where e2 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols) } } + #[allow(deprecated)] self.proto1.inject_listen_upgrade_error( i1, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e1)), ); + #[allow(deprecated)] self.proto2.inject_listen_upgrade_error( i2, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e2)), ) } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => { + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => + { + #[allow(deprecated)] self.proto1.inject_listen_upgrade_error( i1, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), ) } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => { + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => + { + #[allow(deprecated)] self.proto2.inject_listen_upgrade_error( i2, ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), @@ -314,6 +324,45 @@ where } } } +} + +impl ConnectionHandler for ConnectionHandlerSelect +where + TProto1: ConnectionHandler, + TProto2: ConnectionHandler, +{ + type InEvent = EitherOutput; + type OutEvent = EitherOutput; + type Error = EitherError; + type InboundProtocol = SelectUpgrade< + SendWrapper<::InboundProtocol>, + SendWrapper<::InboundProtocol>, + >; + type OutboundProtocol = EitherUpgrade< + SendWrapper, + SendWrapper, + >; + type OutboundOpenInfo = EitherOutput; + type InboundOpenInfo = (TProto1::InboundOpenInfo, TProto2::InboundOpenInfo); + + fn listen_protocol(&self) -> SubstreamProtocol { + let proto1 = self.proto1.listen_protocol(); + let proto2 = self.proto2.listen_protocol(); + let timeout = *std::cmp::max(proto1.timeout(), proto2.timeout()); + let (u1, i1) = proto1.into_upgrade(); + let (u2, i2) = proto2.into_upgrade(); + let choice = SelectUpgrade::new(SendWrapper(u1), SendWrapper(u2)); + SubstreamProtocol::new(choice, (i1, i2)).with_timeout(timeout) + } + + fn on_behaviour_event(&mut self, event: Self::InEvent) { + match event { + #[allow(deprecated)] + EitherOutput::First(event) => self.proto1.inject_event(event), + #[allow(deprecated)] + EitherOutput::Second(event) => self.proto2.inject_event(event), + } + } fn connection_keep_alive(&self) -> KeepAlive { cmp::max( @@ -369,4 +418,35 @@ where Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + self.on_fully_negotiated_outbound(fully_negotiated_outbound) + } + ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + self.on_fully_negotiated_inbound(fully_negotiated_inbound) + } + ConnectionEvent::AddressChange(address) => { + #[allow(deprecated)] + self.proto1.inject_address_change(address.new_address); + #[allow(deprecated)] + self.proto2.inject_address_change(address.new_address) + } + ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + self.on_dial_upgrade_error(dial_upgrade_error) + } + ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { + self.on_listen_upgrade_error(listen_upgrade_error) + } + } + } } diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index 29a0eca1e21..bd1ed812b8b 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -1,14 +1,11 @@ use crate::behaviour::{FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use crate::handler::{ - ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol, + ConnectionEvent, ConnectionHandlerEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound, + KeepAlive, SubstreamProtocol, }; -use crate::NegotiatedSubstream; use libp2p_core::connection::ConnectionId; +use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::PeerId; -use libp2p_core::{ - upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade}, - Multiaddr, -}; use std::task::{Context, Poll}; use void::Void; @@ -76,46 +73,10 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { SubstreamProtocol::new(DeniedUpgrade, ()) } - fn inject_fully_negotiated_inbound( - &mut self, - protocol: >::Output, - _: Self::InboundOpenInfo, - ) { - void::unreachable(protocol); - } - - fn inject_fully_negotiated_outbound( - &mut self, - protocol: >::Output, - _: Self::OutboundOpenInfo, - ) { - void::unreachable(protocol) - } - - fn inject_event(&mut self, v: Self::InEvent) { + fn on_behaviour_event(&mut self, v: Self::InEvent) { void::unreachable(v) } - fn inject_address_change(&mut self, _: &Multiaddr) {} - - fn inject_dial_upgrade_error( - &mut self, - _: Self::OutboundOpenInfo, - _: ConnectionHandlerUpgrErr< - >::Error, - >, - ) { - } - - fn inject_listen_upgrade_error( - &mut self, - _: Self::InboundOpenInfo, - _: ConnectionHandlerUpgrErr< - >::Error, - >, - ) { - } - fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Yes } @@ -133,4 +94,26 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { > { Poll::Pending } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol, .. + }) => void::unreachable(protocol), + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol, .. + }) => void::unreachable(protocol), + ConnectionEvent::DialUpgradeError(_) + | ConnectionEvent::ListenUpgradeError(_) + | ConnectionEvent::AddressChange(_) => {} + } + } }