Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Add connect integration test #108

Merged
merged 6 commits into from
Mar 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 94 additions & 68 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,18 @@ impl<Types: IpfsTypes> fmt::Debug for IpfsOptions<Types> {
}
}

impl IpfsOptions<TestTypes> {
/// Creates an inmemory store backed node for tests
pub fn inmemory_with_generated_keys(mdns: bool) -> Self {
Self::new(
std::env::temp_dir().into(),
Keypair::generate_ed25519(),
vec![],
mdns,
)
}
}

/// Workaround for libp2p::identity::Keypair missing a Debug impl, works with references and owned
/// keypairs.
#[derive(Clone)]
Expand Down Expand Up @@ -407,84 +419,98 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
use futures::Stream;
use libp2p::Swarm;
use libp2p::{swarm::SwarmEvent, Swarm};

// begin by polling the swarm so that initially it'll first have chance to bind listeners
// and such. TODO: this no longer needs to be a swarm event but perhaps we should
// consolidate logging of these events here, if necessary?
loop {
// temporary pinning of the receivers should be safe as we are pinning through the
// already pinned self. with the receivers we can also safely ignore exhaustion
// as those are fused.
loop {
let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) {
Poll::Ready(Some(evt)) => evt,
// doing teardown also after the `Ipfs` has been dropped
Poll::Ready(None) => IpfsEvent::Exit,
let inner = {
let next = self.swarm.next_event();
futures::pin_mut!(next);
match next.poll(ctx) {
Poll::Ready(inner) => inner,
Poll::Pending => break,
};

match inner {
IpfsEvent::Connect(addr, ret) => {
let fut = self.swarm.connect(addr);
task::spawn(async move {
let res = fut.await.map_err(|err| format_err!("{}", err));
ret.send(res).ok();
});
}
IpfsEvent::Addresses(ret) => {
let addrs = self.swarm.addrs();
ret.send(Ok(addrs)).ok();
}
IpfsEvent::Listeners(ret) => {
let listeners = Swarm::listeners(&self.swarm).cloned().collect();
ret.send(Ok(listeners)).ok();
}
IpfsEvent::Connections(ret) => {
let connections = self.swarm.connections();
ret.send(Ok(connections)).ok();
}
IpfsEvent::Disconnect(addr, ret) => {
if let Some(disconnector) = self.swarm.disconnect(addr) {
disconnector.disconnect(&mut self.swarm);
}
ret.send(Ok(())).ok();
}
IpfsEvent::GetAddresses(ret) => {
// perhaps this could be moved under `IpfsEvent` or free functions?
let mut addresses = Vec::new();
addresses.extend(Swarm::listeners(&self.swarm).cloned());
addresses.extend(Swarm::external_addresses(&self.swarm).cloned());
// ignore error, perhaps caller went away already
let _ = ret.send(addresses);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());
}
}
};
match inner {
SwarmEvent::Behaviour(()) => {}
SwarmEvent::Connected(_peer_id) => {}
SwarmEvent::Disconnected(_peer_id) => {}
SwarmEvent::NewListenAddr(_addr) => {}
SwarmEvent::ExpiredListenAddr(_addr) => {}
SwarmEvent::UnreachableAddr {
peer_id: _peer_id,
address: _address,
error: _error,
} => {}
SwarmEvent::StartConnect(_peer_id) => {}
Comment on lines +436 to +447
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see no harm in keeping this because it's easy to put dbg!(inner) on line 436 here.

}
}

// Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy
// wants this to be written with a `while let`.
while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid),
RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid),
}
}
// temporary pinning of the receivers should be safe as we are pinning through the
// already pinned self. with the receivers we can also safely ignore exhaustion
// as those are fused.
loop {
let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) {
Poll::Ready(Some(evt)) => evt,
// doing teardown also after the `Ipfs` has been dropped
Poll::Ready(None) => IpfsEvent::Exit,
Poll::Pending => break,
};

{
let poll = Pin::new(&mut self.swarm).poll_next(ctx);
match poll {
Poll::Ready(Some(_)) => {}
Poll::Ready(None) => {
// this should never happen with libp2p swarm
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
match inner {
IpfsEvent::Connect(addr, ret) => {
let fut = self.swarm.connect(addr);
task::spawn(async move {
let res = fut.await.map_err(|err| format_err!("{}", err));
ret.send(res).ok();
});
}
IpfsEvent::Addresses(ret) => {
let addrs = self.swarm.addrs();
ret.send(Ok(addrs)).ok();
}
IpfsEvent::Listeners(ret) => {
let listeners = Swarm::listeners(&self.swarm).cloned().collect();
ret.send(Ok(listeners)).ok();
}
IpfsEvent::Connections(ret) => {
let connections = self.swarm.connections();
ret.send(Ok(connections)).ok();
}
IpfsEvent::Disconnect(addr, ret) => {
if let Some(disconnector) = self.swarm.disconnect(addr) {
disconnector.disconnect(&mut self.swarm);
}
ret.send(Ok(())).ok();
}
IpfsEvent::GetAddresses(ret) => {
// perhaps this could be moved under `IpfsEvent` or free functions?
let mut addresses = Vec::new();
addresses.extend(Swarm::listeners(&self.swarm).cloned());
addresses.extend(Swarm::external_addresses(&self.swarm).cloned());
// ignore error, perhaps caller went away already
let _ = ret.send(addresses);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());
}
}
}

// Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy
// wants this to be written with a `while let`.
while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid),
RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid),
}
}

