Skip to content
This repository has been archived by the owner on Jul 1, 2023. It is now read-only.

rust/: update dependencies #91

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
libp2p = { version = "0.50.0", default-features = false, features = ["async-std", "dns", "macros", "noise", "plaintext", "rsa", "tcp", "yamux"] }
futures_codec = "0.4"
libp2p = { version = "0.51.0", default-features = false, features = ["async-std", "dns", "macros", "noise", "plaintext", "rsa", "tcp", "yamux"] }
either = "1"
futures = "0.3.1"
async-std = { version = "1.12.0", features = ["attributes"] }
bytes = "1.3.0"
Expand Down
87 changes: 36 additions & 51 deletions rust/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::handler::{PerfHandler, PerfHandlerIn, PerfHandlerOut};
use libp2p::{
core::{connection::ConnectionId, transport::ListenerId, ConnectedPoint},
swarm::{
ConnectionHandler, DialError, IntoConnectionHandler, NetworkBehaviour,
NetworkBehaviourAction, NotifyHandler, PollParameters,
behaviour::{ConnectionEstablished, ListenerClosed, ListenerError},
ConnectionId, FromSwarm, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
PollParameters,
},
Multiaddr, PeerId,
};
Expand All @@ -13,12 +13,7 @@ use std::time::Duration;

#[derive(Default)]
pub struct Perf {
outbox: Vec<
NetworkBehaviourAction<
<Self as NetworkBehaviour>::OutEvent,
<Self as NetworkBehaviour>::ConnectionHandler,
>,
>,
outbox: Vec<NetworkBehaviourAction<PerfEvent, PerfHandlerIn>>,
}

impl NetworkBehaviour for Perf {
Expand All @@ -34,64 +29,54 @@ impl NetworkBehaviour for Perf {
vec![]
}

fn inject_connection_established(
&mut self,
peer_id: &PeerId,
_: &ConnectionId,
connected_point: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
_other_established: usize,
) {
if connected_point.is_dialer() {
self.outbox.push(NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id,
event: PerfHandlerIn::StartPerf,
handler: NotifyHandler::Any,
})
};
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(event) => {
let ConnectionEstablished {
endpoint, peer_id, ..
} = event;
if endpoint.is_dialer() {
self.outbox.push(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: PerfHandlerIn::StartPerf,
handler: NotifyHandler::Any,
})
};
}
FromSwarm::DialFailure(_) => {
panic!("inject dial failure");
}
FromSwarm::ListenerError(event) => {
let ListenerError { err, .. } = event;
panic!("listener error {:?}", err);
}
FromSwarm::ListenerClosed(event) => {
let ListenerClosed { reason, .. } = event;
panic!("listener closed {:?}", reason);
}
_ => {}
}
}

