Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implementation of a relay manager #96

Merged
merged 22 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
fa7d829
chore: Placeholder for libp2p-relay-manager
dariusc93 Sep 3, 2023
ff6bbcb
chore: Basic implementation
dariusc93 Sep 4, 2023
20efd1b
chore: Remove pending selection
dariusc93 Sep 5, 2023
bedb50a
chore: Use connection id and emit event for address expiring
dariusc93 Sep 5, 2023
8d8f3b3
chore: Update Cargo.lock
dariusc93 Sep 5, 2023
005f5bb
chore: Add example
dariusc93 Sep 5, 2023
3a4e2f3
chore: Updated example
dariusc93 Sep 6, 2023
38d8a52
chore: Use correct address for external confirmation
dariusc93 Sep 6, 2023
c826923
chore: Simplify code and remove select option
dariusc93 Sep 6, 2023
9023b2f
chore: Simplify the state
dariusc93 Sep 7, 2023
7ae2e24
chore: Remove pending reservation, simplify logic, and update example
dariusc93 Sep 8, 2023
22aa62d
chore: Add function to list stored relays
dariusc93 Sep 10, 2023
7b54d10
feat: Implement relay functions
dariusc93 Sep 10, 2023
c8d2cd9
chore: Emit events
dariusc93 Sep 10, 2023
0ed0ecc
chore: Return peer id for random relays
dariusc93 Sep 10, 2023
b06204c
chore: Return results to oneshot
dariusc93 Sep 10, 2023
0e0b046
chore: return error
dariusc93 Sep 10, 2023
ace54e8
chore: Update pubsub example
dariusc93 Sep 10, 2023
c2b318f
Merge branch 'libp2p-next' into feat/relay-manager-r0
dariusc93 Sep 10, 2023
4ad6da6
chore: Cleanup, notation and added small placeholder for future imple…
dariusc93 Sep 11, 2023
2c676b2
chore: Cleanup and remove pending connection
dariusc93 Sep 11, 2023
97e5ef2
chore: Format code
dariusc93 Sep 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
597 changes: 568 additions & 29 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ libipld = "0.16"
clap = { version = "4.3", features = ["derive"] }
rust-ipns = { version = "0.1", path = "packages/rust-ipns" }
chrono = { version = "0.4" }
libp2p-relay-manager = { version = "0.0", path = "packages/libp2p-relay-manager" }

[dependencies]
anyhow = "1.0"
Expand All @@ -45,6 +46,7 @@ hash_hasher = "2.0.3"
rust-unixfs = { workspace = true }

rust-ipns = { workspace = true, optional = true }
libp2p-relay-manager = { workspace = true }

chrono.workspace = true

Expand All @@ -71,7 +73,7 @@ libp2p = { features = [
"serde",
"request-response",
"rendezvous",
"quic"
"quic",
], workspace = true }

libp2p-mplex = "0.40"
Expand Down
102 changes: 25 additions & 77 deletions examples/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use clap::Parser;
use futures::{channel::mpsc, pin_mut, FutureExt};
use futures::{pin_mut, FutureExt};
use libipld::ipld;
use libp2p::{futures::StreamExt, swarm::SwarmEvent};
use rust_ipfs::{BehaviourEvent, Ipfs, Protocol, PubsubEvent};
use libp2p::futures::StreamExt;
use libp2p::Multiaddr;
use rust_ipfs::p2p::MultiaddrExt;
use rust_ipfs::{Ipfs, PubsubEvent};

use rust_ipfs::UninitializedIpfsNoop as UninitializedIpfs;

