Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Traffic statistics #4017

Merged
merged 2 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions core/network/src/legacy_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
use crate::{DiscoveryNetBehaviour, config::ProtocolId};
use crate::legacy_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn};
use crate::legacy_proto::upgrade::RegisteredProtocol;
use crate::protocol::message::Message;
use bytes::BytesMut;
use fnv::FnvHashMap;
use futures::prelude::*;
use futures03::{compat::Compat, TryFutureExt as _, StreamExt as _, TryStreamExt as _};
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use log::{debug, error, trace, warn};
use rand::distributions::{Distribution as _, Uniform};
use sr_primitives::traits::Block as BlockT;
use smallvec::SmallVec;
use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, pin::Pin};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -61,9 +60,9 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// Note that this "banning" system is not an actual ban. If a "banned" node tries to connect to
/// us, we accept the connection. The "banning" system is only about delaying dialing attempts.
///
pub struct LegacyProto<B: BlockT, TSubstream> {
pub struct LegacyProto< TSubstream> {
/// List of protocols to open with peers. Never modified.
protocol: RegisteredProtocol<B>,
protocol: RegisteredProtocol,

/// Receiver for instructions about who to connect to or disconnect from.
peerset: peerset::Peerset,
Expand All @@ -80,7 +79,7 @@ pub struct LegacyProto<B: BlockT, TSubstream> {
next_incoming_index: peerset::IncomingIndex,

/// Events to produce from `poll()`.
events: SmallVec<[NetworkBehaviourAction<CustomProtoHandlerIn<B>, LegacyProtoOut<B>>; 4]>,
events: SmallVec<[NetworkBehaviourAction<CustomProtoHandlerIn, LegacyProtoOut>; 4]>,

/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
Expand Down Expand Up @@ -189,7 +188,7 @@ struct IncomingPeer {

/// Event that can be emitted by the `LegacyProto`.
#[derive(Debug)]
pub enum LegacyProtoOut<B: BlockT> {
pub enum LegacyProtoOut {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Version of the protocol that has been opened.
Expand All @@ -213,7 +212,7 @@ pub enum LegacyProtoOut<B: BlockT> {
/// Id of the peer the message came from.
peer_id: PeerId,
/// Message that has been received.
message: Message<B>,
message: BytesMut,
},

/// The substream used by the protocol is pretty large. We should print avoid sending more
Expand All @@ -222,11 +221,11 @@ pub enum LegacyProtoOut<B: BlockT> {
/// Id of the peer which is clogged.
peer_id: PeerId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Message<B>>,
messages: Vec<Vec<u8>>,
},
}

impl<B: BlockT, TSubstream> LegacyProto<B, TSubstream> {
impl<TSubstream> LegacyProto<TSubstream> {
/// Creates a `CustomProtos`.
pub fn new(
protocol: impl Into<ProtocolId>,
Expand Down Expand Up @@ -350,8 +349,7 @@ impl<B: BlockT, TSubstream> LegacyProto<B, TSubstream> {
///
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
pub fn send_packet(&mut self, target: &PeerId, message: Message<B>)
where B: BlockT {
pub fn send_packet(&mut self, target: &PeerId, message: Vec<u8>) {
if !self.is_open(target) {
return;
}
Expand Down Expand Up @@ -607,7 +605,7 @@ impl<B: BlockT, TSubstream> LegacyProto<B, TSubstream> {
}
}

impl<B: BlockT, TSubstream> DiscoveryNetBehaviour for LegacyProto<B, TSubstream> {
impl<TSubstream> DiscoveryNetBehaviour for LegacyProto<TSubstream> {
fn add_discovered_nodes(&mut self, peer_ids: impl Iterator<Item = PeerId>) {
self.peerset.discovered(peer_ids.into_iter().map(|peer_id| {
debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id);
Expand All @@ -616,13 +614,12 @@ impl<B: BlockT, TSubstream> DiscoveryNetBehaviour for LegacyProto<B, TSubstream>
}
}

impl<B, TSubstream> NetworkBehaviour for LegacyProto<B, TSubstream>
impl<TSubstream> NetworkBehaviour for LegacyProto<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
B: BlockT,
{
type ProtocolsHandler = CustomProtoHandlerProto<B, TSubstream>;
type OutEvent = LegacyProtoOut<B>;
type ProtocolsHandler = CustomProtoHandlerProto<TSubstream>;
type OutEvent = LegacyProtoOut;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
CustomProtoHandlerProto::new(self.protocol.clone())
Expand Down Expand Up @@ -825,7 +822,7 @@ where
fn inject_node_event(
&mut self,
source: PeerId,
event: CustomProtoHandlerOut<B>,
event: CustomProtoHandlerOut,
) {
match event {
CustomProtoHandlerOut::CustomProtocolClosed { reason } => {
Expand Down Expand Up @@ -954,7 +951,7 @@ where
_params: &mut impl PollParameters,
) -> Async<
NetworkBehaviourAction<
CustomProtoHandlerIn<B>,
CustomProtoHandlerIn,
Self::OutEvent,
>,
> {
Expand Down
78 changes: 37 additions & 41 deletions core/network/src/legacy_proto/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use crate::legacy_proto::upgrade::{RegisteredProtocol, RegisteredProtocolEvent, RegisteredProtocolSubstream};
use crate::protocol::message::Message;
use bytes::BytesMut;
use futures::prelude::*;
use futures03::{compat::Compat, TryFutureExt as _};
use futures_timer::Delay;
Expand All @@ -29,7 +29,6 @@ use libp2p::swarm::{
SubstreamProtocol,
};
use log::{debug, error};
use sr_primitives::traits::Block as BlockT;
use smallvec::{smallvec, SmallVec};
use std::{borrow::Cow, error, fmt, io, marker::PhantomData, mem, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -88,36 +87,34 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// We consider that we are now "closed" if the remote closes all the existing substreams.
/// Re-opening it can then be performed by closing all active substream and re-opening one.
///
pub struct CustomProtoHandlerProto<B, TSubstream> {
pub struct CustomProtoHandlerProto<TSubstream> {
/// Configuration for the protocol upgrade to negotiate.
protocol: RegisteredProtocol<B>,
protocol: RegisteredProtocol,

/// Marker to pin the generic type.
marker: PhantomData<TSubstream>,
}

impl<B, TSubstream> CustomProtoHandlerProto<B, TSubstream>
impl<TSubstream> CustomProtoHandlerProto<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
B: BlockT,
{
/// Builds a new `CustomProtoHandlerProto`.
pub fn new(protocol: RegisteredProtocol<B>) -> Self {
pub fn new(protocol: RegisteredProtocol) -> Self {
CustomProtoHandlerProto {
protocol,
marker: PhantomData,
}
}
}

impl<B, TSubstream> IntoProtocolsHandler for CustomProtoHandlerProto<B, TSubstream>
impl<TSubstream> IntoProtocolsHandler for CustomProtoHandlerProto<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
B: BlockT,
{
type Handler = CustomProtoHandler<B, TSubstream>;
type Handler = CustomProtoHandler<TSubstream>;

fn inbound_protocol(&self) -> RegisteredProtocol<B> {
fn inbound_protocol(&self) -> RegisteredProtocol {
self.protocol.clone()
}

Expand All @@ -136,12 +133,12 @@ where
}

/// The actual handler once the connection has been established.
pub struct CustomProtoHandler<B: BlockT, TSubstream> {
pub struct CustomProtoHandler<TSubstream> {
/// Configuration for the protocol upgrade to negotiate.
protocol: RegisteredProtocol<B>,
protocol: RegisteredProtocol,

/// State of the communications with the remote.
state: ProtocolState<B, TSubstream>,
state: ProtocolState<TSubstream>,

/// Identifier of the node we're talking to. Used only for logging purposes and shouldn't have
/// any influence on the behaviour.
Expand All @@ -155,15 +152,15 @@ pub struct CustomProtoHandler<B: BlockT, TSubstream> {
///
/// This queue must only ever be modified to insert elements at the back, or remove the first
/// element.
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol<B>, (), CustomProtoHandlerOut<B>>; 16]>,
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol, (), CustomProtoHandlerOut>; 16]>,
}

/// State of the handler.
enum ProtocolState<B, TSubstream> {
enum ProtocolState<TSubstream> {
/// Waiting for the behaviour to tell the handler whether it is enabled or disabled.
Init {
/// List of substreams opened by the remote but that haven't been processed yet.
substreams: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 6]>,
substreams: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>,
/// Deadline after which the initialization is abnormally long.
init_deadline: Compat<Delay>,
},
Expand All @@ -179,17 +176,17 @@ enum ProtocolState<B, TSubstream> {
/// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
Normal {
/// The substreams where bidirectional communications happen.
substreams: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 4]>,
substreams: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 4]>,
/// Contains substreams which are being shut down.
shutdown: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 4]>,
shutdown: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 4]>,
},

/// We are disabled. Contains substreams that are being closed.
/// If we are in this state, either we have sent a `CustomProtocolClosed` message to the
/// outside or we have never sent any `CustomProtocolOpen` in the first place.
Disabled {
/// List of substreams to shut down.
shutdown: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 6]>,
shutdown: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>,

/// If true, we should reactivate the handler after all the substreams in `shutdown` have
/// been closed.
Expand All @@ -210,7 +207,7 @@ enum ProtocolState<B, TSubstream> {

/// Event that can be received by a `CustomProtoHandler`.
#[derive(Debug)]
pub enum CustomProtoHandlerIn<B: BlockT> {
pub enum CustomProtoHandlerIn {
/// The node should start using custom protocols.
Enable,

Expand All @@ -220,13 +217,13 @@ pub enum CustomProtoHandlerIn<B: BlockT> {
/// Sends a message through a custom protocol substream.
SendCustomMessage {
/// The message to send.
message: Message<B>,
message: Vec<u8>,
},
}

/// Event that can be emitted by a `CustomProtoHandler`.
#[derive(Debug)]
pub enum CustomProtoHandlerOut<B: BlockT> {
pub enum CustomProtoHandlerOut {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Version of the protocol that has been opened.
Expand All @@ -242,14 +239,14 @@ pub enum CustomProtoHandlerOut<B: BlockT> {
/// Receives a message on a custom protocol substream.
CustomMessage {
/// Message that has been received.
message: Message<B>,
message: BytesMut,
},

/// A substream to the remote is clogged. The send buffer is very large, and we should print
/// a diagnostic message and/or avoid sending more data.
Clogged {
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Message<B>>,
messages: Vec<Vec<u8>>,
},

/// An error has happened on the protocol level with this node.
Expand All @@ -261,10 +258,9 @@ pub enum CustomProtoHandlerOut<B: BlockT> {
},
}

impl<B, TSubstream> CustomProtoHandler<B, TSubstream>
impl<TSubstream> CustomProtoHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
B: BlockT,
{
/// Enables the handler.
fn enable(&mut self) {
Expand Down Expand Up @@ -342,7 +338,7 @@ where
/// Polls the state for events. Optionally returns an event to produce.
#[must_use]
fn poll_state(&mut self)
-> Option<ProtocolsHandlerEvent<RegisteredProtocol<B>, (), CustomProtoHandlerOut<B>>> {
-> Option<ProtocolsHandlerEvent<RegisteredProtocol, (), CustomProtoHandlerOut>> {
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
Expand Down Expand Up @@ -471,7 +467,7 @@ where
/// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`.
fn inject_fully_negotiated(
&mut self,
mut substream: RegisteredProtocolSubstream<B, TSubstream>
mut substream: RegisteredProtocolSubstream<TSubstream>
) {
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
Expand Down Expand Up @@ -516,7 +512,7 @@ where
}

/// Sends a message to the remote.
fn send_message(&mut self, message: Message<B>) {
fn send_message(&mut self, message: Vec<u8>) {
match self.state {
ProtocolState::Normal { ref mut substreams, .. } =>
substreams[0].send_message(message),
Expand All @@ -527,14 +523,14 @@ where
}
}

impl<B, TSubstream> ProtocolsHandler for CustomProtoHandler<B, TSubstream>
where TSubstream: AsyncRead + AsyncWrite, B: BlockT {
type InEvent = CustomProtoHandlerIn<B>;
type OutEvent = CustomProtoHandlerOut<B>;
impl<TSubstream> ProtocolsHandler for CustomProtoHandler<TSubstream>
where TSubstream: AsyncRead + AsyncWrite {
type InEvent = CustomProtoHandlerIn;
type OutEvent = CustomProtoHandlerOut;
type Substream = TSubstream;
type Error = ConnectionKillError;
type InboundProtocol = RegisteredProtocol<B>;
type OutboundProtocol = RegisteredProtocol<B>;
type InboundProtocol = RegisteredProtocol;
type OutboundProtocol = RegisteredProtocol;
type OutboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
Expand All @@ -556,7 +552,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT {
self.inject_fully_negotiated(proto);
}

fn inject_event(&mut self, message: CustomProtoHandlerIn<B>) {
fn inject_event(&mut self, message: CustomProtoHandlerIn) {
match message {
CustomProtoHandlerIn::Disable => self.disable(),
CustomProtoHandlerIn::Enable => self.enable(),
Expand Down Expand Up @@ -613,7 +609,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT {
}
}

impl<B: BlockT, TSubstream> fmt::Debug for CustomProtoHandler<B, TSubstream>
impl<TSubstream> fmt::Debug for CustomProtoHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
Expand All @@ -625,9 +621,9 @@ where

/// Given a list of substreams, tries to shut them down. The substreams that have been successfully
/// shut down are removed from the list.
fn shutdown_list<B, TSubstream>
(list: &mut SmallVec<impl smallvec::Array<Item = RegisteredProtocolSubstream<B, TSubstream>>>)
where TSubstream: AsyncRead + AsyncWrite, B: BlockT {
fn shutdown_list<TSubstream>
(list: &mut SmallVec<impl smallvec::Array<Item = RegisteredProtocolSubstream<TSubstream>>>)
where TSubstream: AsyncRead + AsyncWrite {
'outer: for n in (0..list.len()).rev() {
let mut substream = list.swap_remove(n);
loop {
Expand Down
Loading