Skip to content

Commit

Permalink
Permit concurrent dialing attempts per peer.
Browse files Browse the repository at this point in the history
This is a follow-up to libp2p#1440
and relates to libp2p#925.
This change permits multiple dialing attempts per peer.
Note though that `libp2p-swarm` does not yet make use of this ability,
retaining the current behaviour. The essence of the changes are that the
`Peer` API now provides `Peer::dial()`, i.e. regardless of the state in
which the peer is. A dialing attempt is always made up of one or more
addresses tried sequentially, as before, but now there can be multiple
dialing attempts per peer. A configurable per-peer limit for outgoing
connections and thus concurrent dialing attempts is also included.
  • Loading branch information
Roman S. Borschel committed Apr 8, 2020
1 parent c1191d5 commit cb15028
Show file tree
Hide file tree
Showing 5 changed files with 418 additions and 302 deletions.
34 changes: 28 additions & 6 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,7 @@ where
TPeerId: Clone + Send + 'static,
{
let endpoint = info.to_connected_point();
if let Some(limit) = self.limits.max_incoming {
let current = self.iter_pending_incoming().count();
if current >= limit {
return Err(ConnectionLimit { limit, current })
}
}
self.limits.check_incoming(|| self.iter_pending_incoming().count())?;
Ok(self.add_pending(future, handler, endpoint, None))
}

Expand Down Expand Up @@ -267,6 +262,11 @@ where
TPeerId: Clone + Send + 'static,
{
self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?;

if let Some(peer) = &info.peer_id {
self.limits.check_outgoing_per_peer(|| self.num_peer_outgoing(peer))?;
}

let endpoint = info.to_connected_point();
Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
}
Expand Down Expand Up @@ -465,6 +465,13 @@ where
self.established.get(peer).map_or(0, |conns| conns.len())
}

/// Counts the number of pending outgoing connections to the given peer.
pub fn num_peer_outgoing(&self, peer: &TPeerId) -> usize {
self.iter_pending_outgoing()
.filter(|info| info.peer_id == Some(peer))
.count()
}

/// Returns an iterator over all established connections of `peer`.
pub fn iter_peer_established<'a>(&'a mut self, peer: &TPeerId)
-> EstablishedConnectionIter<'a,
Expand Down Expand Up @@ -837,6 +844,7 @@ pub struct PoolLimits {
pub max_outgoing: Option<usize>,
pub max_incoming: Option<usize>,
pub max_established_per_peer: Option<usize>,
pub max_outgoing_per_peer: Option<usize>,
}

impl PoolLimits {
Expand All @@ -854,6 +862,20 @@ impl PoolLimits {
Self::check(current, self.max_outgoing)
}

fn check_incoming<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_pending_incoming)
}

fn check_outgoing_per_peer<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_outgoing_per_peer)
}

fn check<F>(current: F, limit: Option<usize>) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
Expand Down
74 changes: 44 additions & 30 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::{
};
use fnv::{FnvHashMap};
use futures::{prelude::*, future};
use smallvec::SmallVec;
use std::{
collections::hash_map,
convert::TryFrom as _,
Expand Down Expand Up @@ -78,21 +79,17 @@ where

/// The ongoing dialing attempts.
///
/// The `Network` enforces a single ongoing dialing attempt per peer,
/// even if multiple (established) connections per peer are allowed.
/// However, a single dialing attempt operates on a list of addresses
/// to connect to, which can be extended with new addresses while
/// the connection attempt is still in progress. Thereby each
/// dialing attempt is associated with a new connection and hence a new
/// connection ID.
/// There may be multiple ongoing dialing attempts to the same peer.
/// Each dialing attempt is associated with a new connection and hence
/// a new connection ID.
///
/// > **Note**: `dialing` must be consistent with the pending outgoing
/// > connections in `pool`. That is, for every entry in `dialing`
/// > there must exist a pending outgoing connection in `pool` with
/// > the same connection ID. This is ensured by the implementation of
/// > `Network` (see `dial_peer_impl` and `on_connection_failed`)
/// > together with the implementation of `DialingConnection::abort`.
dialing: FnvHashMap<TPeerId, peer::DialingAttempt>,
/// > together with the implementation of `DialingAttempt::abort`.
dialing: FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
}

