Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Expose KeepAlive information to NetworkBehaviour #1717

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnecti

use crate::muxing::StreamMuxer;
use crate::{Multiaddr, PeerId};
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use std::{borrow::Cow, error::Error, fmt, pin::Pin, task::Context, task::Poll};
use std::hash::Hash;
use substream::{Muxing, SubstreamEvent};

Expand Down Expand Up @@ -199,6 +199,8 @@ pub enum Event<T> {
Handler(T),
/// Address of the remote has changed.
AddressChange(Multiaddr),
/// The protocol keeping the connection alive changed.
KeepAliveProtocolChange(Cow<'static, [u8]>),
}

/// A multiplexed connection to a peer with an associated `ConnectionHandler`.
Expand Down Expand Up @@ -307,6 +309,9 @@ where
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => {
return Poll::Ready(Ok(Event::Handler(event)));
}
Poll::Ready(Ok(ConnectionHandlerEvent::KeepAliveProtocolChange(p))) => {
return Poll::Ready(Ok(Event::KeepAliveProtocolChange(p)))
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),
}
}
Expand Down
14 changes: 11 additions & 3 deletions core/src/connection/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{Multiaddr, PeerId};
use std::{task::Context, task::Poll};
use std::{borrow::Cow, task::Context, task::Poll};
use super::{Connected, SubstreamEndpoint};

/// The interface of a connection handler.
Expand Down Expand Up @@ -92,11 +92,14 @@ where
}

/// Event produced by a handler.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConnectionHandlerEvent<TOutboundOpenInfo, TCustom> {
/// Require a new outbound substream to be opened with the remote.
OutboundSubstreamRequest(TOutboundOpenInfo),

/// The protocol keeping the connection alive changed.
KeepAliveProtocolChange(Cow<'static, [u8]>),

/// Other event.
Custom(TCustom),
}
Expand All @@ -112,6 +115,9 @@ impl<TOutboundOpenInfo, TCustom> ConnectionHandlerEvent<TOutboundOpenInfo, TCust
ConnectionHandlerEvent::OutboundSubstreamRequest(map(val))
},
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val),
ConnectionHandlerEvent::KeepAliveProtocolChange(p) => {
ConnectionHandlerEvent::KeepAliveProtocolChange(p)
},
}
}

Expand All @@ -123,8 +129,10 @@ impl<TOutboundOpenInfo, TCustom> ConnectionHandlerEvent<TOutboundOpenInfo, TCust
ConnectionHandlerEvent::OutboundSubstreamRequest(val) => {
ConnectionHandlerEvent::OutboundSubstreamRequest(val)
},
ConnectionHandlerEvent::KeepAliveProtocolChange(p) => {
ConnectionHandlerEvent::KeepAliveProtocolChange(p)
},
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(map(val)),
}
}
}

14 changes: 13 additions & 1 deletion core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use futures::{
stream::FuturesUnordered
};
use std::{
borrow::Cow,
collections::hash_map,
error,
fmt,
Expand Down Expand Up @@ -234,6 +235,12 @@ pub enum Event<'a, I, O, H, TE, HE, C> {
/// The new [`ConnectedPoint`].
new_endpoint: ConnectedPoint,
},

/// A connection is being kept alive by a different protocol.
KeepAliveProtocolChange{
entry: EstablishedEntry<'a, I, C>,
new_protocol: Cow<'static, [u8]>,
},
}

impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
Expand Down Expand Up @@ -399,6 +406,12 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
new_endpoint: new,
}
}
task::Event::KeepAliveProtocolChange { id: _, new_protocol } => {
Event::KeepAliveProtocolChange {
entry: EstablishedEntry { task },
new_protocol,
}
}
task::Event::Closed { id, error } => {
let id = ConnectionId(id);
let task = task.remove();
Expand Down Expand Up @@ -532,4 +545,3 @@ impl<'a, I, C> PendingEntry<'a, I, C> {
self.task.remove();
}
}

11 changes: 10 additions & 1 deletion core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
},
};
use futures::{prelude::*, channel::mpsc, stream};
use std::{pin::Pin, task::Context, task::Poll};
use std::{borrow::Cow, pin::Pin, task::Context, task::Poll};
use super::ConnectResult;

