Skip to content

Commit

Permalink
Traffic statistics (paritytech#4017)
Browse files Browse the repository at this point in the history
* Network stats

* Fixed tests
  • Loading branch information
arkpar authored and gavofyork committed Nov 5, 2019
1 parent 8e945c7 commit 44a0d9e
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 171 deletions.
33 changes: 15 additions & 18 deletions core/network/src/legacy_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
use crate::{DiscoveryNetBehaviour, config::ProtocolId};
use crate::legacy_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn};
use crate::legacy_proto::upgrade::RegisteredProtocol;
use crate::protocol::message::Message;
use bytes::BytesMut;
use fnv::FnvHashMap;
use futures::prelude::*;
use futures03::{compat::Compat, TryFutureExt as _, StreamExt as _, TryStreamExt as _};
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use log::{debug, error, trace, warn};
use rand::distributions::{Distribution as _, Uniform};
use sr_primitives::traits::Block as BlockT;
use smallvec::SmallVec;
use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, pin::Pin};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -61,9 +60,9 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// Note that this "banning" system is not an actual ban. If a "banned" node tries to connect to
/// us, we accept the connection. The "banning" system is only about delaying dialing attempts.
///
pub struct LegacyProto<B: BlockT, TSubstream> {
pub struct LegacyProto< TSubstream> {
/// List of protocols to open with peers. Never modified.
protocol: RegisteredProtocol<B>,
protocol: RegisteredProtocol,

/// Receiver for instructions about who to connect to or disconnect from.
peerset: peerset::Peerset,
Expand All @@ -80,7 +79,7 @@ pub struct LegacyProto<B: BlockT, TSubstream> {
next_incoming_index: peerset::IncomingIndex,

/// Events to produce from `poll()`.
events: SmallVec<[NetworkBehaviourAction<CustomProtoHandlerIn<B>, LegacyProtoOut<B>>; 4]>,
events: SmallVec<[NetworkBehaviourAction<CustomProtoHandlerIn, LegacyProtoOut>; 4]>,

/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
Expand Down Expand Up @@ -189,7 +188,7 @@ struct IncomingPeer {

/// Event that can be emitted by the `LegacyProto`.
#[derive(Debug)]
pub enum LegacyProtoOut<B: BlockT> {
pub enum LegacyProtoOut {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Version of the protocol that has been opened.
Expand All @@ -213,7 +212,7 @@ pub enum LegacyProtoOut<B: BlockT> {
/// Id of the peer the message came from.
peer_id: PeerId,
/// Message that has been received.
message: Message<B>,
message: BytesMut,
},

/// The substream used by the protocol is pretty large. We should print avoid sending more
Expand All @@ -222,11 +221,11 @@ pub enum LegacyProtoOut<B: BlockT> {
/// Id of the peer which is clogged.
peer_id: PeerId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Message<B>>,
messages: Vec<Vec<u8>>,
},
}

impl<B: BlockT, TSubstream> LegacyProto<B, TSubstream> {
impl<TSubstream> LegacyProto<TSubstream> {
/// Creates a `CustomProtos`.
pub fn new(
protocol: impl Into<ProtocolId>,
Expand Down Expand Up @@ -350,8 +349,7 @@ impl<B: BlockT, TSubstream> LegacyProto<B, TSubstream> {
///
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
pub fn send_packet(&mut self, target: &PeerId, message: Message<B>)
where B: BlockT {
pub fn send_packet(&mut self, target: &PeerId, message: Vec<u8>) {
if !self.is_open(target) {
return;
}
Expand Down Expand Up @@ -607,7 +605,7 @@ impl<B: BlockT, TSubstream> LegacyProto<B, TSubstream> {
}
}

impl<B: BlockT, TSubstream> DiscoveryNetBehaviour for LegacyProto<B, TSubstream> {
impl<TSubstream> DiscoveryNetBehaviour for LegacyProto<TSubstream> {
fn add_discovered_nodes(&mut self, peer_ids: impl Iterator<Item = PeerId>) {
self.peerset.discovered(peer_ids.into_iter().map(|peer_id| {
debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id);
Expand All @@ -616,13 +614,12 @@ impl<B: BlockT, TSubstream> DiscoveryNetBehaviour for LegacyProto<B, TSubstream>
}
}

impl<B, TSubstream> NetworkBehaviour for LegacyProto<B, TSubstream>
impl<TSubstream> NetworkBehaviour for LegacyProto<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
B: BlockT,
{
type ProtocolsHandler = CustomProtoHandlerProto<B, TSubstream>;
type OutEvent = LegacyProtoOut<B>;
type ProtocolsHandler = CustomProtoHandlerProto<TSubstream>;
type OutEvent = LegacyProtoOut;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
CustomProtoHandlerProto::new(self.protocol.clone())
Expand Down Expand Up @@ -825,7 +822,7 @@ where
fn inject_node_event(
&mut self,
source: PeerId,
event: CustomProtoHandlerOut<B>,
event: CustomProtoHandlerOut,
) {
match event {
CustomProtoHandlerOut::CustomProtocolClosed { reason } => {
Expand Down Expand Up @@ -954,7 +951,7 @@ where
_params: &mut impl PollParameters,
) -> Async<
NetworkBehaviourAction<
CustomProtoHandlerIn<B>,
CustomProtoHandlerIn,
Self::OutEvent,
>,
> {
Expand Down
78 changes: 37 additions & 41 deletions core/network/src/legacy_proto/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use crate::legacy_proto::upgrade::{RegisteredProtocol, RegisteredProtocolEvent, RegisteredProtocolSubstream};
use crate::protocol::message::Message;
use bytes::BytesMut;
use futures::prelude::*;
use futures03::{compat::Compat, TryFutureExt as _};
use futures_timer::Delay;
Expand All @@ -29,7 +29,6 @@ use libp2p::swarm::{
SubstreamProtocol,
};
use log::{debug, error};
use sr_primitives::traits::Block as BlockT;
use smallvec::{smallvec, SmallVec};
use std::{borrow::Cow, error, fmt, io, marker::PhantomData, mem, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -88,36 +87,34 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// We consider that we are now "closed" if the remote closes all the existing substreams.
/// Re-opening it can then be performed by closing all active substream and re-opening one.
///
pub struct CustomProtoHandlerProto<B, TSubstream> {
pub struct CustomProtoHandlerProto<TSubstream> {
/// Configuration for the protocol upgrade to negotiate.
protocol: RegisteredProtocol<B>,
protocol: RegisteredProtocol,

/// Marker to pin the generic type.
marker: PhantomData<TSubstream>,
}

impl<B, TSubstream> CustomProtoHandlerProto<B, TSubstream>
impl<TSubstream> CustomProtoHandlerProto<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
B: BlockT,
{
/// Builds a new `CustomProtoHandlerProto`.
pub fn new(protocol: RegisteredProtocol<B>) -> Self {
pub fn new(protocol: RegisteredProtocol) -> Self {
CustomProtoHandlerProto {
protocol,
marker: PhantomData,
}
}
}

impl<B, TSubstream> IntoProtocolsHandler for CustomProtoHandlerProto<B, TSubstream>
impl<TSubstream> IntoProtocolsHandler for CustomProtoHandlerProto<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
B: BlockT,
{
type Handler = CustomProtoHandler<B, TSubstream>;
type Handler = CustomProtoHandler<TSubstream>;

fn inbound_protocol(&self) -> RegisteredProtocol<B> {
fn inbound_protocol(&self) -> RegisteredProtocol {
self.protocol.clone()
}

Expand All @@ -136,12 +133,12 @@ where
}

/// The actual handler once the connection has been established.
pub struct CustomProtoHandler<B: BlockT, TSubstream> {
pub struct CustomProtoHandler<TSubstream> {
/// Configuration for the protocol upgrade to negotiate.
protocol: RegisteredProtocol<B>,
protocol: RegisteredProtocol,

/// State of the communications with the remote.
state: ProtocolState<B, TSubstream>,
state: ProtocolState<TSubstream>,

/// Identifier of the node we're talking to. Used only for logging purposes and shouldn't have
/// any influence on the behaviour.
Expand All @@ -155,15 +152,15 @@ pub struct CustomProtoHandler<B: BlockT, TSubstream> {
///
/// This queue must only ever be modified to insert elements at the back, or remove the first
/// element.
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol<B>, (), CustomProtoHandlerOut<B>>; 16]>,
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol, (), CustomProtoHandlerOut>; 16]>,
}

/// State of the handler.
enum ProtocolState<B, TSubstream> {
enum ProtocolState<TSubstream> {
/// Waiting for the behaviour to tell the handler whether it is enabled or disabled.
Init {
/// List of substreams opened by the remote but that haven't been processed yet.
substreams: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 6]>,
substreams: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>,
/// Deadline after which the initialization is abnormally long.
init_deadline: Compat<Delay>,
},
Expand All @@ -179,17 +176,17 @@ enum ProtocolState<B, TSubstream> {
/// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
Normal {
/// The substreams where bidirectional communications happen.
substreams: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 4]>,
substreams: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 4]>,
/// Contains substreams which are being shut down.
shutdown: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 4]>,
shutdown: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 4]>,
},

/// We are disabled. Contains substreams that are being closed.
/// If we are in this state, either we have sent a `CustomProtocolClosed` message to the
/// outside or we have never sent any `CustomProtocolOpen` in the first place.
Disabled {
/// List of substreams to shut down.
shutdown: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 6]>,
shutdown: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>,

/// If true, we should reactivate the handler after all the substreams in `shutdown` have
/// been closed.
Expand All @@ -210,7 +207,7 @@ enum ProtocolState<B, TSubstream> {

/// Event that can be received by a `CustomProtoHandler`.
#[derive(Debug)]
pub enum CustomProtoHandlerIn<B: BlockT> {
pub enum CustomProtoHandlerIn {
/// The node should start using custom protocols.
Enable,

Expand All @@ -220,13 +217,13 @@ pub enum CustomProtoHandlerIn<B: BlockT> {
/// Sends a message through a custom protocol substream.
SendCustomMessage {
/// The message to send.
message: Message<B>,
message: Vec<u8>,
},
}

/// Event that can be emitted by a `CustomProtoHandler`.
#[derive(Debug)]
pub enum CustomProtoHandlerOut<B: BlockT> {
pub enum CustomProtoHandlerOut {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Version of the protocol that has been opened.
Expand All @@ -242,14 +239,14 @@ pub enum CustomProtoHandlerOut<B: BlockT> {
/// Receives a message on a custom protocol substream.
CustomMessage {
/// Message that has been received.
message: Message<B>,
message: BytesMut,
},

/// A substream to the remote is clogged. The send buffer is very large, and we should print
/// a diagnostic message and/or avoid sending more data.
Clogged {
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Message<B>>,
messages: Vec<Vec<u8>>,
},

/// An error has happened on the protocol level with this node.
Expand All @@ -261,10 +258,9 @@ pub enum CustomProtoHandlerOut<B: BlockT> {
},
}

impl<B, TSubstream> CustomProtoHandler<B, TSubstream>
impl<TSubstream> CustomProtoHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
B: BlockT,
{
/// Enables the handler.
fn enable(&mut self) {
Expand Down Expand Up @@ -342,7 +338,7 @@ where
/// Polls the state for events. Optionally returns an event to produce.
#[must_use]
fn poll_state(&mut self)
-> Option<ProtocolsHandlerEvent<RegisteredProtocol<B>, (), CustomProtoHandlerOut<B>>> {
-> Option<ProtocolsHandlerEvent<RegisteredProtocol, (), CustomProtoHandlerOut>> {
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
Expand Down Expand Up @@ -471,7 +467,7 @@ where
/// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`.
fn inject_fully_negotiated(
&mut self,
mut substream: RegisteredProtocolSubstream<B, TSubstream>
mut substream: RegisteredProtocolSubstream<TSubstream>
) {
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
Expand Down Expand Up @@ -516,7 +512,7 @@ where
}

/// Sends a message to the remote.
fn send_message(&mut self, message: Message<B>) {
fn send_message(&mut self, message: Vec<u8>) {
match self.state {
ProtocolState::Normal { ref mut substreams, .. } =>
substreams[0].send_message(message),
Expand All @@ -527,14 +523,14 @@ where
}
}

impl<B, TSubstream> ProtocolsHandler for CustomProtoHandler<B, TSubstream>
where TSubstream: AsyncRead + AsyncWrite, B: BlockT {
type InEvent = CustomProtoHandlerIn<B>;
type OutEvent = CustomProtoHandlerOut<B>;
impl<TSubstream> ProtocolsHandler for CustomProtoHandler<TSubstream>
where TSubstream: AsyncRead + AsyncWrite {
type InEvent = CustomProtoHandlerIn;
type OutEvent = CustomProtoHandlerOut;
type Substream = TSubstream;
type Error = ConnectionKillError;
type InboundProtocol = RegisteredProtocol<B>;
type OutboundProtocol = RegisteredProtocol<B>;
type InboundProtocol = RegisteredProtocol;
type OutboundProtocol = RegisteredProtocol;
type OutboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
Expand All @@ -556,7 +552,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT {
self.inject_fully_negotiated(proto);
}

fn inject_event(&mut self, message: CustomProtoHandlerIn<B>) {
fn inject_event(&mut self, message: CustomProtoHandlerIn) {
match message {
CustomProtoHandlerIn::Disable => self.disable(),
CustomProtoHandlerIn::Enable => self.enable(),
Expand Down Expand Up @@ -613,7 +609,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT {
}
}

impl<B: BlockT, TSubstream> fmt::Debug for CustomProtoHandler<B, TSubstream>
impl<TSubstream> fmt::Debug for CustomProtoHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
Expand All @@ -625,9 +621,9 @@ where

/// Given a list of substreams, tries to shut them down. The substreams that have been successfully
/// shut down are removed from the list.
fn shutdown_list<B, TSubstream>
(list: &mut SmallVec<impl smallvec::Array<Item = RegisteredProtocolSubstream<B, TSubstream>>>)
where TSubstream: AsyncRead + AsyncWrite, B: BlockT {
fn shutdown_list<TSubstream>
(list: &mut SmallVec<impl smallvec::Array<Item = RegisteredProtocolSubstream<TSubstream>>>)
where TSubstream: AsyncRead + AsyncWrite {
'outer: for n in (0..list.len()).rev() {
let mut substream = list.swap_remove(n);
loop {
Expand Down
Loading

0 comments on commit 44a0d9e

Please sign in to comment.