impl<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
Expand Down Expand Up @@ -381,8 +378,11 @@ where
Poll::Pending => return Poll::Pending,
Poll::Ready(PoolEvent::ConnectionEstablished { connection, num_established }) => {
match self.dialing.entry(connection.peer_id().clone()) {
hash_map::Entry::Occupied(e) if e.get().id == connection.id() => {
e.remove();
hash_map::Entry::Occupied(mut e) => {
e.get_mut().retain(|s| s.current.0 != connection.id());
if e.get().is_empty() {
e.remove();
}
},
_ => {}
}
Expand Down Expand Up @@ -453,7 +453,7 @@ fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans, TConnInfo, TPee
transport: TTrans,
pool: &mut Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error, TConnInfo, TPeerId>,
dialing: &mut FnvHashMap<TPeerId, peer::DialingAttempt>,
dialing: &mut FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<TPeerId, THandler>
) -> Result<ConnectionId, ConnectionLimit>
where
Expand Down Expand Up @@ -489,14 +489,12 @@ where
};

if let Ok(id) = &result {
let former = dialing.insert(opts.peer,
peer::DialingAttempt {
id: *id,
current: opts.address,
next: opts.remaining,
dialing.entry(opts.peer).or_default().push(
peer::DialingState {
current: (*id, opts.address),
remaining: opts.remaining,
},
);
debug_assert!(former.is_none());
}

result
Expand All @@ -508,7 +506,7 @@ where
/// If the failed connection attempt was a dialing attempt and there
/// are more addresses to try, new `DialingOpts` are returned.
fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
dialing: &mut FnvHashMap<TPeerId, peer::DialingAttempt>,
dialing: &mut FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
id: ConnectionId,
endpoint: ConnectedPoint,
error: PendingConnectionError<TTrans::Error>,
Expand All @@ -521,26 +519,33 @@ where
TPeerId: Eq + Hash + Clone,
{
// Check if the failed connection is associated with a dialing attempt.
// TODO: could be more optimal than iterating over everything
let dialing_peer = dialing.iter() // (1)
.find(|(_, a)| a.id == id)
.map(|(p, _)| p.clone());
let dialing_failed = dialing.iter_mut()
.find_map(|(peer, attempts)| {
if let Some(pos) = attempts.iter().position(|s| s.current.0 == id) {
let attempt = attempts.remove(pos);
let last = attempts.is_empty();
Some((peer.clone(), attempt, last))
} else {
None
}
});

if let Some(peer_id) = dialing_peer {
// A pending outgoing connection to a known peer failed.
let mut attempt = dialing.remove(&peer_id).expect("by (1)");
if let Some((peer_id, mut attempt, last)) = dialing_failed {
if last {
dialing.remove(&peer_id);
}

let num_remain = u32::try_from(attempt.next.len()).unwrap();
let failed_addr = attempt.current.clone();
let num_remain = u32::try_from(attempt.remaining.len()).unwrap();
let failed_addr = attempt.current.1.clone();

let opts =
if num_remain > 0 {
let next_attempt = attempt.next.remove(0);
let next_attempt = attempt.remaining.remove(0);
let opts = DialingOpts {
peer: peer_id.clone(),
handler,
address: next_attempt,
remaining: attempt.next
remaining: attempt.remaining
};
Some(opts)
} else {
Expand Down Expand Up @@ -575,9 +580,13 @@ where
/// Information about the network obtained by [`Network::info()`].
#[derive(Clone, Debug)]
pub struct NetworkInfo {
/// The total number of connected peers.
pub num_peers: usize,
/// The total number of connections, both established and pending.
pub num_connections: usize,
/// The total number of pending connections, both incoming and outgoing.
pub num_connections_pending: usize,
/// The total number of established connections.
pub num_connections_established: usize,
}

Expand Down Expand Up @@ -627,4 +636,9 @@ impl NetworkConfig {
self.pool_limits.max_established_per_peer = Some(n);
self
}

pub fn set_outgoing_per_peer_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_outgoing_per_peer = Some(n);
self
}
}
Loading

0 comments on commit cb15028

Please sign in to comment.