Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an example showcasing how to implement a custom network behavior and access streams #4983

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"examples/chat",
"examples/dcutr",
"examples/distributed-key-value-store",
"examples/escape-hatch",
"examples/file-sharing",
"examples/identify",
"examples/ipfs-kad",
Expand Down
25 changes: 25 additions & 0 deletions examples/escape-hatch/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "escape-hatch"
version = "0.1.0"
edition = "2021"
publish = false
license = "MIT"

[package.metadata.release]
release = false

[dependencies]
async-std = { version = "1.12", features = ["attributes"] }
async-trait = "0.1"
futures = "0.3.29"
futures-timer = "3.0.2"
libp2p = { path = "../../libp2p", features = [ "async-std", "dns", "kad", "mdns", "noise", "macros", "tcp", "yamux"] }
libp2p-core = { workspace = true }
rand = "0.8"
tokio = { version = "1.33", features = ["macros", "net", "rt", "signal"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
void = "1.0"

[lints]
workspace = true
Empty file added examples/escape-hatch/README.md
Empty file.
68 changes: 68 additions & 0 deletions examples/escape-hatch/src/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use libp2p::swarm::{ConnectionHandler, SubstreamProtocol, ConnectionHandlerEvent};
use libp2p::StreamProtocol;
use libp2p_core::upgrade::ReadyUpgrade;
use std::task::Context;
use std::{task::Poll, time::Duration};
use void::Void;

use crate::{Config, Error, PROTOCOL_NAME};

pub struct Connection {
config: Config,
}

impl Connection {
pub fn new(config: Config) -> Self {
Self { config }
}
}

impl ConnectionHandler for Connection {
type FromBehaviour = Void;

type ToBehaviour = Result<Duration, Error>;

type InboundProtocol = ReadyUpgrade<StreamProtocol>;

type OutboundProtocol = ReadyUpgrade<StreamProtocol>;

type InboundOpenInfo = ();

type OutboundOpenInfo = ();

fn listen_protocol(
&self,
) -> libp2p::swarm::SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ())
}

fn on_behaviour_event(&mut self, _: Void) {}

fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
libp2p::swarm::ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
>,
> {
let protocol = self.listen_protocol();
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol,
});
}

fn on_connection_event(
&mut self,
event: libp2p::swarm::handler::ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
todo!()
}
}
173 changes: 173 additions & 0 deletions examples/escape-hatch/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use futures::channel::oneshot;
use handler::Connection;
use libp2p::swarm::{
ConnectionDenied, ConnectionId, FromSwarm, SubstreamProtocol, THandler, THandlerOutEvent,
ToSwarm,
};
use libp2p::{swarm::NetworkBehaviour, tcp::tokio::TcpStream, PeerId, Stream, StreamProtocol};
use libp2p_core::upgrade::ReadyUpgrade;
use libp2p_core::{Endpoint, Multiaddr};
use std::task::Context;
use std::{collections::VecDeque, io, task::Poll, time::Duration};
use std::sync::mpsc;

mod handler;

pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/ping/1.0.0");

#[derive(Debug)]
pub enum Error {
NotConnected(PeerId),
UnsupportedProtocol,
Io(io::Error),
}

/// Event generated by the `Ping` network behaviour.
#[derive(Debug)]
pub struct Event {
/// The peer ID of the remote.
pub peer: PeerId,
/// The connection the ping was executed on.
pub connection: ConnectionId,
/// The result of an inbound or outbound ping.
pub result: Result<Duration, Error>,
}

/// A behaviour that manages requests to open new streams which are directly handed to the user.
pub struct Behaviour {
/// Config
config: Config,
/// Queue of events to yield to the swarm.
events: VecDeque<Event>,
/// Protocol
protocol: StreamProtocol,
}

#[derive(Debug, Clone)]
pub struct Config {
/// The timeout of an outbound ping.
timeout: Duration,
/// The duration between outbound pings.
interval: Duration,
}

struct StreamMessage {
peer_id: PeerId, // Assuming PeerId is a type representing a peer
response_channel: oneshot::Sender<Result<Stream, Error>>, // For sending back the result
}

pub struct IncomingStreams {
/// Queue of events to yield to the swarm.
events: VecDeque<Event>,
}

/// A control acts as a "remote" that allows users to request streams without interacting with the `Swarm` directly.

pub struct Control {
sender: mpsc::Sender<StreamMessage>,
receiver: mpsc::Receiver<StreamMessage>
}

impl IncomingStreams {
pub async fn next(&mut self) -> (PeerId, Stream) {
if let Some(e) = self.events.pop_back() {
} else {
}

todo!()
Comment on lines +63 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to Control::open_stream, this needs to listen on an mpsc::Receiver where the mpsc::Sender lives in Behaviour. Any inbound streams from the ConnectionHandler need to be sent this one.

}

pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<(PeerId, Stream)> {
todo!()
}
}

impl Behaviour {
pub fn new(protocol: StreamProtocol) -> (Self, Control, IncomingStreams) {
let (sender, receiver) = mpsc::channel::<StreamMessage>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The receiver needs to go into Behaviour so you can call poll_next on it and actually act on the message from the Control.

let behaviour = Self {
config: Config {
timeout: Duration::from_secs(1),
interval: Duration::from_secs(1),
},
events: VecDeque::new(),
protocol,
};

(
behaviour,
Control {
sender,
receiver
},
IncomingStreams {
events: VecDeque::new(),
},
)
}
}

impl Control {
pub async fn open_stream(&self, peer_id: PeerId) -> Result<Stream, Error> {
let (response_tx, response_rx) = oneshot::channel();
let message = StreamMessage {
peer_id,
response_channel: response_tx,
};
// Send the message to NetworkBehaviour
self.sender.send(message).expect("Failed to send message");
response_rx.await.expect("Failed to receive response")
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Connection;

type ToSwarm = Event;

fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Connection::new(self.config.clone()))
}

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Connection::new(self.config.clone()))
}

fn on_swarm_event(&mut self, _event: FromSwarm) {}

fn on_connection_handler_event(
&mut self,
peer: PeerId,
connection: ConnectionId,
result: THandlerOutEvent<Self>,
) {
self.events.push_front(Event {
peer,
connection,
result,
})
}

fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
if let Some(e) = self.events.pop_back() {
Poll::Ready(ToSwarm::GenerateEvent(e))
} else {
Poll::Pending
}
}
}
15 changes: 15 additions & 0 deletions examples/escape-hatch/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use escape_hatch::Behaviour;
use futures::prelude::*;
use futures::StreamExt;
use libp2p::StreamProtocol;
use libp2p_core::{
multiaddr::multiaddr,
transport::{memory::MemoryTransport, ListenerId, Transport},
};
use rand::{thread_rng, Rng};

#[tokio::main]
async fn main() {
let stream_protocol = StreamProtocol::new("/escape-hatch/");
let (behaviour, control, incoming_stream) = Behaviour::new(stream_protocol);
}
1 change: 1 addition & 0 deletions examples/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tokio = { version = "1", features = ["full"] }
tracing = "0.1.37"
tracing-opentelemetry = "0.21.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-appender = "0.2"

[lints]
workspace = true
Loading