diff --git a/src/lib.rs b/src/lib.rs index 2585ba859..923dad752 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,6 +105,18 @@ impl fmt::Debug for IpfsOptions { } } +impl IpfsOptions { + /// Creates an inmemory store backed node for tests + pub fn inmemory_with_generated_keys(mdns: bool) -> Self { + Self::new( + std::env::temp_dir().into(), + Keypair::generate_ed25519(), + vec![], + mdns, + ) + } +} + /// Workaround for libp2p::identity::Keypair missing a Debug impl, works with references and owned /// keypairs. #[derive(Clone)] @@ -407,84 +419,98 @@ impl Future for IpfsFuture { fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll { use futures::Stream; - use libp2p::Swarm; + use libp2p::{swarm::SwarmEvent, Swarm}; + + // begin by polling the swarm so that initially it'll first have chance to bind listeners + // and such. TODO: this no longer needs to be a swarm event but perhaps we should + // consolidate logging of these events here, if necessary? loop { - // temporary pinning of the receivers should be safe as we are pinning through the - // already pinned self. with the receivers we can also safely ignore exhaustion - // as those are fused. - loop { - let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) { - Poll::Ready(Some(evt)) => evt, - // doing teardown also after the `Ipfs` has been dropped - Poll::Ready(None) => IpfsEvent::Exit, + let inner = { + let next = self.swarm.next_event(); + futures::pin_mut!(next); + match next.poll(ctx) { + Poll::Ready(inner) => inner, Poll::Pending => break, - }; - - match inner { - IpfsEvent::Connect(addr, ret) => { - let fut = self.swarm.connect(addr); - task::spawn(async move { - let res = fut.await.map_err(|err| format_err!("{}", err)); - ret.send(res).ok(); - }); - } - IpfsEvent::Addresses(ret) => { - let addrs = self.swarm.addrs(); - ret.send(Ok(addrs)).ok(); - } - IpfsEvent::Listeners(ret) => { - let listeners = Swarm::listeners(&self.swarm).cloned().collect(); - ret.send(Ok(listeners)).ok(); - } - IpfsEvent::Connections(ret) => { - let connections = self.swarm.connections(); - ret.send(Ok(connections)).ok(); - } - IpfsEvent::Disconnect(addr, ret) => { - if let Some(disconnector) = self.swarm.disconnect(addr) { - disconnector.disconnect(&mut self.swarm); - } - ret.send(Ok(())).ok(); - } - IpfsEvent::GetAddresses(ret) => { - // perhaps this could be moved under `IpfsEvent` or free functions? - let mut addresses = Vec::new(); - addresses.extend(Swarm::listeners(&self.swarm).cloned()); - addresses.extend(Swarm::external_addresses(&self.swarm).cloned()); - // ignore error, perhaps caller went away already - let _ = ret.send(addresses); - } - IpfsEvent::Exit => { - // FIXME: we could do a proper teardown - return Poll::Ready(()); - } } + }; + match inner { + SwarmEvent::Behaviour(()) => {} + SwarmEvent::Connected(_peer_id) => {} + SwarmEvent::Disconnected(_peer_id) => {} + SwarmEvent::NewListenAddr(_addr) => {} + SwarmEvent::ExpiredListenAddr(_addr) => {} + SwarmEvent::UnreachableAddr { + peer_id: _peer_id, + address: _address, + error: _error, + } => {} + SwarmEvent::StartConnect(_peer_id) => {} } + } - // Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy - // wants this to be written with a `while let`. - while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) { - match evt { - RepoEvent::WantBlock(cid) => self.swarm.want_block(cid), - RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid), - RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid), - } - } + // temporary pinning of the receivers should be safe as we are pinning through the + // already pinned self. with the receivers we can also safely ignore exhaustion + // as those are fused. + loop { + let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) { + Poll::Ready(Some(evt)) => evt, + // doing teardown also after the `Ipfs` has been dropped + Poll::Ready(None) => IpfsEvent::Exit, + Poll::Pending => break, + }; - { - let poll = Pin::new(&mut self.swarm).poll_next(ctx); - match poll { - Poll::Ready(Some(_)) => {} - Poll::Ready(None) => { - // this should never happen with libp2p swarm - return Poll::Ready(()); - } - Poll::Pending => { - return Poll::Pending; + match inner { + IpfsEvent::Connect(addr, ret) => { + let fut = self.swarm.connect(addr); + task::spawn(async move { + let res = fut.await.map_err(|err| format_err!("{}", err)); + ret.send(res).ok(); + }); + } + IpfsEvent::Addresses(ret) => { + let addrs = self.swarm.addrs(); + ret.send(Ok(addrs)).ok(); + } + IpfsEvent::Listeners(ret) => { + let listeners = Swarm::listeners(&self.swarm).cloned().collect(); + ret.send(Ok(listeners)).ok(); + } + IpfsEvent::Connections(ret) => { + let connections = self.swarm.connections(); + ret.send(Ok(connections)).ok(); + } + IpfsEvent::Disconnect(addr, ret) => { + if let Some(disconnector) = self.swarm.disconnect(addr) { + disconnector.disconnect(&mut self.swarm); } + ret.send(Ok(())).ok(); + } + IpfsEvent::GetAddresses(ret) => { + // perhaps this could be moved under `IpfsEvent` or free functions? + let mut addresses = Vec::new(); + addresses.extend(Swarm::listeners(&self.swarm).cloned()); + addresses.extend(Swarm::external_addresses(&self.swarm).cloned()); + // ignore error, perhaps caller went away already + let _ = ret.send(addresses); + } + IpfsEvent::Exit => { + // FIXME: we could do a proper teardown + return Poll::Ready(()); } } } + + // Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy + // wants this to be written with a `while let`. + while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) { + match evt { + RepoEvent::WantBlock(cid) => self.swarm.want_block(cid), + RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid), + RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid), + } + } + + Poll::Pending } } diff --git a/src/p2p/swarm.rs b/src/p2p/swarm.rs index a73b599b5..42c4383c9 100644 --- a/src/p2p/swarm.rs +++ b/src/p2p/swarm.rs @@ -4,8 +4,9 @@ use libp2p::core::{ConnectedPoint, Multiaddr, PeerId}; use libp2p::swarm::protocols_handler::{ DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler, }; -use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, Swarm}; +use libp2p::swarm::{self, NetworkBehaviour, PollParameters, Swarm}; use std::collections::{HashMap, HashSet, VecDeque}; +use std::task::Waker; use std::time::Duration; #[derive(Clone, Debug)] @@ -26,13 +27,18 @@ impl Disconnector { } } +// Currently this is swarm::NetworkBehaviourAction +type NetworkBehaviourAction = swarm::NetworkBehaviourAction<<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, ::OutEvent>; + #[derive(Debug, Default)] pub struct SwarmApi { - events: VecDeque>, + events: VecDeque, peers: HashSet, connect_registry: SubscriptionRegistry>, connections: HashMap, connected_peers: HashMap, + /// The waker of the last polled task, if any. + waker: Option, } impl SwarmApi { @@ -65,15 +71,23 @@ impl SwarmApi { } pub fn connect(&mut self, address: Multiaddr) -> SubscriptionFuture> { - log::trace!("connect {}", address.to_string()); - self.events.push_back(NetworkBehaviourAction::DialAddress { + log::trace!("starting to connect to {}", address); + self.push_action(NetworkBehaviourAction::DialAddress { address: address.clone(), }); self.connect_registry.create_subscription(address) } + fn push_action(&mut self, action: NetworkBehaviourAction) { + self.events.push_back(action); + + if let Some(waker) = self.waker.as_ref() { + waker.wake_by_ref(); + } + } + pub fn disconnect(&mut self, address: Multiaddr) -> Option { - log::trace!("disconnect {}", address.to_string()); + log::trace!("disconnect {}", address); self.connections.remove(&address); let peer_id = self .connections @@ -144,10 +158,13 @@ impl NetworkBehaviour for SwarmApi { .finish_subscription(addr, Err(format!("{}", error))); } - #[allow(clippy::type_complexity)] - fn poll(&mut self, _ctx: &mut Context, _: &mut impl PollParameters) -> Poll::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>{ - log::trace!("poll"); + fn poll( + &mut self, + ctx: &mut Context, + _: &mut impl PollParameters, + ) -> Poll { + // store the poller so that we can wake the task on next push_action + self.waker = Some(ctx.waker().clone()); if let Some(event) = self.events.pop_front() { Poll::Ready(event) } else { diff --git a/tests/connect_two.rs b/tests/connect_two.rs new file mode 100644 index 000000000..3298237ba --- /dev/null +++ b/tests/connect_two.rs @@ -0,0 +1,71 @@ +use async_std::task; + +/// Make sure two instances of ipfs can be connected. +#[test] +fn connect_two_nodes() { + // env_logger::init(); + + // make sure the connection will only happen through explicit connect + let mdns = false; + + let (tx, rx) = futures::channel::oneshot::channel(); + + let node_a = task::spawn(async move { + let opts = ipfs::IpfsOptions::inmemory_with_generated_keys(mdns); + let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts) + .await + .start() + .await + .unwrap(); + + let jh = task::spawn(fut); + + let (pk, addrs) = ipfs + .identity() + .await + .expect("failed to read identity() on node_a"); + assert!(!addrs.is_empty()); + tx.send((pk, addrs, ipfs, jh)).unwrap(); + }); + + task::block_on(async move { + let (other_pk, other_addrs, other_ipfs, other_jh) = rx.await.unwrap(); + + println!("got back from the other node: {:?}", other_addrs); + + let opts = ipfs::IpfsOptions::inmemory_with_generated_keys(mdns); + let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts) + .await + .start() + .await + .unwrap(); + let jh = task::spawn(fut); + + let _other_peerid = other_pk.into_peer_id(); + + let mut connected = None; + + for addr in other_addrs { + println!("trying {}", addr); + match ipfs.connect(addr.clone()).await { + Ok(_) => { + connected = Some(addr); + break; + } + Err(e) => { + println!("Failed connecting to {}: {}", addr, e); + } + } + } + + let connected = connected.expect("Failed to connect to anything"); + println!("connected to {}", connected); + + other_ipfs.exit_daemon().await; + other_jh.await; + node_a.await; + + ipfs.exit_daemon().await; + jh.await; + }); +}