Skip to content

Commit

Permalink
libp2p-ping improvements.
Browse files Browse the repository at this point in the history
  * re libp2p#950: Removes use of the `OneShotHandler`, but still sending each
    ping over a new substream, as seems to be intentional since libp2p#828.

  * re libp2p#842: Adds an integration test that exercises the ping behaviour through
    a Swarm, requiring the RTT to be below a threshold. This requires disabling
    Nagle's algorithm as it can interact badly with delayed ACKs (and has been
    observed to do so in the context of the new ping example and integration test).

  * re libp2p#864: Control of the inbound and outbound (sub)stream protocol upgrade
    timeouts has been moved from the `NodeHandlerWrapperBuilder` to the
    `ProtocolsHandler`. That may also alleviate the need for a custom timeout
    on an `OutboundSubstreamRequest` as a `ProtocolsHandler` is now free to
    adjust these timeouts over time.

Other changes:

  * A new ping example.
  * Documentation improvements.
  • Loading branch information
Roman S. Borschel committed Apr 9, 2019
1 parent 98b2517 commit 732a219
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 273 deletions.
72 changes: 50 additions & 22 deletions core/src/protocols_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,22 +108,35 @@ pub trait ProtocolsHandler {
/// and will be passed back in `inject_substream` or `inject_outbound_closed`.
type OutboundOpenInfo;

/// Produces a `ConnectionUpgrade` for the protocol or protocols to accept when listening.
/// The timeout for protocol negotiation on inbound substreams.
fn inbound_timeout(&self) -> Duration {
Duration::from_secs(10)
}

/// The timeout for protocol negotiation on outbound substreams.
fn outbound_timeout(&self) -> Duration {
Duration::from_secs(10)
}

/// The [`InboundUpgrade`] to apply on inbound substreams to negotiate the
/// desired protocols.
///
/// > **Note**: You should always accept all the protocols you support, even if in a specific
/// > context you wouldn't accept one in particular (eg. only allow one substream at
/// > a time for a given protocol). The reason is that remotes are allowed to put the
/// > list of supported protocols in a cache in order to avoid spurious queries.
/// > **Note**: The returned `InboundUpgrade` should always accept all the generally
/// > supported protocols, even if in a specific context a particular one is
/// > not supported, (eg. when only allowing one substream at a time for a protocol).
/// > This allows a remote to put the list of supported protocols in a cache.
fn listen_protocol(&self) -> Self::InboundProtocol;

/// Injects a fully-negotiated substream in the handler.
///
/// This method is called when a substream has been successfully opened and negotiated.
/// Injects the output of a successful upgrade on a new inbound substream.
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<Self::Substream>>::Output
);

/// Injects the output of a successful upgrade on a new outbound substream.
///
/// The second argument is the information that was previously passed to
/// [`ProtocolsHandlerEvent::OutboundSubstreamRequest`].
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Output,
Expand All @@ -134,28 +147,41 @@ pub trait ProtocolsHandler {
fn inject_event(&mut self, event: Self::InEvent);

/// Indicates to the handler that upgrading a substream to the given protocol has failed.
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>);
fn inject_dial_upgrade_error(
&mut self,
info: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error
>
);

/// Returns until when the connection should be kept alive.
///
/// If returns `Until`, that indicates that this connection may be closed and this handler
/// destroyed after the returned `Instant` has elapsed if they think that they will no longer
/// need the connection in the future. Returning `Forever` is equivalent to "infinite".
/// Returning `Now` is equivalent to `Until(Instant::now())`.
/// This method is called by the `Swarm` after each invocation of
/// [`ProtocolsHandler::poll`] to determine if the connection and the associated
/// `ProtocolsHandler`s should be kept alive and if so, for how long.
///
/// Returning [`KeepAlive::Now`] indicates that the connection should be
/// closed and this handler destroyed immediately.
///
/// On the other hand, the return value is only an indication and doesn't mean that the user
/// will not close the connection.
/// Returning [`KeepAlive::Until`] indicates that the connection may be closed
/// and this handler destroyed after the specified `Instant`.
///
/// When multiple `ProtocolsHandler` are combined together, the largest `KeepAlive` should be
/// used.
/// Returning [`KeepAlive::Forever`] indicates that the connection should
/// always be kept alive. The connection will only be closed if
/// [`ProtocolsHandler::poll`] returns an error.
///
/// The result of this method should be checked every time `poll()` is invoked.
/// When multiple `ProtocolsHandler`s are combined, the largest `KeepAlive`
/// takes precedence.
fn connection_keep_alive(&self) -> KeepAlive;