/// Identifier of a [`Task`] in a [`Manager`](super::Manager).
Expand All @@ -60,6 +60,8 @@ pub enum Event<T, H, TE, HE, C> {
Failed { id: TaskId, error: PendingConnectionError<TE>, handler: H },
/// A node we are connected to has changed its address.
AddressChange { id: TaskId, new_address: Multiaddr },
/// The protocol keeping the connection alive changed.
KeepAliveProtocolChange{ id: TaskId, new_protocol: Cow<'static, [u8]> },
/// Notify the manager of an event from the connection.
Notify { id: TaskId, event: T },
/// A connection closed, possibly due to an error.
Expand All @@ -75,6 +77,7 @@ impl<T, H, TE, HE, C> Event<T, H, TE, HE, C> {
Event::Established { id, .. } => id,
Event::Failed { id, .. } => id,
Event::AddressChange { id, .. } => id,
Event::KeepAliveProtocolChange { id, .. } => id,
Event::Notify { id, .. } => id,
Event::Closed { id, .. } => id,
}
Expand Down Expand Up @@ -300,6 +303,12 @@ where
event: Some(Event::AddressChange { id, new_address })
};
}
Poll::Ready(Ok(connection::Event::KeepAliveProtocolChange(new_protocol))) => {
this.state = State::Established {
connection,
event: Some(Event::KeepAliveProtocolChange{ id, new_protocol })
};
}
Poll::Ready(Err(error)) => {
// Don't accept any further commands.
this.commands.get_mut().close();
Expand Down
27 changes: 26 additions & 1 deletion core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use either::Either;
use fnv::FnvHashMap;
use futures::prelude::*;
use smallvec::SmallVec;
use std::{convert::TryFrom as _, error, fmt, hash::Hash, num::NonZeroU32, task::Context, task::Poll};
use std::{borrow::Cow, convert::TryFrom as _, error, fmt, hash::Hash, num::NonZeroU32, task::Context, task::Poll};

/// A connection `Pool` manages a set of connections for each peer.
pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo = PeerId, TPeerId = PeerId> {
Expand Down Expand Up @@ -152,6 +152,14 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC
/// The old endpoint.
old_endpoint: ConnectedPoint,
},

/// The connection to a node is being kept alive by a different protocol.
KeepAliveProtocolChange {
/// The connection that is being kept alive by a different protocol.
connection: EstablishedConnection<'a, TInEvent, TConnInfo>,
/// The identifier of the protocol keeping the connection alive.
new_protocol: Cow<'static, [u8]>,
}
}

impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> fmt::Debug
Expand Down Expand Up @@ -196,6 +204,12 @@ where
.field("old_endpoint", old_endpoint)
.finish()
},
PoolEvent::KeepAliveProtocolChange { ref connection, ref new_protocol } => {
f.debug_struct("PoolEvent::KeepAliveProtocolChange")
.field("conn_info", connection.info())
.field("new_protocol", new_protocol)
.finish()
},
}
}
}
Expand Down Expand Up @@ -738,6 +752,17 @@ where
_ => unreachable!("since `entry` is an `EstablishedEntry`.")
}
},
manager::Event::KeepAliveProtocolChange { entry, new_protocol } => {
let id = entry.id();
match self.get(id) {
Some(PoolConnection::Established(connection)) =>
return Poll::Ready(PoolEvent::KeepAliveProtocolChange {
connection,
new_protocol
}),
_ => unreachable!("since `entry` is an `EstablishedEntry`.")
}
},
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,12 @@ where
old_endpoint,
}
}
Poll::Ready(PoolEvent::KeepAliveProtocolChange { connection, new_protocol }) => {
NetworkEvent::KeepAliveProtocolChange {
connection,
new_protocol,
}
}
};

Poll::Ready(event)
Expand Down
16 changes: 15 additions & 1 deletion core/src/network/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
transport::{Transport, TransportError},
};
use futures::prelude::*;
use std::{error, fmt, hash::Hash, num::NonZeroU32};
use std::{borrow::Cow, error, fmt, hash::Hash, num::NonZeroU32};

/// Event that can happen on the `Network`.
pub enum NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
Expand Down Expand Up @@ -174,6 +174,14 @@ where
/// Old endpoint of this connection.
old_endpoint: ConnectedPoint,
},

/// An established connection is being kept alive by a different protocol.
KeepAliveProtocolChange {
/// The connection whose address has changed.
connection: EstablishedConnection<'a, TInEvent, TConnInfo>,
/// New endpoint of this connection.
new_protocol: Cow<'static, [u8]>,
},
}

