Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add a protocol that answers finality proofs #5718

Merged
merged 5 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/network/build.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const PROTOS: &[&str] = &[
"src/protocol/schema/api.v1.proto",
"src/protocol/schema/finality.v1.proto",
"src/protocol/schema/light.v1.proto"
];

Expand Down
6 changes: 5 additions & 1 deletion client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use codec::Encode as _;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
use libp2p::kad::record;
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, toggle::Toggle};
use log::debug;
use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}};
use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId, Justification};
Expand All @@ -45,6 +45,8 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
discovery: DiscoveryBehaviour,
/// Block request handling.
block_requests: protocol::BlockRequests<B>,
/// Finality proof request handling.
finality_proof_requests: Toggle<protocol::FinalityProofRequests<B>>,
/// Light client request handling.
light_client_handler: protocol::LightClientHandler<B>,

Expand Down Expand Up @@ -75,6 +77,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
user_agent: String,
local_public_key: PublicKey,
block_requests: protocol::BlockRequests<B>,
finality_proof_requests: Option<protocol::FinalityProofRequests<B>>,
light_client_handler: protocol::LightClientHandler<B>,
disco_config: DiscoveryConfig,
) -> Self {
Expand All @@ -83,6 +86,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()),
discovery: disco_config.finish(),
block_requests,
finality_proof_requests: From::from(finality_proof_requests),
light_client_handler,
events: Vec::new(),
role,
Expand Down
5 changes: 5 additions & 0 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ use wasm_timer::Instant;
pub mod api {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/api.v1.rs"));
pub mod finality {
include!(concat!(env!("OUT_DIR"), "/api.v1.finality.rs"));
}
pub mod light {
include!(concat!(env!("OUT_DIR"), "/api.v1.light.rs"));
}
Expand All @@ -72,12 +75,14 @@ mod generic_proto;
mod util;

pub mod block_requests;
pub mod finality_requests;
pub mod message;
pub mod event;
pub mod light_client_handler;
pub mod sync;

pub use block_requests::BlockRequests;
pub use finality_requests::FinalityProofRequests;
pub use light_client_handler::LightClientHandler;
pub use generic_proto::LegacyConnectionKillError;

Expand Down
266 changes: 266 additions & 0 deletions client/network/src/protocol/finality_requests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
//
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

//! `NetworkBehaviour` implementation which handles incoming finality proof requests.
//!
//! Every request is coming in on a separate connection substream which gets
//! closed after we have sent the response back. Incoming requests are encoded
//! as protocol buffers (cf. `finality.v1.proto`).

#![allow(unused)]

use bytes::Bytes;
use codec::{Encode, Decode};
use crate::{
chain::FinalityProofProvider,
config::ProtocolId,
protocol::{api, message::BlockAttributes}
};
use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
use libp2p::{
core::{
ConnectedPoint,
Multiaddr,
PeerId,
connection::ConnectionId,
upgrade::{InboundUpgrade, ReadOneError, UpgradeInfo, Negotiated},
upgrade::{DeniedUpgrade, read_one, write_one}
},
swarm::{
NegotiatedSubstream,
NetworkBehaviour,
NetworkBehaviourAction,
OneShotHandler,
OneShotHandlerConfig,
PollParameters,
SubstreamProtocol
}
};
use prost::Message;
use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}};
use std::{
cmp::min,
io,
iter,
sync::Arc,
time::Duration,
task::{Context, Poll}
};
use void::{Void, unreachable};

// Type alias for convenience.
pub type Error = Box<dyn std::error::Error + 'static>;

/// Configuration options for `FinalityProofRequests`.
#[derive(Debug, Clone)]
pub struct Config {
max_request_len: usize,
inactivity_timeout: Duration,
protocol: Bytes,
}

impl Config {
/// Create a fresh configuration with the following options:
///
/// - max. request size = 1 MiB
/// - inactivity timeout = 15s
pub fn new(id: &ProtocolId) -> Self {
let mut c = Config {
max_request_len: 1024 * 1024,
inactivity_timeout: Duration::from_secs(15),
protocol: Bytes::new(),
};
c.set_protocol(id);
c
}

/// Limit the max. length of incoming finality proof request bytes.
pub fn set_max_request_len(&mut self, v: usize) -> &mut Self {
self.max_request_len = v;
self
}

/// Limit the max. duration the substream may remain inactive before closing it.
pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self {
self.inactivity_timeout = v;
self
}

/// Set protocol to use for upgrade negotiation.
pub fn set_protocol(&mut self, id: &ProtocolId) -> &mut Self {
let mut v = Vec::new();
v.extend_from_slice(b"/");
v.extend_from_slice(id.as_bytes());
v.extend_from_slice(b"/finality-proof/1");
self.protocol = v.into();
self
}
}

/// The finality proof request handling behaviour.
pub struct FinalityProofRequests<B: Block> {
/// This behaviour's configuration.
config: Config,
/// How to construct finality proofs.
finality_proof_provider: Arc<dyn FinalityProofProvider<B>>,
/// Futures sending back the finality proof request responses.
outgoing: FuturesUnordered<BoxFuture<'static, ()>>,
}

impl<B> FinalityProofRequests<B>
where
B: Block,
{
/// Initializes the behaviour.
pub fn new(cfg: Config, finality_proof_provider: Arc<dyn FinalityProofProvider<B>>) -> Self {
FinalityProofRequests {
config: cfg,
finality_proof_provider,
outgoing: FuturesUnordered::new(),
}
}

/// Callback, invoked when a new finality request has been received from remote.
fn on_finality_request(&mut self, peer: &PeerId, request: &api::v1::finality::FinalityProofRequest)
-> Result<api::v1::finality::FinalityProofResponse, Error>
{
let block_hash = Decode::decode(&mut request.block_hash.as_ref())?;

log::trace!(target: "sync", "Finality proof request from {} for {}", peer, block_hash);

let finality_proof = self.finality_proof_provider
.prove_finality(block_hash, &request.request)?
.unwrap_or(Vec::new());
// Note that an empty Vec is sent if no proof is available.

Ok(api::v1::finality::FinalityProofResponse { proof: finality_proof })
}
}

