Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge branch 'libp2p/master' into kad-query-info #4

Merged
merged 8 commits into from
May 16, 2020
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Version ???

- `libp2p-core`, `libp2p-swarm`: Added support for multiple dialing
attempts per peer, with a configurable limit.
[PR 1506](https://github.com/libp2p/rust-libp2p/pull/1506)

- `libp2p-core`: Updated to multihash 0.11.0.
[PR 1566](https://github.com/libp2p/rust-libp2p/pull/1566)

- `libp2p-core`: Make the number of events buffered to/from tasks configurable.
[PR 1574](https://github.com/libp2p/rust-libp2p/pull/1574)

- `libp2p-noise`: Added the `X25519Spec` protocol suite which uses
libp2p-noise-spec compliant signatures on static keys as well as the
`/noise` protocol upgrade, hence providing a libp2p-noise-spec compliant
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ secp256k1 = ["libp2p-core/secp256k1", "libp2p-secio/secp256k1"]
bytes = "0.5"
futures = "0.3.1"
multiaddr = { package = "parity-multiaddr", version = "0.8.0", path = "misc/multiaddr" }
multihash = "0.10"
multihash = "0.11.0"
lazy_static = "1.2"
libp2p-mplex = { version = "0.18.0", path = "muxers/mplex", optional = true }
libp2p-identify = { version = "0.18.0", path = "protocols/identify", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ lazy_static = "1.2"
libsecp256k1 = { version = "0.3.1", optional = true }
log = "0.4"
multiaddr = { package = "parity-multiaddr", version = "0.8.0", path = "../misc/multiaddr" }
multihash = "0.10"
multihash = "0.11.0"
multistream-select = { version = "0.8.0", path = "../misc/multistream-select" }
parking_lot = "0.10.0"
pin-project = "0.4.6"
Expand Down
40 changes: 35 additions & 5 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ pub struct Manager<I, O, H, E, HE, C> {
/// Next available identifier for a new connection / task.
next_task_id: TaskId,

/// Size of the task command buffer (per task).
task_command_buffer_size: usize,

/// The executor to use for running the background tasks. If `None`,
/// the tasks are kept in `local_spawns` instead and polled on the
/// current thread when the manager is polled for new events.
Expand Down Expand Up @@ -127,6 +130,32 @@ where
}
}

/// Configuration options when creating a [`Manager`].
///
/// The default configuration specifies no dedicated task executor, a
/// task event buffer size of 32, and a task command buffer size of 7.
#[non_exhaustive]
pub struct ManagerConfig {
/// Executor to use to spawn tasks.
pub executor: Option<Box<dyn Executor + Send>>,

/// Size of the task command buffer (per task).
pub task_command_buffer_size: usize,

/// Size of the task event buffer (for all tasks).
pub task_event_buffer_size: usize,
}

impl Default for ManagerConfig {
fn default() -> Self {
ManagerConfig {
executor: None,
task_event_buffer_size: 32,
task_command_buffer_size: 7,
}
}
}

/// Internal information about a running task.
///
/// Contains the sender to deliver event messages to the task, and
Expand Down Expand Up @@ -196,12 +225,13 @@ pub enum Event<'a, I, O, H, TE, HE, C> {

impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
/// Creates a new connection manager.
pub fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
let (tx, rx) = mpsc::channel(1);
pub fn new(config: ManagerConfig) -> Self {
let (tx, rx) = mpsc::channel(config.task_event_buffer_size);
Self {
tasks: FnvHashMap::default(),
next_task_id: TaskId(0),
executor,
task_command_buffer_size: config.task_command_buffer_size,
executor: config.executor,
local_spawns: FuturesUnordered::new(),
events_tx: tx,
events_rx: rx
Expand Down Expand Up @@ -234,7 +264,7 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
let task_id = self.next_task_id;
self.next_task_id.0 += 1;

let (tx, rx) = mpsc::channel(4);
let (tx, rx) = mpsc::channel(self.task_command_buffer_size);
self.tasks.insert(task_id, TaskInfo { sender: tx, state: TaskState::Pending });

let task = Box::pin(Task::pending(task_id, self.events_tx.clone(), rx, future, handler));
Expand Down Expand Up @@ -269,7 +299,7 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
let task_id = self.next_task_id;
self.next_task_id.0 += 1;

let (tx, rx) = mpsc::channel(4);
let (tx, rx) = mpsc::channel(self.task_command_buffer_size);
self.tasks.insert(task_id, TaskInfo {
sender: tx, state: TaskState::Established(info)
});
Expand Down
41 changes: 31 additions & 10 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
Executor,
ConnectedPoint,
PeerId,
connection::{
Expand All @@ -36,7 +35,7 @@ use crate::{
OutgoingInfo,
Substream,
PendingConnectionError,
manager::{self, Manager},
manager::{self, Manager, ManagerConfig},
},
muxing::StreamMuxer,
};
Expand Down Expand Up @@ -175,13 +174,13 @@ where
/// Creates a new empty `Pool`.
pub fn new(
local_id: TPeerId,
executor: Option<Box<dyn Executor + Send>>,
manager_config: ManagerConfig,
limits: PoolLimits
) -> Self {
Pool {
local_id,
limits,
manager: Manager::new(executor),
manager: Manager::new(manager_config),
established: Default::default(),
pending: Default::default(),
}
Expand Down Expand Up @@ -225,12 +224,7 @@ where
TPeerId: Clone + Send + 'static,
{
let endpoint = info.to_connected_point();
if let Some(limit) = self.limits.max_incoming {
let current = self.iter_pending_incoming().count();
if current >= limit {
return Err(ConnectionLimit { limit, current })
}
}
self.limits.check_incoming(|| self.iter_pending_incoming().count())?;
Ok(self.add_pending(future, handler, endpoint, None))
}

Expand Down Expand Up @@ -267,6 +261,11 @@ where
TPeerId: Clone + Send + 'static,
{
self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?;

if let Some(peer) = &info.peer_id {
self.limits.check_outgoing_per_peer(|| self.num_peer_outgoing(peer))?;
}

let endpoint = info.to_connected_point();
Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
}
Expand Down Expand Up @@ -465,6 +464,13 @@ where
self.established.get(peer).map_or(0, |conns| conns.len())
}

/// Counts the number of pending outgoing connections to the given peer.
pub fn num_peer_outgoing(&self, peer: &TPeerId) -> usize {
self.iter_pending_outgoing()
.filter(|info| info.peer_id == Some(peer))
.count()
}

/// Returns an iterator over all established connections of `peer`.
pub fn iter_peer_established<'a>(&'a mut self, peer: &TPeerId)
-> EstablishedConnectionIter<'a,
Expand Down Expand Up @@ -837,6 +843,7 @@ pub struct PoolLimits {
pub max_outgoing: Option<usize>,
pub max_incoming: Option<usize>,
pub max_established_per_peer: Option<usize>,
pub max_outgoing_per_peer: Option<usize>,
}

impl PoolLimits {
Expand All @@ -854,6 +861,20 @@ impl PoolLimits {
Self::check(current, self.max_outgoing)
}

fn check_incoming<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_incoming)
}

fn check_outgoing_per_peer<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_outgoing_per_peer)
}

fn check<F>(current: F, limit: Option<usize>) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
Expand Down
Loading