Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): Make executor for connection tasks explicit #3097

Merged
merged 40 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ce2cb5c
Initial prototype commit
umgefahren Nov 8, 2022
6cb302f
Merge branch 'master' into executor-aware-swarm
umgefahren Nov 8, 2022
b69dc35
Update swarm/src/connection/pool.rs
umgefahren Nov 9, 2022
12e0a32
Moving tokio and async std executors
umgefahren Nov 9, 2022
ad2ad9b
Implement other changes
umgefahren Nov 9, 2022
92da6a2
Formatted
umgefahren Nov 9, 2022
e7253d1
Update swarm/src/connection/pool.rs
umgefahren Nov 10, 2022
3a24540
Update swarm/src/lib.rs
umgefahren Nov 10, 2022
98dc7ce
Implemented changes
umgefahren Nov 10, 2022
38b21d7
Merge branch 'executor-aware-swarm' of github.com:umgefahren/rust-lib…
umgefahren Nov 10, 2022
804b7d6
Merge branch 'master' into executor-aware-swarm
umgefahren Nov 10, 2022
8fd5e56
Fix tests
umgefahren Nov 10, 2022
ee9354c
Modified CI
umgefahren Nov 10, 2022
5381a2c
Corrected some clippy issues
umgefahren Nov 10, 2022
dc5f1b7
Update core/Cargo.toml
umgefahren Nov 10, 2022
b82db0b
Merge branch 'master' into executor-aware-swarm
umgefahren Nov 11, 2022
5365389
Implemented suggested changes
umgefahren Nov 11, 2022
50ee927
Merge branch 'master' into executor-aware-swarm
umgefahren Nov 12, 2022
e055d2a
Apply suggestions from code review
umgefahren Nov 12, 2022
5feb2e5
Implemented requested changes
umgefahren Nov 12, 2022
e6761c6
Use new api
umgefahren Nov 13, 2022
20c5a09
Update swarm/src/lib.rs
umgefahren Nov 13, 2022
eb7836d
Update swarm/src/lib.rs
umgefahren Nov 13, 2022
361ec51
Update swarm/src/lib.rs
umgefahren Nov 13, 2022
da955ba
Implemented more ergonomic api
umgefahren Nov 13, 2022
7d8cbe3
Move executor
umgefahren Nov 13, 2022
15607ba
Merge branch 'master' into executor-aware-swarm
umgefahren Nov 13, 2022
f0493ed
Correct bad merge
umgefahren Nov 13, 2022
91676a9
Do fmt
umgefahren Nov 13, 2022
d99b802
Added changelog entries
umgefahren Nov 13, 2022
05deb81
Merge branch 'master' into executor-aware-swarm
umgefahren Nov 13, 2022
69fa671
Apply suggestions from code review
umgefahren Nov 13, 2022
f27fdf8
Implement small changes
umgefahren Nov 13, 2022
742f555
Merge branch 'master' into executor-aware-swarm
umgefahren Nov 13, 2022
f61686d
Apply suggestions from code review
umgefahren Nov 14, 2022
d193da3
Merge branch 'master' into executor-aware-swarm
umgefahren Nov 14, 2022
30c5020
Addressed Max's concerns.
umgefahren Nov 14, 2022
f4ddadd
Merge branch 'master' into executor-aware-swarm
umgefahren Nov 15, 2022
51e98fb
Removed unnecessary inlines
umgefahren Nov 15, 2022
1edb41d
Merge branch 'master' into executor-aware-swarm
umgefahren Nov 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ secp256k1 = ["libp2p-core/secp256k1"]
rsa = ["libp2p-core/rsa"]
ecdsa = ["libp2p-core/ecdsa"]
serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"]
tokio = ["libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio"]
async-std = ["libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std"]
tokio = ["libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-swarm/tokio"]
async-std = ["libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-swarm/async-std"]

[dependencies]
bytes = "1"
Expand Down
7 changes: 7 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ mod peer_record_proto {
include!(concat!(env!("OUT_DIR"), "/peer_record_proto.rs"));
}

use futures::executor::ThreadPool;
/// Multi-address re-export.
pub use multiaddr;
pub type Negotiated<T> = multistream_select::Negotiated<T>;
Expand Down Expand Up @@ -102,3 +103,9 @@ impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for F {
self(f)
}
}

impl Executor for ThreadPool {
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
self.spawn_ok(future)
}
}
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
23 changes: 6 additions & 17 deletions examples/chat-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use libp2p::{
TokioMdns,
},
mplex, noise,
swarm::{SwarmBuilder, SwarmEvent},
swarm::SwarmEvent,
tcp, Multiaddr, NetworkBehaviour, PeerId, Transport,
};
use std::error::Error;
Expand Down Expand Up @@ -97,23 +97,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
}

// Create a Swarm to manage peers and events.
let mut swarm = {
let mdns = TokioMdns::new(Default::default())?;
let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(peer_id),
mdns,
};

