Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Implement request-responses protocols #6634

Merged
16 commits merged into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/cli/src/params/network_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl NetworkParams {
listen_addresses,
public_addresses,
notifications_protocols: Vec::new(),
request_response_protocols: Vec::new(),
node_key,
node_name: node_name.to_string(),
client_version: client_id.to_string(),
Expand Down
3 changes: 2 additions & 1 deletion client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ targets = ["x86_64-unknown-linux-gnu"]
prost-build = "0.6.1"

[dependencies]
async-trait = "0.1"
bitflags = "1.2.0"
bs58 = "0.3.1"
bytes = "0.5.0"
Expand Down Expand Up @@ -65,7 +66,7 @@ zeroize = "1.0.0"
[dependencies.libp2p]
version = "0.21.1"
default-features = false
features = ["identify", "kad", "mdns", "mplex", "noise", "ping", "tcp-async-std", "websocket", "yamux"]
features = ["identify", "kad", "mdns", "mplex", "noise", "ping", "request-response", "tcp-async-std", "websocket", "yamux"]

[dev-dependencies]
async-std = "1.6.2"
Expand Down
98 changes: 79 additions & 19 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

use crate::{
config::{ProtocolId, Role}, block_requests, light_client_handler, finality_requests,
peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
request_responses, peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
protocol::{message::{self, Roles}, CustomMessageOutcome, Protocol},
Event, ObservedRole, DhtEvent, ExHashT,
service::maybe_utf8_bytes_to_string, Event, ObservedRole, DhtEvent, ExHashT,
};

use codec::Encode as _;
Expand All @@ -37,6 +37,10 @@ use std::{
time::Duration,
};

pub use crate::request_responses::{
InboundError, InboundFailure, OutboundFailure, RequestId, SendRequestError
};

/// General behaviour of the network. Combines all protocols together.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOut<B>", poll_method = "poll")]
Expand All @@ -48,6 +52,8 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
peer_info: peer_info::PeerInfoBehaviour,
/// Discovers nodes of the network.
discovery: DiscoveryBehaviour,
/// Generic request-reponse protocols.
request_responses: request_responses::RequestResponsesBehaviour,
/// Block request handling.
block_requests: block_requests::BlockRequests<B>,
/// Finality proof request handling.
Expand All @@ -74,22 +80,40 @@ pub enum BehaviourOut<B: BlockT> {
RandomKademliaStarted(ProtocolId),

/// We have received a request from a peer and answered it.
AnsweredRequest {
///
/// This event is generated for statistics purposes.
InboundRequest {
/// Peer which sent us a request.
peer: PeerId,
/// Protocol name of the request.
protocol: Vec<u8>,
/// Time it took to build the response.
build_time: Duration,
protocol: Cow<'static, str>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do protocol names have to be strings instead of bytes?

Copy link
Contributor Author

@tomaka tomaka Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While libp2p allows bytes, I went for restricting this to strings in Substrate because:

  • In practice they're always strings anyway.
  • These protocol names are reported to Prometheus, and Prometheus expects strings (which is the reason for maybe_utf8_bytes_to_string).

/// If `Ok`, contains the time elapsed between when we received the request and when we
/// sent back the response. If `Err`, the error that happened.
outcome: Result<Duration, InboundError>,
tomaka marked this conversation as resolved.
Show resolved Hide resolved
},

/// A request initiated using [`Behaviour::send_request`] has succeeded or failed.
RequestFinished {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RequestFinished {
OutboundRequestFinished {

Would be easier for me to understand without having to read the doc comment.

/// Request that has succeeded.
request_id: RequestId,
/// Response sent by the remote or reason for failure.
outcome: Result<Vec<u8>, OutboundFailure>,
tomaka marked this conversation as resolved.
Show resolved Hide resolved
},

/// Started a new request with the given node.
RequestStarted {
///
/// This event is for statistics purposes only. The request and response handling are entirely
/// internal to the behaviour.
OpaqueRequestStarted {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
OpaqueRequestStarted {
OutboundRequestStarted {

Would that not be more descriptive? Would as well match with InboundRequest above.

peer: PeerId,
/// Protocol name of the request.
protocol: Vec<u8>,
},
/// Finished, successfully or not, a previously-started request.
RequestFinished {
///
/// This event is for statistics purposes only. The request and response handling are entirely
/// internal to the behaviour.
OpaqueRequestFinished {
/// Who we were requesting.
peer: PeerId,
/// Protocol name of the request.
Expand All @@ -116,17 +140,20 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
finality_proof_requests: finality_requests::FinalityProofRequests<B>,
light_client_handler: light_client_handler::LightClientHandler<B>,
disco_config: DiscoveryConfig,
) -> Self {
Behaviour {
request_response_protocols: Vec<request_responses::ProtocolConfig>,
) -> Result<Self, request_responses::RegisterError> {
Ok(Behaviour {
substrate,
peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key),
discovery: disco_config.finish(),
request_responses:
request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?,
block_requests,
finality_proof_requests,
light_client_handler,
events: VecDeque::new(),
role,
}
})
}

/// Returns the list of nodes that we know exist in the network.
Expand Down Expand Up @@ -163,6 +190,16 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
self.peer_info.node(peer_id)
}

/// Initiates sending a request.
///
/// An error is returned if we are not connected to the target peer of if the protocol doesn't
/// match one that has been registered.
pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec<u8>)
-> Result<RequestId, SendRequestError>
{
self.request_responses.send_request(target, protocol, request)
}

/// Registers a new notifications protocol.
///
/// After that, you can call `write_notifications`.
Expand Down Expand Up @@ -255,18 +292,18 @@ Behaviour<B, H> {
CustomMessageOutcome::BlockRequest { target, request } => {
match self.block_requests.send_request(&target, request) {
block_requests::SendRequestOutcome::Ok => {
self.events.push_back(BehaviourOut::RequestStarted {
self.events.push_back(BehaviourOut::OpaqueRequestStarted {
peer: target,
protocol: self.block_requests.protocol_name().to_vec(),
});
},
block_requests::SendRequestOutcome::Replaced { request_duration, .. } => {
self.events.push_back(BehaviourOut::RequestFinished {
self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: target.clone(),
protocol: self.block_requests.protocol_name().to_vec(),
request_duration,
});
self.events.push_back(BehaviourOut::RequestStarted {
self.events.push_back(BehaviourOut::OpaqueRequestStarted {
peer: target,
protocol: self.block_requests.protocol_name().to_vec(),
});
Expand Down Expand Up @@ -307,18 +344,41 @@ Behaviour<B, H> {
}
}

impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<request_responses::Event> for Behaviour<B, H> {
fn inject_event(&mut self, event: request_responses::Event) {
match event {
request_responses::Event::InboundRequest { peer, protocol, outcome } => {
romanb marked this conversation as resolved.
Show resolved Hide resolved
self.events.push_back(BehaviourOut::InboundRequest {
peer,
protocol,
outcome,
});
}

request_responses::Event::OutboundFinished { request_id, outcome } => {
self.events.push_back(BehaviourOut::RequestFinished {
request_id,
outcome,
});
},
}
}
}

impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B>> for Behaviour<B, H> {
fn inject_event(&mut self, event: block_requests::Event<B>) {
match event {
block_requests::Event::AnsweredRequest { peer, total_handling_time } => {
self.events.push_back(BehaviourOut::AnsweredRequest {
let protocol = maybe_utf8_bytes_to_string(&self.block_requests.protocol_name())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to its documentation maybe_utf8_bytes_to_string is meant for diagnostics only and non-UTF-8 bytes are just format!ed as a string. If protocol names are now supposed to be strings (why?), should BlockRequests::protocol_name not return a &str?

Copy link
Contributor Author

@tomaka tomaka Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The protocol name contained in BlockRequests unfortunately contains the ProtocolId, which might theoretically not be UTF-8. In practice, this ProtocolId is always UTF-8, and we could add a requirement that the ProtocolId must be UTF-8, but this was a bit too off-topic for this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this would be handled consistently. If changing BlockRequests::protocol_name to return a &str is too much for this PR maybe protocol names should continue to be &[u8] for the time being and a follow-up PR could transition to &str everywhere at once.

Copy link
Contributor Author

@tomaka tomaka Jul 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that you hate maybe_utf8_bytes_to_string, but this is not something that this PR introduces, and the change is out of scope to me. I've opened #6660.

.into_owned();
self.events.push_back(BehaviourOut::InboundRequest {
peer,
protocol: self.block_requests.protocol_name().to_vec(),
build_time: total_handling_time,
protocol: From::from(protocol),
outcome: Ok(total_handling_time),
});
},
block_requests::Event::Response { peer, original_request: _, response, request_duration } => {
self.events.push_back(BehaviourOut::RequestFinished {
self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_vec(),
request_duration,
Expand All @@ -330,7 +390,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B
block_requests::Event::RequestTimeout { peer, request_duration, .. } => {
// There doesn't exist any mechanism to report cancellations or timeouts yet, so
// we process them by disconnecting the node.
self.events.push_back(BehaviourOut::RequestFinished {
self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_vec(),
request_duration,
Expand Down
13 changes: 8 additions & 5 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

pub use crate::chain::{Client, FinalityProofProvider};
pub use crate::on_demand_layer::{AlwaysBadChecker, OnDemand};
pub use crate::request_responses::{IncomingRequest, ProtocolConfig as RequestResponseConfig};
pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr};

// Note: this re-export shouldn't be part of the public API of the crate and will be removed in
Expand All @@ -34,9 +35,10 @@ use crate::ExHashT;

use core::{fmt, iter};
use futures::future;
use libp2p::identity::{ed25519, Keypair};
use libp2p::wasm_ext;
use libp2p::{multiaddr, Multiaddr, PeerId};
use libp2p::{
identity::{ed25519, Keypair},
multiaddr, wasm_ext, Multiaddr, PeerId,
};
use prometheus_endpoint::Registry;
use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
Expand Down Expand Up @@ -407,6 +409,8 @@ pub struct NetworkConfiguration {
/// List of notifications protocols that the node supports. Must also include a
/// `ConsensusEngineId` for backwards-compatibility.
pub notifications_protocols: Vec<(ConsensusEngineId, Cow<'static, [u8]>)>,
/// List of request-response protocols that the node supports.
pub request_response_protocols: Vec<RequestResponseConfig>,
/// Maximum allowed number of incoming connections.
pub in_peers: u32,
/// Number of outgoing connections we're trying to maintain.
Expand Down Expand Up @@ -442,6 +446,7 @@ impl NetworkConfiguration {
boot_nodes: Vec::new(),
node_key,
notifications_protocols: Vec::new(),
request_response_protocols: Vec::new(),
in_peers: 25,
out_peers: 75,
reserved_nodes: Vec::new(),
Expand All @@ -458,9 +463,7 @@ impl NetworkConfiguration {
allow_non_globals_in_dht: false,
}
}
}

impl NetworkConfiguration {
/// Create new default configuration for localhost-only connection with random port (useful for testing)
pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new(
Expand Down
9 changes: 8 additions & 1 deletion client/network/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use crate::config::TransportConfig;
use libp2p::{PeerId, Multiaddr};

use std::fmt;
use std::{borrow::Cow, fmt};

/// Result type alias for the network.
pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -61,6 +61,12 @@ pub enum Error {
/// The invalid addresses.
addresses: Vec<Multiaddr>,
},
/// The same request-response protocol has been registered multiple times.
#[display(fmt = "Request-response protocol registered multiple times: {}", protocol)]
DuplicateRequestResponseProtocol {
/// Name of the protocol registered multiple times.
protocol: Cow<'static, str>,
},
}

// Make `Debug` use the `Display` implementation.
Expand All @@ -78,6 +84,7 @@ impl std::error::Error for Error {
Error::DuplicateBootnode { .. } => None,
Error::Prometheus(ref err) => Some(err),
Error::AddressesForAnotherTransport { .. } => None,
Error::DuplicateRequestResponseProtocol { .. } => None,
}
}
}
10 changes: 4 additions & 6 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ mod finality_requests;
mod light_client_handler;
mod on_demand_layer;
mod protocol;
mod request_responses;
mod schema;
mod service;
mod transport;
Expand All @@ -260,13 +261,10 @@ pub mod config;
pub mod error;
pub mod network_state;

pub use service::{NetworkService, NetworkWorker};
pub use protocol::PeerInfo;
pub use protocol::event::{Event, DhtEvent, ObservedRole};
pub use protocol::sync::SyncState;
pub use libp2p::{Multiaddr, PeerId};
#[doc(inline)]
pub use libp2p::multiaddr;
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use protocol::{event::{DhtEvent, Event, ObservedRole}, sync::SyncState, PeerInfo};
pub use service::{NetworkService, NetworkWorker, OutboundFailure};

pub use sc_peerset::ReputationChange;
use sp_runtime::traits::{Block as BlockT, NumberFor};
Expand Down
Loading