Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an example showcasing how to implement a custom network behavior and access streams #4983

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"examples/chat",
"examples/dcutr",
"examples/distributed-key-value-store",
"examples/escape-hatch",
"examples/file-sharing",
"examples/identify",
"examples/ipfs-kad",
Expand Down
25 changes: 25 additions & 0 deletions examples/escape-hatch/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "escape-hatch"
version = "0.1.0"
edition = "2021"
publish = false
license = "MIT"

[package.metadata.release]
release = false

[dependencies]
async-std = { version = "1.12", features = ["attributes"] }
async-trait = "0.1"
futures = "0.3.29"
futures-timer = "3.0.2"
libp2p = { path = "../../libp2p", features = [ "async-std", "ping"] }
libp2p-core = { workspace = true }
rand = "0.8"
tokio = { version = "1.33", features = ["macros", "net", "rt", "signal"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
void = "1.0"

[lints]
workspace = true
Empty file added examples/escape-hatch/README.md
Empty file.
78 changes: 78 additions & 0 deletions examples/escape-hatch/src/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use libp2p::swarm::{ConnectionHandler, SubstreamProtocol, ConnectionHandlerEvent};
use libp2p::StreamProtocol;
use libp2p_core::upgrade::ReadyUpgrade;
use std::task::Context;
use std::{task::Poll, time::Duration};
use void::Void;

use crate::{Config, Error, PROTOCOL_NAME};

pub struct Connection {
config: Config,
}

impl Connection {
pub fn new(config: Config) -> Self {
Self { config }
}
}

impl ConnectionHandler for Connection {
type FromBehaviour = Void;

type ToBehaviour = Result<Duration, Error>;

type InboundProtocol = ReadyUpgrade<StreamProtocol>;

type OutboundProtocol = ReadyUpgrade<StreamProtocol>;

type InboundOpenInfo = ();

type OutboundOpenInfo = ();

fn listen_protocol(
&self,
) -> libp2p::swarm::SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ())
}

fn on_behaviour_event(&mut self, _: Void) {}

fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
libp2p::swarm::ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
>,
> {
let protocol = self.listen_protocol();
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol,
});
}

fn on_connection_event(
&mut self,
event: libp2p::swarm::handler::ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedInbound(_) => todo!(),
libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedOutbound(_) => todo!(),
libp2p::swarm::handler::ConnectionEvent::AddressChange(_) => todo!(),
libp2p::swarm::handler::ConnectionEvent::DialUpgradeError(_) => todo!(),
libp2p::swarm::handler::ConnectionEvent::ListenUpgradeError(_) => todo!(),
libp2p::swarm::handler::ConnectionEvent::LocalProtocolsChange(_) => todo!(),
libp2p::swarm::handler::ConnectionEvent::RemoteProtocolsChange(_) => todo!(),
_ => todo!(),
}
todo!()
}
}
161 changes: 161 additions & 0 deletions examples/escape-hatch/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use futures::channel::oneshot;
use handler::Connection;
use libp2p::swarm::{
ConnectionDenied, ConnectionId, FromSwarm, SubstreamProtocol, THandler, THandlerOutEvent,
ToSwarm,
};
use libp2p::{swarm::NetworkBehaviour, PeerId, Stream, StreamProtocol};
use libp2p_core::upgrade::ReadyUpgrade;
use libp2p_core::{Endpoint, Multiaddr};
use std::task::Context;
use std::{collections::VecDeque, io, task::Poll, time::Duration};
use std::sync::mpsc;

mod handler;

pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/ping/1.0.0");

#[derive(Debug)]
pub enum Error {
NotConnected(PeerId),
UnsupportedProtocol,
Io(io::Error),
}

/// A behaviour that manages requests to open new streams which are directly handed to the user.
pub struct Behaviour {
/// Config
config: Config,
/// Queue of events to yield to the swarm.
events: VecDeque<Event>,
/// Protocol
protocol: StreamProtocol,

receiver: mpsc::Receiver<Event>
}

#[derive(Debug, Clone)]
pub struct Config {
/// The timeout of an outbound ping.
timeout: Duration,
/// The duration between outbound pings.
interval: Duration,
}

pub struct Event {
pub peer_id: PeerId, // Assuming PeerId is a type representing a peer
pub response_channel: oneshot::Sender<Result<Stream, Error>>, // For sending back the result
}

pub struct IncomingStreams {
/// Queue of events to yield to the swarm.
events: VecDeque<Event>,
}

/// A control acts as a "remote" that allows users to request streams without interacting with the `Swarm` directly.

pub struct Control {
sender: mpsc::Sender<Event>,
}

impl IncomingStreams {
pub async fn next(&mut self) -> (PeerId, Stream) {
if let Some(e) = self.events.pop_back() {
} else {
}

todo!()
Comment on lines +63 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to Control::open_stream, this needs to listen on an mpsc::Receiver where the mpsc::Sender lives in Behaviour. Any inbound streams from the ConnectionHandler need to be sent this one.

}

pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<(PeerId, Stream)> {
todo!()
}
}

impl Behaviour {
pub fn new(protocol: StreamProtocol) -> (Self, Control, IncomingStreams) {
let (sender, receiver) = mpsc::channel::<Event>();
let behaviour = Self {
config: Config {
timeout: Duration::from_secs(1),
interval: Duration::from_secs(1),
},
events: VecDeque::new(),
protocol,
receiver
};

(
behaviour,
Control {
sender,
},
IncomingStreams {
events: VecDeque::new(),
},
)
}
}

impl Control {
pub async fn open_stream(&self, peer_id: PeerId) -> Result<Stream, Error> {
let (response_tx, response_rx) = oneshot::channel();
let message = Event {
peer_id,
response_channel: response_tx,
};
// Send the message to NetworkBehaviour
self.sender.send(message).expect("Failed to send message");
response_rx.await.expect("Failed to receive response")
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Connection;

type ToSwarm = Event;

fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Connection::new(self.config.clone()))
}

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Connection::new(self.config.clone()))
}

fn on_swarm_event(&mut self, _event: FromSwarm) {}

fn on_connection_handler_event(
&mut self,
peer: PeerId,
connection: ConnectionId,
result: THandlerOutEvent<Self>,
) {
todo!()
}

fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
let Ok(message) = self.receiver.recv() else {
return Poll::Pending;
};
let stream = SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ());

// message.response_channel.send(t)
Poll::Ready(ToSwarm::GenerateEvent(message))
}
}
15 changes: 15 additions & 0 deletions examples/escape-hatch/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use escape_hatch::Behaviour;
use futures::prelude::*;
use futures::StreamExt;
use libp2p::StreamProtocol;
use libp2p_core::{
multiaddr::multiaddr,
transport::{memory::MemoryTransport, ListenerId, Transport},
};
use rand::{thread_rng, Rng};

#[tokio::main]
async fn main() {
let stream_protocol = StreamProtocol::new("/escape-hatch/");
let (behaviour, control, incoming_stream) = Behaviour::new(stream_protocol);
}
Loading