/// Should behave like `Stream::poll()`.
///
/// Returning an error will close the connection to the remote.
fn poll(&mut self) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>;
fn poll(&mut self) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Self::Error
>;

/// Adds a closure that turns the input event into something else.
#[inline]
Expand Down Expand Up @@ -230,7 +256,8 @@ pub enum ProtocolsHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom> {
impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
ProtocolsHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
{
/// If this is an `OutboundSubstreamRequest`, maps the `info` member from a `TOutboundOpenInfo` to something else.
/// If this is an `OutboundSubstreamRequest`, maps the `info` member from a
/// `TOutboundOpenInfo` to something else.
#[inline]
pub fn map_outbound_open_info<F, I>(
self,
Expand All @@ -250,7 +277,8 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
}
}

/// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`) to something else.
/// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`)
/// to something else.
#[inline]
pub fn map_protocol<F, I>(
self,
Expand Down Expand Up @@ -356,7 +384,7 @@ pub trait IntoProtocolsHandler {
where
Self: Sized,
{
NodeHandlerWrapperBuilder::new(self, Duration::from_secs(10), Duration::from_secs(10))
NodeHandlerWrapperBuilder::new(self)
}
}

Expand Down
44 changes: 9 additions & 35 deletions core/src/protocols_handler/node_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,13 @@ use crate::{
}
};
use futures::prelude::*;
use std::{error, fmt, time::Duration, time::Instant};
use std::{error, fmt, time::Instant};
use tokio_timer::{Delay, Timeout};

/// Prototype for a `NodeHandlerWrapper`.
pub struct NodeHandlerWrapperBuilder<TIntoProtoHandler> {
/// The underlying handler.
handler: TIntoProtoHandler,
/// Timeout for incoming substreams negotiation.
in_timeout: Duration,
/// Timeout for outgoing substreams negotiation.
out_timeout: Duration,
}

impl<TIntoProtoHandler> NodeHandlerWrapperBuilder<TIntoProtoHandler>
Expand All @@ -50,28 +46,12 @@ where
{
/// Builds a `NodeHandlerWrapperBuilder`.
#[inline]
pub(crate) fn new(handler: TIntoProtoHandler, in_timeout: Duration, out_timeout: Duration) -> Self {
pub(crate) fn new(handler: TIntoProtoHandler) -> Self {
NodeHandlerWrapperBuilder {
handler,
in_timeout,
out_timeout,
}
}

/// Sets the timeout to use when negotiating a protocol on an ingoing substream.
#[inline]
pub fn with_in_negotiation_timeout(mut self, timeout: Duration) -> Self {
self.in_timeout = timeout;
self
}

/// Sets the timeout to use when negotiating a protocol on an outgoing substream.
#[inline]
pub fn with_out_negotiation_timeout(mut self, timeout: Duration) -> Self {
self.out_timeout = timeout;
self
}

/// Builds the `NodeHandlerWrapper`.
#[deprecated(note = "Pass the NodeHandlerWrapperBuilder directly")]
#[inline]
Expand All @@ -82,8 +62,6 @@ where
handler: self.handler,
negotiating_in: Vec::new(),
negotiating_out: Vec::new(),
in_timeout: self.in_timeout,
out_timeout: self.out_timeout,
queued_dial_upgrades: Vec::new(),
unique_dial_upgrade_id: 0,
connection_shutdown: None,
Expand All @@ -105,8 +83,6 @@ where
handler: self.handler.into_handler(remote_peer_id),
negotiating_in: Vec::new(),
negotiating_out: Vec::new(),
in_timeout: self.in_timeout,
out_timeout: self.out_timeout,
queued_dial_upgrades: Vec::new(),
unique_dial_upgrade_id: 0,
connection_shutdown: None,
Expand All @@ -131,10 +107,6 @@ where
TProtoHandler::OutboundOpenInfo,
Timeout<OutboundUpgradeApply<TProtoHandler::Substream, TProtoHandler::OutboundProtocol>>,
)>,
/// Timeout for incoming substreams negotiation.
in_timeout: Duration,
/// Timeout for outgoing substreams negotiation.
out_timeout: Duration,
/// For each outbound substream request, how to upgrade it. The first element of the tuple
/// is the unique identifier (see `unique_dial_upgrade_id`).
queued_dial_upgrades: Vec<(u64, TProtoHandler::OutboundProtocol)>,
Expand Down Expand Up @@ -208,7 +180,8 @@ where
NodeHandlerEndpoint::Listener => {
let protocol = self.handler.listen_protocol();
let upgrade = upgrade::apply_inbound(substream, protocol);
let with_timeout = Timeout::new(upgrade, self.in_timeout);
let timeout = self.handler.inbound_timeout();
let with_timeout = Timeout::new(upgrade, timeout);
self.negotiating_in.push(with_timeout);
}
NodeHandlerEndpoint::Dialer((upgrade_id, user_data)) => {
Expand All @@ -226,7 +199,8 @@ where

let (_, proto_upgrade) = self.queued_dial_upgrades.remove(pos);
let upgrade = upgrade::apply_outbound(substream, proto_upgrade);
let with_timeout = Timeout::new(upgrade, self.out_timeout);
let timeout = self.handler.outbound_timeout();
let with_timeout = Timeout::new(upgrade, timeout);
self.negotiating_out.push((user_data, with_timeout));
}
}
Expand Down Expand Up @@ -270,7 +244,7 @@ where
} else {
debug_assert!(err.is_inner());
let err = err.into_inner().expect("Timeout error is one of {elapsed, \
timer, inner}; is_inner and is_elapsed are both false; error is \
timer, inner}; is_elapsed and is_timer are both false; error is \
inner; QED");
ProtocolsHandlerUpgrErr::Upgrade(err)
};
Expand All @@ -280,8 +254,8 @@ where
}
}