behaviour.floodsub.subscribe(floodsub_topic.clone());

SwarmBuilder::new(transport, behaviour, peer_id)
// We want the connection background tasks to be spawned
// onto the tokio runtime.
.executor(Box::new(|fut| {
tokio::spawn(fut);
}))
.build()
let mdns = TokioMdns::new(Default::default())?;
let behaviour = MyBehaviour {
floodsub: Floodsub::new(peer_id),
mdns,
};
let mut swarm = libp2p_swarm::Swarm::with_tokio_executor(transport, behaviour, peer_id);

// Reach out to another node if specified
if let Some(to_dial) = std::env::args().nth(1) {
Expand Down
2 changes: 1 addition & 1 deletion examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
};

behaviour.floodsub.subscribe(floodsub_topic.clone());
Swarm::new(transport, behaviour, local_peer_id)
Swarm::with_threadpool_executor(transport, behaviour, local_peer_id)
};

// Reach out to another node if specified
Expand Down
5 changes: 2 additions & 3 deletions examples/file-sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ mod network {

// Build the Swarm, connecting the lower layer transport logic with the
// higher layer network behaviour logic.
let swarm = SwarmBuilder::new(
let swarm = Swarm::with_threadpool_executor(
libp2p::development_transport(id_keys).await?,
ComposedBehaviour {
kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)),
Expand All @@ -262,8 +262,7 @@ mod network {
),
},
peer_id,
)
.build();
);

let (command_sender, command_receiver) = mpsc::channel(0);
let (event_sender, event_receiver) = mpsc::channel(0);
Expand Down
7 changes: 5 additions & 2 deletions protocols/dcutr/examples/dcutr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use clap::Parser;
use futures::executor::block_on;
use futures::executor::{block_on, ThreadPool};
use futures::future::FutureExt;
use futures::stream::StreamExt;
use libp2p::core::multiaddr::{Multiaddr, Protocol};
Expand Down Expand Up @@ -155,7 +155,10 @@ fn main() -> Result<(), Box<dyn Error>> {
dcutr: dcutr::behaviour::Behaviour::new(),
};

let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id)
let mut swarm = match ThreadPool::new() {
Ok(tp) => SwarmBuilder::with_executor(transport, behaviour, local_peer_id, Box::new(tp)),
Err(_) => SwarmBuilder::without_executor(transport, behaviour, local_peer_id),
}
.dial_concurrency_factor(10_u8.try_into().unwrap())
.build();

Expand Down
8 changes: 2 additions & 6 deletions protocols/rendezvous/tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use libp2p::core::upgrade::SelectUpgrade;
use libp2p::core::{identity, Multiaddr, PeerId, Transport};
use libp2p::mplex::MplexConfig;
use libp2p::noise::NoiseAuthenticated;
use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent};
use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent};
use libp2p::yamux::YamuxConfig;
use std::fmt::Debug;
use std::time::Duration;
Expand All @@ -53,11 +53,7 @@ where
.timeout(Duration::from_secs(5))
.boxed();

SwarmBuilder::new(transport, behaviour_fn(peer_id, identity), peer_id)
.executor(Box::new(|future| {
let _ = tokio::spawn(future);
}))
.build()
Swarm::with_tokio_executor(transport, behaviour_fn(peer_id, identity), peer_id)
}