Expand All @@ -21,6 +23,8 @@ struct Opt {
#[clap(long)]
use_relay: bool,
#[clap(long)]
relay_addrs: Vec<Multiaddr>,
#[clap(long)]
use_upnp: bool,
#[clap(long)]
topic: Option<String>,
Expand Down Expand Up @@ -52,91 +56,35 @@ async fn main() -> anyhow::Result<()> {
uninitialized = uninitialized.enable_upnp();
}

let (tx, mut rx) = mpsc::unbounded();
let ipfs: Ipfs = uninitialized
.swarm_events({
move |_, event| {
if let SwarmEvent::Behaviour(BehaviourEvent::Autonat(
libp2p::autonat::Event::StatusChanged { new, .. },
)) = event
{
match new {
libp2p::autonat::NatStatus::Public(_) => {
let _ = tx.unbounded_send(true);
}
libp2p::autonat::NatStatus::Private
| libp2p::autonat::NatStatus::Unknown => {
let _ = tx.unbounded_send(false);
}
}
}
}
})
.start()
.await?;


let ipfs: Ipfs = uninitialized.start().await?;

let identity = ipfs.identity(None).await?;
let peer_id = identity.peer_id;
let (mut rl, mut stdout) = Readline::new(format!("{peer_id} >"))?;

ipfs.default_bootstrap().await?;

if opt.bootstrap {
ipfs.default_bootstrap().await?;
tokio::spawn({
let ipfs = ipfs.clone();
async move { if let Err(_e) = ipfs.bootstrap().await {} }
});
if let Err(_e) = ipfs.bootstrap().await {}
}

let cancel = Arc::new(Notify::new());
if opt.use_relay {
//Until autorelay is implemented and/or functions to use relay more directly, we will manually listen to the relays (using libp2p bootstrap, though you can add your own)
tokio::spawn({
let ipfs = ipfs.clone();
let mut stdout = stdout.clone();
let cancel = cancel.clone();
async move {
let mut listening_addrs = vec![];
let mut relay_used = false;
loop {
let flag = tokio::select! {
flag = rx.next() => {
flag.unwrap_or_default()
},
_ = cancel.notified() => break
};

match flag {
true => {
if relay_used {
writeln!(stdout, "Disabling Relay...")?;
for addr in listening_addrs.drain(..) {
if let Err(_e) = ipfs.remove_listening_address(addr).await {}
}
relay_used = false;
}
}
false => {
if !relay_used {
writeln!(stdout, "Enabling Relay...")?;
for addr in ipfs.get_bootstraps().await? {
let circuit = addr.with(Protocol::P2pCircuit);
if let Ok(addr) =
ipfs.add_listening_address(circuit.clone()).await
{
listening_addrs.push(addr)
}
}
relay_used = !listening_addrs.is_empty();
}
}
}
}
let bootstrap_nodes = ipfs.get_bootstraps().await.expect("Bootstrap exist");
let addrs = opt
.relay_addrs
.iter()
.chain(bootstrap_nodes.iter())
.cloned();

for mut addr in addrs {
let peer_id = addr.extract_peer_id().expect("Bootstrap to contain peer id");
ipfs.add_relay(peer_id, addr).await?;
}

Ok::<_, anyhow::Error>(())
}
});
if let Err(e) = ipfs.enable_relay(None).await {
writeln!(stdout, "> Error selecting a relay: {e}")?;
}
}

let mut event_stream = ipfs.pubsub_events(&topic).await?;
Expand Down
29 changes: 29 additions & 0 deletions packages/libp2p-relay-manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "libp2p-relay-manager"
version = "0.0.0"
edition = "2021"
license = "Apache-2.0 OR MIT"
description = "(WIP) Implementation of a relay-manager"
repository = "https://github.com/dariusc93/rust-ipfs"
readme = "README.md"
keywords = ["libp2p", "p2p", "networking"]
authors = ["Darius Clark"]
exclude = [".gitignore"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
libp2p = { workspace = true, features = ["relay"] }
thiserror = "1.0"
anyhow = "1.0"
futures = "0.3"
futures-timer = "3.0"
log = "0.4"
void = "1.0"
rand = "0.8"

[dev-dependencies]
libp2p = { version = "0.52", features = ["full"] }
async-std = { version = "1", features = ["attributes"] }
clap = { version = "4.0", features = ["derive"] }
env_logger = "0.10"
9 changes: 9 additions & 0 deletions packages/libp2p-relay-manager/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# libp2p-relay-manager

Basic implementation of a relay manager

Note: This crate is a WIP and bound to change.

## License

This crate is licensed under MIT or Apache 2.0.
Loading