// Poll the handler at the end so that we see the consequences of the method calls on
// `self.handler`.
// Poll the handler at the end so that we see the consequences of the method
// calls on `self.handler`.
let poll_result = self.handler.poll()?;

self.connection_shutdown = match self.handler.connection_keep_alive() {
Expand Down
31 changes: 17 additions & 14 deletions core/src/swarm/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub trait NetworkBehaviour {
/// Event generated by the swarm.
type OutEvent;

/// Builds a new `ProtocolsHandler`.
/// Creates a new `ProtocolsHandler` for a connection with a peer.
fn new_handler(&mut self) -> Self::ProtocolsHandler;

/// Addresses that this behaviour is aware of for this specific peer, and that may allow
Expand Down Expand Up @@ -95,45 +95,48 @@ pub trait NetworkBehaviourEventProcess<TEvent> {
fn inject_event(&mut self, event: TEvent);
}

/// Action to perform.
/// An action that a [`NetworkBehaviour`] can trigger in the [`Swarm`]
/// in whose context it is executing.
#[derive(Debug, Clone)]
pub enum NetworkBehaviourAction<TInEvent, TOutEvent> {
/// Generate an event for the outside.
/// Instructs the `Swarm` to return an event when it is being polled.
GenerateEvent(TOutEvent),

// TODO: report new raw connection for usage after intercepting an address dial

/// Instructs the swarm to dial the given multiaddress without any expectation of a peer id.
/// Instructs the swarm to dial the given multiaddress, without a known `PeerId`.
DialAddress {
/// The address to dial.
address: Multiaddr,
},

/// Instructs the swarm to try reach the given peer.
/// Instructs the swarm to dial a known `PeerId`.
///
/// In the future, a corresponding `inject_dial_failure` or `inject_connected` function call
/// must be performed.
/// On success, [`NetworkBehaviour::inject_connected`] is invoked.
/// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked.
DialPeer {
/// The peer to try reach.
peer_id: PeerId,
},

/// If we're connected to the given peer, sends a message to the protocol handler.
/// Instructs the `Swarm` to send a message to a connected peer.
///
/// If we're not connected to this peer, does nothing. If necessary, the implementation of
/// `NetworkBehaviour` is supposed to track which peers we are connected to.
/// If the `Swarm` is connected to the peer, the message is delivered to the remote's
/// protocol handler. If there is no connection to the peer, the message is ignored.
/// To ensure delivery, the `NetworkBehaviour` must keep track of connected peers.
SendEvent {
/// The peer which to send the message to.
/// The peer to which to send the message.
peer_id: PeerId,
/// Event to send to the peer.
/// The message to send.
event: TInEvent,
},

/// Reports that a remote observes us as this address.
/// Informs the `Swarm` about a multi-address observed by a remote for
/// the local node.
///
/// The swarm will pass this address through the transport's NAT traversal.
ReportObservedAddr {
/// The address we're being observed as.
/// The observed address of the local node.
address: Multiaddr,
},
}
2 changes: 2 additions & 0 deletions protocols/ping/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ void = "1.0"

[dev-dependencies]
libp2p-tcp = { version = "0.6.0", path = "../../transports/tcp" }
libp2p-secio = { version = "0.6.0", path = "../../protocols/secio" }
libp2p-yamux = { version = "0.6.0", path = "../../muxers/yamux" }
tokio = "0.1"
tokio-tcp = "0.1"
Loading

0 comments on commit 732a219

Please sign in to comment.