fn get_rand_memory_address() -> Multiaddr {
Expand Down
6 changes: 6 additions & 0 deletions swarm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@ rand = "0.8"
smallvec = "1.6.1"
thiserror = "1.0"
void = "1"
tokio = { version = "1.15", features = ["rt"], optional = true }
async-std = { version = "1.6.2", optional = true }

[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.9"
libp2p = { path = "..", features = ["full"] }
quickcheck = { package = "quickcheck-ext", path = "../misc/quickcheck-ext" }

[features]
tokio = ["dep:tokio"]
async-std = ["dep:async-std"]

# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling
[package.metadata.docs.rs]
Expand Down
89 changes: 50 additions & 39 deletions swarm/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,41 @@ use std::{
use void::Void;

mod concurrent_dial;
#[cfg(any(feature = "tokio", feature = "async-std"))]
pub mod executor;
mod task;

enum ExecSwitch {
Executor(Box<dyn Executor + Send>),
umgefahren marked this conversation as resolved.
Show resolved Hide resolved
LocalSpawn(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>),
}

impl ExecSwitch {
// advance the local queue
umgefahren marked this conversation as resolved.
Show resolved Hide resolved
#[inline]
umgefahren marked this conversation as resolved.
Show resolved Hide resolved
fn advance_local(&mut self, cx: &mut Context) {
match self {
ExecSwitch::Executor(_) => {}
ExecSwitch::LocalSpawn(local) => {
while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {}
}
}
}

#[inline]
fn spawn(&mut self, task: BoxFuture<'static, ()>) {
match self {
Self::Executor(executor) => executor.exec(task),
Self::LocalSpawn(local) => local.push(task),
}
}
}

/// A connection `Pool` manages a set of connections for each peer.
pub struct Pool<THandler: IntoConnectionHandler, TTrans>
pub struct Pool<THandler, TTrans>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
local_id: PeerId,

Expand Down Expand Up @@ -93,14 +122,9 @@ where
/// See [`Connection::max_negotiating_inbound_streams`].
max_negotiating_inbound_streams: usize,

/// The executor to use for running the background tasks. If `None`,
/// the tasks are kept in `local_spawns` instead and polled on the
/// current thread when the [`Pool`] is polled for new events.
executor: Option<Box<dyn Executor + Send>>,

/// If no `executor` is configured, tasks are kept in this set and
/// polled on the current thread when the [`Pool`] is polled for new events.
local_spawns: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// The executor to use for running connection tasks. Can either be a global executor
/// or a local queue.
executor: ExecSwitch,

/// Sender distributed to pending tasks for reporting events back
/// to the pool.
Expand Down Expand Up @@ -299,6 +323,10 @@ where
mpsc::channel(config.task_event_buffer_size);
let (established_connection_events_tx, established_connection_events_rx) =
mpsc::channel(config.task_event_buffer_size);
let executor = match config.executor {
Some(exec) => ExecSwitch::Executor(exec),
None => ExecSwitch::LocalSpawn(Default::default()),
};
Pool {
local_id,
counters: ConnectionCounters::new(limits),
Expand All @@ -309,8 +337,7 @@ where
dial_concurrency_factor: config.dial_concurrency_factor,
substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
executor: config.executor,
local_spawns: FuturesUnordered::new(),
executor,
pending_connection_events_tx,
pending_connection_events_rx,
established_connection_events_tx,
Expand Down Expand Up @@ -399,11 +426,7 @@ where
}

fn spawn(&mut self, task: BoxFuture<'static, ()>) {
if let Some(executor) = &mut self.executor {
executor.exec(task);
} else {
self.local_spawns.push(task);
}
self.executor.spawn(task)
}
}

Expand Down Expand Up @@ -821,7 +844,7 @@ where
}

// Advance the tasks in `local_spawns`.
umgefahren marked this conversation as resolved.
Show resolved Hide resolved
while let Poll::Ready(Some(())) = self.local_spawns.poll_next_unpin(cx) {}
self.executor.advance_local(cx);

Poll::Pending
}
Expand Down Expand Up @@ -1073,34 +1096,21 @@ pub struct PoolConfig {
max_negotiating_inbound_streams: usize,
}

impl Default for PoolConfig {
fn default() -> Self {
PoolConfig {
executor: None,
task_event_buffer_size: 32,
task_command_buffer_size: 7,
// Set to a default of 8 based on frequency of dialer connections
impl PoolConfig {
pub fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
Self {
executor,
task_command_buffer_size: 32,
task_event_buffer_size: 7,
dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
substream_upgrade_protocol_override: None,
max_negotiating_inbound_streams: 128,
}
}
}

impl PoolConfig {
/// Configures the executor to use for spawning connection background tasks.
pub fn with_executor(mut self, e: Box<dyn Executor + Send>) -> Self {
self.executor = Some(e);
self
}

/// Configures the executor to use for spawning connection background tasks,
/// only if no executor has already been configured.
pub fn or_else_with_executor<F>(mut self, f: F) -> Self
where
F: FnOnce() -> Option<Box<dyn Executor + Send>>,
{
self.executor = self.executor.or_else(f);
pub fn with_executor(mut self, executor: Box<dyn Executor + Send>) -> Self {
umgefahren marked this conversation as resolved.
Show resolved Hide resolved
self.executor = Some(executor);
self
}

Expand Down Expand Up @@ -1175,9 +1185,10 @@ mod tests {
fn exec(&self, _: Pin<Box<dyn Future<Output = ()> + Send>>) {}
}

// TODO: This test has to be redesigned.
umgefahren marked this conversation as resolved.
Show resolved Hide resolved
#[test]
fn set_executor() {
PoolConfig::default()
PoolConfig::new(None)
.with_executor(Box::new(Dummy))
.with_executor(Box::new(|f| {
async_std::task::spawn(f);
Expand Down
24 changes: 24 additions & 0 deletions swarm/src/connection/pool/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use libp2p_core::Executor;
use std::{future::Future, pin::Pin};

#[cfg(feature = "tokio")]
#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TokioExecutor;

#[cfg(feature = "tokio")]
impl Executor for TokioExecutor {
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = tokio::spawn(future);
}
}

#[cfg(feature = "async-std")]
#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct AsyncStdExecutor;

#[cfg(feature = "async-std")]
impl Executor for AsyncStdExecutor {
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = async_std::task::spawn(future);
}
}
Loading