impl<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
Expand Down Expand Up @@ -267,6 +275,12 @@ where
.field("old_endpoint", old_endpoint)
.finish()
}
NetworkEvent::KeepAliveProtocolChange { connection, new_protocol } => {
f.debug_struct("KeepAliveProtocolChange")
.field("connection", connection)
.field("new_protocol", new_protocol)
.finish()
}
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,18 @@ impl GossipsubHandler {
max_transmit_size: usize,
validation_mode: ValidationMode,
) -> Self {
let protocol_id = protocol_id.into();
GossipsubHandler {
listen_protocol: SubstreamProtocol::new(ProtocolConfig::new(
protocol_id,
protocol_id.clone(),
max_transmit_size,
validation_mode,
)),
inbound_substream: None,
outbound_substream: None,
outbound_substream_establishing: false,
send_queue: SmallVec::new(),
keep_alive: KeepAlive::Yes,
keep_alive: KeepAlive::Yes { protocol: protocol_id },
}
}
}
Expand Down Expand Up @@ -161,7 +162,7 @@ impl ProtocolsHandler for GossipsubHandler {
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
self.keep_alive.clone()
}

fn poll(
Expand Down
8 changes: 4 additions & 4 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{RemoteInfo, IdentifyProtocolConfig, ReplySubstream};
use crate::protocol::{DEFAULT_PROTO_NAME, RemoteInfo, IdentifyProtocolConfig, ReplySubstream};
use futures::prelude::*;
use libp2p_core::upgrade::{
InboundUpgrade,
Expand All @@ -34,7 +34,7 @@ use libp2p_swarm::{
ProtocolsHandlerUpgrErr
};
use smallvec::SmallVec;
use std::{pin::Pin, task::Context, task::Poll, time::Duration};
use std::{borrow::Cow, pin::Pin, task::Context, task::Poll, time::Duration};
use wasm_timer::Delay;

/// Delay between the moment we connect and the first time we identify.
Expand Down Expand Up @@ -81,7 +81,7 @@ impl IdentifyHandler {
config: IdentifyProtocolConfig,
events: SmallVec::new(),
next_id: Delay::new(DELAY_TO_FIRST_ID),
keep_alive: KeepAlive::Yes,
keep_alive: KeepAlive::Yes { protocol: Cow::Borrowed(DEFAULT_PROTO_NAME) },
}
}
}
Expand Down Expand Up @@ -129,7 +129,7 @@ impl ProtocolsHandler for IdentifyHandler {
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
self.keep_alive.clone()
}

fn poll(&mut self, cx: &mut Context<'_>) -> Poll<
Expand Down
5 changes: 4 additions & 1 deletion protocols/identify/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ use prost::Message;
use std::convert::TryFrom;
use std::{fmt, io, iter, pin::Pin};

/// The protocol name used for negotiating with multistream-select.
pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/id/1.0.0";

/// Configuration for an upgrade to the `Identify` protocol.
#[derive(Debug, Clone)]
pub struct IdentifyProtocolConfig;
Expand Down Expand Up @@ -115,7 +118,7 @@ impl UpgradeInfo for IdentifyProtocolConfig {
type InfoIter = iter::Once<Self::Info>;

fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/ipfs/id/1.0.0")
iter::once(DEFAULT_PROTO_NAME)
}
}

Expand Down
21 changes: 16 additions & 5 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,10 @@ struct UniqueConnecId(u64);
impl<TUserData> KademliaHandler<TUserData> {
/// Create a [`KademliaHandler`] using the given configuration.
pub fn new(config: KademliaHandlerConfig) -> Self {
let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout);
let keep_alive = KeepAlive::Until {
deadline: Instant::now() + config.idle_timeout,
protocol: config.protocol_config.protocol_name().clone(),
};

KademliaHandler {
config,
Expand Down Expand Up @@ -607,7 +610,7 @@ where

#[inline]
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
self.keep_alive.clone()
}

fn poll(
Expand All @@ -632,7 +635,10 @@ where
}
(None, Some(event), _) => {
if self.substreams.is_empty() {
self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout);
self.keep_alive = KeepAlive::Until {
deadline: Instant::now() + self.config.idle_timeout,
protocol: self.config.protocol_config.protocol_name().clone(),
};
}
return Poll::Ready(event);
}
Expand All @@ -653,9 +659,14 @@ where

if self.substreams.is_empty() {
// We destroyed all substreams in this function.
self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout);
self.keep_alive = KeepAlive::Until {
deadline: Instant::now() + self.config.idle_timeout,
protocol: self.config.protocol_config.protocol_name().clone(),
}
} else {
self.keep_alive = KeepAlive::Yes;
self.keep_alive = KeepAlive::Yes {
protocol: self.config.protocol_config.protocol_name().clone(),
};
}

Poll::Pending
Expand Down
Loading