impl<B> NetworkBehaviour for FinalityProofRequests<B>
where
B: Block
{
type ProtocolsHandler = OneShotHandler<Protocol, DeniedUpgrade, Request<NegotiatedSubstream>>;
type OutEvent = Void;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
let p = Protocol {
max_request_len: self.config.max_request_len,
protocol: self.config.protocol.clone(),
};
let mut cfg = OneShotHandlerConfig::default();
cfg.inactive_timeout = self.config.inactivity_timeout;
OneShotHandler::new(SubstreamProtocol::new(p), cfg)
}

fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}

fn inject_connected(&mut self, _peer: &PeerId) {
}

fn inject_disconnected(&mut self, _peer: &PeerId) {
}

fn inject_event(
&mut self,
peer: PeerId,
connection: ConnectionId,
Request(request, mut stream): Request<NegotiatedSubstream>
) {
match self.on_finality_request(&peer, &request) {
Ok(res) => {
log::trace!("enqueueing finality response for peer {}", peer);
let mut data = Vec::with_capacity(res.encoded_len());
if let Err(e) = res.encode(&mut data) {
log::debug!("error encoding finality response for peer {}: {}", peer, e)
} else {
let future = async move {
if let Err(e) = write_one(&mut stream, data).await {
log::debug!("error writing finality response: {}", e)
}
};
self.outgoing.push(future.boxed())
}
}
Err(e) => log::debug!("error handling finality request from peer {}: {}", peer, e)
}
}

fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<DeniedUpgrade, Void>> {
while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {}
Poll::Pending
}
}

/// The incoming finality proof request.
///
/// Holds the protobuf value and the connection substream which made the
/// request and over which to send the response.
#[derive(Debug)]
pub struct Request<T>(api::v1::finality::FinalityProofRequest, T);

impl<T> From<Void> for Request<T> {
fn from(v: Void) -> Self {
unreachable(v)
}
}

/// Substream upgrade protocol.
///
/// We attempt to parse an incoming protobuf encoded request (cf. `Request`)
/// which will be handled by the `FinalityProofRequests` behaviour, i.e. the request
/// will become visible via `inject_node_event` which then dispatches to the
/// relevant callback to process the message and prepare a response.
#[derive(Debug, Clone)]
pub struct Protocol {
/// The max. request length in bytes.
max_request_len: usize,
/// The protocol to use during upgrade negotiation.
protocol: Bytes,
}

impl UpgradeInfo for Protocol {
type Info = Bytes;
type InfoIter = iter::Once<Self::Info>;

fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.protocol.clone())
}
}

impl<T> InboundUpgrade<T> for Protocol
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static
{
type Output = Request<T>;
type Error = ReadOneError;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future {
async move {
let len = self.max_request_len;
let vec = read_one(&mut s, len).await?;
match api::v1::finality::FinalityProofRequest::decode(&vec[..]) {
Ok(r) => Ok(Request(r, s)),
Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)))
}
}.boxed()
}
}

19 changes: 19 additions & 0 deletions client/network/src/protocol/schema/finality.v1.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Schema definition for finality proof request/responses.

syntax = "proto3";

package api.v1.finality;

// Request a finality proof from a peer.
message FinalityProofRequest {
// SCALE-encoded hash of the block to request.
bytes block_hash = 1;
// Opaque chain-specific additional request data.
bytes request = 2;
}

// Response to a finality proof request.
message FinalityProofResponse {
// Opaque chain-specific finality proof. Empty if no such proof exists.
bytes proof = 1; // optional
}
13 changes: 11 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
let config = protocol::block_requests::Config::new(&params.protocol_id);
protocol::BlockRequests::new(config, params.chain.clone())
};
let finality_proof_requests = if let Some(pb) = &params.finality_proof_provider {
let config = protocol::finality_requests::Config::new(&params.protocol_id);
Some(protocol::FinalityProofRequests::new(config, pb.clone()))
} else {
None
};
let light_client_handler = {
let config = protocol::light_client_handler::Config::new(&params.protocol_id);
protocol::LightClientHandler::new(
Expand Down Expand Up @@ -261,6 +267,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
user_agent,
local_public,
block_requests,
finality_proof_requests,
light_client_handler,
discovery_config
);
Expand Down Expand Up @@ -1113,10 +1120,12 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
ConnectionError::IO(_) =>
metrics.connections_closed_total.with_label_values(&[dir, "transport-error"]).inc(),
ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::B(EitherError::A(PingFailure::Timeout))))))) =>
EitherError::A(EitherError::A(EitherError::B(
EitherError::A(PingFailure::Timeout)))))))) =>
metrics.connections_closed_total.with_label_values(&[dir, "ping-timeout"]).inc(),
ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::B(LegacyConnectionKillError))))))) =>
EitherError::A(EitherError::A(EitherError::A(
EitherError::B(LegacyConnectionKillError)))))))) =>
metrics.connections_closed_total.with_label_values(&[dir, "force-closed"]).inc(),
ConnectionError::Handler(NodeHandlerWrapperError::Handler(_)) =>
metrics.connections_closed_total.with_label_values(&[dir, "protocol-error"]).inc(),
Expand Down