Poll::Pending
}
}

Expand Down
35 changes: 26 additions & 9 deletions src/p2p/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p::swarm::protocols_handler::{
DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler,
};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, Swarm};
use libp2p::swarm::{self, NetworkBehaviour, PollParameters, Swarm};
use std::collections::{HashMap, HashSet, VecDeque};
use std::task::Waker;
use std::time::Duration;

#[derive(Clone, Debug)]
Expand All @@ -26,13 +27,18 @@ impl Disconnector {
}
}

// Currently this is swarm::NetworkBehaviourAction<Void, Void>
type NetworkBehaviourAction = swarm::NetworkBehaviourAction<<<<SwarmApi as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, <SwarmApi as NetworkBehaviour>::OutEvent>;

#[derive(Debug, Default)]
pub struct SwarmApi {
events: VecDeque<NetworkBehaviourAction<void::Void, void::Void>>,
events: VecDeque<NetworkBehaviourAction>,
peers: HashSet<PeerId>,
connect_registry: SubscriptionRegistry<Multiaddr, Result<(), String>>,
connections: HashMap<Multiaddr, Connection>,
connected_peers: HashMap<PeerId, Multiaddr>,
/// The waker of the last polled task, if any.
waker: Option<Waker>,
}

impl SwarmApi {
Expand Down Expand Up @@ -65,15 +71,23 @@ impl SwarmApi {
}

pub fn connect(&mut self, address: Multiaddr) -> SubscriptionFuture<Result<(), String>> {
log::trace!("connect {}", address.to_string());
self.events.push_back(NetworkBehaviourAction::DialAddress {
log::trace!("starting to connect to {}", address);
self.push_action(NetworkBehaviourAction::DialAddress {
address: address.clone(),
});
self.connect_registry.create_subscription(address)
}

fn push_action(&mut self, action: NetworkBehaviourAction) {
self.events.push_back(action);

if let Some(waker) = self.waker.as_ref() {
waker.wake_by_ref();
}
}

pub fn disconnect(&mut self, address: Multiaddr) -> Option<Disconnector> {
log::trace!("disconnect {}", address.to_string());
log::trace!("disconnect {}", address);
self.connections.remove(&address);
let peer_id = self
.connections
Expand Down Expand Up @@ -144,10 +158,13 @@ impl NetworkBehaviour for SwarmApi {
.finish_subscription(addr, Err(format!("{}", error)));
}

#[allow(clippy::type_complexity)]
fn poll(&mut self, _ctx: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<<<
Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>{
log::trace!("poll");
fn poll(
&mut self,
ctx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction> {
// store the poller so that we can wake the task on next push_action
self.waker = Some(ctx.waker().clone());
if let Some(event) = self.events.pop_front() {
Poll::Ready(event)
} else {
Expand Down
71 changes: 71 additions & 0 deletions tests/connect_two.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use async_std::task;

/// Make sure two instances of ipfs can be connected.
#[test]
fn connect_two_nodes() {
// env_logger::init();

// make sure the connection will only happen through explicit connect
let mdns = false;

let (tx, rx) = futures::channel::oneshot::channel();

let node_a = task::spawn(async move {
let opts = ipfs::IpfsOptions::inmemory_with_generated_keys(mdns);
let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts)
.await
.start()
.await
.unwrap();

let jh = task::spawn(fut);

let (pk, addrs) = ipfs
.identity()
.await
.expect("failed to read identity() on node_a");
assert!(!addrs.is_empty());
tx.send((pk, addrs, ipfs, jh)).unwrap();
});

task::block_on(async move {
let (other_pk, other_addrs, other_ipfs, other_jh) = rx.await.unwrap();

println!("got back from the other node: {:?}", other_addrs);

let opts = ipfs::IpfsOptions::inmemory_with_generated_keys(mdns);
let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts)
.await
.start()
.await
.unwrap();
let jh = task::spawn(fut);

let _other_peerid = other_pk.into_peer_id();

let mut connected = None;

for addr in other_addrs {
println!("trying {}", addr);
match ipfs.connect(addr.clone()).await {
Ok(_) => {
connected = Some(addr);
break;
}
Err(e) => {
println!("Failed connecting to {}: {}", addr, e);
}
}
Comment on lines +50 to +58
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial thinking for the connect was that it could be connect(PeerId, Vec<Multiaddr>) as by the looks of libp2p it might dial through all of the addresses. This will probably change somehow in the next version of libp2p because the multiple connections to a peer was merged libp2p/rust-libp2p#1440 and libp2p/rust-libp2p#1493.

}

let connected = connected.expect("Failed to connect to anything");
println!("connected to {}", connected);

other_ipfs.exit_daemon().await;
other_jh.await;
node_a.await;

ipfs.exit_daemon().await;
jh.await;
});
}