Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use libp2p-nat for handling portmapping #23

Merged
merged 3 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# 0.3.0-alpha.3 [unreleased]
- feat/upnp: Use libp2p-nat for handling port forwarding [PR 23]

[PR 23]: https://github.com/dariusc93/rust-ipfs/pull/23

# 0.3.0-alpha.2
- fix: Poll receiving oneshot channel directly [PR #20]
- fix: Poll receiving oneshot channel directly [PR 20]

[PR 20]: https://github.com/dariusc93/rust-ipfs/pull/20

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void = { default-features = false, version = "1.0" }
fs2 = "0.4"
sled = "0.34"
once_cell = "1.16"
igd-next = { version = "0.13", features = ["aio_tokio"] }
libp2p-nat = { version = "0.1" }

prost = { default-features = false, version = "0.11" }

Expand Down
103 changes: 0 additions & 103 deletions src/igd.rs

This file was deleted.

61 changes: 2 additions & 59 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ pub mod repo;
mod subscription;
pub mod unixfs;

mod igd;

#[macro_use]
extern crate tracing;

Expand All @@ -47,15 +45,15 @@ use futures::{
},
sink::SinkExt,
stream::{BoxStream, Fuse, Stream},
FutureExt, StreamExt,
StreamExt,
};

use ipfs_bitswap::BitswapEvent;
use p2p::{
IdentifyConfiguration, KadStoreConfig, PeerInfo, ProviderStream, RecordStream, RelayConfig,
};
use subscription::SubscriptionRegistry;
use tokio::{sync::broadcast, task::JoinHandle};
use tokio::task::JoinHandle;
use tracing::Span;
use tracing_futures::Instrument;
use unixfs::UnixfsStatus;
Expand Down Expand Up @@ -567,7 +565,6 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
let listener_subscriptions = Default::default();
let listeners = Default::default();
let bootstraps = Default::default();
let mapping_task = HashMap::new();

let IpfsOptions {
listening_addrs, ..
Expand All @@ -588,9 +585,6 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
autonat_counter,
bootstraps,
swarm_event,
mapping_task,
mapping_pending: Default::default(),
port_mapping: options.port_mapping,
};

for addr in listening_addrs.into_iter() {
Expand Down Expand Up @@ -1672,15 +1666,6 @@ struct IpfsFuture<Types: IpfsTypes> {
autonat_counter: Arc<AtomicU64>,
bootstraps: HashSet<MultiaddrWithPeerId>,
swarm_event: Option<TSwarmEventFn>,
mapping_task: HashMap<Multiaddr, broadcast::Sender<()>>,
mapping_pending: HashMap<
Multiaddr,
(
futures::channel::oneshot::Receiver<Result<(), Error>>,
broadcast::Sender<()>,
),
>,
port_mapping: bool,
}

impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
Expand Down Expand Up @@ -1720,19 +1705,6 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
self.listening_addresses
.insert(address.clone(), listener_id);

if self.port_mapping
&& address.iter().any(|p| matches!(p, Protocol::Ip4(_)))
{
let (tx, rx) = broadcast::channel(1);
if let Some(forward_rx) = igd::forward_port(
address.clone(),
std::time::Duration::from_secs(2 * 60),
rx,
) {
self.mapping_pending
.insert(address.clone(), (forward_rx, tx));
}
}
self.listener_subscriptions
.finish_subscription(listener_id.into(), Ok(Some(Some(address))));
}
Expand All @@ -1742,9 +1714,6 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
} => {
self.listeners.remove(&listener_id);
self.listening_addresses.remove(&address);
if let Some(tx) = self.mapping_task.remove(&address) {
let _ = tx.send(());
}
self.listener_subscriptions
.finish_subscription(listener_id.into(), Ok(Some(None)));
}
Expand All @@ -1756,9 +1725,6 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
self.listeners.remove(&listener_id);
for address in addresses {
self.listening_addresses.remove(&address);
if let Some(tx) = self.mapping_task.remove(&address) {
let _ = tx.send(());
}
}
let reason = reason.map(|_| Some(None)).map_err(|e| e.to_string());
self.listener_subscriptions
Expand Down Expand Up @@ -2655,29 +2621,6 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
}
}

let pending_addr_list = self.mapping_pending.keys().cloned().collect::<Vec<_>>();

for address in pending_addr_list {
if let Entry::Occupied(mut entry) = self.mapping_pending.entry(address.clone()) {
let result = {
let (nat_rx, _) = entry.get_mut();
match Pin::new(nat_rx).poll_unpin(ctx) {
Poll::Ready(r) => Some(r),
Poll::Pending => continue,
}
};

if let Some(result) = result {
let (_, tx) = entry.remove();
if let Ok(Ok(_)) = result {
self.mapping_task.insert(address.clone(), tx);
}
}
}
}

self.mapping_pending.shrink_to_fit();

done = true;
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct Behaviour {
pub keepalive: Toggle<KeepAliveBehaviour>,
pub pubsub: GossipsubStream,
pub autonat: autonat::Behaviour,
pub upnp: Toggle<libp2p_nat::Behaviour>,
pub relay: Toggle<Relay>,
pub relay_client: Toggle<RelayClient>,
pub dcutr: Toggle<Dcutr>,
Expand Down Expand Up @@ -360,6 +361,8 @@ impl Behaviour {
.then(|| Relay::new(peer_id, relay_config)),
);

let upnp = Toggle::from(options.portmapping.then_some(libp2p_nat::Behaviour::new().await?));

let (transport, relay_client) = match options.relay {
true => {
let (transport, client) = RelayClient::new_transport_and_behaviour(peer_id);
Expand All @@ -382,6 +385,7 @@ impl Behaviour {
dcutr,
relay,
relay_client,
upnp,
},
transport,
))
Expand Down
5 changes: 5 additions & 0 deletions src/p2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ pub struct SwarmOptions {
/// Kad store config
/// Note: Only supports MemoryStoreConfig at this time
pub kad_store_config: Option<KadStoreConfig>,
/// UPnP/PortMapping
pub portmapping: bool,
/// Keep alive
pub keep_alive: bool,
/// Relay client
Expand All @@ -149,6 +151,8 @@ impl From<&IpfsOptions> for SwarmOptions {

let keep_alive = options.keep_alive;
let identify_config = options.identify_configuration.clone();
let portmapping = options.port_mapping;

SwarmOptions {
keypair,
peer_id,
Expand All @@ -164,6 +168,7 @@ impl From<&IpfsOptions> for SwarmOptions {
ping_config,
keep_alive,
identify_config,
portmapping,
}
}
}
Expand Down