Skip to content

Commit

Permalink
feat: introduce libp2p-stream
Browse files Browse the repository at this point in the history
For a while now, `rust-libp2p` provided the `request-response` abstraction which makes it easy for users to build request-response based protocols without having to implement a `NetworkBehaviour` themselves. This PR introduces an alpha version of `libp2p-stream`: a `NetworkBehaviour` that directly gives access to negotiated streams.

In addition to complementing `request-response`, `libp2p-stream` also diverges in its design from the remaining modules by offering a clonable `Control` that provides `async` functions.

Resolves: libp2p#4457.

Pull-Request: libp2p#5027.
  • Loading branch information
thomaseizinger authored Jan 16, 2024
0 parents commit d650aee
Show file tree
Hide file tree
Showing 10 changed files with 830 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 0.1.0-alpha

Initial release.
27 changes: 27 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "libp2p-stream"
version = "0.1.0-alpha"
edition = "2021"
rust-version.workspace = true
description = "Generic stream protocols for libp2p"
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
futures = "0.3.29"
libp2p-core = { workspace = true }
libp2p-identity = { workspace = true, features = ["peerid"] }
libp2p-swarm = { workspace = true }
tracing = "0.1.37"
void = "1"
rand = "0.8"

[dev-dependencies]
libp2p-swarm-test = { workspace = true }
tokio = { version = "1", features = ["full"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[lints]
workspace = true
69 changes: 69 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Generic (stream) protocols

This module provides a generic [`NetworkBehaviour`](libp2p_swarm::NetworkBehaviour) for stream-oriented protocols.
Streams are the fundamental primitive of libp2p and all other protocols are implemented using streams.
In contrast to other [`NetworkBehaviour`](libp2p_swarm::NetworkBehaviour)s, this module takes a different design approach.
All interaction happens through a [`Control`] that can be obtained via [`Behaviour::new_control`].
[`Control`]s can be cloned and thus shared across your application.

## Inbound

To accept streams for a particular [`StreamProtocol`](libp2p_swarm::StreamProtocol) using this module, use [`Control::accept`]:

### Example

```rust,no_run
# fn main() {
# use libp2p_swarm::{Swarm, StreamProtocol};
# use libp2p_stream as stream;
# use futures::StreamExt as _;
let mut swarm: Swarm<stream::Behaviour> = todo!();
let mut control = swarm.behaviour().new_control();
let mut incoming = control.accept(StreamProtocol::new("/my-protocol")).unwrap();
let handler_future = async move {
while let Some((peer, stream)) = incoming.next().await {
// Execute your protocol using `stream`.
}
};
# }
```

### Resource management

[`Control::accept`] returns you an instance of [`IncomingStreams`].
This struct implements [`Stream`](futures::Stream) and like other streams, is lazy.
You must continuously poll it to make progress.
In the example above, this taken care of by using the [`StreamExt::next`](futures::StreamExt::next) helper.

Internally, we will drop streams if your application falls behind in processing these incoming streams, i.e. if whatever loop calls `.next()` is not fast enough.

### Drop

As soon as you drop [`IncomingStreams`], the protocol will be de-registered.
Any further attempt by remote peers to open a stream using the provided protocol will result in a negotiation error.

## Outbound

To open a new outbound stream for a particular protocol, use [`Control::open_stream`].

### Example

```rust,no_run
# fn main() {
# use libp2p_swarm::{Swarm, StreamProtocol};
# use libp2p_stream as stream;
# use libp2p_identity::PeerId;
let mut swarm: Swarm<stream::Behaviour> = todo!();
let peer_id: PeerId = todo!();
let mut control = swarm.behaviour().new_control();
let protocol_future = async move {
let stream = control.open_stream(peer_id, StreamProtocol::new("/my-protocol")).await.unwrap();
// Execute your protocol here using `stream`.
};
# }
```
143 changes: 143 additions & 0 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use core::fmt;
use std::{
sync::{Arc, Mutex},
task::{Context, Poll},
};

use futures::{channel::mpsc, StreamExt};
use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::{
self as swarm, dial_opts::DialOpts, ConnectionDenied, ConnectionId, FromSwarm,
NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use swarm::{
behaviour::ConnectionEstablished, dial_opts::PeerCondition, ConnectionClosed, DialError,
DialFailure,
};

use crate::{handler::Handler, shared::Shared, Control};

/// A generic behaviour for stream-oriented protocols.
pub struct Behaviour {
shared: Arc<Mutex<Shared>>,
dial_receiver: mpsc::Receiver<PeerId>,
}

impl Default for Behaviour {
fn default() -> Self {
Self::new()
}
}

impl Behaviour {
pub fn new() -> Self {
let (dial_sender, dial_receiver) = mpsc::channel(0);

Self {
shared: Arc::new(Mutex::new(Shared::new(dial_sender))),
dial_receiver,
}
}

/// Obtain a new [`Control`].
pub fn new_control(&self) -> Control {
Control::new(self.shared.clone())
}
}

/// The protocol is already registered.
#[derive(Debug)]
pub struct AlreadyRegistered;

impl fmt::Display for AlreadyRegistered {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "The protocol is already registered")
}
}

impl std::error::Error for AlreadyRegistered {}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Handler;
type ToSwarm = ();

fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(
peer,
self.shared.clone(),
Shared::lock(&self.shared).receiver(peer, connection_id),
))
}

fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(
peer,
self.shared.clone(),
Shared::lock(&self.shared).receiver(peer, connection_id),
))
}

fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
..
}) => Shared::lock(&self.shared).on_connection_established(connection_id, peer_id),
FromSwarm::ConnectionClosed(ConnectionClosed { connection_id, .. }) => {
Shared::lock(&self.shared).on_connection_closed(connection_id)
}
FromSwarm::DialFailure(DialFailure {
peer_id: Some(peer_id),
error:
error @ (DialError::Transport(_)
| DialError::Denied { .. }
| DialError::NoAddresses
| DialError::WrongPeerId { .. }),
..
}) => {
let reason = error.to_string(); // We can only forward the string repr but it is better than nothing.

Shared::lock(&self.shared).on_dial_failure(peer_id, reason)
}
_ => {}
}
}

fn on_connection_handler_event(
&mut self,
_peer_id: PeerId,
_connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
void::unreachable(event);
}

fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Poll::Ready(Some(peer)) = self.dial_receiver.poll_next_unpin(cx) {
return Poll::Ready(ToSwarm::Dial {
opts: DialOpts::peer_id(peer)
.condition(PeerCondition::DisconnectedAndNotDialing)
.build(),
});
}

Poll::Pending
}
}
124 changes: 124 additions & 0 deletions src/control.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use core::fmt;
use std::{
io,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};

use crate::AlreadyRegistered;
use crate::{handler::NewStream, shared::Shared};

use futures::{
channel::{mpsc, oneshot},
SinkExt as _, StreamExt as _,
};
use libp2p_identity::PeerId;
use libp2p_swarm::{Stream, StreamProtocol};

/// A (remote) control for opening new streams and registration of inbound protocols.
///
/// A [`Control`] can be cloned and thus allows for concurrent access.
#[derive(Clone)]
pub struct Control {
shared: Arc<Mutex<Shared>>,
}

impl Control {
pub(crate) fn new(shared: Arc<Mutex<Shared>>) -> Self {
Self { shared }
}

/// Attempt to open a new stream for the given protocol and peer.
///
/// In case we are currently not connected to the peer, we will attempt to make a new connection.
///
/// ## Backpressure
///
/// [`Control`]s support backpressure similarly to bounded channels:
/// Each [`Control`] has a guaranteed slot for internal messages.
/// A single control will always open one stream at a time which is enforced by requiring `&mut self`.
///
/// This backpressure mechanism breaks if you clone [`Control`]s excessively.
pub async fn open_stream(
&mut self,
peer: PeerId,
protocol: StreamProtocol,
) -> Result<Stream, OpenStreamError> {
tracing::debug!(%peer, "Requesting new stream");

let mut new_stream_sender = Shared::lock(&self.shared).sender(peer);

let (sender, receiver) = oneshot::channel();

new_stream_sender
.send(NewStream { protocol, sender })
.await
.map_err(|e| io::Error::new(io::ErrorKind::ConnectionReset, e))?;

let stream = receiver
.await
.map_err(|e| io::Error::new(io::ErrorKind::ConnectionReset, e))??;

Ok(stream)
}

/// Accept inbound streams for the provided protocol.
///
/// To stop accepting streams, simply drop the returned [`IncomingStreams`] handle.
pub fn accept(
&mut self,
protocol: StreamProtocol,
) -> Result<IncomingStreams, AlreadyRegistered> {
Shared::lock(&self.shared).accept(protocol)
}
}

/// Errors while opening a new stream.
#[derive(Debug)]
#[non_exhaustive]
pub enum OpenStreamError {
/// The remote does not support the requested protocol.
UnsupportedProtocol(StreamProtocol),
/// IO Error that occurred during the protocol handshake.
Io(std::io::Error),
}

impl From<std::io::Error> for OpenStreamError {
fn from(v: std::io::Error) -> Self {
Self::Io(v)
}
}

impl fmt::Display for OpenStreamError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
OpenStreamError::UnsupportedProtocol(p) => {
write!(f, "failed to open stream: remote peer does not support {p}")
}
OpenStreamError::Io(e) => {
write!(f, "failed to open stream: io error: {e}")
}
}
}
}

/// A handle to inbound streams for a particular protocol.
#[must_use = "Streams do nothing unless polled."]
pub struct IncomingStreams {
receiver: mpsc::Receiver<(PeerId, Stream)>,
}

impl IncomingStreams {
pub(crate) fn new(receiver: mpsc::Receiver<(PeerId, Stream)>) -> Self {
Self { receiver }
}
}

impl futures::Stream for IncomingStreams {
type Item = (PeerId, Stream);

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_next_unpin(cx)
}
}
Loading

0 comments on commit d650aee

Please sign in to comment.