fn inject_event(
fn on_connection_handler_event(
&mut self,
_peer_id: PeerId,
_connection: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
_connection_id: ConnectionId,
event: PerfHandlerOut,
) {
match event {
//PerfHandlerOut::PerfRunDone(duration, transfered) => {},
PerfHandlerOut::PerfRunDone(duration, transfered) => self.outbox.push(
NetworkBehaviourAction::GenerateEvent(PerfEvent::PerfRunDone(duration, transfered)),
),
}
}

fn inject_dial_failure(
&mut self,
_peer_id: Option<PeerId>,
_handler: PerfHandler,
_error: &DialError,
) {
panic!("inject dial failure");
}

fn inject_new_listen_addr(&mut self, _: ListenerId, _addr: &Multiaddr) {}

fn inject_expired_listen_addr(&mut self, _: ListenerId, _addr: &Multiaddr) {}

fn inject_new_external_addr(&mut self, _addr: &Multiaddr) {}

fn inject_listener_error(&mut self, _id: ListenerId, err: &(dyn std::error::Error + 'static)) {
panic!("listener error {:?}", err);
}

fn inject_listener_closed(&mut self, _id: ListenerId, reason: Result<(), &std::io::Error>) {
panic!("listener closed {:?}", reason);
}

fn poll(
&mut self,
_cx: &mut Context,
_params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
) -> Poll<NetworkBehaviourAction<PerfEvent, PerfHandlerIn>> {
if let Some(action) = self.outbox.pop() {
return Poll::Ready(action);
}
Expand Down
2 changes: 1 addition & 1 deletion rust/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn main() {
loop {
match client.next().await.expect("Infinite stream.") {
SwarmEvent::Behaviour(e) => {
println!("{}", e);
println!("{e}");

// TODO: Fix hack
//
Expand Down
4 changes: 2 additions & 2 deletions rust/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ async fn main() {

poll_fn(|cx| loop {
match server.poll_next_unpin(cx) {
Poll::Ready(Some(e)) => println!("{:?}", e),
Poll::Ready(Some(e)) => println!("{e:?}"),
Poll::Ready(None) => panic!("Unexpected server termination."),
Poll::Pending => {
if !listening {
if let Some(a) = Swarm::listeners(&server).next() {
println!("Listening on {:?}.", a);
println!("Listening on {a:?}.");
listening = true;
}
}
Expand Down
71 changes: 36 additions & 35 deletions rust/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ use futures::stream::FuturesUnordered;
use libp2p::{
core::upgrade::{InboundUpgrade, OutboundUpgrade},
swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
NegotiatedSubstream, SubstreamProtocol,
handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError,
},
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream,
SubstreamProtocol,
},
};
use std::io;
Expand Down Expand Up @@ -242,33 +246,7 @@ impl ConnectionHandler for PerfHandler {
SubstreamProtocol::new(PerfProtocolConfig {}, ())
}

/// Injects the output of a successful upgrade on a new inbound substream.
fn inject_fully_negotiated_inbound(
&mut self,
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::InboundOpenInfo,
) {
self.perf_runs.push(PerfRun::new(PerfRunStream::Receiver(
substream,
vec![0; BUFFER_SIZE],
)));
}

/// Injects the output of a successful upgrade on a new outbound substream.
///
/// The second argument is the information that was previously passed to
/// [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
fn inject_fully_negotiated_outbound(
&mut self,
substream: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::OutboundOpenInfo,
) {
self.perf_runs
.push(PerfRun::new(PerfRunStream::Sender(substream)));
}

/// Injects an event coming from the outside in the handler.
fn inject_event(&mut self, event: Self::InEvent) {
fn on_behaviour_event(&mut self, event: Self::InEvent) {
match event {
PerfHandlerIn::StartPerf => {
self.outbox
Expand All @@ -279,15 +257,38 @@ impl ConnectionHandler for PerfHandler {
}
}

/// Indicates to the handler that upgrading a substream to the given protocol has failed.
fn inject_dial_upgrade_error(
fn on_connection_event(
&mut self,
_info: Self::OutboundOpenInfo,
error: ConnectionHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
panic!("{:?}", error);
match event {
ConnectionEvent::FullyNegotiatedInbound(event) => {
let FullyNegotiatedInbound { protocol, .. } = event;
self.perf_runs.push(PerfRun::new(PerfRunStream::Receiver(
protocol,
vec![0; BUFFER_SIZE],
)));
}
ConnectionEvent::FullyNegotiatedOutbound(event) => {
let FullyNegotiatedOutbound { protocol, .. } = event;
self.perf_runs
.push(PerfRun::new(PerfRunStream::Sender(protocol)));
}
ConnectionEvent::DialUpgradeError(event) => {
let DialUpgradeError { error, .. } = event;
panic!("{:?}", error);
}
ConnectionEvent::ListenUpgradeError(event) => {
let ListenUpgradeError { error, .. } = event;
panic!("listener upgrade error {:?}", error);
}
ConnectionEvent::AddressChange(_) => {}
}
}

/// Returns until when the connection should be kept alive.
Expand Down
18 changes: 6 additions & 12 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ mod handler;
mod protocol;

pub use behaviour::{Perf, PerfEvent};
use either::Either;
use futures::executor::block_on;

use libp2p::{
core::{
self,
either::{EitherOutput, EitherTransport},
muxing::StreamMuxerBox,
transport::{MemoryTransport, Transport},
upgrade::{InboundUpgradeExt, OptionalUpgrade, OutboundUpgradeExt, SelectUpgrade},
Expand Down Expand Up @@ -40,7 +40,7 @@ impl std::str::FromStr for TransportSecurity {

impl std::fmt::Display for TransportSecurity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
write!(f, "{self:?}")
}
}

Expand Down Expand Up @@ -138,9 +138,9 @@ pub fn build_transport(
};

let transport = if in_memory {
EitherTransport::Left(MemoryTransport::new())
Either::Left(MemoryTransport::new())
} else {
EitherTransport::Right(block_on(dns::DnsConfig::system(
Either::Right(block_on(dns::DnsConfig::system(
tcp::async_io::Transport::new(tcp::Config::new().nodelay(true)),
))?)
};
Expand All @@ -149,14 +149,8 @@ pub fn build_transport(
.upgrade(core::upgrade::Version::V1)
.authenticate(
transport_security_config
.map_inbound(move |result| match result {
EitherOutput::First((peer_id, o)) => (peer_id, EitherOutput::First(o)),
EitherOutput::Second((peer_id, o)) => (peer_id, EitherOutput::Second(o)),
})
.map_outbound(move |result| match result {
EitherOutput::First((peer_id, o)) => (peer_id, EitherOutput::First(o)),
EitherOutput::Second((peer_id, o)) => (peer_id, EitherOutput::Second(o)),
}),
.map_inbound(move |result| result.factor_first())
.map_outbound(move |result| result.factor_first()),
)
.multiplex(yamux_config)
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
Expand Down