From 129e330e72199d0e1b3cbed9e03d8b855c4a61f3 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Mon, 6 Feb 2023 10:08:58 -0500 Subject: [PATCH 1/3] refactor: Use libp2p-nat for handling portmapping --- Cargo.toml | 2 +- src/igd.rs | 103 ------------------------------------------- src/lib.rs | 61 +------------------------ src/p2p/behaviour.rs | 4 ++ src/p2p/mod.rs | 5 +++ 5 files changed, 12 insertions(+), 163 deletions(-) delete mode 100644 src/igd.rs diff --git a/Cargo.toml b/Cargo.toml index dbaa7a4fa..8de816ada 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ void = { default-features = false, version = "1.0" } fs2 = "0.4" sled = "0.34" once_cell = "1.16" -igd-next = { version = "0.13", features = ["aio_tokio"] } +libp2p-nat = { version = "0.1.1" } prost = { default-features = false, version = "0.11" } diff --git a/src/igd.rs b/src/igd.rs deleted file mode 100644 index 9cfff2106..000000000 --- a/src/igd.rs +++ /dev/null @@ -1,103 +0,0 @@ -use anyhow::Error; -use futures::channel::oneshot; -use igd_next::{aio, PortMappingProtocol, SearchOptions}; -use libp2p::multiaddr::Protocol; -use libp2p::Multiaddr; -use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; -use std::time::Duration; -use tokio::sync::broadcast::Receiver; -use tokio::time::{self, Instant}; -use tracing::debug; - -pub(crate) fn multiaddr_to_socket_port( - addr: Multiaddr, -) -> Result<(SocketAddr, u16, PortMappingProtocol), Error> { - let mut iter = addr.iter(); - let mut addr = iter - .next() - .and_then(|proto| match proto { - Protocol::Ip4(addr) if addr.is_private() => { - Some(SocketAddr::V4(SocketAddrV4::new(addr, 0))) - } - Protocol::Ip6(addr) - if !addr.is_loopback() - && (addr.segments()[0] & 0xffc0) != 0xfe80 - && (addr.segments()[0] & 0xfe00) != 0xfc00 => - { - Some(SocketAddr::V6(SocketAddrV6::new(addr, 0, 0, 0))) - } - _ => None, - }) - .ok_or_else(|| anyhow::anyhow!("Invalid address type"))?; - - let (protocol, port) = iter - .next() - .and_then(|proto| match proto { - Protocol::Tcp(port) => Some((PortMappingProtocol::TCP, port)), - Protocol::Udp(port) => Some((PortMappingProtocol::UDP, port)), - _ => None, - }) - .ok_or_else(|| anyhow::anyhow!("Invalid protocol type"))?; - - addr.set_port(port); - - Ok((addr, port, protocol)) -} - -pub(crate) fn forward_port( - addr: Multiaddr, - lease_interval: Duration, - mut termination_rx: Receiver<()>, -) -> Option>> { - - if multiaddr_to_socket_port(addr.clone()).is_err() { - return None; - } - - let lease_interval = lease_interval.min(Duration::from_secs(u32::MAX.into())); - let lease_interval_u32 = lease_interval.as_secs() as u32; - let (tx, rx) = oneshot::channel(); - - // Start a tokio task to renew the lease periodically. - tokio::spawn(async move { - match add_or_renewal_port(addr.clone(), lease_interval_u32).await { - Ok(_) => { - let _ = tx.send(Ok(())); - } - Err(e) => { - let _ = tx.send(Err(e)); - return; - } - }; - let mut timer = time::interval_at(Instant::now() + lease_interval, lease_interval); - - loop { - tokio::select! { - _e = termination_rx.recv() => { - debug!("Terminate renewal task"); - break; - }, - _ = timer.tick() => { - debug!("Renewing lease for {}", addr); - - if let Err(error) = add_or_renewal_port(addr.clone(), lease_interval_u32).await { - //TODO: Do we want to break loop to end the task or should we continue to retry on an interval? - error!("Failed to renew lease: {}", error); - } - } - }; - } - }); - Some(rx) -} - -pub(crate) async fn add_or_renewal_port(addr: Multiaddr, lease_duration: u32) -> Result<(), Error> { - let (local_addr, ext_port, protocol) = multiaddr_to_socket_port(addr)?; - let gateway = aio::search_gateway(SearchOptions::default()).await?; - - gateway - .add_port(protocol, ext_port, local_addr, lease_duration, "rust-ipfs") - .await?; - - Ok(()) -} diff --git a/src/lib.rs b/src/lib.rs index 0f70dd6e5..1cad429b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,8 +33,6 @@ pub mod repo; mod subscription; pub mod unixfs; -mod igd; - #[macro_use] extern crate tracing; @@ -47,7 +45,7 @@ use futures::{ }, sink::SinkExt, stream::{BoxStream, Fuse, Stream}, - FutureExt, StreamExt, + StreamExt, }; use ipfs_bitswap::BitswapEvent; @@ -55,7 +53,7 @@ use p2p::{ IdentifyConfiguration, KadStoreConfig, PeerInfo, ProviderStream, RecordStream, RelayConfig, }; use subscription::SubscriptionRegistry; -use tokio::{sync::broadcast, task::JoinHandle}; +use tokio::task::JoinHandle; use tracing::Span; use tracing_futures::Instrument; use unixfs::UnixfsStatus; @@ -567,7 +565,6 @@ impl UninitializedIpfs { let listener_subscriptions = Default::default(); let listeners = Default::default(); let bootstraps = Default::default(); - let mapping_task = HashMap::new(); let IpfsOptions { listening_addrs, .. @@ -588,9 +585,6 @@ impl UninitializedIpfs { autonat_counter, bootstraps, swarm_event, - mapping_task, - mapping_pending: Default::default(), - port_mapping: options.port_mapping, }; for addr in listening_addrs.into_iter() { @@ -1672,15 +1666,6 @@ struct IpfsFuture { autonat_counter: Arc, bootstraps: HashSet, swarm_event: Option, - mapping_task: HashMap>, - mapping_pending: HashMap< - Multiaddr, - ( - futures::channel::oneshot::Receiver>, - broadcast::Sender<()>, - ), - >, - port_mapping: bool, } impl Future for IpfsFuture { @@ -1720,19 +1705,6 @@ impl Future for IpfsFuture { self.listening_addresses .insert(address.clone(), listener_id); - if self.port_mapping - && address.iter().any(|p| matches!(p, Protocol::Ip4(_))) - { - let (tx, rx) = broadcast::channel(1); - if let Some(forward_rx) = igd::forward_port( - address.clone(), - std::time::Duration::from_secs(2 * 60), - rx, - ) { - self.mapping_pending - .insert(address.clone(), (forward_rx, tx)); - } - } self.listener_subscriptions .finish_subscription(listener_id.into(), Ok(Some(Some(address)))); } @@ -1742,9 +1714,6 @@ impl Future for IpfsFuture { } => { self.listeners.remove(&listener_id); self.listening_addresses.remove(&address); - if let Some(tx) = self.mapping_task.remove(&address) { - let _ = tx.send(()); - } self.listener_subscriptions .finish_subscription(listener_id.into(), Ok(Some(None))); } @@ -1756,9 +1725,6 @@ impl Future for IpfsFuture { self.listeners.remove(&listener_id); for address in addresses { self.listening_addresses.remove(&address); - if let Some(tx) = self.mapping_task.remove(&address) { - let _ = tx.send(()); - } } let reason = reason.map(|_| Some(None)).map_err(|e| e.to_string()); self.listener_subscriptions @@ -2655,29 +2621,6 @@ impl Future for IpfsFuture { } } - let pending_addr_list = self.mapping_pending.keys().cloned().collect::>(); - - for address in pending_addr_list { - if let Entry::Occupied(mut entry) = self.mapping_pending.entry(address.clone()) { - let result = { - let (nat_rx, _) = entry.get_mut(); - match Pin::new(nat_rx).poll_unpin(ctx) { - Poll::Ready(r) => Some(r), - Poll::Pending => continue, - } - }; - - if let Some(result) = result { - let (_, tx) = entry.remove(); - if let Ok(Ok(_)) = result { - self.mapping_task.insert(address.clone(), tx); - } - } - } - } - - self.mapping_pending.shrink_to_fit(); - done = true; } } diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index ab7c38682..0f503f936 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -43,6 +43,7 @@ pub struct Behaviour { pub keepalive: Toggle, pub pubsub: GossipsubStream, pub autonat: autonat::Behaviour, + pub upnp: Toggle, pub relay: Toggle, pub relay_client: Toggle, pub dcutr: Toggle, @@ -360,6 +361,8 @@ impl Behaviour { .then(|| Relay::new(peer_id, relay_config)), ); + let upnp = Toggle::from(options.portmapping.then_some(libp2p_nat::Behaviour::new().await?)); + let (transport, relay_client) = match options.relay { true => { let (transport, client) = RelayClient::new_transport_and_behaviour(peer_id); @@ -382,6 +385,7 @@ impl Behaviour { dcutr, relay, relay_client, + upnp, }, transport, )) diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index 4f72f7fe8..e72e18dfe 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -124,6 +124,8 @@ pub struct SwarmOptions { /// Kad store config /// Note: Only supports MemoryStoreConfig at this time pub kad_store_config: Option, + /// UPnP/PortMapping + pub portmapping: bool, /// Keep alive pub keep_alive: bool, /// Relay client @@ -149,6 +151,8 @@ impl From<&IpfsOptions> for SwarmOptions { let keep_alive = options.keep_alive; let identify_config = options.identify_configuration.clone(); + let portmapping = options.port_mapping; + SwarmOptions { keypair, peer_id, @@ -164,6 +168,7 @@ impl From<&IpfsOptions> for SwarmOptions { ping_config, keep_alive, identify_config, + portmapping, } } } From 321d9a0f62385e1dda74e23ccb8648c29138805d Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Wed, 8 Feb 2023 21:54:45 -0500 Subject: [PATCH 2/3] chore: chaange libp2p-nat version to 0.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8de816ada..7a367f5fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ void = { default-features = false, version = "1.0" } fs2 = "0.4" sled = "0.34" once_cell = "1.16" -libp2p-nat = { version = "0.1.1" } +libp2p-nat = { version = "0.1" } prost = { default-features = false, version = "0.11" } From 6dfe8c1e7389df987a278675a07bd2ac652aab65 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Wed, 8 Feb 2023 21:56:56 -0500 Subject: [PATCH 3/3] chore: Updated changelog --- CHANGELOG.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f77479118..d251d83eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ +# 0.3.0-alpha.3 [unreleased] +- feat/upnp: Use libp2p-nat for handling port forwarding [PR 23] + +[PR 23]: https://github.com/dariusc93/rust-ipfs/pull/23 + # 0.3.0-alpha.2 -- fix: Poll receiving oneshot channel directly [PR #20] +- fix: Poll receiving oneshot channel directly [PR 20] [PR 20]: https://github.com/dariusc93/rust-ipfs/pull/20